diff --git a/pkg/varlog/config.go b/pkg/varlog/config.go index 83f1959be..5c6032503 100644 --- a/pkg/varlog/config.go +++ b/pkg/varlog/config.go @@ -4,6 +4,7 @@ import ( "context" "time" + "github.com/kakao/varlog/internal/storagenode" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/proto/varlogpb" ) @@ -97,8 +98,8 @@ func WithTimeout(timeout time.Duration) AdminCallOption { const ( defaultPipelineSize = 2 - minPipelineSize = 1 - maxPipelineSize = 8 + minPipelineSize = storagenode.MinAppendPipelineSize + maxPipelineSize = storagenode.MaxAppendPipelineSize ) type logStreamAppenderConfig struct { diff --git a/pkg/varlog/log_stream_appender.go b/pkg/varlog/log_stream_appender.go index 8a4135c29..689aca814 100644 --- a/pkg/varlog/log_stream_appender.go +++ b/pkg/varlog/log_stream_appender.go @@ -30,6 +30,7 @@ type LogStreamAppender interface { // The only error from the AppendBatch is ErrClosed, which is returned when // the LogStreamAppender is already closed. It returns nil even if the // underlying stream is disconnected and notifies errors via callback. + // // It is safe to have multiple goroutines calling AppendBatch // simultaneously, but the order between them is not guaranteed. AppendBatch(dataBatch [][]byte, callback BatchCallback) error @@ -48,8 +49,9 @@ type LogStreamAppender interface { type BatchCallback func([]varlogpb.LogEntryMeta, error) type cbQueueEntry struct { - cb BatchCallback - err error + cb BatchCallback + data [][]byte + err error } func newCallbackQueueEntry() *cbQueueEntry { @@ -71,8 +73,10 @@ type logStreamAppender struct { logStreamAppenderConfig stream snpb.LogIO_AppendClient cancelFunc context.CancelCauseFunc + causeFunc func() error sema chan struct{} - cbq chan *cbQueueEntry + sq chan *cbQueueEntry + rq chan *cbQueueEntry wg sync.WaitGroup closed struct { xsync.RBMutex @@ -110,10 +114,15 @@ func (v *logImpl) newLogStreamAppender(ctx context.Context, tpid types.TopicID, logStreamAppenderConfig: cfg, stream: stream, sema: make(chan struct{}, cfg.pipelineSize), - cbq: make(chan *cbQueueEntry, cfg.pipelineSize), + sq: make(chan *cbQueueEntry, cfg.pipelineSize), + rq: make(chan *cbQueueEntry, cfg.pipelineSize), cancelFunc: cancelFunc, + causeFunc: func() error { + return context.Cause(ctx) + }, } - lsa.wg.Add(1) + lsa.wg.Add(2) + go lsa.sendLoop() go lsa.recvLoop() return lsa, nil } @@ -128,23 +137,14 @@ func (lsa *logStreamAppender) AppendBatch(dataBatch [][]byte, callback BatchCall lsa.sema <- struct{}{} qe := newCallbackQueueEntry() + qe.data = dataBatch qe.cb = callback - - err := lsa.stream.Send(&snpb.AppendRequest{ - TopicID: lsa.tpid, - LogStreamID: lsa.lsid, - Payload: dataBatch, - }) - if err != nil { - _ = lsa.stream.CloseSend() - qe.err = err - } - lsa.cbq <- qe + lsa.sq <- qe return nil } func (lsa *logStreamAppender) Close() { - lsa.cancelFunc(nil) + lsa.cancelFunc(ErrClosed) lsa.closed.Lock() defer lsa.closed.Unlock() @@ -153,10 +153,39 @@ func (lsa *logStreamAppender) Close() { } lsa.closed.value = true - close(lsa.cbq) + close(lsa.sq) lsa.wg.Wait() } +func (lsa *logStreamAppender) sendLoop() { + defer func() { + close(lsa.rq) + lsa.wg.Done() + }() + + var sendErr error + req := &snpb.AppendRequest{ + TopicID: lsa.tpid, + LogStreamID: lsa.lsid, + } + for qe := range lsa.sq { + if sendErr == nil { + req.Payload = qe.data + sendErr = lsa.stream.Send(req) + if sendErr != nil { + if cause := lsa.causeFunc(); cause != nil { + sendErr = cause + } + _ = lsa.stream.CloseSend() + } + } + if sendErr != nil { + qe.err = sendErr + } + lsa.rq <- qe + } +} + func (lsa *logStreamAppender) recvLoop() { defer lsa.wg.Done() @@ -165,7 +194,7 @@ func (lsa *logStreamAppender) recvLoop() { var cb BatchCallback rsp := &snpb.AppendResponse{} - for qe := range lsa.cbq { + for qe := range lsa.rq { meta = nil err = qe.err if err != nil { @@ -188,6 +217,11 @@ func (lsa *logStreamAppender) recvLoop() { break } Call: + if err != nil { + if cause := lsa.causeFunc(); cause != nil { + err = cause + } + } if qe.cb != nil { cb = qe.cb } else { diff --git a/tests/it/cluster/client_test.go b/tests/it/cluster/client_test.go index 095f57457..0ca154762 100644 --- a/tests/it/cluster/client_test.go +++ b/tests/it/cluster/client_test.go @@ -12,8 +12,6 @@ import ( . "github.com/smartystreets/goconvey/convey" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" "github.com/kakao/varlog/pkg/types" "github.com/kakao/varlog/pkg/util/testutil" @@ -780,7 +778,7 @@ func TestLogStreamAppender(t *testing.T) { cb := func(metas []varlogpb.LogEntryMeta, err error) { called.Add(1) if err != nil { - assert.Equal(t, codes.Canceled, status.Code(err)) + assert.Equal(t, varlog.ErrClosed, err) return } assert.NoError(t, err) @@ -833,6 +831,37 @@ func TestLogStreamAppender(t *testing.T) { }, 5*time.Second, 100*time.Millisecond) }, }, + { + name: "ConcurrentAppendBatch", + testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) { + lsa, err := vcli.NewLogStreamAppender(tpid, lsid, varlog.WithPipelineSize(pipelineSize)) + require.NoError(t, err) + defer func() { + lsa.Close() + }() + + var wg sync.WaitGroup + wg.Add(calls * 2) + expected := 0 + dataBatch := [][]byte{[]byte("foo")} + cb := func(metas []varlogpb.LogEntryMeta, err error) { + defer wg.Done() + assert.NoError(t, err) + assert.Len(t, metas, 1) + expected++ + assert.EqualValues(t, expected, metas[0].LLSN) + assert.EqualValues(t, expected, metas[0].GLSN) + } + for i := 0; i < calls; i++ { + go func() { + defer wg.Done() + err := lsa.AppendBatch(dataBatch, cb) + require.NoError(t, err) + }() + } + wg.Wait() + }, + }, } for _, tc := range tcs {