70 lines
2.0 KiB
Go
70 lines
2.0 KiB
Go
|
package cluster
|
||
|
|
||
|
import (
|
||
|
"sync"
|
||
|
|
||
|
"github.com/Shopify/sarama"
|
||
|
)
|
||
|
|
||
|
// OffsetStash allows to accumulate offsets and
|
||
|
// mark them as processed in a bulk
|
||
|
type OffsetStash struct {
|
||
|
offsets map[topicPartition]offsetInfo
|
||
|
mu sync.Mutex
|
||
|
}
|
||
|
|
||
|
// NewOffsetStash inits a blank stash
|
||
|
func NewOffsetStash() *OffsetStash {
|
||
|
return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)}
|
||
|
}
|
||
|
|
||
|
// MarkOffset stashes the provided message offset
|
||
|
func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
|
||
|
s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
|
||
|
}
|
||
|
|
||
|
// MarkPartitionOffset stashes the offset for the provided topic/partition combination
|
||
|
func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
|
||
|
key := topicPartition{Topic: topic, Partition: partition}
|
||
|
if info := s.offsets[key]; offset >= info.Offset {
|
||
|
info.Offset = offset
|
||
|
info.Metadata = metadata
|
||
|
s.offsets[key] = info
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// ResetPartitionOffset stashes the offset for the provided topic/partition combination.
|
||
|
// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets
|
||
|
func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
|
||
|
key := topicPartition{Topic: topic, Partition: partition}
|
||
|
if info := s.offsets[key]; offset <= info.Offset {
|
||
|
info.Offset = offset
|
||
|
info.Metadata = metadata
|
||
|
s.offsets[key] = info
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// ResetOffset stashes the provided message offset
|
||
|
// See ResetPartitionOffset for explanation
|
||
|
func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
|
||
|
s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
|
||
|
}
|
||
|
|
||
|
// Offsets returns the latest stashed offsets by topic-partition
|
||
|
func (s *OffsetStash) Offsets() map[string]int64 {
|
||
|
s.mu.Lock()
|
||
|
defer s.mu.Unlock()
|
||
|
|
||
|
res := make(map[string]int64, len(s.offsets))
|
||
|
for tp, info := range s.offsets {
|
||
|
res[tp.String()] = info.Offset
|
||
|
}
|
||
|
return res
|
||
|
}
|