package cluster import ( "sort" "sync" "sync/atomic" "time" "github.com/Shopify/sarama" ) // Consumer is a cluster group consumer type Consumer struct { client *Client ownClient bool consumer sarama.Consumer subs *partitionMap consumerID string groupID string memberID string generationID int32 membershipMu sync.RWMutex coreTopics []string extraTopics []string dying, dead chan none closeOnce sync.Once consuming int32 messages chan *sarama.ConsumerMessage errors chan error partitions chan PartitionConsumer notifications chan *Notification commitMu sync.Mutex } // NewConsumer initializes a new consumer func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) { client, err := NewClient(addrs, config) if err != nil { return nil, err } consumer, err := NewConsumerFromClient(client, groupID, topics) if err != nil { return nil, err } consumer.ownClient = true return consumer, nil } // NewConsumerFromClient initializes a new consumer from an existing client. // // Please note that clients cannot be shared between consumers (due to Kafka internals), // they can only be re-used which requires the user to call Close() on the first consumer // before using this method again to initialize another one. Attempts to use a client with // more than one consumer at a time will return errors. func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) { if !client.claim() { return nil, errClientInUse } consumer, err := sarama.NewConsumerFromClient(client.Client) if err != nil { client.release() return nil, err } sort.Strings(topics) c := &Consumer{ client: client, consumer: consumer, subs: newPartitionMap(), groupID: groupID, coreTopics: topics, dying: make(chan none), dead: make(chan none), messages: make(chan *sarama.ConsumerMessage), errors: make(chan error, client.config.ChannelBufferSize), partitions: make(chan PartitionConsumer, 1), notifications: make(chan *Notification), } if err := c.client.RefreshCoordinator(groupID); err != nil { client.release() return nil, err } go c.mainLoop() return c, nil } // Messages returns the read channel for the messages that are returned by // the broker. // // This channel will only return if Config.Group.Mode option is set to // ConsumerModeMultiplex (default). func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages } // Partitions returns the read channels for individual partitions of this broker. // // This will channel will only return if Config.Group.Mode option is set to // ConsumerModePartitions. // // The Partitions() channel must be listened to for the life of this consumer; // when a rebalance happens old partitions will be closed (naturally come to // completion) and new ones will be emitted. The returned channel will only close // when the consumer is completely shut down. func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions } // Errors returns a read channel of errors that occur during offset management, if // enabled. By default, errors are logged and not returned over this channel. If // you want to implement any custom error handling, set your config's // Consumer.Return.Errors setting to true, and read from this channel. func (c *Consumer) Errors() <-chan error { return c.errors } // Notifications returns a channel of Notifications that occur during consumer // rebalancing. Notifications will only be emitted over this channel, if your config's // Group.Return.Notifications setting to true. func (c *Consumer) Notifications() <-chan *Notification { return c.notifications } // HighWaterMarks returns the current high water marks for each topic and partition // Consistency between partitions is not guaranteed since high water marks are updated separately. func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() } // MarkOffset marks the provided message as processed, alongside a metadata string // that represents the state of the partition consumer at that point in time. The // metadata string can be used by another consumer to restore that state, so it // can resume consumption. // // Note: calling MarkOffset does not necessarily commit the offset to the backend // store immediately for efficiency reasons, and it may never be committed if // your application crashes. This means that you may end up processing the same // message twice, and your processing should ideally be idempotent. func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil { sub.MarkOffset(msg.Offset, metadata) } } // MarkPartitionOffset marks an offset of the provided topic/partition as processed. // See MarkOffset for additional explanation. func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { if sub := c.subs.Fetch(topic, partition); sub != nil { sub.MarkOffset(offset, metadata) } } // MarkOffsets marks stashed offsets as processed. // See MarkOffset for additional explanation. func (c *Consumer) MarkOffsets(s *OffsetStash) { s.mu.Lock() defer s.mu.Unlock() for tp, info := range s.offsets { if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil { sub.MarkOffset(info.Offset, info.Metadata) } delete(s.offsets, tp) } } // ResetOffsets marks the provided message as processed, alongside a metadata string // that represents the state of the partition consumer at that point in time. The // metadata string can be used by another consumer to restore that state, so it // can resume consumption. // // Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) { if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil { sub.ResetOffset(msg.Offset, metadata) } } // ResetPartitionOffset marks an offset of the provided topic/partition as processed. // See ResetOffset for additional explanation. func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) { sub := c.subs.Fetch(topic, partition) if sub != nil { sub.ResetOffset(offset, metadata) } } // ResetOffsets marks stashed offsets as processed. // See ResetOffset for additional explanation. func (c *Consumer) ResetOffsets(s *OffsetStash) { s.mu.Lock() defer s.mu.Unlock() for tp, info := range s.offsets { if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil { sub.ResetOffset(info.Offset, info.Metadata) } delete(s.offsets, tp) } } // Subscriptions returns the consumed topics and partitions func (c *Consumer) Subscriptions() map[string][]int32 { return c.subs.Info() } // CommitOffsets allows to manually commit previously marked offsets. By default there is no // need to call this function as the consumer will commit offsets automatically // using the Config.Consumer.Offsets.CommitInterval setting. // // Please be aware that calling this function during an internal rebalance cycle may return // broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration). func (c *Consumer) CommitOffsets() error { c.commitMu.Lock() defer c.commitMu.Unlock() memberID, generationID := c.membership() req := &sarama.OffsetCommitRequest{ Version: 2, ConsumerGroup: c.groupID, ConsumerGroupGeneration: generationID, ConsumerID: memberID, RetentionTime: -1, } if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 { req.RetentionTime = int64(ns / time.Millisecond) } snap := c.subs.Snapshot() dirty := false for tp, state := range snap { if state.Dirty { dirty = true req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata) } } if !dirty { return nil } broker, err := c.client.Coordinator(c.groupID) if err != nil { c.closeCoordinator(broker, err) return err } resp, err := broker.CommitOffset(req) if err != nil { c.closeCoordinator(broker, err) return err } for topic, errs := range resp.Errors { for partition, kerr := range errs { if kerr != sarama.ErrNoError { err = kerr } else if state, ok := snap[topicPartition{topic, partition}]; ok { if sub := c.subs.Fetch(topic, partition); sub != nil { sub.markCommitted(state.Info.Offset) } } } } return err } // Close safely closes the consumer and releases all resources func (c *Consumer) Close() (err error) { c.closeOnce.Do(func() { close(c.dying) <-c.dead if e := c.release(); e != nil { err = e } if e := c.consumer.Close(); e != nil { err = e } close(c.messages) close(c.errors) if e := c.leaveGroup(); e != nil { err = e } close(c.partitions) close(c.notifications) // drain for range c.messages { } for range c.errors { } for p := range c.partitions { _ = p.Close() } for range c.notifications { } c.client.release() if c.ownClient { if e := c.client.Close(); e != nil { err = e } } }) return } func (c *Consumer) mainLoop() { defer close(c.dead) defer atomic.StoreInt32(&c.consuming, 0) for { atomic.StoreInt32(&c.consuming, 0) // Check if close was requested select { case <-c.dying: return default: } // Start next consume cycle c.nextTick() } } func (c *Consumer) nextTick() { // Remember previous subscriptions var notification *Notification if c.client.config.Group.Return.Notifications { notification = newNotification(c.subs.Info()) } // Refresh coordinator if err := c.refreshCoordinator(); err != nil { c.rebalanceError(err, nil) return } // Release subscriptions if err := c.release(); err != nil { c.rebalanceError(err, nil) return } // Issue rebalance start notification if c.client.config.Group.Return.Notifications { c.handleNotification(notification) } // Rebalance, fetch new subscriptions subs, err := c.rebalance() if err != nil { c.rebalanceError(err, notification) return } // Coordinate loops, make sure everything is // stopped on exit tomb := newLoopTomb() defer tomb.Close() // Start the heartbeat tomb.Go(c.hbLoop) // Subscribe to topic/partitions if err := c.subscribe(tomb, subs); err != nil { c.rebalanceError(err, notification) return } // Update/issue notification with new claims if c.client.config.Group.Return.Notifications { notification = notification.success(subs) c.handleNotification(notification) } // Start topic watcher loop tomb.Go(c.twLoop) // Start consuming and committing offsets tomb.Go(c.cmLoop) atomic.StoreInt32(&c.consuming, 1) // Wait for signals select { case <-tomb.Dying(): case <-c.dying: } } // heartbeat loop, triggered by the mainLoop func (c *Consumer) hbLoop(stopped <-chan none) { ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval) defer ticker.Stop() for { select { case <-ticker.C: switch err := c.heartbeat(); err { case nil, sarama.ErrNoError: case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress: return default: c.handleError(&Error{Ctx: "heartbeat", error: err}) return } case <-stopped: return case <-c.dying: return } } } // topic watcher loop, triggered by the mainLoop func (c *Consumer) twLoop(stopped <-chan none) { ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2) defer ticker.Stop() for { select { case <-ticker.C: topics, err := c.client.Topics() if err != nil { c.handleError(&Error{Ctx: "topics", error: err}) return } for _, topic := range topics { if !c.isKnownCoreTopic(topic) && !c.isKnownExtraTopic(topic) && c.isPotentialExtraTopic(topic) { return } } case <-stopped: return case <-c.dying: return } } } // commit loop, triggered by the mainLoop func (c *Consumer) cmLoop(stopped <-chan none) { ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval) defer ticker.Stop() for { select { case <-ticker.C: if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil { c.handleError(&Error{Ctx: "commit", error: err}) return } case <-stopped: return case <-c.dying: return } } } func (c *Consumer) rebalanceError(err error, n *Notification) { if n != nil { n.Type = RebalanceError c.handleNotification(n) } switch err { case sarama.ErrRebalanceInProgress: default: c.handleError(&Error{Ctx: "rebalance", error: err}) } select { case <-c.dying: case <-time.After(c.client.config.Metadata.Retry.Backoff): } } func (c *Consumer) handleNotification(n *Notification) { if c.client.config.Group.Return.Notifications { select { case c.notifications <- n: case <-c.dying: return } } } func (c *Consumer) handleError(e *Error) { if c.client.config.Consumer.Return.Errors { select { case c.errors <- e: case <-c.dying: return } } else { sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error()) } } // Releases the consumer and commits offsets, called from rebalance() and Close() func (c *Consumer) release() (err error) { // Stop all consumers c.subs.Stop() // Clear subscriptions on exit defer c.subs.Clear() // Wait for messages to be processed timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime) defer timeout.Stop() select { case <-c.dying: case <-timeout.C: } // Commit offsets, continue on errors if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil { err = e } return } // -------------------------------------------------------------------- // Performs a heartbeat, part of the mainLoop() func (c *Consumer) heartbeat() error { broker, err := c.client.Coordinator(c.groupID) if err != nil { c.closeCoordinator(broker, err) return err } memberID, generationID := c.membership() resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{ GroupId: c.groupID, MemberId: memberID, GenerationId: generationID, }) if err != nil { c.closeCoordinator(broker, err) return err } return resp.Err } // Performs a rebalance, part of the mainLoop() func (c *Consumer) rebalance() (map[string][]int32, error) { memberID, _ := c.membership() sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID) allTopics, err := c.client.Topics() if err != nil { return nil, err } c.extraTopics = c.selectExtraTopics(allTopics) sort.Strings(c.extraTopics) // Re-join consumer group strategy, err := c.joinGroup() switch { case err == sarama.ErrUnknownMemberId: c.membershipMu.Lock() c.memberID = "" c.membershipMu.Unlock() return nil, err case err != nil: return nil, err } // Sync consumer group state, fetch subscriptions subs, err := c.syncGroup(strategy) switch { case err == sarama.ErrRebalanceInProgress: return nil, err case err != nil: _ = c.leaveGroup() return nil, err } return subs, nil } // Performs the subscription, part of the mainLoop() func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error { // fetch offsets offsets, err := c.fetchOffsets(subs) if err != nil { _ = c.leaveGroup() return err } // create consumers in parallel var mu sync.Mutex var wg sync.WaitGroup for topic, partitions := range subs { for _, partition := range partitions { wg.Add(1) info := offsets[topic][partition] go func(topic string, partition int32) { if e := c.createConsumer(tomb, topic, partition, info); e != nil { mu.Lock() err = e mu.Unlock() } wg.Done() }(topic, partition) } } wg.Wait() if err != nil { _ = c.release() _ = c.leaveGroup() } return err } // -------------------------------------------------------------------- // Send a request to the broker to join group on rebalance() func (c *Consumer) joinGroup() (*balancer, error) { memberID, _ := c.membership() req := &sarama.JoinGroupRequest{ GroupId: c.groupID, MemberId: memberID, SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond), ProtocolType: "consumer", } meta := &sarama.ConsumerGroupMemberMetadata{ Version: 1, Topics: append(c.coreTopics, c.extraTopics...), UserData: c.client.config.Group.Member.UserData, } err := req.AddGroupProtocolMetadata(string(StrategyRange), meta) if err != nil { return nil, err } err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta) if err != nil { return nil, err } broker, err := c.client.Coordinator(c.groupID) if err != nil { c.closeCoordinator(broker, err) return nil, err } resp, err := broker.JoinGroup(req) if err != nil { c.closeCoordinator(broker, err) return nil, err } else if resp.Err != sarama.ErrNoError { c.closeCoordinator(broker, resp.Err) return nil, resp.Err } var strategy *balancer if resp.LeaderId == resp.MemberId { members, err := resp.GetMembers() if err != nil { return nil, err } strategy, err = newBalancerFromMeta(c.client, members) if err != nil { return nil, err } } c.membershipMu.Lock() c.memberID = resp.MemberId c.generationID = resp.GenerationId c.membershipMu.Unlock() return strategy, nil } // Send a request to the broker to sync the group on rebalance(). // Returns a list of topics and partitions to consume. func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) { memberID, generationID := c.membership() req := &sarama.SyncGroupRequest{ GroupId: c.groupID, MemberId: memberID, GenerationId: generationID, } if strategy != nil { for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) { if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{ Topics: topics, }); err != nil { return nil, err } } } broker, err := c.client.Coordinator(c.groupID) if err != nil { c.closeCoordinator(broker, err) return nil, err } resp, err := broker.SyncGroup(req) if err != nil { c.closeCoordinator(broker, err) return nil, err } else if resp.Err != sarama.ErrNoError { c.closeCoordinator(broker, resp.Err) return nil, resp.Err } // Return if there is nothing to subscribe to if len(resp.MemberAssignment) == 0 { return nil, nil } // Get assigned subscriptions members, err := resp.GetMemberAssignment() if err != nil { return nil, err } // Sort partitions, for each topic for topic := range members.Topics { sort.Sort(int32Slice(members.Topics[topic])) } return members.Topics, nil } // Fetches latest committed offsets for all subscriptions func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) { offsets := make(map[string]map[int32]offsetInfo, len(subs)) req := &sarama.OffsetFetchRequest{ Version: 1, ConsumerGroup: c.groupID, } for topic, partitions := range subs { offsets[topic] = make(map[int32]offsetInfo, len(partitions)) for _, partition := range partitions { offsets[topic][partition] = offsetInfo{Offset: -1} req.AddPartition(topic, partition) } } broker, err := c.client.Coordinator(c.groupID) if err != nil { c.closeCoordinator(broker, err) return nil, err } resp, err := broker.FetchOffset(req) if err != nil { c.closeCoordinator(broker, err) return nil, err } for topic, partitions := range subs { for _, partition := range partitions { block := resp.GetBlock(topic, partition) if block == nil { return nil, sarama.ErrIncompleteResponse } if block.Err == sarama.ErrNoError { offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata} } else { return nil, block.Err } } } return offsets, nil } // Send a request to the broker to leave the group on failes rebalance() and on Close() func (c *Consumer) leaveGroup() error { broker, err := c.client.Coordinator(c.groupID) if err != nil { c.closeCoordinator(broker, err) return err } memberID, _ := c.membership() if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{ GroupId: c.groupID, MemberId: memberID, }); err != nil { c.closeCoordinator(broker, err) } return err } // -------------------------------------------------------------------- func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error { memberID, _ := c.membership() sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial)) // Create partitionConsumer pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial) if err != nil { return err } // Store in subscriptions c.subs.Store(topic, partition, pc) // Start partition consumer goroutine tomb.Go(func(stopper <-chan none) { if c.client.config.Group.Mode == ConsumerModePartitions { pc.waitFor(stopper, c.errors) } else { pc.multiplex(stopper, c.messages, c.errors) } }) if c.client.config.Group.Mode == ConsumerModePartitions { c.partitions <- pc } return nil } func (c *Consumer) commitOffsetsWithRetry(retries int) error { err := c.CommitOffsets() if err != nil && retries > 0 { return c.commitOffsetsWithRetry(retries - 1) } return err } func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) { if broker != nil { _ = broker.Close() } switch err { case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer: _ = c.client.RefreshCoordinator(c.groupID) } } func (c *Consumer) selectExtraTopics(allTopics []string) []string { extra := allTopics[:0] for _, topic := range allTopics { if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) { extra = append(extra, topic) } } return extra } func (c *Consumer) isKnownCoreTopic(topic string) bool { pos := sort.SearchStrings(c.coreTopics, topic) return pos < len(c.coreTopics) && c.coreTopics[pos] == topic } func (c *Consumer) isKnownExtraTopic(topic string) bool { pos := sort.SearchStrings(c.extraTopics, topic) return pos < len(c.extraTopics) && c.extraTopics[pos] == topic } func (c *Consumer) isPotentialExtraTopic(topic string) bool { rx := c.client.config.Group.Topics if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) { return false } if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) { return true } return false } func (c *Consumer) refreshCoordinator() error { if err := c.refreshMetadata(); err != nil { return err } return c.client.RefreshCoordinator(c.groupID) } func (c *Consumer) refreshMetadata() (err error) { if c.client.config.Metadata.Full { err = c.client.RefreshMetadata() } else { var topics []string if topics, err = c.client.Topics(); err == nil && len(topics) != 0 { err = c.client.RefreshMetadata(topics...) } } // maybe we didn't have authorization to describe all topics switch err { case sarama.ErrTopicAuthorizationFailed: err = c.client.RefreshMetadata(c.coreTopics...) } return } func (c *Consumer) membership() (memberID string, generationID int32) { c.membershipMu.RLock() memberID, generationID = c.memberID, c.generationID c.membershipMu.RUnlock() return }