Skip to content

Commit

Permalink
feat(client): support goroutine-safety of pkg/var log.(LogStream Appe…
Browse files Browse the repository at this point in the history
…nder).Append Batch

This PR makes `kg/var log.(LogStream Appender).Append Batch` goroutine-safe.
  • Loading branch information
ijsong committed Jun 11, 2023
1 parent 4695075 commit 8765e09
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 24 deletions.
5 changes: 3 additions & 2 deletions pkg/varlog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/kakao/varlog/internal/storagenode"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/proto/varlogpb"
)
Expand Down Expand Up @@ -97,8 +98,8 @@ func WithTimeout(timeout time.Duration) AdminCallOption {

const (
defaultPipelineSize = 2
minPipelineSize = 1
maxPipelineSize = 8
minPipelineSize = storagenode.MinAppendPipelineSize
maxPipelineSize = storagenode.MaxAppendPipelineSize
)

type logStreamAppenderConfig struct {
Expand Down
72 changes: 53 additions & 19 deletions pkg/varlog/log_stream_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type LogStreamAppender interface {
// The only error from the AppendBatch is ErrClosed, which is returned when
// the LogStreamAppender is already closed. It returns nil even if the
// underlying stream is disconnected and notifies errors via callback.
//
// It is safe to have multiple goroutines calling AppendBatch
// simultaneously, but the order between them is not guaranteed.
AppendBatch(dataBatch [][]byte, callback BatchCallback) error
Expand All @@ -48,8 +49,9 @@ type LogStreamAppender interface {
type BatchCallback func([]varlogpb.LogEntryMeta, error)

type cbQueueEntry struct {
cb BatchCallback
err error
cb BatchCallback
data [][]byte
err error
}

func newCallbackQueueEntry() *cbQueueEntry {
Expand All @@ -71,8 +73,10 @@ type logStreamAppender struct {
logStreamAppenderConfig
stream snpb.LogIO_AppendClient
cancelFunc context.CancelCauseFunc
causeFunc func() error
sema chan struct{}
cbq chan *cbQueueEntry
sq chan *cbQueueEntry
rq chan *cbQueueEntry
wg sync.WaitGroup
closed struct {
xsync.RBMutex
Expand Down Expand Up @@ -110,10 +114,15 @@ func (v *logImpl) newLogStreamAppender(ctx context.Context, tpid types.TopicID,
logStreamAppenderConfig: cfg,
stream: stream,
sema: make(chan struct{}, cfg.pipelineSize),
cbq: make(chan *cbQueueEntry, cfg.pipelineSize),
sq: make(chan *cbQueueEntry, cfg.pipelineSize),
rq: make(chan *cbQueueEntry, cfg.pipelineSize),
cancelFunc: cancelFunc,
causeFunc: func() error {
return context.Cause(ctx)
},
}
lsa.wg.Add(1)
lsa.wg.Add(2)
go lsa.sendLoop()
go lsa.recvLoop()
return lsa, nil
}
Expand All @@ -128,23 +137,14 @@ func (lsa *logStreamAppender) AppendBatch(dataBatch [][]byte, callback BatchCall
lsa.sema <- struct{}{}

qe := newCallbackQueueEntry()
qe.data = dataBatch
qe.cb = callback

err := lsa.stream.Send(&snpb.AppendRequest{
TopicID: lsa.tpid,
LogStreamID: lsa.lsid,
Payload: dataBatch,
})
if err != nil {
_ = lsa.stream.CloseSend()
qe.err = err
}
lsa.cbq <- qe
lsa.sq <- qe
return nil
}

func (lsa *logStreamAppender) Close() {
lsa.cancelFunc(nil)
lsa.cancelFunc(ErrClosed)

lsa.closed.Lock()
defer lsa.closed.Unlock()
Expand All @@ -153,10 +153,39 @@ func (lsa *logStreamAppender) Close() {
}
lsa.closed.value = true

close(lsa.cbq)
close(lsa.sq)
lsa.wg.Wait()
}

func (lsa *logStreamAppender) sendLoop() {
defer func() {
close(lsa.rq)
lsa.wg.Done()
}()

var sendErr error
req := &snpb.AppendRequest{
TopicID: lsa.tpid,
LogStreamID: lsa.lsid,
}
for qe := range lsa.sq {
if sendErr == nil {
req.Payload = qe.data
sendErr = lsa.stream.Send(req)
if sendErr != nil {
if cause := lsa.causeFunc(); cause != nil {
sendErr = cause
}
_ = lsa.stream.CloseSend()
}
}
if sendErr != nil {
qe.err = sendErr
}
lsa.rq <- qe
}
}

func (lsa *logStreamAppender) recvLoop() {
defer lsa.wg.Done()

Expand All @@ -165,7 +194,7 @@ func (lsa *logStreamAppender) recvLoop() {
var cb BatchCallback
rsp := &snpb.AppendResponse{}

for qe := range lsa.cbq {
for qe := range lsa.rq {
meta = nil
err = qe.err
if err != nil {
Expand All @@ -188,6 +217,11 @@ func (lsa *logStreamAppender) recvLoop() {
break
}
Call:
if err != nil {
if cause := lsa.causeFunc(); cause != nil {
err = cause
}
}
if qe.cb != nil {
cb = qe.cb
} else {
Expand Down
35 changes: 32 additions & 3 deletions tests/it/cluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
. "github.com/smartystreets/goconvey/convey"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/testutil"
Expand Down Expand Up @@ -780,7 +778,7 @@ func TestLogStreamAppender(t *testing.T) {
cb := func(metas []varlogpb.LogEntryMeta, err error) {
called.Add(1)
if err != nil {
assert.Equal(t, codes.Canceled, status.Code(err))
assert.Equal(t, varlog.ErrClosed, err)
return
}
assert.NoError(t, err)
Expand Down Expand Up @@ -833,6 +831,37 @@ func TestLogStreamAppender(t *testing.T) {
}, 5*time.Second, 100*time.Millisecond)
},
},
{
name: "ConcurrentAppendBatch",
testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid, varlog.WithPipelineSize(pipelineSize))
require.NoError(t, err)
defer func() {
lsa.Close()
}()

var wg sync.WaitGroup
wg.Add(calls * 2)
expected := 0
dataBatch := [][]byte{[]byte("foo")}
cb := func(metas []varlogpb.LogEntryMeta, err error) {
defer wg.Done()
assert.NoError(t, err)
assert.Len(t, metas, 1)
expected++
assert.EqualValues(t, expected, metas[0].LLSN)
assert.EqualValues(t, expected, metas[0].GLSN)
}
for i := 0; i < calls; i++ {
go func() {
defer wg.Done()
err := lsa.AppendBatch(dataBatch, cb)
require.NoError(t, err)
}()
}
wg.Wait()
},
},
}

for _, tc := range tcs {
Expand Down

0 comments on commit 8765e09

Please sign in to comment.