finclip-app-manager/vendor/gitlab.finogeeks.club/finclip-backend/apm/client_kafka.go

37 lines
1.1 KiB
Go

package apm
import (
"context"
"github.com/Shopify/sarama"
"github.com/SkyAPM/go2sky"
)
func (client *Client) initKafka(config KafkaConfig) {
if config.extractor == nil {
client.kafkaExtractor = KafkaExtractor
} else {
client.kafkaExtractor = config.extractor
}
if config.injector == nil {
client.kafkaInjector = KafkaInjector
} else {
client.kafkaInjector = config.injector
}
}
func (client *Client) CreateKafkaEntrySpan(ctx context.Context, topic, method string, msg *sarama.ConsumerMessage) (go2sky.Span, context.Context) {
span, nCtx := client.CreateEntrySpan(ctx, topic, client.kafkaExtractor(msg))
span.Tag(TagMQType, MQTypeKafka)
span.Tag(go2sky.TagMQTopic, topic)
span.Tag(TagMQMethod, method)
return span, nCtx
}
func (client *Client) CreateKafkaExitSpan(ctx context.Context, topic, method, address string, msg *sarama.ProducerMessage) go2sky.Span {
span := client.CreateExitSpan(ctx, topic, address, client.kafkaInjector(msg))
span.Tag(TagMQType, MQTypeKafka)
span.Tag(go2sky.TagMQTopic, topic)
span.Tag(TagMQMethod, method)
return span
}