Skip to content

Commit

Permalink
fix: single Handler's message missing commit
Browse files Browse the repository at this point in the history
  • Loading branch information
GGXXLL committed May 19, 2021
1 parent e591f09 commit bd8393c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 28 deletions.
31 changes: 17 additions & 14 deletions otkafka/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,27 +100,23 @@ func (e *Processor) addHandler(h Handler) error {
return err
}

if reader.Config().GroupID == ""{
return errors.New("")
}

batchFunc := func(ctx context.Context, data []interface{}) error {
return nil
}
batchHandler, isBatchHandler := h.(BatchHandler)
if isBatchHandler {
batchFunc = batchHandler.Batch
if reader.Config().GroupID == "" {
return errors.New("kafka reader must ")
}

var hd = &handler{
msgCh: make(chan *kafka.Message, h.Info().chanSize()),
reader: reader,
handleFunc: h.Handle,
batchFunc: batchFunc,
info: h.Info(),
once: sync.Once{},
batchCh: make(chan *batchInfo, h.Info().chanSize()),
ticker: time.NewTicker(h.Info().autoBatchInterval()),
}

batchHandler, isBatchHandler := h.(BatchHandler)
if isBatchHandler {
hd.batchFunc = batchHandler.Batch
hd.batchCh = make(chan *batchInfo, h.Info().chanSize())
hd.ticker = time.NewTicker(h.Info().autoBatchInterval())
}

e.handlers = append(e.handlers, hd)
Expand Down Expand Up @@ -234,7 +230,14 @@ func (h *handler) handle(ctx context.Context) error {
if err != nil {
return err
}
h.batchCh <- &batchInfo{message: msg, data: v}
if h.batchCh != nil {
h.batchCh <- &batchInfo{message: msg, data: v}
}
if h.batchFunc == nil {
if err := h.commit(*msg); err != nil {
return err
}
}
case <-ctx.Done():
return ctx.Err()
}
Expand Down
28 changes: 14 additions & 14 deletions otkafka/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,22 +166,22 @@ func TestProcessor(t *testing.T) {
core.WithInline("kafka.reader.A.brokers", envDefaultKafkaAddrs),
core.WithInline("kafka.reader.A.topic", "processor"),
core.WithInline("kafka.reader.A.groupID", "testA"),
core.WithInline("kafka.reader.A.startOffset", kafka.LastOffset),
core.WithInline("kafka.reader.A.startOffset", kafka.FirstOffset),

core.WithInline("kafka.reader.B.brokers", envDefaultKafkaAddrs),
core.WithInline("kafka.reader.B.topic", "processor"),
core.WithInline("kafka.reader.B.groupID", "testB"),
core.WithInline("kafka.reader.B.startOffset", kafka.LastOffset),
core.WithInline("kafka.reader.B.startOffset", kafka.FirstOffset),

core.WithInline("kafka.reader.C.brokers", envDefaultKafkaAddrs),
core.WithInline("kafka.reader.C.topic", "processor"),
core.WithInline("kafka.reader.C.groupID", "testC"),
core.WithInline("kafka.reader.C.startOffset", kafka.LastOffset),
core.WithInline("kafka.reader.C.startOffset", kafka.FirstOffset),

core.WithInline("kafka.reader.D.brokers", envDefaultKafkaAddrs),
core.WithInline("kafka.reader.D.topic", "processor"),
core.WithInline("kafka.reader.D.groupID", "testD"),
core.WithInline("kafka.reader.D.startOffset", kafka.LastOffset),
core.WithInline("kafka.reader.D.startOffset", kafka.FirstOffset),

core.WithInline("http.disable", "true"),
core.WithInline("grpc.disable", "true"),
Expand All @@ -191,10 +191,10 @@ func TestProcessor(t *testing.T) {

c.ProvideEssentials()
c.Provide(otkafka.Providers())
handlerA := &testHandlerA{make(chan *testData, 10)}
handlerB := &testHandlerB{make(chan *testData, 10)}
handlerC := &testHandlerC{make(chan *testData, 10)}
handlerD := &testHandlerD{make(chan *testData, 10)}
handlerA := &testHandlerA{make(chan *testData, 100)}
handlerB := &testHandlerB{make(chan *testData, 100)}
handlerC := &testHandlerC{make(chan *testData, 100)}
handlerD := &testHandlerD{make(chan *testData, 100)}
defer func() {
close(handlerA.data)
close(handlerB.data)
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestProcessorGracefulShutdown(t *testing.T) {
core.WithInline("kafka.reader.A.brokers", envDefaultKafkaAddrs),
core.WithInline("kafka.reader.A.topic", "processor"),
core.WithInline("kafka.reader.A.groupID", "testE"),
core.WithInline("kafka.reader.A.startOffset", kafka.LastOffset),
core.WithInline("kafka.reader.A.startOffset", kafka.FirstOffset),

core.WithInline("http.disable", "true"),
core.WithInline("grpc.disable", "true"),
Expand All @@ -245,7 +245,7 @@ func TestProcessorGracefulShutdown(t *testing.T) {
c.ProvideEssentials()
c.Provide(otkafka.Providers())

handlerA := &testHandlerA{make(chan *testData, 10)}
handlerA := &testHandlerA{make(chan *testData, 100)}
defer close(handlerA.data)
c.Provide(di.Deps{
func() Out {
Expand Down Expand Up @@ -297,7 +297,7 @@ func TestProcessorBatchInterval(t *testing.T) {
core.WithInline("kafka.reader.default.brokers", envDefaultKafkaAddrs),
core.WithInline("kafka.reader.default.topic", "processor"),
core.WithInline("kafka.reader.default.groupID", "testF"),
core.WithInline("kafka.reader.default.startOffset", kafka.LastOffset),
core.WithInline("kafka.reader.default.startOffset", kafka.FirstOffset),

core.WithInline("http.disable", "true"),
core.WithInline("grpc.disable", "true"),
Expand All @@ -307,7 +307,7 @@ func TestProcessorBatchInterval(t *testing.T) {
c.ProvideEssentials()
c.Provide(otkafka.Providers())

handler := &testHandlerE{make(chan *testData, 10)}
handler := &testHandlerE{make(chan *testData, 100)}
defer close(handler.data)
c.Provide(di.Deps{
func() Out {
Expand Down Expand Up @@ -357,7 +357,7 @@ func TestProcessorBatchError(t *testing.T) {
core.WithInline("kafka.reader.default.brokers", envDefaultKafkaAddrs),
core.WithInline("kafka.reader.default.topic", "processor"),
core.WithInline("kafka.reader.default.groupID", "testG"),
core.WithInline("kafka.reader.default.startOffset", kafka.LastOffset),
core.WithInline("kafka.reader.default.startOffset", kafka.FirstOffset),

core.WithInline("http.disable", "true"),
core.WithInline("grpc.disable", "true"),
Expand All @@ -367,7 +367,7 @@ func TestProcessorBatchError(t *testing.T) {
c.ProvideEssentials()
c.Provide(otkafka.Providers())

handler := &testHandlerF{make(chan *testData, 10)}
handler := &testHandlerF{make(chan *testData, 100)}
c.Provide(di.Deps{
func() Out {
return NewOut(
Expand Down

0 comments on commit bd8393c

Please sign in to comment.