Your Web News in One Place

Help Webnuz

Referal links:

Sign up for GreenGeeks web hosting
December 22, 2022 11:57 pm GMT

[OpenTelemetry] Observability of Async Processes with Custom Propagator

Its the 23rd day of Makuake Advent Calendar 2022 .

As we know OpenTelemetry is an observability framework to generate and export telemetry data. As more companies adopt microservices and SLI/SLOs, we need it to answer new, never-before-seen (and complex) questions.

In that context, observability of systems that communicate using asynchronous messaging is as important as systems communicates using synchronous messagin like HTTP or gRPC.

How about a notification system as an example:

  • How long does it take for the end-user to receive the notification after the notification event is fired?
  • Whats the error rate access the notification lifecycle?

To answer those questions, we may need expensive and unique structures.

Instead, in this article, Ill demonstrate an easy way to do that with OpenTelemetry.

Example Case

All source code is here:

https://github.com/ymtdzzz/batch-tracing-sample

example case

Processing flow:

  • Enqueue the notification content as a message into a queue (Rabbit MQ) in batch
  • A worker (consumer) asynchronously receives the message and send a request to the notification server (/email or /push)
  • In the notification server, it responses 200 or 500

Its assumed that the instrumentation of each component has been completed, and the HTTP communication has also been instrumented by net/http auto instrumentation library.

Problem

problem

In the current state, batch processing and subsequent processing (worker) cannot be traced.

trace a

trace b

Moreover, there doesnt seem to be an instrumentation library for RabbitMQ in Golang.

https://opentelemetry.io/registry/?s=rabbitmq&component=&language=

What Should We Do?

To propagate context, we can use OpenTelemetry Propagator for both sync and async types of messaging!

Implement Custom Propagator for RabbitMQ

Whats Propagator?

https://opentelemetry.io/docs/reference/specification/context/api-propagators/

Propagator API is interface definitions for propagating contexts across process - how sender *Inject*s context into message and how receiver *Extract*s it from message. Propagator has Carrier which has a responsibility to actual injection and extraction from any type of messages.

Fortunately, RabbitMQ allows to put key-value format Headers in messages (docs), so we can use TextMapPropagator.

propagation flow

Propagator Implementation

Actually, since it is not Propagator but the Carrier that manipulates the TextMap, all we have to do is implementing the struct that satisfies the TextMapCarrier interface!

TextMapCarrier interface (doc):

type TextMapCarrier interface {    // Get returns the value associated with the passed key.    Get(key string) string    // Set stores the key-value pair.    Set(key string, value string)    // Keys lists the keys stored in this carrier.    Keys() []string}

Carrier implementation for this interface (source code):

type AMQPCarrier struct {    headers amqp.Table}func (c *AMQPCarrier) Get(key string) string {    return fmt.Sprintf("%s", c.headers[key])}func (c *AMQPCarrier) Set(key string, value string) {    c.headers[key] = value}func (c *AMQPCarrier) Keys() []string {    keys := make([]string, len(c.headers))    for k := range c.headers {        keys = append(keys, k)    }    return keys}

amqp.Table is just map[string]interface{} . Get() implementation is a little rough but its enough for example ;)

Sender Side Implementation

At the sender side, we can inject context into header and just send the message (source code).

    // Create an empty amqp.Tables    headers := amqp.NewConnectionProperties()       // Assign it to custom Carrier    carrier := internal.NewAMQPCarrier(headers)    // Inject the context    otel.GetTextMapPropagator().Inject(ctx, carrier)    err = ch.PublishWithContext(        ctx,        "",        q.Name,        false,        false,        amqp.Publishing{            ContentType: "application/octet-stream",            Body:        msg,            Headers:     headers, // Assign the context injected headers        },    )    if err != nil {        panic(err)    }    log.Println("Message has been sent")

Receiver Side Implementation

Receiver side is the same.

        // Assign the received headers to custom Carrier        carrier := internal.NewAMQPCarrier(d.Headers)        // Extract the context        ctx := otel.GetTextMapPropagator().Extract(context.Background(), carrier)        // Generate child Span with received context as parent Span        ctx, span := otel.Tracer("notification").Start(ctx, "consume")        msg, err := internal.DecodeNotificationMessage(d.Body)        if err != nil {            panic(err)        }        log.Printf("received msg: %v
", msg) internal.CallServer(ctx, &client, msg) span.End()

Were All Set

Now, lets start the apps and check the Jaeger UI endpoint (http://localhost:16686/).

final result a

By connecting the traces, we can now investigate any errors throughout the notification lifecycle easily.

final result b

Moreover, since the duration of the entire Trace is able to be measured, we can analyze bottlenecks of performance decreasing and notice slow notifications based on user experience.

I hope you learned something new from this post, please let me know if you have any feedback in the comments or Twitter!


Original Link: https://dev.to/ymtdzzz/opentelemetry-observability-of-async-processes-with-custom-propagator-2043

Share this article:    Share on Facebook
View Full Article

Dev To

An online community for sharing and discovering great ideas, having debates, and making friends

More About this Source Visit Dev To