38 lines
806 B
Go
38 lines
806 B
Go
|
package apm
|
||
|
|
||
|
import (
|
||
|
"github.com/Shopify/sarama"
|
||
|
"github.com/SkyAPM/go2sky/propagation"
|
||
|
)
|
||
|
|
||
|
func KafkaExtractor(msg *sarama.ConsumerMessage) propagation.Extractor {
|
||
|
return func() (s string, e error) {
|
||
|
if msg == nil || msg.Headers == nil {
|
||
|
return "", nil
|
||
|
}
|
||
|
for _, header := range msg.Headers {
|
||
|
if string(header.Key) == propagation.Header {
|
||
|
return string(header.Value), nil
|
||
|
}
|
||
|
}
|
||
|
return "", nil
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func KafkaInjector(msg *sarama.ProducerMessage) propagation.Injector {
|
||
|
return func(header string) error {
|
||
|
if msg == nil {
|
||
|
return nil
|
||
|
}
|
||
|
if msg.Headers == nil {
|
||
|
msg.Headers = []sarama.RecordHeader{}
|
||
|
}
|
||
|
sw8Header := sarama.RecordHeader{
|
||
|
Key: []byte(propagation.Header),
|
||
|
Value: []byte(header),
|
||
|
}
|
||
|
msg.Headers = append(msg.Headers, sw8Header)
|
||
|
return nil
|
||
|
}
|
||
|
}
|