diff --git a/mocks/async_producer.go b/mocks/async_producer.go index 6ccf1f145..d1d9ba416 100644 --- a/mocks/async_producer.go +++ b/mocks/async_producer.go @@ -8,8 +8,10 @@ import ( // AsyncProducer implements sarama's Producer interface for testing purposes. // Before you can send messages to it's Input channel, you have to set expectations -// so it knows how to handle the input. This way you can easily test success and -// failure scenarios. +// so it knows how to handle the input; it returns an error if the number of messages +// received is bigger then the number of expectations set. You can also set a +// function in each expectation so that the message value is checked by this function +// and an error is returned if the match fails. type AsyncProducer struct { l sync.Mutex t ErrorReporter @@ -52,6 +54,18 @@ func NewAsyncProducer(t ErrorReporter, config *sarama.Config) *AsyncProducer { } else { expectation := mp.expectations[0] mp.expectations = mp.expectations[1:] + if expectation.CheckFunction != nil { + if val, err := msg.Value.Encode(); err != nil { + mp.t.Errorf("Input message encoding failed: %s", err.Error()) + mp.errors <- &sarama.ProducerError{Err: err, Msg: msg} + } else { + err = expectation.CheckFunction(val) + if err != nil { + mp.t.Errorf("Check function returned an error: %s", err.Error()) + mp.errors <- &sarama.ProducerError{Err: err, Msg: msg} + } + } + } if expectation.Result == errProduceSuccess { mp.lastOffset++ if config.Producer.Return.Successes { @@ -122,21 +136,39 @@ func (mp *AsyncProducer) Errors() <-chan *sarama.ProducerError { // Setting expectations //////////////////////////////////////////////// +// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message +// will be provided on the input channel. The mock producer will call the given function to check +// the message value. If an error is returned it will be made available on the Errors channel +// otherwise the mock will handle the message as if it produced successfully, i.e. it will make +// it available on the Successes channel if the Producer.Return.Successes setting is set to true. +func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndSucceed(cf ValueChecker) { + mp.l.Lock() + defer mp.l.Unlock() + mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf}) +} + +// ExpectInputWithCheckerFunctionAndSucceed sets an expectation on the mock producer that a message +// will be provided on the input channel. The mock producer will first call the given function to +// check the message value. If an error is returned it will be made available on the Errors channel +// otherwise the mock will handle the message as if it failed to produce successfully. This means +// it will make a ProducerError available on the Errors channel. +func (mp *AsyncProducer) ExpectInputWithCheckerFunctionAndFail(cf ValueChecker, err error) { + mp.l.Lock() + defer mp.l.Unlock() + mp.expectations = append(mp.expectations, &producerExpectation{Result: err, CheckFunction: cf}) +} + // ExpectInputAndSucceed sets an expectation on the mock producer that a message will be provided // on the input channel. The mock producer will handle the message as if it is produced successfully, // i.e. it will make it available on the Successes channel if the Producer.Return.Successes setting // is set to true. func (mp *AsyncProducer) ExpectInputAndSucceed() { - mp.l.Lock() - defer mp.l.Unlock() - mp.expectations = append(mp.expectations, &producerExpectation{Result: errProduceSuccess}) + mp.ExpectInputWithCheckerFunctionAndSucceed(nil) } // ExpectInputAndFail sets an expectation on the mock producer that a message will be provided // on the input channel. The mock producer will handle the message as if it failed to produce // successfully. This means it will make a ProducerError available on the Errors channel. func (mp *AsyncProducer) ExpectInputAndFail(err error) { - mp.l.Lock() - defer mp.l.Unlock() - mp.expectations = append(mp.expectations, &producerExpectation{Result: err}) + mp.ExpectInputWithCheckerFunctionAndFail(nil, err) } diff --git a/mocks/async_producer_test.go b/mocks/async_producer_test.go index 520bf58b9..e373142e6 100644 --- a/mocks/async_producer_test.go +++ b/mocks/async_producer_test.go @@ -2,6 +2,7 @@ package mocks import ( "fmt" + "strings" "testing" "github.com/Shopify/sarama" @@ -92,3 +93,25 @@ func TestProducerWithTooManyExpectations(t *testing.T) { t.Error("Expected to report an error") } } + +func TestProducerWithCheckerFunction(t *testing.T) { + trm := newTestReporterMock() + mp := NewAsyncProducer(trm, nil) + mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + mp.ExpectInputWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$")) + + mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + mp.Input() <- &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + if err := mp.Close(); err != nil { + t.Error(err) + } + + if len(mp.Errors()) != 1 { + t.Error("Expected to report an error") + } + + err1 := <-mp.Errors() + if !strings.HasPrefix(err1.Err.Error(), "No match") { + t.Error("Expected to report a value check error, found: ", err1.Err) + } +} diff --git a/mocks/mocks.go b/mocks/mocks.go index 96b79bc06..60955a3e0 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -15,6 +15,8 @@ package mocks import ( "errors" + "fmt" + "regexp" "github.com/Shopify/sarama" ) @@ -25,6 +27,24 @@ type ErrorReporter interface { Errorf(string, ...interface{}) } +// ValueChecker is a function type to be set in each expectation of the producer mocks +// to check the value passed. +type ValueChecker func(val []byte) error + +// This function is used inside the mocks unit tests to generate ValueCheckers +func generateRegexpChecker(re string) func([]byte) error { + return func(val []byte) error { + matched, err := regexp.MatchString(re, string(val)) + if err != nil { + return errors.New("Error while trying to match the input message with the expected pattern: " + err.Error()) + } + if !matched { + return fmt.Errorf("No match between input value \"%s\" and expected pattern \"%s\"", val, re) + } + return nil + } +} + var ( errProduceSuccess error = nil errOutOfExpectations = errors.New("No more expectations set on mock") @@ -34,7 +54,8 @@ var ( const AnyOffset int64 = -1000 type producerExpectation struct { - Result error + Result error + CheckFunction ValueChecker } type consumerExpectation struct { diff --git a/mocks/sync_producer.go b/mocks/sync_producer.go index 7541841f6..2ac7b5c32 100644 --- a/mocks/sync_producer.go +++ b/mocks/sync_producer.go @@ -34,7 +34,9 @@ func NewSyncProducer(t ErrorReporter, config *sarama.Config) *SyncProducer { // SendMessage corresponds with the SendMessage method of sarama's SyncProducer implementation. // You have to set expectations on the mock producer before calling SendMessage, so it knows -// how to handle them. If there is no more remaining expectations when SendMessage is called, +// how to handle them. You can set a function in each expectation so that the message value +// checked by this function and an error is returned if the match fails. +// If there is no more remaining expectation when SendMessage is called, // the mock producer will write an error to the test state object. func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) { sp.l.Lock() @@ -43,7 +45,18 @@ func (sp *SyncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int3 if len(sp.expectations) > 0 { expectation := sp.expectations[0] sp.expectations = sp.expectations[1:] - + if expectation.CheckFunction != nil { + if val, err := msg.Value.Encode(); err != nil { + sp.t.Errorf("Input message encoding failed: %s", err.Error()) + return -1, -1, err + } else { + err := expectation.CheckFunction(val) + if err != nil { + sp.t.Errorf("Check function returned an error: %s", err.Error()) + return -1, -1, err + } + } + } if expectation.Result == errProduceSuccess { sp.lastOffset++ msg.Offset = sp.lastOffset @@ -100,20 +113,36 @@ func (sp *SyncProducer) Close() error { // Setting expectations //////////////////////////////////////////////// +// ExpectSendMessageWithCheckerFunctionAndSucceed sets an expectation on the mock producer that SendMessage +// will be called. The mock producer will first call the given function to check the message value. +// It will cascade the error of the function, if any, or handle the message as if it produced +// successfully, i.e. by returning a valid partition, and offset, and a nil error. +func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndSucceed(cf ValueChecker) { + sp.l.Lock() + defer sp.l.Unlock() + sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess, CheckFunction: cf}) +} + +// ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be +// called. The mock producer will first call the given function to check the message value. +// It will cascade the error of the function, if any, or handle the message as if it failed +// to produce successfully, i.e. by returning the provided error. +func (sp *SyncProducer) ExpectSendMessageWithCheckerFunctionAndFail(cf ValueChecker, err error) { + sp.l.Lock() + defer sp.l.Unlock() + sp.expectations = append(sp.expectations, &producerExpectation{Result: err, CheckFunction: cf}) +} + // ExpectSendMessageAndSucceed sets an expectation on the mock producer that SendMessage will be // called. The mock producer will handle the message as if it produced successfully, i.e. by // returning a valid partition, and offset, and a nil error. func (sp *SyncProducer) ExpectSendMessageAndSucceed() { - sp.l.Lock() - defer sp.l.Unlock() - sp.expectations = append(sp.expectations, &producerExpectation{Result: errProduceSuccess}) + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(nil) } // ExpectSendMessageAndFail sets an expectation on the mock producer that SendMessage will be // called. The mock producer will handle the message as if it failed to produce // successfully, i.e. by returning the provided error. func (sp *SyncProducer) ExpectSendMessageAndFail(err error) { - sp.l.Lock() - defer sp.l.Unlock() - sp.expectations = append(sp.expectations, &producerExpectation{Result: err}) + sp.ExpectSendMessageWithCheckerFunctionAndFail(nil, err) } diff --git a/mocks/sync_producer_test.go b/mocks/sync_producer_test.go index a674138e9..0fdc99877 100644 --- a/mocks/sync_producer_test.go +++ b/mocks/sync_producer_test.go @@ -1,6 +1,7 @@ package mocks import ( + "strings" "testing" "github.com/Shopify/sarama" @@ -96,3 +97,28 @@ func TestSyncProducerWithTooFewExpectations(t *testing.T) { t.Error("Expected to report an error") } } + +func TestSyncProducerWithCheckerFunction(t *testing.T) { + trm := newTestReporterMock() + + sp := NewSyncProducer(trm, nil) + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes")) + sp.ExpectSendMessageWithCheckerFunctionAndSucceed(generateRegexpChecker("^tes$")) + + msg := &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + if _, _, err := sp.SendMessage(msg); err != nil { + t.Error("No error expected on first SendMessage call, found: ", err) + } + msg = &sarama.ProducerMessage{Topic: "test", Value: sarama.StringEncoder("test")} + if _, _, err := sp.SendMessage(msg); err == nil || !strings.HasPrefix(err.Error(), "No match") { + t.Error("Error during value check expected on second SendMessage call, found:", err) + } + + if err := sp.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 1 { + t.Error("Expected to report an error") + } +}