920 lines
23 KiB
Go
920 lines
23 KiB
Go
package cluster
|
|
|
|
import (
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/Shopify/sarama"
|
|
)
|
|
|
|
// Consumer is a cluster group consumer
|
|
type Consumer struct {
|
|
client *Client
|
|
ownClient bool
|
|
|
|
consumer sarama.Consumer
|
|
subs *partitionMap
|
|
|
|
consumerID string
|
|
groupID string
|
|
|
|
memberID string
|
|
generationID int32
|
|
membershipMu sync.RWMutex
|
|
|
|
coreTopics []string
|
|
extraTopics []string
|
|
|
|
dying, dead chan none
|
|
closeOnce sync.Once
|
|
|
|
consuming int32
|
|
messages chan *sarama.ConsumerMessage
|
|
errors chan error
|
|
partitions chan PartitionConsumer
|
|
notifications chan *Notification
|
|
|
|
commitMu sync.Mutex
|
|
}
|
|
|
|
// NewConsumer initializes a new consumer
|
|
func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) {
|
|
client, err := NewClient(addrs, config)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
consumer, err := NewConsumerFromClient(client, groupID, topics)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
consumer.ownClient = true
|
|
return consumer, nil
|
|
}
|
|
|
|
// NewConsumerFromClient initializes a new consumer from an existing client.
|
|
//
|
|
// Please note that clients cannot be shared between consumers (due to Kafka internals),
|
|
// they can only be re-used which requires the user to call Close() on the first consumer
|
|
// before using this method again to initialize another one. Attempts to use a client with
|
|
// more than one consumer at a time will return errors.
|
|
func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) {
|
|
if !client.claim() {
|
|
return nil, errClientInUse
|
|
}
|
|
|
|
consumer, err := sarama.NewConsumerFromClient(client.Client)
|
|
if err != nil {
|
|
client.release()
|
|
return nil, err
|
|
}
|
|
|
|
sort.Strings(topics)
|
|
c := &Consumer{
|
|
client: client,
|
|
consumer: consumer,
|
|
subs: newPartitionMap(),
|
|
groupID: groupID,
|
|
|
|
coreTopics: topics,
|
|
|
|
dying: make(chan none),
|
|
dead: make(chan none),
|
|
|
|
messages: make(chan *sarama.ConsumerMessage),
|
|
errors: make(chan error, client.config.ChannelBufferSize),
|
|
partitions: make(chan PartitionConsumer, 1),
|
|
notifications: make(chan *Notification),
|
|
}
|
|
if err := c.client.RefreshCoordinator(groupID); err != nil {
|
|
client.release()
|
|
return nil, err
|
|
}
|
|
|
|
go c.mainLoop()
|
|
return c, nil
|
|
}
|
|
|
|
// Messages returns the read channel for the messages that are returned by
|
|
// the broker.
|
|
//
|
|
// This channel will only return if Config.Group.Mode option is set to
|
|
// ConsumerModeMultiplex (default).
|
|
func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages }
|
|
|
|
// Partitions returns the read channels for individual partitions of this broker.
|
|
//
|
|
// This will channel will only return if Config.Group.Mode option is set to
|
|
// ConsumerModePartitions.
|
|
//
|
|
// The Partitions() channel must be listened to for the life of this consumer;
|
|
// when a rebalance happens old partitions will be closed (naturally come to
|
|
// completion) and new ones will be emitted. The returned channel will only close
|
|
// when the consumer is completely shut down.
|
|
func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions }
|
|
|
|
// Errors returns a read channel of errors that occur during offset management, if
|
|
// enabled. By default, errors are logged and not returned over this channel. If
|
|
// you want to implement any custom error handling, set your config's
|
|
// Consumer.Return.Errors setting to true, and read from this channel.
|
|
func (c *Consumer) Errors() <-chan error { return c.errors }
|
|
|
|
// Notifications returns a channel of Notifications that occur during consumer
|
|
// rebalancing. Notifications will only be emitted over this channel, if your config's
|
|
// Group.Return.Notifications setting to true.
|
|
func (c *Consumer) Notifications() <-chan *Notification { return c.notifications }
|
|
|
|
// HighWaterMarks returns the current high water marks for each topic and partition
|
|
// Consistency between partitions is not guaranteed since high water marks are updated separately.
|
|
func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() }
|
|
|
|
// MarkOffset marks the provided message as processed, alongside a metadata string
|
|
// that represents the state of the partition consumer at that point in time. The
|
|
// metadata string can be used by another consumer to restore that state, so it
|
|
// can resume consumption.
|
|
//
|
|
// Note: calling MarkOffset does not necessarily commit the offset to the backend
|
|
// store immediately for efficiency reasons, and it may never be committed if
|
|
// your application crashes. This means that you may end up processing the same
|
|
// message twice, and your processing should ideally be idempotent.
|
|
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
|
|
if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
|
|
sub.MarkOffset(msg.Offset, metadata)
|
|
}
|
|
}
|
|
|
|
// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
|
|
// See MarkOffset for additional explanation.
|
|
func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
|
|
if sub := c.subs.Fetch(topic, partition); sub != nil {
|
|
sub.MarkOffset(offset, metadata)
|
|
}
|
|
}
|
|
|
|
// MarkOffsets marks stashed offsets as processed.
|
|
// See MarkOffset for additional explanation.
|
|
func (c *Consumer) MarkOffsets(s *OffsetStash) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
for tp, info := range s.offsets {
|
|
if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
|
|
sub.MarkOffset(info.Offset, info.Metadata)
|
|
}
|
|
delete(s.offsets, tp)
|
|
}
|
|
}
|
|
|
|
// ResetOffsets marks the provided message as processed, alongside a metadata string
|
|
// that represents the state of the partition consumer at that point in time. The
|
|
// metadata string can be used by another consumer to restore that state, so it
|
|
// can resume consumption.
|
|
//
|
|
// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
|
|
func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
|
|
if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
|
|
sub.ResetOffset(msg.Offset, metadata)
|
|
}
|
|
}
|
|
|
|
// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
|
|
// See ResetOffset for additional explanation.
|
|
func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
|
|
sub := c.subs.Fetch(topic, partition)
|
|
if sub != nil {
|
|
sub.ResetOffset(offset, metadata)
|
|
}
|
|
}
|
|
|
|
// ResetOffsets marks stashed offsets as processed.
|
|
// See ResetOffset for additional explanation.
|
|
func (c *Consumer) ResetOffsets(s *OffsetStash) {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
for tp, info := range s.offsets {
|
|
if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
|
|
sub.ResetOffset(info.Offset, info.Metadata)
|
|
}
|
|
delete(s.offsets, tp)
|
|
}
|
|
}
|
|
|
|
// Subscriptions returns the consumed topics and partitions
|
|
func (c *Consumer) Subscriptions() map[string][]int32 {
|
|
return c.subs.Info()
|
|
}
|
|
|
|
// CommitOffsets allows to manually commit previously marked offsets. By default there is no
|
|
// need to call this function as the consumer will commit offsets automatically
|
|
// using the Config.Consumer.Offsets.CommitInterval setting.
|
|
//
|
|
// Please be aware that calling this function during an internal rebalance cycle may return
|
|
// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).
|
|
func (c *Consumer) CommitOffsets() error {
|
|
c.commitMu.Lock()
|
|
defer c.commitMu.Unlock()
|
|
|
|
memberID, generationID := c.membership()
|
|
req := &sarama.OffsetCommitRequest{
|
|
Version: 2,
|
|
ConsumerGroup: c.groupID,
|
|
ConsumerGroupGeneration: generationID,
|
|
ConsumerID: memberID,
|
|
RetentionTime: -1,
|
|
}
|
|
|
|
if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 {
|
|
req.RetentionTime = int64(ns / time.Millisecond)
|
|
}
|
|
|
|
snap := c.subs.Snapshot()
|
|
dirty := false
|
|
for tp, state := range snap {
|
|
if state.Dirty {
|
|
dirty = true
|
|
req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata)
|
|
}
|
|
}
|
|
if !dirty {
|
|
return nil
|
|
}
|
|
|
|
broker, err := c.client.Coordinator(c.groupID)
|
|
if err != nil {
|
|
c.closeCoordinator(broker, err)
|
|
return err
|
|
}
|
|
|
|
resp, err := broker.CommitOffset(req)
|
|
if err != nil {
|
|
c.closeCoordinator(broker, err)
|
|
return err
|
|
}
|
|
|
|
for topic, errs := range resp.Errors {
|
|
for partition, kerr := range errs {
|
|
if kerr != sarama.ErrNoError {
|
|
err = kerr
|
|
} else if state, ok := snap[topicPartition{topic, partition}]; ok {
|
|
if sub := c.subs.Fetch(topic, partition); sub != nil {
|
|
sub.markCommitted(state.Info.Offset)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Close safely closes the consumer and releases all resources
|
|
func (c *Consumer) Close() (err error) {
|
|
c.closeOnce.Do(func() {
|
|
close(c.dying)
|
|
<-c.dead
|
|
|
|
if e := c.release(); e != nil {
|
|
err = e
|
|
}
|
|
if e := c.consumer.Close(); e != nil {
|
|
err = e
|
|
}
|
|
close(c.messages)
|
|
close(c.errors)
|
|
|
|
if e := c.leaveGroup(); e != nil {
|
|
err = e
|
|
}
|
|
close(c.partitions)
|
|
close(c.notifications)
|
|
|
|
// drain
|
|
for range c.messages {
|
|
}
|
|
for range c.errors {
|
|
}
|
|
for p := range c.partitions {
|
|
_ = p.Close()
|
|
}
|
|
for range c.notifications {
|
|
}
|
|
|
|
c.client.release()
|
|
if c.ownClient {
|
|
if e := c.client.Close(); e != nil {
|
|
err = e
|
|
}
|
|
}
|
|
})
|
|
return
|
|
}
|
|
|
|
func (c *Consumer) mainLoop() {
|
|
defer close(c.dead)
|
|
defer atomic.StoreInt32(&c.consuming, 0)
|
|
|
|
for {
|
|
atomic.StoreInt32(&c.consuming, 0)
|
|
|
|
// Check if close was requested
|
|
select {
|
|
case <-c.dying:
|
|
return
|
|
default:
|
|
}
|
|
|
|
// Start next consume cycle
|
|
c.nextTick()
|
|
}
|
|
}
|
|
|
|
func (c *Consumer) nextTick() {
|
|
// Remember previous subscriptions
|
|
var notification *Notification
|
|
if c.client.config.Group.Return.Notifications {
|
|
notification = newNotification(c.subs.Info())
|
|
}
|
|
|
|
// Refresh coordinator
|
|
if err := c.refreshCoordinator(); err != nil {
|
|
c.rebalanceError(err, nil)
|
|
return
|
|
}
|
|
|
|
// Release subscriptions
|
|
if err := c.release(); err != nil {
|
|
c.rebalanceError(err, nil)
|
|
return
|
|
}
|
|
|
|
// Issue rebalance start notification
|
|
if c.client.config.Group.Return.Notifications {
|
|
c.handleNotification(notification)
|
|
}
|
|
|
|
// Rebalance, fetch new subscriptions
|
|
subs, err := c.rebalance()
|
|
if err != nil {
|
|
c.rebalanceError(err, notification)
|
|
return
|
|
}
|
|
|
|
// Coordinate loops, make sure everything is
|
|
// stopped on exit
|
|
tomb := newLoopTomb()
|
|
defer tomb.Close()
|
|
|
|
// Start the heartbeat
|
|
tomb.Go(c.hbLoop)
|
|
|
|
// Subscribe to topic/partitions
|
|
if err := c.subscribe(tomb, subs); err != nil {
|
|
c.rebalanceError(err, notification)
|
|
return
|
|
}
|
|
|
|
// Update/issue notification with new claims
|
|
if c.client.config.Group.Return.Notifications {
|
|
notification = notification.success(subs)
|
|
c.handleNotification(notification)
|
|
}
|
|
|
|
// Start topic watcher loop
|
|
tomb.Go(c.twLoop)
|
|
|
|
// Start consuming and committing offsets
|
|
tomb.Go(c.cmLoop)
|
|
atomic.StoreInt32(&c.consuming, 1)
|
|
|
|
// Wait for signals
|
|
select {
|
|
case <-tomb.Dying():
|
|
case <-c.dying:
|
|
}
|
|
}
|
|
|
|
// heartbeat loop, triggered by the mainLoop
|
|
func (c *Consumer) hbLoop(stopped <-chan none) {
|
|
ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
switch err := c.heartbeat(); err {
|
|
case nil, sarama.ErrNoError:
|
|
case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress:
|
|
return
|
|
default:
|
|
c.handleError(&Error{Ctx: "heartbeat", error: err})
|
|
return
|
|
}
|
|
case <-stopped:
|
|
return
|
|
case <-c.dying:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// topic watcher loop, triggered by the mainLoop
|
|
func (c *Consumer) twLoop(stopped <-chan none) {
|
|
ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
topics, err := c.client.Topics()
|
|
if err != nil {
|
|
c.handleError(&Error{Ctx: "topics", error: err})
|
|
return
|
|
}
|
|
|
|
for _, topic := range topics {
|
|
if !c.isKnownCoreTopic(topic) &&
|
|
!c.isKnownExtraTopic(topic) &&
|
|
c.isPotentialExtraTopic(topic) {
|
|
return
|
|
}
|
|
}
|
|
case <-stopped:
|
|
return
|
|
case <-c.dying:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// commit loop, triggered by the mainLoop
|
|
func (c *Consumer) cmLoop(stopped <-chan none) {
|
|
ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil {
|
|
c.handleError(&Error{Ctx: "commit", error: err})
|
|
return
|
|
}
|
|
case <-stopped:
|
|
return
|
|
case <-c.dying:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Consumer) rebalanceError(err error, n *Notification) {
|
|
if n != nil {
|
|
n.Type = RebalanceError
|
|
c.handleNotification(n)
|
|
}
|
|
|
|
switch err {
|
|
case sarama.ErrRebalanceInProgress:
|
|
default:
|
|
c.handleError(&Error{Ctx: "rebalance", error: err})
|
|
}
|
|
|
|
select {
|
|
case <-c.dying:
|
|
case <-time.After(c.client.config.Metadata.Retry.Backoff):
|
|
}
|
|
}
|
|
|
|
func (c *Consumer) handleNotification(n *Notification) {
|
|
if c.client.config.Group.Return.Notifications {
|
|
select {
|
|
case c.notifications <- n:
|
|
case <-c.dying:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *Consumer) handleError(e *Error) {
|
|
if c.client.config.Consumer.Return.Errors {
|
|
select {
|
|
case c.errors <- e:
|
|
case <-c.dying:
|
|
return
|
|
}
|
|
} else {
|
|
sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error())
|
|
}
|
|
}
|
|
|
|
// Releases the consumer and commits offsets, called from rebalance() and Close()
|
|
func (c *Consumer) release() (err error) {
|
|
// Stop all consumers
|
|
c.subs.Stop()
|
|
|
|
// Clear subscriptions on exit
|
|
defer c.subs.Clear()
|
|
|
|
// Wait for messages to be processed
|
|
timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime)
|
|
defer timeout.Stop()
|
|
|
|
select {
|
|
case <-c.dying:
|
|
case <-timeout.C:
|
|
}
|
|
|
|
// Commit offsets, continue on errors
|
|
if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil {
|
|
err = e
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// --------------------------------------------------------------------
|
|
|
|
// Performs a heartbeat, part of the mainLoop()
|
|
func (c *Consumer) heartbeat() error {
|
|
broker, err := c.client.Coordinator(c.groupID)
|
|
if err != nil {
|
|
c.closeCoordinator(broker, err)
|
|
return err
|
|
}
|
|
|
|
memberID, generationID := c.membership()
|
|
resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{
|
|
GroupId: c.groupID,
|
|
MemberId: memberID,
|
|
GenerationId: generationID,
|
|
})
|
|
if err != nil {
|
|
c.closeCoordinator(broker, err)
|
|
return err
|
|
}
|
|
return resp.Err
|
|
}
|
|
|
|
// Performs a rebalance, part of the mainLoop()
|
|
func (c *Consumer) rebalance() (map[string][]int32, error) {
|
|
memberID, _ := c.membership()
|
|
sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID)
|
|
|
|
allTopics, err := c.client.Topics()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.extraTopics = c.selectExtraTopics(allTopics)
|
|
sort.Strings(c.extraTopics)
|
|
|
|
// Re-join consumer group
|
|
strategy, err := c.joinGroup()
|
|
switch {
|
|
case err == sarama.ErrUnknownMemberId:
|
|
c.membershipMu.Lock()
|
|
c.memberID = ""
|
|
c.membershipMu.Unlock()
|
|
return nil, err
|
|
case err != nil:
|
|
return nil, err
|
|
}
|
|
|
|
// Sync consumer group state, fetch subscriptions
|
|
subs, err := c.syncGroup(strategy)
|
|
switch {
|
|
case err == sarama.ErrRebalanceInProgress:
|
|
return nil, err
|
|
case err != nil:
|
|
_ = c.leaveGroup()
|
|
return nil, err
|
|
}
|
|
return subs, nil
|
|
}
|
|
|
|
// Performs the subscription, part of the mainLoop()
|
|
func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error {
|
|
// fetch offsets
|
|
offsets, err := c.fetchOffsets(subs)
|
|
if err != nil {
|
|
_ = c.leaveGroup()
|
|
return err
|
|
}
|
|
|
|
// create consumers in parallel
|
|
var mu sync.Mutex
|
|
var wg sync.WaitGroup
|
|
|
|
for topic, partitions := range subs {
|
|
for _, partition := range partitions {
|
|
wg.Add(1)
|
|
|
|
info := offsets[topic][partition]
|
|
go func(topic string, partition int32) {
|
|
if e := c.createConsumer(tomb, topic, partition, info); e != nil {
|
|
mu.Lock()
|
|
err = e
|
|
mu.Unlock()
|
|
}
|
|
wg.Done()
|
|
}(topic, partition)
|
|
}
|
|
}
|
|
wg.Wait()
|
|
|
|
if err != nil {
|
|
_ = c.release()
|
|
_ = c.leaveGroup()
|
|
}
|
|
return err
|
|
}
|
|
|
|
// --------------------------------------------------------------------
|
|
|
|
// Send a request to the broker to join group on rebalance()
|
|
func (c *Consumer) joinGroup() (*balancer, error) {
|
|
memberID, _ := c.membership()
|
|
req := &sarama.JoinGroupRequest{
|
|
GroupId: c.groupID,
|
|
MemberId: memberID,
|
|
SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond),
|
|
ProtocolType: "consumer",
|
|
}
|
|
|
|
meta := &sarama.ConsumerGroupMemberMetadata{
|
|
Version: 1,
|
|
Topics: append(c.coreTopics, c.extraTopics...),
|
|
UserData: c.client.config.Group.Member.UserData,
|
|
}
|
|
err := req.AddGroupProtocolMetadata(string(StrategyRange), meta)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
broker, err := c.client.Coordinator(c.groupID)
|
|
if err != nil {
|
|
c.closeCoordinator(broker, err)
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := broker.JoinGroup(req)
|
|
if err != nil {
|
|
c.closeCoordinator(broker, err)
|
|
return nil, err
|
|
} else if resp.Err != sarama.ErrNoError {
|
|
c.closeCoordinator(broker, resp.Err)
|
|
return nil, resp.Err
|
|
}
|
|
|
|
var strategy *balancer
|
|
if resp.LeaderId == resp.MemberId {
|
|
members, err := resp.GetMembers()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
strategy, err = newBalancerFromMeta(c.client, members)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
c.membershipMu.Lock()
|
|
c.memberID = resp.MemberId
|
|
c.generationID = resp.GenerationId
|
|
c.membershipMu.Unlock()
|
|
|
|
return strategy, nil
|
|
}
|
|
|
|
// Send a request to the broker to sync the group on rebalance().
|
|
// Returns a list of topics and partitions to consume.
|
|
func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) {
|
|
memberID, generationID := c.membership()
|
|
req := &sarama.SyncGroupRequest{
|
|
GroupId: c.groupID,
|
|
MemberId: memberID,
|
|
GenerationId: generationID,
|
|
}
|
|
|
|
if strategy != nil {
|
|
for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) {
|
|
if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
|
|
Topics: topics,
|
|
}); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
broker, err := c.client.Coordinator(c.groupID)
|
|
if err != nil {
|
|
c.closeCoordinator(broker, err)
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := broker.SyncGroup(req)
|
|
if err != nil {
|
|
c.closeCoordinator(broker, err)
|
|
return nil, err
|
|
} else if resp.Err != sarama.ErrNoError {
|
|
c.closeCoordinator(broker, resp.Err)
|
|
return nil, resp.Err
|
|
}
|
|
|
|
// Return if there is nothing to subscribe to
|
|
if len(resp.MemberAssignment) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
// Get assigned subscriptions
|
|
members, err := resp.GetMemberAssignment()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Sort partitions, for each topic
|
|
for topic := range members.Topics {
|
|
sort.Sort(int32Slice(members.Topics[topic]))
|
|
}
|
|
return members.Topics, nil
|
|
}
|
|
|
|
// Fetches latest committed offsets for all subscriptions
|
|
func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) {
|
|
offsets := make(map[string]map[int32]offsetInfo, len(subs))
|
|
req := &sarama.OffsetFetchRequest{
|
|
Version: 1,
|
|
ConsumerGroup: c.groupID,
|
|
}
|
|
|
|
for topic, partitions := range subs {
|
|
offsets[topic] = make(map[int32]offsetInfo, len(partitions))
|
|
for _, partition := range partitions {
|
|
offsets[topic][partition] = offsetInfo{Offset: -1}
|
|
req.AddPartition(topic, partition)
|
|
}
|
|
}
|
|
|
|
broker, err := c.client.Coordinator(c.groupID)
|
|
if err != nil {
|
|
c.closeCoordinator(broker, err)
|
|
return nil, err
|
|
}
|
|
|
|
resp, err := broker.FetchOffset(req)
|
|
if err != nil {
|
|
c.closeCoordinator(broker, err)
|
|
return nil, err
|
|
}
|
|
|
|
for topic, partitions := range subs {
|
|
for _, partition := range partitions {
|
|
block := resp.GetBlock(topic, partition)
|
|
if block == nil {
|
|
return nil, sarama.ErrIncompleteResponse
|
|
}
|
|
|
|
if block.Err == sarama.ErrNoError {
|
|
offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata}
|
|
} else {
|
|
return nil, block.Err
|
|
}
|
|
}
|
|
}
|
|
return offsets, nil
|
|
}
|
|
|
|
// Send a request to the broker to leave the group on failes rebalance() and on Close()
|
|
func (c *Consumer) leaveGroup() error {
|
|
broker, err := c.client.Coordinator(c.groupID)
|
|
if err != nil {
|
|
c.closeCoordinator(broker, err)
|
|
return err
|
|
}
|
|
|
|
memberID, _ := c.membership()
|
|
if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{
|
|
GroupId: c.groupID,
|
|
MemberId: memberID,
|
|
}); err != nil {
|
|
c.closeCoordinator(broker, err)
|
|
}
|
|
return err
|
|
}
|
|
|
|
// --------------------------------------------------------------------
|
|
|
|
func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error {
|
|
memberID, _ := c.membership()
|
|
sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial))
|
|
|
|
// Create partitionConsumer
|
|
pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Store in subscriptions
|
|
c.subs.Store(topic, partition, pc)
|
|
|
|
// Start partition consumer goroutine
|
|
tomb.Go(func(stopper <-chan none) {
|
|
if c.client.config.Group.Mode == ConsumerModePartitions {
|
|
pc.waitFor(stopper, c.errors)
|
|
} else {
|
|
pc.multiplex(stopper, c.messages, c.errors)
|
|
}
|
|
})
|
|
|
|
if c.client.config.Group.Mode == ConsumerModePartitions {
|
|
c.partitions <- pc
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Consumer) commitOffsetsWithRetry(retries int) error {
|
|
err := c.CommitOffsets()
|
|
if err != nil && retries > 0 {
|
|
return c.commitOffsetsWithRetry(retries - 1)
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) {
|
|
if broker != nil {
|
|
_ = broker.Close()
|
|
}
|
|
|
|
switch err {
|
|
case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
|
|
_ = c.client.RefreshCoordinator(c.groupID)
|
|
}
|
|
}
|
|
|
|
func (c *Consumer) selectExtraTopics(allTopics []string) []string {
|
|
extra := allTopics[:0]
|
|
for _, topic := range allTopics {
|
|
if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) {
|
|
extra = append(extra, topic)
|
|
}
|
|
}
|
|
return extra
|
|
}
|
|
|
|
func (c *Consumer) isKnownCoreTopic(topic string) bool {
|
|
pos := sort.SearchStrings(c.coreTopics, topic)
|
|
return pos < len(c.coreTopics) && c.coreTopics[pos] == topic
|
|
}
|
|
|
|
func (c *Consumer) isKnownExtraTopic(topic string) bool {
|
|
pos := sort.SearchStrings(c.extraTopics, topic)
|
|
return pos < len(c.extraTopics) && c.extraTopics[pos] == topic
|
|
}
|
|
|
|
func (c *Consumer) isPotentialExtraTopic(topic string) bool {
|
|
rx := c.client.config.Group.Topics
|
|
if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) {
|
|
return false
|
|
}
|
|
if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (c *Consumer) refreshCoordinator() error {
|
|
if err := c.refreshMetadata(); err != nil {
|
|
return err
|
|
}
|
|
return c.client.RefreshCoordinator(c.groupID)
|
|
}
|
|
|
|
func (c *Consumer) refreshMetadata() (err error) {
|
|
if c.client.config.Metadata.Full {
|
|
err = c.client.RefreshMetadata()
|
|
} else {
|
|
var topics []string
|
|
if topics, err = c.client.Topics(); err == nil && len(topics) != 0 {
|
|
err = c.client.RefreshMetadata(topics...)
|
|
}
|
|
}
|
|
|
|
// maybe we didn't have authorization to describe all topics
|
|
switch err {
|
|
case sarama.ErrTopicAuthorizationFailed:
|
|
err = c.client.RefreshMetadata(c.coreTopics...)
|
|
}
|
|
return
|
|
}
|
|
|
|
func (c *Consumer) membership() (memberID string, generationID int32) {
|
|
c.membershipMu.RLock()
|
|
memberID, generationID = c.memberID, c.generationID
|
|
c.membershipMu.RUnlock()
|
|
return
|
|
}
|