37 lines
1.1 KiB
Go
37 lines
1.1 KiB
Go
package apm
|
|
|
|
import (
|
|
"context"
|
|
"github.com/Shopify/sarama"
|
|
"github.com/SkyAPM/go2sky"
|
|
)
|
|
|
|
func (client *Client) initKafka(config KafkaConfig) {
|
|
if config.extractor == nil {
|
|
client.kafkaExtractor = KafkaExtractor
|
|
} else {
|
|
client.kafkaExtractor = config.extractor
|
|
}
|
|
if config.injector == nil {
|
|
client.kafkaInjector = KafkaInjector
|
|
} else {
|
|
client.kafkaInjector = config.injector
|
|
}
|
|
}
|
|
|
|
func (client *Client) CreateKafkaEntrySpan(ctx context.Context, topic, method string, msg *sarama.ConsumerMessage) (go2sky.Span, context.Context) {
|
|
span, nCtx := client.CreateEntrySpan(ctx, topic, client.kafkaExtractor(msg))
|
|
span.Tag(TagMQType, MQTypeKafka)
|
|
span.Tag(go2sky.TagMQTopic, topic)
|
|
span.Tag(TagMQMethod, method)
|
|
return span, nCtx
|
|
}
|
|
|
|
func (client *Client) CreateKafkaExitSpan(ctx context.Context, topic, method, address string, msg *sarama.ProducerMessage) go2sky.Span {
|
|
span := client.CreateExitSpan(ctx, topic, address, client.kafkaInjector(msg))
|
|
span.Tag(TagMQType, MQTypeKafka)
|
|
span.Tag(go2sky.TagMQTopic, topic)
|
|
span.Tag(TagMQMethod, method)
|
|
return span
|
|
}
|