Handling Asynchronous Errors on GCP with go-cloud and Pub/Sub

Peter Malina
5 min readSep 18, 2019

--

Living in a world where people don’t care about the response of others is no longer a joke, rather an optimization technique. However, actions have reactions, and sometimes even systems riot when we forget that YAML needs more space.

Before jumping on the asynchronous ship, remember, that synchronous error reporting is your friend — you don’t want to build the next Titanic. Don’t optimize too soon.

Synchronous Error Reporting.

When we step into the asynchronous world, it’s a little harder to handle these failed requests. When a message can’t be processed, it doesn’t just disappear (or at least it should not). Instead, it stays in the system, causing even more errors every time it retries. Bloating logs, restarting servers, or blocking other messages from being processed while we are trying to debug what’s going on. So how do we handle this?

Parking Lane for Errors

The technique we’ll discuss here involves creating a parking lane for messages that cause an error in the process implementation. While this won’t avoid full system crashes (e.g., due to wrong allocations), it will quickly catch invalid data.

Messages are being published into an input Pub/Sub topic called job-requests and delivered to the process via subscription. If the process fails on any non-recoverable state, it pushes the message into the job-fails which will park it until support can have a look at the message. While failed jobs are parked, they can be pushed back to the job-requests once the implementation, or the message is corrected.

Parking Lante for Failed Messages

Preparing Pub/Subs with Terraform

We are going to use the GCP Terraform provider and create two topics with their subscriptions so we can later use them in our Go code. Retaining acked messages on the parking lane will allow us to seek the message for seven days. The retention helps to empty the parking lane while we can still work with the message.

provider "google" {
project = "<your-project>"
}
# input topic for requests
resource "google_pubsub_topic" "job_requests" {
name = "job-requests"
}
resource "google_pubsub_subscription" "job_requests" {
name = "job-requests-subscription"
topic = google_pubsub_topic.job_requests.name
ack_deadline_seconds = 20
}
# output topic for failed messages
resource "google_pubsub_topic" "job_fails" {
name = "job-fails"
}
resource "google_pubsub_subscription" "job_fails" {
name = "job-fails-subscription"
topic = google_pubsub_topic.job_fails.name
# retain messages even after acking so they can be retrieved
# by subscription seek if needed (Default: 7 days).
retain_acked_messages = true
}

Note that we need the job-fails-subscription to retain the failed messages. Publishing a message into a topic without subscriptions will scratch the message.

Connecting to the Topics with go-cloud

The go-cloud library is a portable open-source implementation of multiple cloud products like storage buckets, Pub/Sub, or secrets. We’ll first need to download it using:

go get gocloud.dev

The above command will download the portable library with implementations for all supported clouds, so it should be pretty straight-forward if you want to convert this code to AWS, Azure, etc.

Let’s head into the code. First, we’ll need to import the library along with the GCP Pub/Sub driver. We’ll also import original Pub/Sub message implementation so we can get access to the GCP-specific fields.

import (
"gocloud.dev/pubsub" # portable library
_ "gocloud.dev/pubsub/gcppubsub" # GCP driver
# gcp implementation of the messages
gcpPubSub "google.golang.org/genproto/googleapis/pubsub/v1"
)

Next, we’ll need to connect to the message subscription and fails topics for the parking lane:

const requests = "gcppubsub://<project>/job-requests-subscription"
const parkingLane = "gcppubsub://<project>/job-fails"
// !! You will want to create your own context in case you want to handle interrupts !!
ctx := context.Background()
in, err := pubsub.OpenSubscription(ctx, requests)
if err != nil {
// ...
}
fails, err := pubsub.OpenTopic(ctx, parkingLane)
if err != nil {
// ...
}
# don't forget to close the subscription and topic once done
defer func() {
err := in.Shutdown(ctx)
if err != nil {
// ...
}
err = fails.Shutdown(ctx)
if err != nil {
// ...
}
}()

Handling Messages

Now to the implementation of the message handler. We will receive a message from the input subscription. If the unmarshal fails, the message will be reported in logs and sent to the parking lane. Lastly, we acknowledge the message in the input subscription, so it’s not retried by the Pub/Sub:

// receive the message from the input subscription
msg, err := in.Receive(ctx)
if err != nil {
panic(err)
}
// process our message
var say YourMessageType
err = json.Unmarshal(msg.Body, &say)
if err != nil {
// convert to the GCP specific PubSub message
originalMsg := &gcpPubSub.PubsubMessage{}
// log the error with the original message ID
log.Println("An error occured when processing the pubsub message:", originalMsg.MessageId, ", err:", err.Error())
err = fails.Send(context.Background(), &pubsub.Message{
Body: msg.Body,
Metadata: map[string]string{
// add the original message ID to the metadata
"id": originalMsg.MessageId,
},
})
}
// acknowledge the message so it's not repeated
msg.Ack()

If you are asking about the originalMsg conversion, the go-cloud version of the message doesn’t contain MessageId which we use for tracking the message inside the logs. Go-cloud, however, comes with a method called As which helps you convert to the original platform-specific models.

Monitoring

Messages that end up in the parking lane can be monitored using the GCP dashboards or Stackdriver Alerts. You can view the messages directly in the UI for fast debugging or seek old messages in case they were already acked.

Google Cloud Pub/Sub Subscription Panel

Last Words

If you’re mystified by the Pub/Sub, make sure to read this page about concepts such as subscriptions and topics.

Questions? Don’t hesitate to write a comment, or ping me on Twitter or LinkedIn.

--

--