diff --git a/processor.go b/processor.go index c13a2df6..7e07e6f4 100644 --- a/processor.go +++ b/processor.go @@ -3,6 +3,7 @@ package goka import ( "errors" "fmt" + "log" "runtime/debug" "sync" "time" @@ -684,13 +685,18 @@ func (g *Processor) process(msg *message, st storage.Storage, wg *sync.WaitGroup // get stream subcription codec := g.graph.codec(msg.Topic) if codec == nil { - wg.Done() return fmt.Errorf("cannot handle topic %s", msg.Topic) } + + // drop nil messages + if msg.Data == nil { + log.Printf("dropping nil message for key %s from %s/%d", msg.Key, msg.Topic, msg.Partition) + return nil + } + // decode message m, err := codec.Decode(msg.Data) if err != nil { - wg.Done() return fmt.Errorf("error decoding message for key %s from %s/%d: %v", msg.Key, msg.Topic, msg.Partition, err) } @@ -698,7 +704,6 @@ func (g *Processor) process(msg *message, st storage.Storage, wg *sync.WaitGroup defer ctx.finish() // execute even in case of panic cb := g.graph.callback(msg.Topic) if cb == nil { - wg.Done() return fmt.Errorf("error processing message for key %s from %s/%d: %v", msg.Key, msg.Topic, msg.Partition, err) } cb(ctx, m) diff --git a/processor_test.go b/processor_test.go index 15519320..d7767ab2 100644 --- a/processor_test.go +++ b/processor_test.go @@ -181,7 +181,7 @@ func TestProcessor_process(t *testing.T) { // no emits consumer.EXPECT().Commit("sometopic", int32(1), int64(123)) - msg := &message{Topic: "sometopic", Partition: 1, Offset: 123} + msg := &message{Topic: "sometopic", Partition: 1, Offset: 123, Data: []byte("something")} err := p.process(msg, st, &wg) ensure.Nil(t, err) @@ -191,7 +191,7 @@ func TestProcessor_process(t *testing.T) { producer.EXPECT().Emit("anothertopic", "key", []byte("message")).Return(promise), consumer.EXPECT().Commit("sometopic", int32(1), int64(123)), ) - msg = &message{Topic: "sometopic", Partition: 1, Offset: 123} + msg = &message{Topic: "sometopic", Partition: 1, Offset: 123, Data: []byte("something")} p.graph.callbacks["sometopic"] = func(ctx Context, msg interface{}) { ctx.Emit("anothertopic", "key", "message") } @@ -208,7 +208,7 @@ func TestProcessor_process(t *testing.T) { st.EXPECT().SetOffset(int64(322)), consumer.EXPECT().Commit("sometopic", int32(1), int64(123)), ) - msg = &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123} + msg = &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123, Data: []byte("something")} p.graph.callbacks["sometopic"] = func(ctx Context, msg interface{}) { ctx.SetValue("message") } @@ -258,7 +258,7 @@ func TestProcessor_processFail(t *testing.T) { consumer.EXPECT().Close().Do(func() { close(p.done) }), producer.EXPECT().Close(), ) - msg := &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123} + msg := &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123, Data: []byte("something")} p.graph.callbacks["sometopic"] = func(ctx Context, msg interface{}) { ctx.SetValue("message") } @@ -281,7 +281,7 @@ func TestProcessor_processFail(t *testing.T) { consumer.EXPECT().Close().Do(func() { close(p.done) }), producer.EXPECT().Close(), ) - msg = &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123} + msg = &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123, Data: []byte("something")} p.graph.callbacks["sometopic"] = func(ctx Context, msg interface{}) { ctx.SetValue("message") } @@ -305,7 +305,7 @@ func TestProcessor_processFail(t *testing.T) { consumer.EXPECT().Close().Do(func() { close(p.done) }), producer.EXPECT().Close(), ) - msg = &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123} + msg = &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123, Data: []byte("something")} p.graph.callbacks["sometopic"] = func(ctx Context, msg interface{}) { ctx.SetValue("message") } @@ -727,6 +727,7 @@ func TestProcessor_Start(t *testing.T) { Partition: 1, Offset: 1, Key: "key", + Value: value, } err = syncWith(t, ch, 1) // with partition 1 ensure.Nil(t, err) @@ -899,6 +900,7 @@ func TestProcessor_StartWithTable(t *testing.T) { Partition: 1, Offset: 1, Key: "key", + Value: value, } err = syncWith(t, ch, 1) ensure.Nil(t, err)