From 94957d3c23b51025e7f3a08c315c16192a1fdaa0 Mon Sep 17 00:00:00 2001 From: Jacopo Silvestro Date: Fri, 17 Jun 2016 10:39:19 +0200 Subject: [PATCH 1/3] Check the received value in producers (synch/asych) mocks agains a regexp --- async_producer_test.go | 4 +++ errors.go | 3 ++ examples/http_server/http_server.go | 4 +++ mocks/async_producer.go | 49 ++++++++++++++++++++++++----- mocks/async_producer_test.go | 13 ++++++++ mocks/mocks.go | 4 ++- mocks/sync_producer.go | 45 +++++++++++++++++++++----- mocks/sync_producer_test.go | 25 +++++++++++++++ utils.go | 9 ++++++ 9 files changed, 139 insertions(+), 17 deletions(-) diff --git a/async_producer_test.go b/async_producer_test.go index 517ef2a34..23acfb1e9 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -93,6 +93,10 @@ func (f flakyEncoder) Encode() ([]byte, error) { return []byte(TestMessage), nil } +func (f flakyEncoder) String() string { + return string(TestMessage) +} + func TestAsyncProducer(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) diff --git a/errors.go b/errors.go index a837087f1..a6d3064c4 100644 --- a/errors.go +++ b/errors.go @@ -37,6 +37,9 @@ var ErrShuttingDown = errors.New("kafka: message received by producer in process // ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max") +// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max +var ErrRegexpMatch = errors.New("kafka: message is larger than Consumer.Fetch.Max") + // PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that. type PacketEncodingError struct { diff --git a/examples/http_server/http_server.go b/examples/http_server/http_server.go index 03e47b6b2..d16130ebf 100644 --- a/examples/http_server/http_server.go +++ b/examples/http_server/http_server.go @@ -162,6 +162,10 @@ func (ale *accessLogEntry) Encode() ([]byte, error) { return ale.encoded, ale.err } +func (ale *accessLogEntry) String() string { + return fmt.Sprintf("%+v\n", ale) +} + func (s *Server) withAccessLog(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/mocks/async_producer.go b/mocks/async_producer.go index 6ccf1f145..40daf9bb7 100644 --- a/mocks/async_producer.go +++ b/mocks/async_producer.go @@ -1,6 +1,7 @@ package mocks import ( + "regexp" "sync" "github.com/Shopify/sarama" @@ -8,8 +9,10 @@ import ( // AsyncProducer implements sarama's Producer interface for testing purposes. // Before you can send messages to it's Input channel, you have to set expectations -// so it knows how to handle the input. This way you can easily test success and -// failure scenarios. +// so it knows how to handle the input; it returns and error if the numer of messages +// received is bigger then the number of expectations set. You can also set a +// regexp in each expectation so that the message value is matched against this +// regexp and an error is returned if the match fails. type AsyncProducer struct { l sync.Mutex t ErrorReporter @@ -52,6 +55,18 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer { } else { expectation := mp.expectations[0] mp.expectations = mp.expectations[1:] + if len(expectation.MatchPattern) != 0 { + matched, err := regexp.MatchString(expectation.MatchPattern, msg.Value.String()) + if err != nil { + mp.t.Errorf("Error while trying to match the input message with the expected pattern: " + err.Error()) + mp.l.Unlock() + panic(err.Error()) + } + if !matched { + mp.t.Errorf("Input value \"%s\" did not match expected pattern \"%s\"", msg.Value.String(), expectation.MatchPattern) + mp.errors <- &sarama.ProducerError{Err: errNoMatch, Msg: msg} + } + } if expectation.Result == errProduceSuccess { mp.lastOffset++ if config.Producer.Return.Successes { @@ -122,21 +137,39 @@ func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError { // Setting expectations //////////////////////////////////////////////// +// ExpectInputWithPatternAndSucceed sets an expectation on the mock producer that a message +// with value matching a given regexp will be provided on the input channel. The mock producer +// will first check the message value against the pattern. It will return an error if the matching +// fails or handle the message as if it produced successfully, i.e. it will make it available +// on the Successes channel if the Producer.Return.Successes setting is set to true. +func (mp *AsyncProducer) ExpectInputWithPatternAndSucceed(pattern string) { + mp.l.Lock() + defer mp.l.Unlock() + mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, MatchPattern: pattern}) +} + +// ExpectInputWithPatternAndFail sets an expectation on the mock producer that a message +// with value matching a given regexp will be provided on the input channel. The mock producer +// will first check the message value against the pattern. It will return an error if the matching +// fails or handle the message as if it failed to produce successfully. This means it will make +// a ProducerError available on the Errors channel. +func (mp *AsyncProducer) ExpectInputWithPatternAndFail(pattern string, err error) { + mp.l.Lock() + defer mp.l.Unlock() + mp.expectations = append(mp.expectations, &producerExpectation{Result: err, MatchPattern: pattern}) +} + // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided // on the input channel. The mock producer will handle the message as if it is produced successfully, // i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting // is set to true. func (mp *AsyncProducer) ExpectInputAndSucceed() { - mp.l.Lock() - defer mp.l.Unlock() - mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess}) + mp.ExpectInputWithPatternAndSucceed("") } // ExpectInputAndFail sets an expectation on the mock producer that a message will be provided // on the input channel. The mock producer will handle the message as if it failed to produce // successfully. This means it will make a ProducerError available on the Errors channel. func (mp *AsyncProducer) ExpectInputAndFail(err error) { - mp.l.Lock() - defer mp.l.Unlock() - mp.expectations = append(mp.expectations, &producerExpectation{Result: err}) + mp.ExpectInputWithPatternAndFail("", err) } diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index 520bf58b9..572793dee 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -92,3 +92,16 @@ func TestProducerWithTooManyExpectations(t *testing.T) { t.Error("Expected to report an error") } } + +func TestProducerWithMatchPattern(t *testing.T) { + trm := newTestReporterMock() + mp := NewAsyncProducer(trm, nil) + mp.ExpectInputWithPatternAndSucceed("$tes") + mp.ExpectInputWithPatternAndFail("tes$", errNoMatch) + + mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + if err := mp.Close(); err != nil { + t.Error(err) + } +} diff --git a/mocks/mocks.go b/mocks/mocks.go index 96b79bc06..08b808312 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -29,12 +29,14 @@ var ( errProduceSuccess error = nil errOutOfExpectations = errors.New("No more expectations set on mock") errPartitionConsumerNotStarted = errors.New("The partition consumer was never started") + errNoMatch = errors.New("The input message value did not match with the expected pattern") ) const AnyOffset int64 = -1000 type producerExpectation struct { - Result error + Result error + MatchPattern string } type consumerExpectation struct { diff --git a/mocks/sync_producer.go b/mocks/sync_producer.go index fa86b245c..79e8ef045 100644 --- a/mocks/sync_producer.go +++ b/mocks/sync_producer.go @@ -1,6 +1,7 @@ package mocks import ( + "regexp" "sync" "github.com/Shopify/sarama" @@ -34,7 +35,9 @@ func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer { // SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation. // You have to set expectations on the mock producer before calling SendMessage, so it knows -// how to handle them. If there is no more remaining expectations when SendMessage is called, +// how to handle them. You can set a regexp in each expectation so that the message value +// is matched against this regexp and an error is returned if the match fails. +// If there is no more remaining expectation when SendMessage is called, // the mock producer will write an error to the test state object. func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { sp.l.Lock() @@ -43,7 +46,17 @@ func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int3 if len(sp.expectations) > 0 { expectation := sp.expectations[0] sp.expectations = sp.expectations[1:] - + if len(expectation.MatchPattern) != 0 { + matched, err := regexp.MatchString(expectation.MatchPattern, msg.Value.String()) + if err != nil { + sp.t.Errorf("Error while trying to match the input message with the expected pattern: " + err.Error()) + panic(err.Error()) + } + if !matched { + sp.t.Errorf("Input value \"%s\" did not match expected pattern \"%s\"", msg.Value.String(), expectation.MatchPattern) + return -1, -1, errNoMatch + } + } if expectation.Result == errProduceSuccess { sp.lastOffset++ msg.Offset = sp.lastOffset @@ -75,20 +88,36 @@ func (sp *SyncProducer) Close() error { // Setting expectations //////////////////////////////////////////////// +// ExpectSendMessageWithPatternAndSucceed sets an expectation on the mock producer that SendMessage +// will be called with a message value matching a given regexp. The mock producer will first check the +// message value against the pattern. It will return an error if the matching fails or handle +// the message as if it produced successfully, i.e. by returning a valid partition, and offset, and a nil error. +func (sp *SyncProducer) ExpectSendMessageWithPatternAndSucceed(pattern string) { + sp.l.Lock() + defer sp.l.Unlock() + sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, MatchPattern: pattern}) +} + +// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be +// called with a message value matching a given regexp. The mock producer will first check the +// message value against the pattern. It will return an error if the matching fails or handle +// the message as if it failed to produce successfully, i.e. by returning the provided error. +func (sp *SyncProducer) ExpectSendMessageWithPatternAndFail(pattern string, err error) { + sp.l.Lock() + defer sp.l.Unlock() + sp.expectations = append(sp.expectations, &producerExpectation{Result: err, MatchPattern: pattern}) +} + // ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be // called. The mock producer will handle the message as if it produced successfully, i.e. by // returning a valid partition, and offset, and a nil error. func (sp *SyncProducer) ExpectSendMessageAndSucceed() { - sp.l.Lock() - defer sp.l.Unlock() - sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess}) + sp.ExpectSendMessageWithPatternAndSucceed("") } // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be // called. The mock producer will handle the message as if it failed to produce // successfully, i.e. by returning the provided error. func (sp *SyncProducer) ExpectSendMessageAndFail(err error) { - sp.l.Lock() - defer sp.l.Unlock() - sp.expectations = append(sp.expectations, &producerExpectation{Result: err}) + sp.ExpectSendMessageWithPatternAndFail("", err) } diff --git a/mocks/sync_producer_test.go b/mocks/sync_producer_test.go index a674138e9..d10be050d 100644 --- a/mocks/sync_producer_test.go +++ b/mocks/sync_producer_test.go @@ -96,3 +96,28 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) { t.Error("Expected to report an error") } } + +func TestSyncProducerWithPattern(t *testing.T) { + trm := newTestReporterMock() + + sp := NewSyncProducer(trm, nil) + sp.ExpectSendMessageWithPatternAndSucceed("^tes") + sp.ExpectSendMessageWithPatternAndSucceed("^tes$") + + msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + if _, _, err := sp.SendMessage(msg); err != nil { + t.Error("No error expected on first SendMessage call", err) + } + msg = &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + if _, _, err := sp.SendMessage(msg); err != errNoMatch { + t.Error("errNoMatch expected on second SendMessage call, found:", err) + } + + if err := sp.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 1 { + t.Error("Expected to report an error") + } +} diff --git a/utils.go b/utils.go index 04ca88750..37c0bbbee 100644 --- a/utils.go +++ b/utils.go @@ -63,6 +63,7 @@ func safeAsyncClose(b *Broker) { type Encoder interface { Encode() ([]byte, error) Length() int + String() string } // make strings and byte slices encodable for convenience so they can be used as keys @@ -80,6 +81,10 @@ func (s StringEncoder) Length() int { return len(s) } +func (s StringEncoder) String() string { + return string(s) +} + // ByteEncoder implements the Encoder interface for Go byte slices so that they can be used // as the Key or Value in a ProducerMessage. type ByteEncoder []byte @@ -92,6 +97,10 @@ func (b ByteEncoder) Length() int { return len(b) } +func (s ByteEncoder) String() string { + return string(s) +} + // bufConn wraps a net.Conn with a buffer for reads to reduce the number of // reads that trigger syscalls. type bufConn struct { From 7aeedb0f4aa023f1661f2b705d91b37da304df46 Mon Sep 17 00:00:00 2001 From: Jacopo Silvestro Date: Fri, 24 Jun 2016 15:23:49 +0200 Subject: [PATCH 2/3] Address comments --- async_producer_test.go | 4 -- errors.go | 3 -- examples/http_server/http_server.go | 4 -- mocks/async_producer.go | 57 ++++++++++++++--------------- mocks/async_producer_test.go | 16 ++++++-- mocks/mocks.go | 22 +++++++++-- mocks/sync_producer.go | 50 ++++++++++++------------- mocks/sync_producer_test.go | 13 ++++--- utils.go | 9 ----- 9 files changed, 92 insertions(+), 86 deletions(-) diff --git a/async_producer_test.go b/async_producer_test.go index 23acfb1e9..517ef2a34 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -93,10 +93,6 @@ func (f flakyEncoder) Encode() ([]byte, error) { return []byte(TestMessage), nil } -func (f flakyEncoder) String() string { - return string(TestMessage) -} - func TestAsyncProducer(t *testing.T) { seedBroker := NewMockBroker(t, 1) leader := NewMockBroker(t, 2) diff --git a/errors.go b/errors.go index a6d3064c4..a837087f1 100644 --- a/errors.go +++ b/errors.go @@ -37,9 +37,6 @@ var ErrShuttingDown = errors.New("kafka: message received by producer in process // ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max var ErrMessageTooLarge = errors.New("kafka: message is larger than Consumer.Fetch.Max") -// ErrMessageTooLarge is returned when the next message to consume is larger than the configured Consumer.Fetch.Max -var ErrRegexpMatch = errors.New("kafka: message is larger than Consumer.Fetch.Max") - // PacketEncodingError is returned from a failure while encoding a Kafka packet. This can happen, for example, // if you try to encode a string over 2^15 characters in length, since Kafka's encoding rules do not permit that. type PacketEncodingError struct { diff --git a/examples/http_server/http_server.go b/examples/http_server/http_server.go index d16130ebf..03e47b6b2 100644 --- a/examples/http_server/http_server.go +++ b/examples/http_server/http_server.go @@ -162,10 +162,6 @@ func (ale *accessLogEntry) Encode() ([]byte, error) { return ale.encoded, ale.err } -func (ale *accessLogEntry) String() string { - return fmt.Sprintf("%+v\n", ale) -} - func (s *Server) withAccessLog(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { diff --git a/mocks/async_producer.go b/mocks/async_producer.go index 40daf9bb7..c32519cba 100644 --- a/mocks/async_producer.go +++ b/mocks/async_producer.go @@ -1,7 +1,6 @@ package mocks import ( - "regexp" "sync" "github.com/Shopify/sarama" @@ -11,8 +10,8 @@ import ( // Before you can send messages to it's Input channel, you have to set expectations // so it knows how to handle the input; it returns and error if the numer of messages // received is bigger then the number of expectations set. You can also set a -// regexp in each expectation so that the message value is matched against this -// regexp and an error is returned if the match fails. +// function in each expectation so that the message value is checked by this function +// and an error is returned if the match fails. type AsyncProducer struct { l sync.Mutex t ErrorReporter @@ -55,16 +54,16 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer { } else { expectation := mp.expectations[0] mp.expectations = mp.expectations[1:] - if len(expectation.MatchPattern) != 0 { - matched, err := regexp.MatchString(expectation.MatchPattern, msg.Value.String()) - if err != nil { - mp.t.Errorf("Error while trying to match the input message with the expected pattern: " + err.Error()) - mp.l.Unlock() - panic(err.Error()) - } - if !matched { - mp.t.Errorf("Input value \"%s\" did not match expected pattern \"%s\"", msg.Value.String(), expectation.MatchPattern) - mp.errors <- &sarama.ProducerError{Err: errNoMatch, Msg: msg} + if expectation.CheckFunction != nil { + if val, err := msg.Value.Encode(); err != nil { + mp.t.Errorf("Input message encoding failed: %s", err.Error()) + mp.errors <- &sarama.ProducerError{Err: err, Msg: msg} + } else { + err = expectation.CheckFunction(val) + if err != nil { + mp.t.Errorf("Check function returned an error: %s", err.Error()) + mp.errors <- &sarama.ProducerError{Err: err, Msg: msg} + } } } if expectation.Result == errProduceSuccess { @@ -137,26 +136,26 @@ func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError { // Setting expectations //////////////////////////////////////////////// -// ExpectInputWithPatternAndSucceed sets an expectation on the mock producer that a message -// with value matching a given regexp will be provided on the input channel. The mock producer -// will first check the message value against the pattern. It will return an error if the matching -// fails or handle the message as if it produced successfully, i.e. it will make it available -// on the Successes channel if the Producer.Return.Successes setting is set to true. -func (mp *AsyncProducer) ExpectInputWithPatternAndSucceed(pattern string) { +// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message +// will be provided on the input channel. The mock producer will call the given function to check +// the message value. If an error is returned it will be made available on the Errors channel +// otherwise the mock will handle the message as if it produced successfully, i.e. it will make +// it available on the Successes channel if the Producer.Return.Successes setting is set to true. +func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) { mp.l.Lock() defer mp.l.Unlock() - mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, MatchPattern: pattern}) + mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf}) } -// ExpectInputWithPatternAndFail sets an expectation on the mock producer that a message -// with value matching a given regexp will be provided on the input channel. The mock producer -// will first check the message value against the pattern. It will return an error if the matching -// fails or handle the message as if it failed to produce successfully. This means it will make -// a ProducerError available on the Errors channel. -func (mp *AsyncProducer) ExpectInputWithPatternAndFail(pattern string, err error) { +// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message +// will be provided on the input channel. The mock producer will first call the given function to +// check the message value. If an error is returned it will be made available on the Errors channel +// otherwise the mock will handle the message as if it failed to produce successfully. This means +// it will make a ProducerError available on the Errors channel. +func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) { mp.l.Lock() defer mp.l.Unlock() - mp.expectations = append(mp.expectations, &producerExpectation{Result: err, MatchPattern: pattern}) + mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf}) } // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided @@ -164,12 +163,12 @@ func (mp *AsyncProducer) ExpectInputWithPatternAndFail(pattern string, err error // i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting // is set to true. func (mp *AsyncProducer) ExpectInputAndSucceed() { - mp.ExpectInputWithPatternAndSucceed("") + mp.ExpectInputWithCheckerFunctionAndSucceed(nil) } // ExpectInputAndFail sets an expectation on the mock producer that a message will be provided // on the input channel. The mock producer will handle the message as if it failed to produce // successfully. This means it will make a ProducerError available on the Errors channel. func (mp *AsyncProducer) ExpectInputAndFail(err error) { - mp.ExpectInputWithPatternAndFail("", err) + mp.ExpectInputWithCheckerFunctionAndFail(nil, err) } diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index 572793dee..e373142e6 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -2,6 +2,7 @@ package mocks import ( "fmt" + "strings" "testing" "github.com/Shopify/sarama" @@ -93,15 +94,24 @@ func TestProducerWithTooManyExpectations(t *testing.T) { } } -func TestProducerWithMatchPattern(t *testing.T) { +func TestProducerWithCheckerFunction(t *testing.T) { trm := newTestReporterMock() mp := NewAsyncProducer(trm, nil) - mp.ExpectInputWithPatternAndSucceed("$tes") - mp.ExpectInputWithPatternAndFail("tes$", errNoMatch) + mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$")) mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} if err := mp.Close(); err != nil { t.Error(err) } + + if len(mp.Errors()) != 1 { + t.Error("Expected to report an error") + } + + err1 := <-mp.Errors() + if !strings.HasPrefix(err1.Err.Error(), "No match") { + t.Error("Expected to report a value check error, found: ", err1.Err) + } } diff --git a/mocks/mocks.go b/mocks/mocks.go index 08b808312..58b5b64d6 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -15,6 +15,8 @@ package mocks import ( "errors" + "fmt" + "regexp" "github.com/Shopify/sarama" ) @@ -25,18 +27,32 @@ type ErrorReporter interface { Errorf(string, ...interface{}) } +type ValueChecker func(val []byte) error + +func generateRegexpChecker(re string) func([]byte) error { + return func(val []byte) error { + matched, err := regexp.MatchString(re, string(val)) + if err != nil { + return errors.New("Error while trying to match the input message with the expected pattern: " + err.Error()) + } + if !matched { + return fmt.Errorf("No match between input value \"%s\" and expected pattern \"%s\"", val, re) + } + return nil + } +} + var ( errProduceSuccess error = nil errOutOfExpectations = errors.New("No more expectations set on mock") errPartitionConsumerNotStarted = errors.New("The partition consumer was never started") - errNoMatch = errors.New("The input message value did not match with the expected pattern") ) const AnyOffset int64 = -1000 type producerExpectation struct { - Result error - MatchPattern string + Result error + CheckFunction ValueChecker } type consumerExpectation struct { diff --git a/mocks/sync_producer.go b/mocks/sync_producer.go index 79e8ef045..21b6ce0d1 100644 --- a/mocks/sync_producer.go +++ b/mocks/sync_producer.go @@ -1,7 +1,6 @@ package mocks import ( - "regexp" "sync" "github.com/Shopify/sarama" @@ -35,8 +34,8 @@ func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer { // SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation. // You have to set expectations on the mock producer before calling SendMessage, so it knows -// how to handle them. You can set a regexp in each expectation so that the message value -// is matched against this regexp and an error is returned if the match fails. +// how to handle them. You can set a function in each expectation so that the message value +// checked by this function and an error is returned if the match fails. // If there is no more remaining expectation when SendMessage is called, // the mock producer will write an error to the test state object. func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { @@ -46,15 +45,16 @@ func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int3 if len(sp.expectations) > 0 { expectation := sp.expectations[0] sp.expectations = sp.expectations[1:] - if len(expectation.MatchPattern) != 0 { - matched, err := regexp.MatchString(expectation.MatchPattern, msg.Value.String()) - if err != nil { - sp.t.Errorf("Error while trying to match the input message with the expected pattern: " + err.Error()) - panic(err.Error()) - } - if !matched { - sp.t.Errorf("Input value \"%s\" did not match expected pattern \"%s\"", msg.Value.String(), expectation.MatchPattern) - return -1, -1, errNoMatch + if expectation.CheckFunction != nil { + if val, err := msg.Value.Encode(); err != nil { + sp.t.Errorf("Input message encoding failed: %s", err.Error()) + return -1, -1, err + } else { + err := expectation.CheckFunction(val) + if err != nil { + sp.t.Errorf("Check function returned an error: %s", err.Error()) + return -1, -1, err + } } } if expectation.Result == errProduceSuccess { @@ -88,36 +88,36 @@ func (sp *SyncProducer) Close() error { // Setting expectations //////////////////////////////////////////////// -// ExpectSendMessageWithPatternAndSucceed sets an expectation on the mock producer that SendMessage -// will be called with a message value matching a given regexp. The mock producer will first check the -// message value against the pattern. It will return an error if the matching fails or handle -// the message as if it produced successfully, i.e. by returning a valid partition, and offset, and a nil error. -func (sp *SyncProducer) ExpectSendMessageWithPatternAndSucceed(pattern string) { +// ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage +// will be called. The mock producer will first call the given function to check the message value. +// It will cascade the error of the function, if any, or handle the message as if it produced +// successfully, i.e. by returning a valid partition, and offset, and a nil error. +func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) { sp.l.Lock() defer sp.l.Unlock() - sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, MatchPattern: pattern}) + sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf}) } // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be -// called with a message value matching a given regexp. The mock producer will first check the -// message value against the pattern. It will return an error if the matching fails or handle -// the message as if it failed to produce successfully, i.e. by returning the provided error. -func (sp *SyncProducer) ExpectSendMessageWithPatternAndFail(pattern string, err error) { +// called. The mock producer will first call the given function to check the message value. +// It will cascade the error of the function, if any, or handle the message as if it failed +// to produce successfully, i.e. by returning the provided error. +func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) { sp.l.Lock() defer sp.l.Unlock() - sp.expectations = append(sp.expectations, &producerExpectation{Result: err, MatchPattern: pattern}) + sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf}) } // ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be // called. The mock producer will handle the message as if it produced successfully, i.e. by // returning a valid partition, and offset, and a nil error. func (sp *SyncProducer) ExpectSendMessageAndSucceed() { - sp.ExpectSendMessageWithPatternAndSucceed("") + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(nil) } // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be // called. The mock producer will handle the message as if it failed to produce // successfully, i.e. by returning the provided error. func (sp *SyncProducer) ExpectSendMessageAndFail(err error) { - sp.ExpectSendMessageWithPatternAndFail("", err) + sp.ExpectSendMessageWithCheckerFunctionAndFail(nil, err) } diff --git a/mocks/sync_producer_test.go b/mocks/sync_producer_test.go index d10be050d..0fdc99877 100644 --- a/mocks/sync_producer_test.go +++ b/mocks/sync_producer_test.go @@ -1,6 +1,7 @@ package mocks import ( + "strings" "testing" "github.com/Shopify/sarama" @@ -97,20 +98,20 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) { } } -func TestSyncProducerWithPattern(t *testing.T) { +func TestSyncProducerWithCheckerFunction(t *testing.T) { trm := newTestReporterMock() sp := NewSyncProducer(trm, nil) - sp.ExpectSendMessageWithPatternAndSucceed("^tes") - sp.ExpectSendMessageWithPatternAndSucceed("^tes$") + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$")) msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} if _, _, err := sp.SendMessage(msg); err != nil { - t.Error("No error expected on first SendMessage call", err) + t.Error("No error expected on first SendMessage call, found: ", err) } msg = &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} - if _, _, err := sp.SendMessage(msg); err != errNoMatch { - t.Error("errNoMatch expected on second SendMessage call, found:", err) + if _, _, err := sp.SendMessage(msg); err == nil || !strings.HasPrefix(err.Error(), "No match") { + t.Error("Error during value check expected on second SendMessage call, found:", err) } if err := sp.Close(); err != nil { diff --git a/utils.go b/utils.go index 37c0bbbee..04ca88750 100644 --- a/utils.go +++ b/utils.go @@ -63,7 +63,6 @@ func safeAsyncClose(b *Broker) { type Encoder interface { Encode() ([]byte, error) Length() int - String() string } // make strings and byte slices encodable for convenience so they can be used as keys @@ -81,10 +80,6 @@ func (s StringEncoder) Length() int { return len(s) } -func (s StringEncoder) String() string { - return string(s) -} - // ByteEncoder implements the Encoder interface for Go byte slices so that they can be used // as the Key or Value in a ProducerMessage. type ByteEncoder []byte @@ -97,10 +92,6 @@ func (b ByteEncoder) Length() int { return len(b) } -func (s ByteEncoder) String() string { - return string(s) -} - // bufConn wraps a net.Conn with a buffer for reads to reduce the number of // reads that trigger syscalls. type bufConn struct { From b0701275b1a0ec482283665dfbca7b8470218149 Mon Sep 17 00:00:00 2001 From: Jacopo Silvestro Date: Wed, 6 Jul 2016 11:11:26 +0200 Subject: [PATCH 3/3] Address github CR comments --- mocks/async_producer.go | 2 +- mocks/mocks.go | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/mocks/async_producer.go b/mocks/async_producer.go index c32519cba..d1d9ba416 100644 --- a/mocks/async_producer.go +++ b/mocks/async_producer.go @@ -8,7 +8,7 @@ import ( // AsyncProducer implements sarama's Producer interface for testing purposes. // Before you can send messages to it's Input channel, you have to set expectations -// so it knows how to handle the input; it returns and error if the numer of messages +// so it knows how to handle the input; it returns an error if the number of messages // received is bigger then the number of expectations set. You can also set a // function in each expectation so that the message value is checked by this function // and an error is returned if the match fails. diff --git a/mocks/mocks.go b/mocks/mocks.go index 58b5b64d6..60955a3e0 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -27,8 +27,11 @@ type ErrorReporter interface { Errorf(string, ...interface{}) } +// ValueChecker is a function type to be set in each expectation of the producer mocks +// to check the value passed. type ValueChecker func(val []byte) error +// This function is used inside the mocks unit tests to generate ValueCheckers func generateRegexpChecker(re string) func([]byte) error { return func(val []byte) error { matched, err := regexp.MatchString(re, string(val))