404 lines
11 KiB
Go
404 lines
11 KiB
Go
package sarama
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"reflect"
|
|
"strconv"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/davecgh/go-spew/spew"
|
|
)
|
|
|
|
const (
|
|
expectationTimeout = 500 * time.Millisecond
|
|
)
|
|
|
|
type GSSApiHandlerFunc func([]byte) []byte
|
|
|
|
type requestHandlerFunc func(req *request) (res encoder)
|
|
|
|
// RequestNotifierFunc is invoked when a mock broker processes a request successfully
|
|
// and will provides the number of bytes read and written.
|
|
type RequestNotifierFunc func(bytesRead, bytesWritten int)
|
|
|
|
// MockBroker is a mock Kafka broker that is used in unit tests. It is exposed
|
|
// to facilitate testing of higher level or specialized consumers and producers
|
|
// built on top of Sarama. Note that it does not 'mimic' the Kafka API protocol,
|
|
// but rather provides a facility to do that. It takes care of the TCP
|
|
// transport, request unmarshaling, response marshaling, and makes it the test
|
|
// writer responsibility to program correct according to the Kafka API protocol
|
|
// MockBroker behaviour.
|
|
//
|
|
// MockBroker is implemented as a TCP server listening on a kernel-selected
|
|
// localhost port that can accept many connections. It reads Kafka requests
|
|
// from that connection and returns responses programmed by the SetHandlerByMap
|
|
// function. If a MockBroker receives a request that it has no programmed
|
|
// response for, then it returns nothing and the request times out.
|
|
//
|
|
// A set of MockRequest builders to define mappings used by MockBroker is
|
|
// provided by Sarama. But users can develop MockRequests of their own and use
|
|
// them along with or instead of the standard ones.
|
|
//
|
|
// When running tests with MockBroker it is strongly recommended to specify
|
|
// a timeout to `go test` so that if the broker hangs waiting for a response,
|
|
// the test panics.
|
|
//
|
|
// It is not necessary to prefix message length or correlation ID to your
|
|
// response bytes, the server does that automatically as a convenience.
|
|
type MockBroker struct {
|
|
brokerID int32
|
|
port int32
|
|
closing chan none
|
|
stopper chan none
|
|
expectations chan encoder
|
|
listener net.Listener
|
|
t TestReporter
|
|
latency time.Duration
|
|
handler requestHandlerFunc
|
|
notifier RequestNotifierFunc
|
|
history []RequestResponse
|
|
lock sync.Mutex
|
|
gssApiHandler GSSApiHandlerFunc
|
|
}
|
|
|
|
// RequestResponse represents a Request/Response pair processed by MockBroker.
|
|
type RequestResponse struct {
|
|
Request protocolBody
|
|
Response encoder
|
|
}
|
|
|
|
// SetLatency makes broker pause for the specified period every time before
|
|
// replying.
|
|
func (b *MockBroker) SetLatency(latency time.Duration) {
|
|
b.latency = latency
|
|
}
|
|
|
|
// SetHandlerByMap defines mapping of Request types to MockResponses. When a
|
|
// request is received by the broker, it looks up the request type in the map
|
|
// and uses the found MockResponse instance to generate an appropriate reply.
|
|
// If the request type is not found in the map then nothing is sent.
|
|
func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
|
|
b.setHandler(func(req *request) (res encoder) {
|
|
reqTypeName := reflect.TypeOf(req.body).Elem().Name()
|
|
mockResponse := handlerMap[reqTypeName]
|
|
if mockResponse == nil {
|
|
return nil
|
|
}
|
|
return mockResponse.For(req.body)
|
|
})
|
|
}
|
|
|
|
// SetNotifier set a function that will get invoked whenever a request has been
|
|
// processed successfully and will provide the number of bytes read and written
|
|
func (b *MockBroker) SetNotifier(notifier RequestNotifierFunc) {
|
|
b.lock.Lock()
|
|
b.notifier = notifier
|
|
b.lock.Unlock()
|
|
}
|
|
|
|
// BrokerID returns broker ID assigned to the broker.
|
|
func (b *MockBroker) BrokerID() int32 {
|
|
return b.brokerID
|
|
}
|
|
|
|
// History returns a slice of RequestResponse pairs in the order they were
|
|
// processed by the broker. Note that in case of multiple connections to the
|
|
// broker the order expected by a test can be different from the order recorded
|
|
// in the history, unless some synchronization is implemented in the test.
|
|
func (b *MockBroker) History() []RequestResponse {
|
|
b.lock.Lock()
|
|
history := make([]RequestResponse, len(b.history))
|
|
copy(history, b.history)
|
|
b.lock.Unlock()
|
|
return history
|
|
}
|
|
|
|
// Port returns the TCP port number the broker is listening for requests on.
|
|
func (b *MockBroker) Port() int32 {
|
|
return b.port
|
|
}
|
|
|
|
// Addr returns the broker connection string in the form "<address>:<port>".
|
|
func (b *MockBroker) Addr() string {
|
|
return b.listener.Addr().String()
|
|
}
|
|
|
|
// Close terminates the broker blocking until it stops internal goroutines and
|
|
// releases all resources.
|
|
func (b *MockBroker) Close() {
|
|
close(b.expectations)
|
|
if len(b.expectations) > 0 {
|
|
buf := bytes.NewBufferString(fmt.Sprintf("mockbroker/%d: not all expectations were satisfied! Still waiting on:\n", b.BrokerID()))
|
|
for e := range b.expectations {
|
|
_, _ = buf.WriteString(spew.Sdump(e))
|
|
}
|
|
b.t.Error(buf.String())
|
|
}
|
|
close(b.closing)
|
|
<-b.stopper
|
|
}
|
|
|
|
// setHandler sets the specified function as the request handler. Whenever
|
|
// a mock broker reads a request from the wire it passes the request to the
|
|
// function and sends back whatever the handler function returns.
|
|
func (b *MockBroker) setHandler(handler requestHandlerFunc) {
|
|
b.lock.Lock()
|
|
b.handler = handler
|
|
b.lock.Unlock()
|
|
}
|
|
|
|
func (b *MockBroker) serverLoop() {
|
|
defer close(b.stopper)
|
|
var err error
|
|
var conn net.Conn
|
|
|
|
go func() {
|
|
<-b.closing
|
|
err := b.listener.Close()
|
|
if err != nil {
|
|
b.t.Error(err)
|
|
}
|
|
}()
|
|
|
|
wg := &sync.WaitGroup{}
|
|
i := 0
|
|
for conn, err = b.listener.Accept(); err == nil; conn, err = b.listener.Accept() {
|
|
wg.Add(1)
|
|
go b.handleRequests(conn, i, wg)
|
|
i++
|
|
}
|
|
wg.Wait()
|
|
Logger.Printf("*** mockbroker/%d: listener closed, err=%v", b.BrokerID(), err)
|
|
}
|
|
|
|
func (b *MockBroker) SetGSSAPIHandler(handler GSSApiHandlerFunc) {
|
|
b.gssApiHandler = handler
|
|
}
|
|
|
|
func (b *MockBroker) readToBytes(r io.Reader) ([]byte, error) {
|
|
var (
|
|
bytesRead int
|
|
lengthBytes = make([]byte, 4)
|
|
)
|
|
|
|
if _, err := io.ReadFull(r, lengthBytes); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
bytesRead += len(lengthBytes)
|
|
length := int32(binary.BigEndian.Uint32(lengthBytes))
|
|
|
|
if length <= 4 || length > MaxRequestSize {
|
|
return nil, PacketDecodingError{fmt.Sprintf("message of length %d too large or too small", length)}
|
|
}
|
|
|
|
encodedReq := make([]byte, length)
|
|
if _, err := io.ReadFull(r, encodedReq); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
bytesRead += len(encodedReq)
|
|
|
|
fullBytes := append(lengthBytes, encodedReq...)
|
|
|
|
return fullBytes, nil
|
|
}
|
|
|
|
func (b *MockBroker) isGSSAPI(buffer []byte) bool {
|
|
return buffer[4] == 0x60 || bytes.Equal(buffer[4:6], []byte{0x05, 0x04})
|
|
}
|
|
|
|
func (b *MockBroker) handleRequests(conn net.Conn, idx int, wg *sync.WaitGroup) {
|
|
defer wg.Done()
|
|
defer func() {
|
|
_ = conn.Close()
|
|
}()
|
|
Logger.Printf("*** mockbroker/%d/%d: connection opened", b.BrokerID(), idx)
|
|
var err error
|
|
|
|
abort := make(chan none)
|
|
defer close(abort)
|
|
go func() {
|
|
select {
|
|
case <-b.closing:
|
|
_ = conn.Close()
|
|
case <-abort:
|
|
}
|
|
}()
|
|
|
|
resHeader := make([]byte, 8)
|
|
var bytesWritten int
|
|
var bytesRead int
|
|
for {
|
|
|
|
buffer, err := b.readToBytes(conn)
|
|
if err != nil {
|
|
Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(buffer))
|
|
b.serverError(err)
|
|
break
|
|
}
|
|
|
|
bytesWritten = 0
|
|
if !b.isGSSAPI(buffer) {
|
|
|
|
req, br, err := decodeRequest(bytes.NewReader(buffer))
|
|
bytesRead = br
|
|
if err != nil {
|
|
Logger.Printf("*** mockbroker/%d/%d: invalid request: err=%+v, %+v", b.brokerID, idx, err, spew.Sdump(req))
|
|
b.serverError(err)
|
|
break
|
|
}
|
|
|
|
if b.latency > 0 {
|
|
time.Sleep(b.latency)
|
|
}
|
|
|
|
b.lock.Lock()
|
|
res := b.handler(req)
|
|
b.history = append(b.history, RequestResponse{req.body, res})
|
|
b.lock.Unlock()
|
|
|
|
if res == nil {
|
|
Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(req))
|
|
continue
|
|
}
|
|
Logger.Printf("*** mockbroker/%d/%d: served %v -> %v", b.brokerID, idx, req, res)
|
|
|
|
encodedRes, err := encode(res, nil)
|
|
if err != nil {
|
|
b.serverError(err)
|
|
break
|
|
}
|
|
if len(encodedRes) == 0 {
|
|
b.lock.Lock()
|
|
if b.notifier != nil {
|
|
b.notifier(bytesRead, 0)
|
|
}
|
|
b.lock.Unlock()
|
|
continue
|
|
}
|
|
|
|
binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
|
|
binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
|
|
if _, err = conn.Write(resHeader); err != nil {
|
|
b.serverError(err)
|
|
break
|
|
}
|
|
if _, err = conn.Write(encodedRes); err != nil {
|
|
b.serverError(err)
|
|
break
|
|
}
|
|
bytesWritten = len(resHeader) + len(encodedRes)
|
|
|
|
} else {
|
|
// GSSAPI is not part of kafka protocol, but is supported for authentication proposes.
|
|
// Don't support history for this kind of request as is only used for test GSSAPI authentication mechanism
|
|
b.lock.Lock()
|
|
res := b.gssApiHandler(buffer)
|
|
b.lock.Unlock()
|
|
if res == nil {
|
|
Logger.Printf("*** mockbroker/%d/%d: ignored %v", b.brokerID, idx, spew.Sdump(buffer))
|
|
continue
|
|
}
|
|
if _, err = conn.Write(res); err != nil {
|
|
b.serverError(err)
|
|
break
|
|
}
|
|
bytesWritten = len(res)
|
|
}
|
|
|
|
b.lock.Lock()
|
|
if b.notifier != nil {
|
|
b.notifier(bytesRead, bytesWritten)
|
|
}
|
|
b.lock.Unlock()
|
|
|
|
}
|
|
Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
|
|
}
|
|
|
|
func (b *MockBroker) defaultRequestHandler(req *request) (res encoder) {
|
|
select {
|
|
case res, ok := <-b.expectations:
|
|
if !ok {
|
|
return nil
|
|
}
|
|
return res
|
|
case <-time.After(expectationTimeout):
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (b *MockBroker) serverError(err error) {
|
|
isConnectionClosedError := false
|
|
if _, ok := err.(*net.OpError); ok {
|
|
isConnectionClosedError = true
|
|
} else if err == io.EOF {
|
|
isConnectionClosedError = true
|
|
} else if err.Error() == "use of closed network connection" {
|
|
isConnectionClosedError = true
|
|
}
|
|
|
|
if isConnectionClosedError {
|
|
return
|
|
}
|
|
|
|
b.t.Errorf(err.Error())
|
|
}
|
|
|
|
// NewMockBroker launches a fake Kafka broker. It takes a TestReporter as provided by the
|
|
// test framework and a channel of responses to use. If an error occurs it is
|
|
// simply logged to the TestReporter and the broker exits.
|
|
func NewMockBroker(t TestReporter, brokerID int32) *MockBroker {
|
|
return NewMockBrokerAddr(t, brokerID, "localhost:0")
|
|
}
|
|
|
|
// NewMockBrokerAddr behaves like newMockBroker but listens on the address you give
|
|
// it rather than just some ephemeral port.
|
|
func NewMockBrokerAddr(t TestReporter, brokerID int32, addr string) *MockBroker {
|
|
listener, err := net.Listen("tcp", addr)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
return NewMockBrokerListener(t, brokerID, listener)
|
|
}
|
|
|
|
// NewMockBrokerListener behaves like newMockBrokerAddr but accepts connections on the listener specified.
|
|
func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener) *MockBroker {
|
|
var err error
|
|
|
|
broker := &MockBroker{
|
|
closing: make(chan none),
|
|
stopper: make(chan none),
|
|
t: t,
|
|
brokerID: brokerID,
|
|
expectations: make(chan encoder, 512),
|
|
listener: listener,
|
|
}
|
|
broker.handler = broker.defaultRequestHandler
|
|
|
|
Logger.Printf("*** mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
|
|
_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
tmp, err := strconv.ParseInt(portStr, 10, 32)
|
|
if err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
broker.port = int32(tmp)
|
|
|
|
go broker.serverLoop()
|
|
|
|
return broker
|
|
}
|
|
|
|
func (b *MockBroker) Returns(e encoder) {
|
|
b.expectations <- e
|
|
}
|