942 lines
25 KiB
Go
942 lines
25 KiB
Go
package sarama
|
|
|
|
import (
|
|
"fmt"
|
|
"strings"
|
|
)
|
|
|
|
// TestReporter has methods matching go's testing.T to avoid importing
|
|
// `testing` in the main part of the library.
|
|
type TestReporter interface {
|
|
Error(...interface{})
|
|
Errorf(string, ...interface{})
|
|
Fatal(...interface{})
|
|
Fatalf(string, ...interface{})
|
|
}
|
|
|
|
// MockResponse is a response builder interface it defines one method that
|
|
// allows generating a response based on a request body. MockResponses are used
|
|
// to program behavior of MockBroker in tests.
|
|
type MockResponse interface {
|
|
For(reqBody versionedDecoder) (res encoder)
|
|
}
|
|
|
|
// MockWrapper is a mock response builder that returns a particular concrete
|
|
// response regardless of the actual request passed to the `For` method.
|
|
type MockWrapper struct {
|
|
res encoder
|
|
}
|
|
|
|
func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
|
|
return mw.res
|
|
}
|
|
|
|
func NewMockWrapper(res encoder) *MockWrapper {
|
|
return &MockWrapper{res: res}
|
|
}
|
|
|
|
// MockSequence is a mock response builder that is created from a sequence of
|
|
// concrete responses. Every time when a `MockBroker` calls its `For` method
|
|
// the next response from the sequence is returned. When the end of the
|
|
// sequence is reached the last element from the sequence is returned.
|
|
type MockSequence struct {
|
|
responses []MockResponse
|
|
}
|
|
|
|
func NewMockSequence(responses ...interface{}) *MockSequence {
|
|
ms := &MockSequence{}
|
|
ms.responses = make([]MockResponse, len(responses))
|
|
for i, res := range responses {
|
|
switch res := res.(type) {
|
|
case MockResponse:
|
|
ms.responses[i] = res
|
|
case encoder:
|
|
ms.responses[i] = NewMockWrapper(res)
|
|
default:
|
|
panic(fmt.Sprintf("Unexpected response type: %T", res))
|
|
}
|
|
}
|
|
return ms
|
|
}
|
|
|
|
func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
|
|
res = mc.responses[0].For(reqBody)
|
|
if len(mc.responses) > 1 {
|
|
mc.responses = mc.responses[1:]
|
|
}
|
|
return res
|
|
}
|
|
|
|
type MockListGroupsResponse struct {
|
|
groups map[string]string
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
|
|
return &MockListGroupsResponse{
|
|
groups: make(map[string]string),
|
|
t: t,
|
|
}
|
|
}
|
|
|
|
func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
|
|
request := reqBody.(*ListGroupsRequest)
|
|
_ = request
|
|
response := &ListGroupsResponse{
|
|
Groups: m.groups,
|
|
}
|
|
return response
|
|
}
|
|
|
|
func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
|
|
m.groups[groupID] = protocolType
|
|
return m
|
|
}
|
|
|
|
type MockDescribeGroupsResponse struct {
|
|
groups map[string]*GroupDescription
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
|
|
return &MockDescribeGroupsResponse{
|
|
t: t,
|
|
groups: make(map[string]*GroupDescription),
|
|
}
|
|
}
|
|
|
|
func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
|
|
m.groups[groupID] = description
|
|
return m
|
|
}
|
|
|
|
func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
|
|
request := reqBody.(*DescribeGroupsRequest)
|
|
|
|
response := &DescribeGroupsResponse{}
|
|
for _, requestedGroup := range request.Groups {
|
|
if group, ok := m.groups[requestedGroup]; ok {
|
|
response.Groups = append(response.Groups, group)
|
|
} else {
|
|
// Mimic real kafka - if a group doesn't exist, return
|
|
// an entry with state "Dead"
|
|
response.Groups = append(response.Groups, &GroupDescription{
|
|
GroupId: requestedGroup,
|
|
State: "Dead",
|
|
})
|
|
}
|
|
}
|
|
|
|
return response
|
|
}
|
|
|
|
// MockMetadataResponse is a `MetadataResponse` builder.
|
|
type MockMetadataResponse struct {
|
|
controllerID int32
|
|
leaders map[string]map[int32]int32
|
|
brokers map[string]int32
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
|
|
return &MockMetadataResponse{
|
|
leaders: make(map[string]map[int32]int32),
|
|
brokers: make(map[string]int32),
|
|
t: t,
|
|
}
|
|
}
|
|
|
|
func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
|
|
partitions := mmr.leaders[topic]
|
|
if partitions == nil {
|
|
partitions = make(map[int32]int32)
|
|
mmr.leaders[topic] = partitions
|
|
}
|
|
partitions[partition] = brokerID
|
|
return mmr
|
|
}
|
|
|
|
func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
|
|
mmr.brokers[addr] = brokerID
|
|
return mmr
|
|
}
|
|
|
|
func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
|
|
mmr.controllerID = brokerID
|
|
return mmr
|
|
}
|
|
|
|
func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
|
|
metadataRequest := reqBody.(*MetadataRequest)
|
|
metadataResponse := &MetadataResponse{
|
|
Version: metadataRequest.version(),
|
|
ControllerID: mmr.controllerID,
|
|
}
|
|
for addr, brokerID := range mmr.brokers {
|
|
metadataResponse.AddBroker(addr, brokerID)
|
|
}
|
|
|
|
// Generate set of replicas
|
|
replicas := []int32{}
|
|
offlineReplicas := []int32{}
|
|
for _, brokerID := range mmr.brokers {
|
|
replicas = append(replicas, brokerID)
|
|
}
|
|
|
|
if len(metadataRequest.Topics) == 0 {
|
|
for topic, partitions := range mmr.leaders {
|
|
for partition, brokerID := range partitions {
|
|
metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
|
|
}
|
|
}
|
|
return metadataResponse
|
|
}
|
|
for _, topic := range metadataRequest.Topics {
|
|
for partition, brokerID := range mmr.leaders[topic] {
|
|
metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
|
|
}
|
|
}
|
|
return metadataResponse
|
|
}
|
|
|
|
// MockOffsetResponse is an `OffsetResponse` builder.
|
|
type MockOffsetResponse struct {
|
|
offsets map[string]map[int32]map[int64]int64
|
|
t TestReporter
|
|
version int16
|
|
}
|
|
|
|
func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
|
|
return &MockOffsetResponse{
|
|
offsets: make(map[string]map[int32]map[int64]int64),
|
|
t: t,
|
|
}
|
|
}
|
|
|
|
func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
|
|
mor.version = version
|
|
return mor
|
|
}
|
|
|
|
func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
|
|
partitions := mor.offsets[topic]
|
|
if partitions == nil {
|
|
partitions = make(map[int32]map[int64]int64)
|
|
mor.offsets[topic] = partitions
|
|
}
|
|
times := partitions[partition]
|
|
if times == nil {
|
|
times = make(map[int64]int64)
|
|
partitions[partition] = times
|
|
}
|
|
times[time] = offset
|
|
return mor
|
|
}
|
|
|
|
func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
|
|
offsetRequest := reqBody.(*OffsetRequest)
|
|
offsetResponse := &OffsetResponse{Version: mor.version}
|
|
for topic, partitions := range offsetRequest.blocks {
|
|
for partition, block := range partitions {
|
|
offset := mor.getOffset(topic, partition, block.time)
|
|
offsetResponse.AddTopicPartition(topic, partition, offset)
|
|
}
|
|
}
|
|
return offsetResponse
|
|
}
|
|
|
|
func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
|
|
partitions := mor.offsets[topic]
|
|
if partitions == nil {
|
|
mor.t.Errorf("missing topic: %s", topic)
|
|
}
|
|
times := partitions[partition]
|
|
if times == nil {
|
|
mor.t.Errorf("missing partition: %d", partition)
|
|
}
|
|
offset, ok := times[time]
|
|
if !ok {
|
|
mor.t.Errorf("missing time: %d", time)
|
|
}
|
|
return offset
|
|
}
|
|
|
|
// MockFetchResponse is a `FetchResponse` builder.
|
|
type MockFetchResponse struct {
|
|
messages map[string]map[int32]map[int64]Encoder
|
|
highWaterMarks map[string]map[int32]int64
|
|
t TestReporter
|
|
batchSize int
|
|
version int16
|
|
}
|
|
|
|
func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
|
|
return &MockFetchResponse{
|
|
messages: make(map[string]map[int32]map[int64]Encoder),
|
|
highWaterMarks: make(map[string]map[int32]int64),
|
|
t: t,
|
|
batchSize: batchSize,
|
|
}
|
|
}
|
|
|
|
func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
|
|
mfr.version = version
|
|
return mfr
|
|
}
|
|
|
|
func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
|
|
partitions := mfr.messages[topic]
|
|
if partitions == nil {
|
|
partitions = make(map[int32]map[int64]Encoder)
|
|
mfr.messages[topic] = partitions
|
|
}
|
|
messages := partitions[partition]
|
|
if messages == nil {
|
|
messages = make(map[int64]Encoder)
|
|
partitions[partition] = messages
|
|
}
|
|
messages[offset] = msg
|
|
return mfr
|
|
}
|
|
|
|
func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
|
|
partitions := mfr.highWaterMarks[topic]
|
|
if partitions == nil {
|
|
partitions = make(map[int32]int64)
|
|
mfr.highWaterMarks[topic] = partitions
|
|
}
|
|
partitions[partition] = offset
|
|
return mfr
|
|
}
|
|
|
|
func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
|
|
fetchRequest := reqBody.(*FetchRequest)
|
|
res := &FetchResponse{
|
|
Version: mfr.version,
|
|
}
|
|
for topic, partitions := range fetchRequest.blocks {
|
|
for partition, block := range partitions {
|
|
initialOffset := block.fetchOffset
|
|
offset := initialOffset
|
|
maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
|
|
for i := 0; i < mfr.batchSize && offset < maxOffset; {
|
|
msg := mfr.getMessage(topic, partition, offset)
|
|
if msg != nil {
|
|
res.AddMessage(topic, partition, nil, msg, offset)
|
|
i++
|
|
}
|
|
offset++
|
|
}
|
|
fb := res.GetBlock(topic, partition)
|
|
if fb == nil {
|
|
res.AddError(topic, partition, ErrNoError)
|
|
fb = res.GetBlock(topic, partition)
|
|
}
|
|
fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
|
|
partitions := mfr.messages[topic]
|
|
if partitions == nil {
|
|
return nil
|
|
}
|
|
messages := partitions[partition]
|
|
if messages == nil {
|
|
return nil
|
|
}
|
|
return messages[offset]
|
|
}
|
|
|
|
func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
|
|
partitions := mfr.messages[topic]
|
|
if partitions == nil {
|
|
return 0
|
|
}
|
|
messages := partitions[partition]
|
|
if messages == nil {
|
|
return 0
|
|
}
|
|
return len(messages)
|
|
}
|
|
|
|
func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
|
|
partitions := mfr.highWaterMarks[topic]
|
|
if partitions == nil {
|
|
return 0
|
|
}
|
|
return partitions[partition]
|
|
}
|
|
|
|
// MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
|
|
type MockConsumerMetadataResponse struct {
|
|
coordinators map[string]interface{}
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
|
|
return &MockConsumerMetadataResponse{
|
|
coordinators: make(map[string]interface{}),
|
|
t: t,
|
|
}
|
|
}
|
|
|
|
func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
|
|
mr.coordinators[group] = broker
|
|
return mr
|
|
}
|
|
|
|
func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
|
|
mr.coordinators[group] = kerror
|
|
return mr
|
|
}
|
|
|
|
func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*ConsumerMetadataRequest)
|
|
group := req.ConsumerGroup
|
|
res := &ConsumerMetadataResponse{}
|
|
v := mr.coordinators[group]
|
|
switch v := v.(type) {
|
|
case *MockBroker:
|
|
res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
|
|
case KError:
|
|
res.Err = v
|
|
}
|
|
return res
|
|
}
|
|
|
|
// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
|
|
type MockFindCoordinatorResponse struct {
|
|
groupCoordinators map[string]interface{}
|
|
transCoordinators map[string]interface{}
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
|
|
return &MockFindCoordinatorResponse{
|
|
groupCoordinators: make(map[string]interface{}),
|
|
transCoordinators: make(map[string]interface{}),
|
|
t: t,
|
|
}
|
|
}
|
|
|
|
func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
|
|
switch coordinatorType {
|
|
case CoordinatorGroup:
|
|
mr.groupCoordinators[group] = broker
|
|
case CoordinatorTransaction:
|
|
mr.transCoordinators[group] = broker
|
|
}
|
|
return mr
|
|
}
|
|
|
|
func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
|
|
switch coordinatorType {
|
|
case CoordinatorGroup:
|
|
mr.groupCoordinators[group] = kerror
|
|
case CoordinatorTransaction:
|
|
mr.transCoordinators[group] = kerror
|
|
}
|
|
return mr
|
|
}
|
|
|
|
func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*FindCoordinatorRequest)
|
|
res := &FindCoordinatorResponse{}
|
|
var v interface{}
|
|
switch req.CoordinatorType {
|
|
case CoordinatorGroup:
|
|
v = mr.groupCoordinators[req.CoordinatorKey]
|
|
case CoordinatorTransaction:
|
|
v = mr.transCoordinators[req.CoordinatorKey]
|
|
}
|
|
switch v := v.(type) {
|
|
case *MockBroker:
|
|
res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
|
|
case KError:
|
|
res.Err = v
|
|
}
|
|
return res
|
|
}
|
|
|
|
// MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
|
|
type MockOffsetCommitResponse struct {
|
|
errors map[string]map[string]map[int32]KError
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
|
|
return &MockOffsetCommitResponse{t: t}
|
|
}
|
|
|
|
func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
|
|
if mr.errors == nil {
|
|
mr.errors = make(map[string]map[string]map[int32]KError)
|
|
}
|
|
topics := mr.errors[group]
|
|
if topics == nil {
|
|
topics = make(map[string]map[int32]KError)
|
|
mr.errors[group] = topics
|
|
}
|
|
partitions := topics[topic]
|
|
if partitions == nil {
|
|
partitions = make(map[int32]KError)
|
|
topics[topic] = partitions
|
|
}
|
|
partitions[partition] = kerror
|
|
return mr
|
|
}
|
|
|
|
func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*OffsetCommitRequest)
|
|
group := req.ConsumerGroup
|
|
res := &OffsetCommitResponse{}
|
|
for topic, partitions := range req.blocks {
|
|
for partition := range partitions {
|
|
res.AddError(topic, partition, mr.getError(group, topic, partition))
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
|
|
topics := mr.errors[group]
|
|
if topics == nil {
|
|
return ErrNoError
|
|
}
|
|
partitions := topics[topic]
|
|
if partitions == nil {
|
|
return ErrNoError
|
|
}
|
|
kerror, ok := partitions[partition]
|
|
if !ok {
|
|
return ErrNoError
|
|
}
|
|
return kerror
|
|
}
|
|
|
|
// MockProduceResponse is a `ProduceResponse` builder.
|
|
type MockProduceResponse struct {
|
|
version int16
|
|
errors map[string]map[int32]KError
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
|
|
return &MockProduceResponse{t: t}
|
|
}
|
|
|
|
func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
|
|
mr.version = version
|
|
return mr
|
|
}
|
|
|
|
func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
|
|
if mr.errors == nil {
|
|
mr.errors = make(map[string]map[int32]KError)
|
|
}
|
|
partitions := mr.errors[topic]
|
|
if partitions == nil {
|
|
partitions = make(map[int32]KError)
|
|
mr.errors[topic] = partitions
|
|
}
|
|
partitions[partition] = kerror
|
|
return mr
|
|
}
|
|
|
|
func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*ProduceRequest)
|
|
res := &ProduceResponse{
|
|
Version: mr.version,
|
|
}
|
|
for topic, partitions := range req.records {
|
|
for partition := range partitions {
|
|
res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
|
|
partitions := mr.errors[topic]
|
|
if partitions == nil {
|
|
return ErrNoError
|
|
}
|
|
kerror, ok := partitions[partition]
|
|
if !ok {
|
|
return ErrNoError
|
|
}
|
|
return kerror
|
|
}
|
|
|
|
// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
|
|
type MockOffsetFetchResponse struct {
|
|
offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
|
|
error KError
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
|
|
return &MockOffsetFetchResponse{t: t}
|
|
}
|
|
|
|
func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
|
|
if mr.offsets == nil {
|
|
mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
|
|
}
|
|
topics := mr.offsets[group]
|
|
if topics == nil {
|
|
topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
|
|
mr.offsets[group] = topics
|
|
}
|
|
partitions := topics[topic]
|
|
if partitions == nil {
|
|
partitions = make(map[int32]*OffsetFetchResponseBlock)
|
|
topics[topic] = partitions
|
|
}
|
|
partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
|
|
return mr
|
|
}
|
|
|
|
func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
|
|
mr.error = kerror
|
|
return mr
|
|
}
|
|
|
|
func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*OffsetFetchRequest)
|
|
group := req.ConsumerGroup
|
|
res := &OffsetFetchResponse{Version: req.Version}
|
|
|
|
for topic, partitions := range mr.offsets[group] {
|
|
for partition, block := range partitions {
|
|
res.AddBlock(topic, partition, block)
|
|
}
|
|
}
|
|
|
|
if res.Version >= 2 {
|
|
res.Err = mr.error
|
|
}
|
|
return res
|
|
}
|
|
|
|
type MockCreateTopicsResponse struct {
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
|
|
return &MockCreateTopicsResponse{t: t}
|
|
}
|
|
|
|
func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*CreateTopicsRequest)
|
|
res := &CreateTopicsResponse{
|
|
Version: req.Version,
|
|
}
|
|
res.TopicErrors = make(map[string]*TopicError)
|
|
|
|
for topic := range req.TopicDetails {
|
|
if res.Version >= 1 && strings.HasPrefix(topic, "_") {
|
|
msg := "insufficient permissions to create topic with reserved prefix"
|
|
res.TopicErrors[topic] = &TopicError{
|
|
Err: ErrTopicAuthorizationFailed,
|
|
ErrMsg: &msg,
|
|
}
|
|
continue
|
|
}
|
|
res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
|
|
}
|
|
return res
|
|
}
|
|
|
|
type MockDeleteTopicsResponse struct {
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
|
|
return &MockDeleteTopicsResponse{t: t}
|
|
}
|
|
|
|
func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*DeleteTopicsRequest)
|
|
res := &DeleteTopicsResponse{}
|
|
res.TopicErrorCodes = make(map[string]KError)
|
|
|
|
for _, topic := range req.Topics {
|
|
res.TopicErrorCodes[topic] = ErrNoError
|
|
}
|
|
res.Version = int16(req.Version)
|
|
return res
|
|
}
|
|
|
|
type MockCreatePartitionsResponse struct {
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
|
|
return &MockCreatePartitionsResponse{t: t}
|
|
}
|
|
|
|
func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*CreatePartitionsRequest)
|
|
res := &CreatePartitionsResponse{}
|
|
res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
|
|
|
|
for topic := range req.TopicPartitions {
|
|
if strings.HasPrefix(topic, "_") {
|
|
msg := "insufficient permissions to create partition on topic with reserved prefix"
|
|
res.TopicPartitionErrors[topic] = &TopicPartitionError{
|
|
Err: ErrTopicAuthorizationFailed,
|
|
ErrMsg: &msg,
|
|
}
|
|
continue
|
|
}
|
|
res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
|
|
}
|
|
return res
|
|
}
|
|
|
|
type MockDeleteRecordsResponse struct {
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
|
|
return &MockDeleteRecordsResponse{t: t}
|
|
}
|
|
|
|
func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*DeleteRecordsRequest)
|
|
res := &DeleteRecordsResponse{}
|
|
res.Topics = make(map[string]*DeleteRecordsResponseTopic)
|
|
|
|
for topic, deleteRecordRequestTopic := range req.Topics {
|
|
partitions := make(map[int32]*DeleteRecordsResponsePartition)
|
|
for partition := range deleteRecordRequestTopic.PartitionOffsets {
|
|
partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
|
|
}
|
|
res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
|
|
}
|
|
return res
|
|
}
|
|
|
|
type MockDescribeConfigsResponse struct {
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
|
|
return &MockDescribeConfigsResponse{t: t}
|
|
}
|
|
|
|
func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*DescribeConfigsRequest)
|
|
res := &DescribeConfigsResponse{}
|
|
|
|
for _, r := range req.Resources {
|
|
var configEntries []*ConfigEntry
|
|
switch r.Type {
|
|
case TopicResource:
|
|
configEntries = append(configEntries,
|
|
&ConfigEntry{Name: "max.message.bytes",
|
|
Value: "1000000",
|
|
ReadOnly: false,
|
|
Default: true,
|
|
Sensitive: false,
|
|
}, &ConfigEntry{Name: "retention.ms",
|
|
Value: "5000",
|
|
ReadOnly: false,
|
|
Default: false,
|
|
Sensitive: false,
|
|
}, &ConfigEntry{Name: "password",
|
|
Value: "12345",
|
|
ReadOnly: false,
|
|
Default: false,
|
|
Sensitive: true,
|
|
})
|
|
res.Resources = append(res.Resources, &ResourceResponse{
|
|
Name: r.Name,
|
|
Configs: configEntries,
|
|
})
|
|
}
|
|
}
|
|
return res
|
|
}
|
|
|
|
type MockAlterConfigsResponse struct {
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
|
|
return &MockAlterConfigsResponse{t: t}
|
|
}
|
|
|
|
func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*AlterConfigsRequest)
|
|
res := &AlterConfigsResponse{}
|
|
|
|
for _, r := range req.Resources {
|
|
res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
|
|
Type: TopicResource,
|
|
ErrorMsg: "",
|
|
})
|
|
}
|
|
return res
|
|
}
|
|
|
|
type MockCreateAclsResponse struct {
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
|
|
return &MockCreateAclsResponse{t: t}
|
|
}
|
|
|
|
func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*CreateAclsRequest)
|
|
res := &CreateAclsResponse{}
|
|
|
|
for range req.AclCreations {
|
|
res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
|
|
}
|
|
return res
|
|
}
|
|
|
|
type MockListAclsResponse struct {
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
|
|
return &MockListAclsResponse{t: t}
|
|
}
|
|
|
|
func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*DescribeAclsRequest)
|
|
res := &DescribeAclsResponse{}
|
|
res.Err = ErrNoError
|
|
acl := &ResourceAcls{}
|
|
if req.ResourceName != nil {
|
|
acl.Resource.ResourceName = *req.ResourceName
|
|
}
|
|
acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
|
|
acl.Resource.ResourceType = req.ResourceType
|
|
|
|
host := "*"
|
|
if req.Host != nil {
|
|
host = *req.Host
|
|
}
|
|
|
|
principal := "User:test"
|
|
if req.Principal != nil {
|
|
principal = *req.Principal
|
|
}
|
|
|
|
permissionType := req.PermissionType
|
|
if permissionType == AclPermissionAny {
|
|
permissionType = AclPermissionAllow
|
|
}
|
|
|
|
acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
|
|
res.ResourceAcls = append(res.ResourceAcls, acl)
|
|
res.Version = int16(req.Version)
|
|
return res
|
|
}
|
|
|
|
type MockSaslAuthenticateResponse struct {
|
|
t TestReporter
|
|
kerror KError
|
|
saslAuthBytes []byte
|
|
}
|
|
|
|
func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
|
|
return &MockSaslAuthenticateResponse{t: t}
|
|
}
|
|
|
|
func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
|
|
res := &SaslAuthenticateResponse{}
|
|
res.Err = msar.kerror
|
|
res.SaslAuthBytes = msar.saslAuthBytes
|
|
return res
|
|
}
|
|
|
|
func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
|
|
msar.kerror = kerror
|
|
return msar
|
|
}
|
|
|
|
func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
|
|
msar.saslAuthBytes = saslAuthBytes
|
|
return msar
|
|
}
|
|
|
|
type MockDeleteAclsResponse struct {
|
|
t TestReporter
|
|
}
|
|
|
|
type MockSaslHandshakeResponse struct {
|
|
enabledMechanisms []string
|
|
kerror KError
|
|
t TestReporter
|
|
}
|
|
|
|
func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
|
|
return &MockSaslHandshakeResponse{t: t}
|
|
}
|
|
|
|
func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
|
|
res := &SaslHandshakeResponse{}
|
|
res.Err = mshr.kerror
|
|
res.EnabledMechanisms = mshr.enabledMechanisms
|
|
return res
|
|
}
|
|
|
|
func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
|
|
mshr.kerror = kerror
|
|
return mshr
|
|
}
|
|
|
|
func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
|
|
mshr.enabledMechanisms = enabledMechanisms
|
|
return mshr
|
|
}
|
|
|
|
func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
|
|
return &MockDeleteAclsResponse{t: t}
|
|
}
|
|
|
|
func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
|
|
req := reqBody.(*DeleteAclsRequest)
|
|
res := &DeleteAclsResponse{}
|
|
|
|
for range req.Filters {
|
|
response := &FilterResponse{Err: ErrNoError}
|
|
response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
|
|
res.FilterResponses = append(res.FilterResponses, response)
|
|
}
|
|
res.Version = int16(req.Version)
|
|
return res
|
|
}
|
|
|
|
type MockDeleteGroupsResponse struct {
|
|
deletedGroups []string
|
|
}
|
|
|
|
func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
|
|
return &MockDeleteGroupsResponse{}
|
|
}
|
|
|
|
func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
|
|
m.deletedGroups = groups
|
|
return m
|
|
}
|
|
|
|
func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
|
|
resp := &DeleteGroupsResponse{
|
|
GroupErrorCodes: map[string]KError{},
|
|
}
|
|
for _, group := range m.deletedGroups {
|
|
resp.GroupErrorCodes[group] = ErrNoError
|
|
}
|
|
return resp
|
|
}
|