291 lines
5.9 KiB
Go
291 lines
5.9 KiB
Go
package cluster
|
|
|
|
import (
|
|
"sort"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/Shopify/sarama"
|
|
)
|
|
|
|
// PartitionConsumer allows code to consume individual partitions from the cluster.
|
|
//
|
|
// See docs for Consumer.Partitions() for more on how to implement this.
|
|
type PartitionConsumer interface {
|
|
sarama.PartitionConsumer
|
|
|
|
// Topic returns the consumed topic name
|
|
Topic() string
|
|
|
|
// Partition returns the consumed partition
|
|
Partition() int32
|
|
|
|
// InitialOffset returns the offset used for creating the PartitionConsumer instance.
|
|
// The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest
|
|
InitialOffset() int64
|
|
|
|
// MarkOffset marks the offset of a message as preocessed.
|
|
MarkOffset(offset int64, metadata string)
|
|
|
|
// ResetOffset resets the offset to a previously processed message.
|
|
ResetOffset(offset int64, metadata string)
|
|
}
|
|
|
|
type partitionConsumer struct {
|
|
sarama.PartitionConsumer
|
|
|
|
state partitionState
|
|
mu sync.Mutex
|
|
|
|
topic string
|
|
partition int32
|
|
initialOffset int64
|
|
|
|
closeOnce sync.Once
|
|
closeErr error
|
|
|
|
dying, dead chan none
|
|
}
|
|
|
|
func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) {
|
|
offset := info.NextOffset(defaultOffset)
|
|
pcm, err := manager.ConsumePartition(topic, partition, offset)
|
|
|
|
// Resume from default offset, if requested offset is out-of-range
|
|
if err == sarama.ErrOffsetOutOfRange {
|
|
info.Offset = -1
|
|
offset = defaultOffset
|
|
pcm, err = manager.ConsumePartition(topic, partition, offset)
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &partitionConsumer{
|
|
PartitionConsumer: pcm,
|
|
state: partitionState{Info: info},
|
|
|
|
topic: topic,
|
|
partition: partition,
|
|
initialOffset: offset,
|
|
|
|
dying: make(chan none),
|
|
dead: make(chan none),
|
|
}, nil
|
|
}
|
|
|
|
// Topic implements PartitionConsumer
|
|
func (c *partitionConsumer) Topic() string { return c.topic }
|
|
|
|
// Partition implements PartitionConsumer
|
|
func (c *partitionConsumer) Partition() int32 { return c.partition }
|
|
|
|
// InitialOffset implements PartitionConsumer
|
|
func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset }
|
|
|
|
// AsyncClose implements PartitionConsumer
|
|
func (c *partitionConsumer) AsyncClose() {
|
|
c.closeOnce.Do(func() {
|
|
c.closeErr = c.PartitionConsumer.Close()
|
|
close(c.dying)
|
|
})
|
|
}
|
|
|
|
// Close implements PartitionConsumer
|
|
func (c *partitionConsumer) Close() error {
|
|
c.AsyncClose()
|
|
<-c.dead
|
|
return c.closeErr
|
|
}
|
|
|
|
func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) {
|
|
defer close(c.dead)
|
|
|
|
for {
|
|
select {
|
|
case err, ok := <-c.Errors():
|
|
if !ok {
|
|
return
|
|
}
|
|
select {
|
|
case errors <- err:
|
|
case <-stopper:
|
|
return
|
|
case <-c.dying:
|
|
return
|
|
}
|
|
case <-stopper:
|
|
return
|
|
case <-c.dying:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
|
|
defer close(c.dead)
|
|
|
|
for {
|
|
select {
|
|
case msg, ok := <-c.Messages():
|
|
if !ok {
|
|
return
|
|
}
|
|
select {
|
|
case messages <- msg:
|
|
case <-stopper:
|
|
return
|
|
case <-c.dying:
|
|
return
|
|
}
|
|
case err, ok := <-c.Errors():
|
|
if !ok {
|
|
return
|
|
}
|
|
select {
|
|
case errors <- err:
|
|
case <-stopper:
|
|
return
|
|
case <-c.dying:
|
|
return
|
|
}
|
|
case <-stopper:
|
|
return
|
|
case <-c.dying:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *partitionConsumer) getState() partitionState {
|
|
c.mu.Lock()
|
|
state := c.state
|
|
c.mu.Unlock()
|
|
|
|
return state
|
|
}
|
|
|
|
func (c *partitionConsumer) markCommitted(offset int64) {
|
|
c.mu.Lock()
|
|
if offset == c.state.Info.Offset {
|
|
c.state.Dirty = false
|
|
}
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
// MarkOffset implements PartitionConsumer
|
|
func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
|
|
c.mu.Lock()
|
|
if next := offset + 1; next > c.state.Info.Offset {
|
|
c.state.Info.Offset = next
|
|
c.state.Info.Metadata = metadata
|
|
c.state.Dirty = true
|
|
}
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
// ResetOffset implements PartitionConsumer
|
|
func (c *partitionConsumer) ResetOffset(offset int64, metadata string) {
|
|
c.mu.Lock()
|
|
if next := offset + 1; next <= c.state.Info.Offset {
|
|
c.state.Info.Offset = next
|
|
c.state.Info.Metadata = metadata
|
|
c.state.Dirty = true
|
|
}
|
|
c.mu.Unlock()
|
|
}
|
|
|
|
// --------------------------------------------------------------------
|
|
|
|
type partitionState struct {
|
|
Info offsetInfo
|
|
Dirty bool
|
|
LastCommit time.Time
|
|
}
|
|
|
|
// --------------------------------------------------------------------
|
|
|
|
type partitionMap struct {
|
|
data map[topicPartition]*partitionConsumer
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func newPartitionMap() *partitionMap {
|
|
return &partitionMap{
|
|
data: make(map[topicPartition]*partitionConsumer),
|
|
}
|
|
}
|
|
|
|
func (m *partitionMap) IsSubscribedTo(topic string) bool {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
for tp := range m.data {
|
|
if tp.Topic == topic {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer {
|
|
m.mu.RLock()
|
|
pc, _ := m.data[topicPartition{topic, partition}]
|
|
m.mu.RUnlock()
|
|
return pc
|
|
}
|
|
|
|
func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) {
|
|
m.mu.Lock()
|
|
m.data[topicPartition{topic, partition}] = pc
|
|
m.mu.Unlock()
|
|
}
|
|
|
|
func (m *partitionMap) Snapshot() map[topicPartition]partitionState {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
snap := make(map[topicPartition]partitionState, len(m.data))
|
|
for tp, pc := range m.data {
|
|
snap[tp] = pc.getState()
|
|
}
|
|
return snap
|
|
}
|
|
|
|
func (m *partitionMap) Stop() {
|
|
m.mu.RLock()
|
|
defer m.mu.RUnlock()
|
|
|
|
var wg sync.WaitGroup
|
|
for tp := range m.data {
|
|
wg.Add(1)
|
|
go func(p *partitionConsumer) {
|
|
_ = p.Close()
|
|
wg.Done()
|
|
}(m.data[tp])
|
|
}
|
|
wg.Wait()
|
|
}
|
|
|
|
func (m *partitionMap) Clear() {
|
|
m.mu.Lock()
|
|
for tp := range m.data {
|
|
delete(m.data, tp)
|
|
}
|
|
m.mu.Unlock()
|
|
}
|
|
|
|
func (m *partitionMap) Info() map[string][]int32 {
|
|
info := make(map[string][]int32)
|
|
m.mu.RLock()
|
|
for tp := range m.data {
|
|
info[tp.Topic] = append(info[tp.Topic], tp.Partition)
|
|
}
|
|
m.mu.RUnlock()
|
|
|
|
for topic := range info {
|
|
sort.Sort(int32Slice(info[topic]))
|
|
}
|
|
return info
|
|
}
|