Skip to content

Commit

Permalink
Merge pull request #39 from lovoo/ignore-nil-message
Browse files Browse the repository at this point in the history
Ignore nil message
  • Loading branch information
db7 authored Sep 29, 2017
2 parents f475664 + c99f936 commit 813e822
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
11 changes: 8 additions & 3 deletions processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package goka
import (
"errors"
"fmt"
"log"
"runtime/debug"
"sync"
"time"
Expand Down Expand Up @@ -684,21 +685,25 @@ func (g *Processor) process(msg *message, st storage.Storage, wg *sync.WaitGroup
// get stream subcription
codec := g.graph.codec(msg.Topic)
if codec == nil {
wg.Done()
return fmt.Errorf("cannot handle topic %s", msg.Topic)
}

// drop nil messages
if msg.Data == nil {
log.Printf("dropping nil message for key %s from %s/%d", msg.Key, msg.Topic, msg.Partition)
return nil
}

// decode message
m, err := codec.Decode(msg.Data)
if err != nil {
wg.Done()
return fmt.Errorf("error decoding message for key %s from %s/%d: %v", msg.Key, msg.Topic, msg.Partition, err)
}

ctx.start()
defer ctx.finish() // execute even in case of panic
cb := g.graph.callback(msg.Topic)
if cb == nil {
wg.Done()
return fmt.Errorf("error processing message for key %s from %s/%d: %v", msg.Key, msg.Topic, msg.Partition, err)
}
cb(ctx, m)
Expand Down
14 changes: 8 additions & 6 deletions processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestProcessor_process(t *testing.T) {

// no emits
consumer.EXPECT().Commit("sometopic", int32(1), int64(123))
msg := &message{Topic: "sometopic", Partition: 1, Offset: 123}
msg := &message{Topic: "sometopic", Partition: 1, Offset: 123, Data: []byte("something")}
err := p.process(msg, st, &wg)
ensure.Nil(t, err)

Expand All @@ -191,7 +191,7 @@ func TestProcessor_process(t *testing.T) {
producer.EXPECT().Emit("anothertopic", "key", []byte("message")).Return(promise),
consumer.EXPECT().Commit("sometopic", int32(1), int64(123)),
)
msg = &message{Topic: "sometopic", Partition: 1, Offset: 123}
msg = &message{Topic: "sometopic", Partition: 1, Offset: 123, Data: []byte("something")}
p.graph.callbacks["sometopic"] = func(ctx Context, msg interface{}) {
ctx.Emit("anothertopic", "key", "message")
}
Expand All @@ -208,7 +208,7 @@ func TestProcessor_process(t *testing.T) {
st.EXPECT().SetOffset(int64(322)),
consumer.EXPECT().Commit("sometopic", int32(1), int64(123)),
)
msg = &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123}
msg = &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123, Data: []byte("something")}
p.graph.callbacks["sometopic"] = func(ctx Context, msg interface{}) {
ctx.SetValue("message")
}
Expand Down Expand Up @@ -258,7 +258,7 @@ func TestProcessor_processFail(t *testing.T) {
consumer.EXPECT().Close().Do(func() { close(p.done) }),
producer.EXPECT().Close(),
)
msg := &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123}
msg := &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123, Data: []byte("something")}
p.graph.callbacks["sometopic"] = func(ctx Context, msg interface{}) {
ctx.SetValue("message")
}
Expand All @@ -281,7 +281,7 @@ func TestProcessor_processFail(t *testing.T) {
consumer.EXPECT().Close().Do(func() { close(p.done) }),
producer.EXPECT().Close(),
)
msg = &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123}
msg = &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123, Data: []byte("something")}
p.graph.callbacks["sometopic"] = func(ctx Context, msg interface{}) {
ctx.SetValue("message")
}
Expand All @@ -305,7 +305,7 @@ func TestProcessor_processFail(t *testing.T) {
consumer.EXPECT().Close().Do(func() { close(p.done) }),
producer.EXPECT().Close(),
)
msg = &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123}
msg = &message{Topic: "sometopic", Key: "key", Partition: 1, Offset: 123, Data: []byte("something")}
p.graph.callbacks["sometopic"] = func(ctx Context, msg interface{}) {
ctx.SetValue("message")
}
Expand Down Expand Up @@ -727,6 +727,7 @@ func TestProcessor_Start(t *testing.T) {
Partition: 1,
Offset: 1,
Key: "key",
Value: value,
}
err = syncWith(t, ch, 1) // with partition 1
ensure.Nil(t, err)
Expand Down Expand Up @@ -899,6 +900,7 @@ func TestProcessor_StartWithTable(t *testing.T) {
Partition: 1,
Offset: 1,
Key: "key",
Value: value,
}
err = syncWith(t, ch, 1)
ensure.Nil(t, err)
Expand Down

0 comments on commit 813e822

Please sign in to comment.