77 lines
2.4 KiB
Go
77 lines
2.4 KiB
Go
package kafka
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"finclip-app-manager/infrastructure/config"
|
|
|
|
"github.com/Shopify/sarama"
|
|
|
|
"gitlab.finogeeks.club/finclip-backend/apm"
|
|
)
|
|
|
|
var (
|
|
once sync.Once
|
|
)
|
|
|
|
func initProducerKafka() {
|
|
version, err := sarama.ParseKafkaVersion(config.GetConfig().KafkaVersion)
|
|
if err != nil {
|
|
log.Errorln("kafka err:" + err.Error())
|
|
return
|
|
}
|
|
|
|
kafkaConfig := sarama.NewConfig()
|
|
// 等待服务器所有副本都保存成功后的响应
|
|
kafkaConfig.Producer.RequiredAcks = sarama.WaitForAll
|
|
// 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区
|
|
kafkaConfig.Producer.Partitioner = sarama.NewRandomPartitioner
|
|
// 是否等待成功和失败后的响应
|
|
kafkaConfig.Producer.Return.Successes = true
|
|
kafkaConfig.Version = version
|
|
if config.GetConfig().KafkaUser != "" && config.GetConfig().KafkaPwd != "" {
|
|
kafkaConfig.Net.SASL.Enable = true
|
|
kafkaConfig.Net.SASL.User = config.Cfg.KafkaUser
|
|
kafkaConfig.Net.SASL.Password = config.Cfg.KafkaPwd
|
|
kafkaConfig.Net.SASL.Handshake = true
|
|
if config.GetConfig().KafkaMechanism == sarama.SASLTypeSCRAMSHA256 {
|
|
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
|
|
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
|
|
} else if config.GetConfig().KafkaMechanism == sarama.SASLTypeSCRAMSHA512 {
|
|
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
|
|
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
|
} else {
|
|
kafkaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(config.GetConfig().KafkaMechanism)
|
|
}
|
|
}
|
|
// 使用给定代理地址和配置创建一个同步生产者
|
|
producer, err = sarama.NewSyncProducer([]string{config.GetConfig().KafkaAddr}, kafkaConfig)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
|
|
func sendMsg(ctx context.Context, topic, value string) error {
|
|
once.Do(initProducerKafka)
|
|
|
|
msg := &sarama.ProducerMessage{
|
|
Topic: topic,
|
|
}
|
|
|
|
span := apm.ApmClient().CreateKafkaExitSpan(ctx, topic, "SendKafkaMsg", config.GetConfig().KafkaAddr, msg)
|
|
defer span.End()
|
|
|
|
//将字符串转换为字节数组
|
|
msg.Value = sarama.ByteEncoder(value)
|
|
_, _, err := producer.SendMessage(msg)
|
|
|
|
if err != nil {
|
|
log.Errorf("Send kafka[%s] message[%s], err:%s,", topic, value, err.Error())
|
|
return err
|
|
}
|
|
|
|
log.Infof("Send kafka[%s] message success, msg=%s", topic, value)
|
|
return nil
|
|
}
|