package kafka import ( "context" "finclip-app-manager/infrastructure/config" "os" "os/signal" "strings" "syscall" "time" "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" "gitlab.finogeeks.club/finclip-backend/apm" ) type ConsumerCallbackHandler func(ctx context.Context, input []byte) error //使用者需要自定义该回调函数实现体 func StartKafakClusterClient(kafkaAddr, topic, group string, handler ConsumerCallbackHandler) { version, err := sarama.ParseKafkaVersion(config.GetConfig().KafkaVersion) if err != nil { log.Errorln("kafka err:" + err.Error()) return } kafkaConfig := cluster.NewConfig() kafkaConfig.Consumer.Return.Errors = true kafkaConfig.Group.Return.Notifications = true kafkaConfig.Consumer.Offsets.Initial = sarama.OffsetNewest //OffsetOldest OffsetNewest kafkaConfig.Consumer.Offsets.CommitInterval = 1 * time.Second kafkaConfig.Version = version if config.GetConfig().KafkaUser != "" && config.GetConfig().KafkaPwd != "" { kafkaConfig.Net.SASL.Enable = true kafkaConfig.Net.SASL.User = config.Cfg.KafkaUser kafkaConfig.Net.SASL.Password = config.Cfg.KafkaPwd kafkaConfig.Net.SASL.Handshake = true if config.GetConfig().KafkaMechanism == sarama.SASLTypeSCRAMSHA256 { kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA256} } kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA256 } else if config.GetConfig().KafkaMechanism == sarama.SASLTypeSCRAMSHA512 { kafkaConfig.Net.SASL.SCRAMClientGeneratorFunc = func() sarama.SCRAMClient { return &XDGSCRAMClient{HashGeneratorFcn: SHA512} } kafkaConfig.Net.SASL.Mechanism = sarama.SASLTypeSCRAMSHA512 } else { kafkaConfig.Net.SASL.Mechanism = sarama.SASLMechanism(config.GetConfig().KafkaMechanism) } } var consumer *cluster.Consumer for { var err error consumer, err = cluster.NewConsumer(strings.Split(kafkaAddr, ","), group, strings.Split(topic, ","), kafkaConfig) if err != nil { log.Errorln("Failed to start kafka consumer,err:" + err.Error()) time.Sleep(time.Millisecond * 5000) continue } else { break } } defer consumer.Close() // Create signal channel sigchan := make(chan os.Signal, 1) signal.Notify(sigchan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM) log.Debugln("Sarama consumer up and running!...") // Consume all channels, wait for signal to exit for { select { case msg, more := <-consumer.Messages(): if more { func() { span, ctx := apm.ApmClient().CreateKafkaEntrySpan(context.Background(), msg.Topic, "handler", msg) defer span.End() consumer.MarkOffset(msg, "") log.Debugf("Message claimed: value = %s, timestamp = %v, topic = %s", string(msg.Value), msg.Timestamp, msg.Topic) if err = handler(ctx, msg.Value); err != nil { log.Errorln("Callback handler err:" + err.Error()) } }() } case ntf, more := <-consumer.Notifications(): if more { log.Debugf("Rebalanced: %+v", ntf) } case err, more := <-consumer.Errors(): if more { log.Debugln("Error:" + err.Error()) } case <-sigchan: { log.Debugln("Exit signal!") } return } } }