diff --git a/pkg/varlog/log.go b/pkg/varlog/log.go index e4f7747f4..ebc76073a 100644 --- a/pkg/varlog/log.go +++ b/pkg/varlog/log.go @@ -198,5 +198,6 @@ func (v *logImpl) Close() (err error) { err = e } v.closed.Store(true) + v.runner.Stop() return } diff --git a/pkg/varlog/log_stream_appender.go b/pkg/varlog/log_stream_appender.go index 689aca814..cae11e0c3 100644 --- a/pkg/varlog/log_stream_appender.go +++ b/pkg/varlog/log_stream_appender.go @@ -39,8 +39,6 @@ type LogStreamAppender interface { // calling AppendBatch will fail immediately. If AppendBatch still waits // for room of pipeline, Close will be blocked. It also waits for all // pending callbacks to be called. - // It's important for users to avoid calling Close within the callback - // function, as it may cause indefinite blocking. Close() } @@ -52,6 +50,7 @@ type cbQueueEntry struct { cb BatchCallback data [][]byte err error + meta []varlogpb.LogEntryMeta } func newCallbackQueueEntry() *cbQueueEntry { @@ -77,6 +76,7 @@ type logStreamAppender struct { sema chan struct{} sq chan *cbQueueEntry rq chan *cbQueueEntry + cq chan *cbQueueEntry wg sync.WaitGroup closed struct { xsync.RBMutex @@ -116,11 +116,18 @@ func (v *logImpl) newLogStreamAppender(ctx context.Context, tpid types.TopicID, sema: make(chan struct{}, cfg.pipelineSize), sq: make(chan *cbQueueEntry, cfg.pipelineSize), rq: make(chan *cbQueueEntry, cfg.pipelineSize), + cq: make(chan *cbQueueEntry, cfg.pipelineSize), cancelFunc: cancelFunc, causeFunc: func() error { return context.Cause(ctx) }, } + _, err = v.runner.Run(func(context.Context) { + lsa.callbackLoop() + }) + if err != nil { + return nil, fmt.Errorf("client: %w", err) + } lsa.wg.Add(2) go lsa.sendLoop() go lsa.recvLoop() @@ -187,15 +194,16 @@ func (lsa *logStreamAppender) sendLoop() { } func (lsa *logStreamAppender) recvLoop() { - defer lsa.wg.Done() + defer func() { + close(lsa.cq) + lsa.wg.Done() + }() - var err error - var meta []varlogpb.LogEntryMeta - var cb BatchCallback rsp := &snpb.AppendResponse{} - for qe := range lsa.rq { - meta = nil + var err error + var meta []varlogpb.LogEntryMeta + err = qe.err if err != nil { goto Call @@ -217,18 +225,27 @@ func (lsa *logStreamAppender) recvLoop() { break } Call: + qe.meta = meta if err != nil { if cause := lsa.causeFunc(); cause != nil { err = cause } + qe.err = err } + lsa.cq <- qe + } +} + +func (lsa *logStreamAppender) callbackLoop() { + for qe := range lsa.cq { + var cb BatchCallback if qe.cb != nil { cb = qe.cb } else { cb = lsa.defaultBatchCallback } if cb != nil { - cb(meta, err) + cb(qe.meta, qe.err) } <-lsa.sema } diff --git a/tests/it/cluster/client_test.go b/tests/it/cluster/client_test.go index 0ca154762..4a4dde56e 100644 --- a/tests/it/cluster/client_test.go +++ b/tests/it/cluster/client_test.go @@ -733,12 +733,12 @@ func TestLogStreamAppender(t *testing.T) { name: "CloseAfterProcessingCallbacks", testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { var ( - llsn types.LLSN - glsn types.GLSN - called atomic.Int32 + llsn types.LLSN + glsn types.GLSN + wg sync.WaitGroup ) cb := func(metas []varlogpb.LogEntryMeta, err error) { - called.Add(1) + defer wg.Done() assert.NoError(t, err) assert.Len(t, metas, batchSize) assert.Less(t, llsn, metas[batchSize-1].LLSN) @@ -750,10 +750,11 @@ func TestLogStreamAppender(t *testing.T) { lsa, err := vcli.NewLogStreamAppender(tpid, lsid, varlog.WithPipelineSize(pipelineSize)) require.NoError(t, err) defer func() { + wg.Wait() lsa.Close() - require.EqualValues(t, calls, called.Load()) }() + wg.Add(calls) for i := 0; i < calls; i++ { data := make([][]byte, batchSize) for j := 0; j < batchSize; j++ { @@ -762,21 +763,18 @@ func TestLogStreamAppender(t *testing.T) { err := lsa.AppendBatch(data, cb) require.NoError(t, err) } - require.Eventually(t, func() bool { - return called.Load() == calls - }, 5*time.Second, 100*time.Millisecond) }, }, { name: "CloseWhileProcessingCallbacks", testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { var ( - llsn types.LLSN - glsn types.GLSN - called atomic.Int32 + llsn types.LLSN + glsn types.GLSN + wg sync.WaitGroup ) cb := func(metas []varlogpb.LogEntryMeta, err error) { - called.Add(1) + defer wg.Done() if err != nil { assert.Equal(t, varlog.ErrClosed, err) return @@ -793,9 +791,10 @@ func TestLogStreamAppender(t *testing.T) { require.NoError(t, err) defer func() { lsa.Close() - require.EqualValues(t, calls, called.Load()) + wg.Wait() }() + wg.Add(calls) for i := 0; i < calls; i++ { data := make([][]byte, batchSize) for j := 0; j < batchSize; j++ { @@ -804,9 +803,32 @@ func TestLogStreamAppender(t *testing.T) { err := lsa.AppendBatch(data, cb) require.NoError(t, err) } - require.Eventually(t, func() bool { - return called.Load() > 0 - }, time.Second, 10*time.Millisecond) + }, + }, + { + name: "CloseInCallback", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + lsa, err := vcli.NewLogStreamAppender(tpid, lsid, varlog.WithPipelineSize(pipelineSize)) + require.NoError(t, err) + defer func() { + lsa.Close() + }() + + var wg sync.WaitGroup + dataBatch := [][]byte{[]byte("foo")} + wg.Add(1) + err = lsa.AppendBatch(dataBatch, func(_ []varlogpb.LogEntryMeta, err error) { + defer wg.Done() + assert.NoError(t, err) + lsa.Close() + }) + require.NoError(t, err) + wg.Wait() + + err = lsa.AppendBatch(dataBatch, func([]varlogpb.LogEntryMeta, error) { + assert.Fail(t, "unexpected callback") + }) + require.Error(t, err) }, }, {