98 lines
3.1 KiB
Go
98 lines
3.1 KiB
Go
|
package kafka
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"finclip-app-manager/infrastructure/config"
|
||
|
"os"
|
||
|
"os/signal"
|
||
|
"strings"
|
||
|
"syscall"
|
||
|
"time"
|
||
|
|
||
|
"github.com/Shopify/sarama"
|
||
|
cluster "github.com/bsm/sarama-cluster"
|
||
|
"gitlab.finogeeks.club/finclip-backend/apm"
|
||
|
)
|
||
|
|
||
|
type ConsumerCallbackHandler func(ctx context.Context, input []byte) error //使用者需要自定义该回调函数实现体
|
||
|
|
||
|
func StartKafakClusterClient(kafkaAddr, topic, group string, handler ConsumerCallbackHandler) {
|
||
|
version, err := sarama.ParseKafkaVersion(config.GetConfig().KafkaVersion)
|
||
|
if err != nil {
|
||
|
log.Errorln("kafka err:" + err.Error())
|
||
|
return
|
||
|
}
|
||
|
kafkaConfig := cluster.NewConfig()
|
||
|
kafkaConfig.Consumer.Return.Errors = true
|
||
|
kafkaConfig.Group.Return.Notifications = true
|
||
|
kafkaConfig.Consumer.Offsets.Initial = sarama.OffsetNewest //OffsetOldest OffsetNewest
|
||
|
kafkaConfig.Consumer.Offsets.CommitInterval = 1 * time.Second
|
||
|
kafkaConfig.Version = version
|
||
|
if config.GetConfig().KafkaUser != "" && config.GetConfig().KafkaPwd != "" {
|
||
|
kafkaConfig.Net.SASL.Enable = true
|
||
|
kafkaConfig.Net.SASL.User = config.Cfg.KafkaUser
|
||
|
kafkaConfig.Net.SASL.Password = config.Cfg.KafkaPwd
|
||
|
kafkaConfig.Net.SASL.Handshake = true
|
||
|
if config.GetConfig().KafkaMechanism == sarama.SASLTypeSCRAMSHA256 {
|
||
|
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} }
|
||
|
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256
|
||
|
} else if config.GetConfig().KafkaMechanism == sarama.SASLTypeSCRAMSHA512 {
|
||
|
kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} }
|
||
|
kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512
|
||
|
} else {
|
||
|
kafkaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(config.GetConfig().KafkaMechanism)
|
||
|
}
|
||
|
}
|
||
|
var consumer *cluster.Consumer
|
||
|
for {
|
||
|
var err error
|
||
|
consumer, err = cluster.NewConsumer(strings.Split(kafkaAddr, ","), group, strings.Split(topic, ","), kafkaConfig)
|
||
|
if err != nil {
|
||
|
log.Errorln("Failed to start kafka consumer,err:" + err.Error())
|
||
|
time.Sleep(time.Millisecond * 5000)
|
||
|
continue
|
||
|
} else {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
|
||
|
defer consumer.Close()
|
||
|
|
||
|
// Create signal channel
|
||
|
sigchan := make(chan os.Signal, 1)
|
||
|
signal.Notify(sigchan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
|
||
|
|
||
|
log.Debugln("Sarama consumer up and running!...")
|
||
|
|
||
|
// Consume all channels, wait for signal to exit
|
||
|
for {
|
||
|
select {
|
||
|
case msg, more := <-consumer.Messages():
|
||
|
if more {
|
||
|
func() {
|
||
|
span, ctx := apm.ApmClient().CreateKafkaEntrySpan(context.Background(), msg.Topic, "handler", msg)
|
||
|
defer span.End()
|
||
|
consumer.MarkOffset(msg, "")
|
||
|
log.Debugf("Message claimed: value = %s, timestamp = %v, topic = %s", string(msg.Value), msg.Timestamp, msg.Topic)
|
||
|
if err = handler(ctx, msg.Value); err != nil {
|
||
|
log.Errorln("Callback handler err:" + err.Error())
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
case ntf, more := <-consumer.Notifications():
|
||
|
if more {
|
||
|
log.Debugf("Rebalanced: %+v", ntf)
|
||
|
}
|
||
|
case err, more := <-consumer.Errors():
|
||
|
if more {
|
||
|
log.Debugln("Error:" + err.Error())
|
||
|
}
|
||
|
case <-sigchan:
|
||
|
{
|
||
|
log.Debugln("Exit signal!")
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|