Skip to content

Commit

Permalink
fix: otkafka processor error (close #135) (#136)
Browse files Browse the repository at this point in the history
* Update go.yml

* fix: close error

* test: length not zero
  • Loading branch information
GGXXLL authored May 25, 2021
1 parent a39ef2b commit 0c6f15c
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 98 deletions.
40 changes: 16 additions & 24 deletions otkafka/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package processor

import (
"context"
"sync"
"time"

"github.com/DoNewsCode/core/di"
Expand Down Expand Up @@ -109,7 +108,6 @@ func (e *Processor) addHandler(h Handler) error {
reader: reader,
handleFunc: h.Handle,
info: h.Info(),
once: sync.Once{},
}

batchHandler, isBatchHandler := h.(BatchHandler)
Expand Down Expand Up @@ -152,7 +150,6 @@ func (e *Processor) ProvideRunGroup(group *run.Group) {
return handler.read(ctx)
}, func(err error) {
cancel()
handler.close()
})
}

Expand All @@ -161,7 +158,6 @@ func (e *Processor) ProvideRunGroup(group *run.Group) {
return handler.handle(ctx)
}, func(err error) {
cancel()
handler.close()
})
}

Expand All @@ -171,11 +167,9 @@ func (e *Processor) ProvideRunGroup(group *run.Group) {
return handler.batch(ctx)
}, func(err error) {
cancel()
handler.close()
})
}
}

}

group.Add(func() error {
Expand All @@ -189,6 +183,13 @@ func (e *Processor) ProvideRunGroup(group *run.Group) {

}

// ProvideCloser implements container.CloserProvider for the Module.
func (e *Processor) ProvideCloser() {
for _, h := range e.handlers {
h.close()
}
}

// handler private processor
// todo It's a bit messy
type handler struct {
Expand All @@ -199,7 +200,6 @@ type handler struct {
batchFunc BatchFunc
info *Info
ticker *time.Ticker
once sync.Once
}

// read fetch message from kafka
Expand Down Expand Up @@ -294,29 +294,21 @@ func (h *handler) batch(ctx context.Context) error {
return err
}
case <-ctx.Done():
for v := range h.batchCh {
appendData(v)
}
if err := doFunc(); err != nil {
return err
}
return ctx.Err()
}
}
}

func (h *handler) close() {
h.once.Do(func() {
if h.msgCh != nil {
close(h.msgCh)
}
if h.batchCh != nil {
close(h.batchCh)
}
if h.ticker != nil {
h.ticker.Stop()
}
})
if h.msgCh != nil {
close(h.msgCh)
}
if h.batchCh != nil {
close(h.batchCh)
}
if h.ticker != nil {
h.ticker.Stop()
}
}

func (h *handler) commit(messages ...kafka.Message) error {
Expand Down
92 changes: 18 additions & 74 deletions otkafka/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func TestProcessor(t *testing.T) {
core.WithInline("cron.disable", "true"),
core.WithInline("log.level", "none"),
)
defer c.Shutdown()

c.ProvideEssentials()
c.Provide(otkafka.Providers())
Expand All @@ -213,102 +214,42 @@ func TestProcessor(t *testing.T) {
},
})
c.AddModuleFunc(New)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

err := c.Serve(ctx)
if err != nil {
t.Error(err)
}

var messageCount = 4

assert.Equal(t, messageCount, len(handlerA.data))
assert.Equal(t, messageCount, len(handlerB.data))
assert.Equal(t, messageCount, len(handlerC.data))
assert.Equal(t, messageCount, len(handlerD.data))
}

func TestProcessorGracefulShutdown(t *testing.T) {
c := core.New(
core.WithInline("kafka.reader.A.brokers", envDefaultKafkaAddrs),
core.WithInline("kafka.reader.A.topic", "processor"),
core.WithInline("kafka.reader.A.groupID", "testE"),
core.WithInline("kafka.reader.A.startOffset", kafka.FirstOffset),

core.WithInline("http.disable", "true"),
core.WithInline("grpc.disable", "true"),
core.WithInline("cron.disable", "true"),
core.WithInline("log.level", "none"),
)
c.ProvideEssentials()
c.Provide(otkafka.Providers())

handlerA := &testHandlerA{make(chan *testData, 100)}
defer close(handlerA.data)
c.Provide(di.Deps{
func() Out {
return NewOut(
handlerA,
)
},
})

c.AddModuleFunc(New)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
g := sync.WaitGroup{}

g.Add(1)
go func() {
err := c.Serve(ctx)
if err != nil {
t.Error(err)
}
g.Done()
}()

g.Add(1)

go func() {
defer g.Done()
count := 0
for {
select {
case <-ctx.Done():
return
case <-handlerA.data:
count++
if count >= 3 {
cancel()
return
}
}
}
}()
g.Wait()

assert.Equal(t, 1, len(handlerA.data))
assert.NotZero(t, len(handlerA.data))
assert.NotZero(t, len(handlerB.data))
assert.Equal(t, 0, len(handlerA.data)%3)
assert.Equal(t, 0, len(handlerB.data)%3)
assert.Equal(t, 4, len(handlerC.data))
assert.Equal(t, 4, len(handlerD.data))
}

func TestProcessorBatchInterval(t *testing.T) {
c := core.New(
core.WithInline("kafka.reader.default.brokers", envDefaultKafkaAddrs),
core.WithInline("kafka.reader.default.topic", "processor"),
core.WithInline("kafka.reader.default.groupID", "testF"),
core.WithInline("kafka.reader.default.groupID", "testE"),
core.WithInline("kafka.reader.default.startOffset", kafka.FirstOffset),

core.WithInline("http.disable", "true"),
core.WithInline("grpc.disable", "true"),
core.WithInline("cron.disable", "true"),
core.WithInline("log.level", "none"),
)
defer c.Shutdown()
c.ProvideEssentials()
c.Provide(otkafka.Providers())

handler := &testHandlerE{make(chan *testData, 100)}
defer close(handler.data)
defer func() {
close(handler.data)
}()

c.Provide(di.Deps{
func() Out {
return NewOut(
Expand All @@ -318,7 +259,6 @@ func TestProcessorBatchInterval(t *testing.T) {
})

c.AddModuleFunc(New)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

Expand Down Expand Up @@ -356,18 +296,22 @@ func TestProcessorBatchError(t *testing.T) {
c := core.New(
core.WithInline("kafka.reader.default.brokers", envDefaultKafkaAddrs),
core.WithInline("kafka.reader.default.topic", "processor"),
core.WithInline("kafka.reader.default.groupID", "testG"),
core.WithInline("kafka.reader.default.groupID", "testF"),
core.WithInline("kafka.reader.default.startOffset", kafka.FirstOffset),

core.WithInline("http.disable", "true"),
core.WithInline("grpc.disable", "true"),
core.WithInline("cron.disable", "true"),
core.WithInline("log.level", "none"),
)
defer c.Shutdown()
c.ProvideEssentials()
c.Provide(otkafka.Providers())

handler := &testHandlerF{make(chan *testData, 100)}
defer func() {
close(handler.data)
}()
c.Provide(di.Deps{
func() Out {
return NewOut(
Expand Down

0 comments on commit 0c6f15c

Please sign in to comment.