package cluster import ( "math" "sort" "github.com/Shopify/sarama" ) // NotificationType defines the type of notification type NotificationType uint8 // String describes the notification type func (t NotificationType) String() string { switch t { case RebalanceStart: return "rebalance start" case RebalanceOK: return "rebalance OK" case RebalanceError: return "rebalance error" } return "unknown" } const ( UnknownNotification NotificationType = iota RebalanceStart RebalanceOK RebalanceError ) // Notification are state events emitted by the consumers on rebalance type Notification struct { // Type exposes the notification type Type NotificationType // Claimed contains topic/partitions that were claimed by this rebalance cycle Claimed map[string][]int32 // Released contains topic/partitions that were released as part of this rebalance cycle Released map[string][]int32 // Current are topic/partitions that are currently claimed to the consumer Current map[string][]int32 } func newNotification(current map[string][]int32) *Notification { return &Notification{ Type: RebalanceStart, Current: current, } } func (n *Notification) success(current map[string][]int32) *Notification { o := &Notification{ Type: RebalanceOK, Claimed: make(map[string][]int32), Released: make(map[string][]int32), Current: current, } for topic, partitions := range current { o.Claimed[topic] = int32Slice(partitions).Diff(int32Slice(n.Current[topic])) } for topic, partitions := range n.Current { o.Released[topic] = int32Slice(partitions).Diff(int32Slice(current[topic])) } return o } // -------------------------------------------------------------------- type topicInfo struct { Partitions []int32 MemberIDs []string } func (info topicInfo) Perform(s Strategy) map[string][]int32 { if s == StrategyRoundRobin { return info.RoundRobin() } return info.Ranges() } func (info topicInfo) Ranges() map[string][]int32 { sort.Strings(info.MemberIDs) mlen := len(info.MemberIDs) plen := len(info.Partitions) res := make(map[string][]int32, mlen) for pos, memberID := range info.MemberIDs { n, i := float64(plen)/float64(mlen), float64(pos) min := int(math.Floor(i*n + 0.5)) max := int(math.Floor((i+1)*n + 0.5)) sub := info.Partitions[min:max] if len(sub) > 0 { res[memberID] = sub } } return res } func (info topicInfo) RoundRobin() map[string][]int32 { sort.Strings(info.MemberIDs) mlen := len(info.MemberIDs) res := make(map[string][]int32, mlen) for i, pnum := range info.Partitions { memberID := info.MemberIDs[i%mlen] res[memberID] = append(res[memberID], pnum) } return res } // -------------------------------------------------------------------- type balancer struct { client sarama.Client topics map[string]topicInfo } func newBalancerFromMeta(client sarama.Client, members map[string]sarama.ConsumerGroupMemberMetadata) (*balancer, error) { balancer := newBalancer(client) for memberID, meta := range members { for _, topic := range meta.Topics { if err := balancer.Topic(topic, memberID); err != nil { return nil, err } } } return balancer, nil } func newBalancer(client sarama.Client) *balancer { return &balancer{ client: client, topics: make(map[string]topicInfo), } } func (r *balancer) Topic(name string, memberID string) error { topic, ok := r.topics[name] if !ok { nums, err := r.client.Partitions(name) if err != nil { return err } topic = topicInfo{ Partitions: nums, MemberIDs: make([]string, 0, 1), } } topic.MemberIDs = append(topic.MemberIDs, memberID) r.topics[name] = topic return nil } func (r *balancer) Perform(s Strategy) map[string]map[string][]int32 { res := make(map[string]map[string][]int32, 1) for topic, info := range r.topics { for memberID, partitions := range info.Perform(s) { if _, ok := res[memberID]; !ok { res[memberID] = make(map[string][]int32, 1) } res[memberID][topic] = partitions } } return res }