Handling Asynchronous Errors on GCP with go-cloud and Pub/Sub
Living in a world where people don’t care about the response of others is no longer a joke, rather an optimization technique. However, actions have reactions, and sometimes even systems riot when we forget that YAML needs more space.
Before jumping on the asynchronous ship, remember, that synchronous error reporting is your friend — you don’t want to build the next Titanic. Don’t optimize too soon.
When we step into the asynchronous world, it’s a little harder to handle these failed requests. When a message can’t be processed, it doesn’t just disappear (or at least it should not). Instead, it stays in the system, causing even more errors every time it retries. Bloating logs, restarting servers, or blocking other messages from being processed while we are trying to debug what’s going on. So how do we handle this?
Parking Lane for Errors
The technique we’ll discuss here involves creating a parking lane for messages that cause an error in the process implementation. While this won’t avoid full system crashes (e.g., due to wrong allocations), it will quickly catch invalid data.
Messages are being published into an input Pub/Sub topic called job-requests
and delivered to the process via subscription. If the process fails on any non-recoverable state, it pushes the message into the job-fails
which will park it until support can have a look at the message. While failed jobs are parked, they can be pushed back to the job-requests
once the implementation, or the message is corrected.
Preparing Pub/Subs with Terraform
We are going to use the GCP Terraform provider and create two topics with their subscriptions so we can later use them in our Go code. Retaining acked messages on the parking lane will allow us to seek the message for seven days. The retention helps to empty the parking lane while we can still work with the message.
provider "google" {
project = "<your-project>"
}# input topic for requests
resource "google_pubsub_topic" "job_requests" {
name = "job-requests"
}resource "google_pubsub_subscription" "job_requests" {
name = "job-requests-subscription"
topic = google_pubsub_topic.job_requests.name ack_deadline_seconds = 20
}# output topic for failed messages
resource "google_pubsub_topic" "job_fails" {
name = "job-fails"
}resource "google_pubsub_subscription" "job_fails" {
name = "job-fails-subscription"
topic = google_pubsub_topic.job_fails.name # retain messages even after acking so they can be retrieved
# by subscription seek if needed (Default: 7 days).
retain_acked_messages = true
}
Note that we need the job-fails-subscription
to retain the failed messages. Publishing a message into a topic without subscriptions will scratch the message.
Connecting to the Topics with go-cloud
The go-cloud library is a portable open-source implementation of multiple cloud products like storage buckets, Pub/Sub, or secrets. We’ll first need to download it using:
go get gocloud.dev
The above command will download the portable library with implementations for all supported clouds, so it should be pretty straight-forward if you want to convert this code to AWS, Azure, etc.
Let’s head into the code. First, we’ll need to import the library along with the GCP Pub/Sub driver. We’ll also import original Pub/Sub message implementation so we can get access to the GCP-specific fields.
import (
"gocloud.dev/pubsub" # portable library
_ "gocloud.dev/pubsub/gcppubsub" # GCP driver# gcp implementation of the messages
gcpPubSub "google.golang.org/genproto/googleapis/pubsub/v1"
)
Next, we’ll need to connect to the message subscription and fails topics for the parking lane:
const requests = "gcppubsub://<project>/job-requests-subscription"
const parkingLane = "gcppubsub://<project>/job-fails"// !! You will want to create your own context in case you want to handle interrupts !!
ctx := context.Background()in, err := pubsub.OpenSubscription(ctx, requests)
if err != nil {
// ...
}fails, err := pubsub.OpenTopic(ctx, parkingLane)
if err != nil {
// ...
}# don't forget to close the subscription and topic once done
defer func() {
err := in.Shutdown(ctx)
if err != nil {
// ...
} err = fails.Shutdown(ctx)
if err != nil {
// ...
}
}()
Handling Messages
Now to the implementation of the message handler. We will receive a message from the input subscription. If the unmarshal fails, the message will be reported in logs and sent to the parking lane. Lastly, we acknowledge the message in the input subscription, so it’s not retried by the Pub/Sub:
// receive the message from the input subscription
msg, err := in.Receive(ctx)
if err != nil {
panic(err)
}// process our message
var say YourMessageType
err = json.Unmarshal(msg.Body, &say)
if err != nil {
// convert to the GCP specific PubSub message
originalMsg := &gcpPubSub.PubsubMessage{} // log the error with the original message ID
log.Println("An error occured when processing the pubsub message:", originalMsg.MessageId, ", err:", err.Error())err = fails.Send(context.Background(), &pubsub.Message{
Body: msg.Body,
Metadata: map[string]string{
// add the original message ID to the metadata
"id": originalMsg.MessageId,
},
})
}// acknowledge the message so it's not repeated
msg.Ack()
If you are asking about the originalMsg
conversion, the go-cloud version of the message doesn’t contain MessageId
which we use for tracking the message inside the logs. Go-cloud, however, comes with a method called As
which helps you convert to the original platform-specific models.
Monitoring
Messages that end up in the parking lane can be monitored using the GCP dashboards or Stackdriver Alerts. You can view the messages directly in the UI for fast debugging or seek old messages in case they were already acked.