Skip to content

Commit

Permalink
feat(client): can call Close at callback in LogStreamAppender
Browse files Browse the repository at this point in the history
  • Loading branch information
ijsong committed Jun 12, 2023
1 parent 8765e09 commit 869135d
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 25 deletions.
1 change: 1 addition & 0 deletions pkg/varlog/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,6 @@ func (v *logImpl) Close() (err error) {
err = e
}
v.closed.Store(true)
v.runner.Stop()
return
}
35 changes: 26 additions & 9 deletions pkg/varlog/log_stream_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ type LogStreamAppender interface {
// calling AppendBatch will fail immediately. If AppendBatch still waits
// for room of pipeline, Close will be blocked. It also waits for all
// pending callbacks to be called.
// It's important for users to avoid calling Close within the callback
// function, as it may cause indefinite blocking.
Close()
}

Expand All @@ -52,6 +50,7 @@ type cbQueueEntry struct {
cb BatchCallback
data [][]byte
err error
meta []varlogpb.LogEntryMeta
}

func newCallbackQueueEntry() *cbQueueEntry {
Expand All @@ -77,6 +76,7 @@ type logStreamAppender struct {
sema chan struct{}
sq chan *cbQueueEntry
rq chan *cbQueueEntry
cq chan *cbQueueEntry
wg sync.WaitGroup
closed struct {
xsync.RBMutex
Expand Down Expand Up @@ -116,11 +116,18 @@ func (v *logImpl) newLogStreamAppender(ctx context.Context, tpid types.TopicID,
sema: make(chan struct{}, cfg.pipelineSize),
sq: make(chan *cbQueueEntry, cfg.pipelineSize),
rq: make(chan *cbQueueEntry, cfg.pipelineSize),
cq: make(chan *cbQueueEntry, cfg.pipelineSize),
cancelFunc: cancelFunc,
causeFunc: func() error {
return context.Cause(ctx)
},
}
_, err = v.runner.Run(func(context.Context) {
lsa.callbackLoop()
})
if err != nil {
return nil, fmt.Errorf("client: %w", err)
}
lsa.wg.Add(2)
go lsa.sendLoop()
go lsa.recvLoop()
Expand Down Expand Up @@ -187,15 +194,16 @@ func (lsa *logStreamAppender) sendLoop() {
}

func (lsa *logStreamAppender) recvLoop() {
defer lsa.wg.Done()
defer func() {
close(lsa.cq)
lsa.wg.Done()
}()

var err error
var meta []varlogpb.LogEntryMeta
var cb BatchCallback
rsp := &snpb.AppendResponse{}

for qe := range lsa.rq {
meta = nil
var err error
var meta []varlogpb.LogEntryMeta

err = qe.err
if err != nil {
goto Call
Expand All @@ -217,18 +225,27 @@ func (lsa *logStreamAppender) recvLoop() {
break
}
Call:
qe.meta = meta
if err != nil {
if cause := lsa.causeFunc(); cause != nil {
err = cause
}
qe.err = err
}
lsa.cq <- qe
}
}

func (lsa *logStreamAppender) callbackLoop() {
for qe := range lsa.cq {
var cb BatchCallback
if qe.cb != nil {
cb = qe.cb
} else {
cb = lsa.defaultBatchCallback
}
if cb != nil {
cb(meta, err)
cb(qe.meta, qe.err)
}
<-lsa.sema
}
Expand Down
54 changes: 38 additions & 16 deletions tests/it/cluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -733,12 +733,12 @@ func TestLogStreamAppender(t *testing.T) {
name: "CloseAfterProcessingCallbacks",
testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) {
var (
llsn types.LLSN
glsn types.GLSN
called atomic.Int32
llsn types.LLSN
glsn types.GLSN
wg sync.WaitGroup
)
cb := func(metas []varlogpb.LogEntryMeta, err error) {
called.Add(1)
defer wg.Done()
assert.NoError(t, err)
assert.Len(t, metas, batchSize)
assert.Less(t, llsn, metas[batchSize-1].LLSN)
Expand All @@ -750,10 +750,11 @@ func TestLogStreamAppender(t *testing.T) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid, varlog.WithPipelineSize(pipelineSize))
require.NoError(t, err)
defer func() {
wg.Wait()
lsa.Close()
require.EqualValues(t, calls, called.Load())
}()

wg.Add(calls)
for i := 0; i < calls; i++ {
data := make([][]byte, batchSize)
for j := 0; j < batchSize; j++ {
Expand All @@ -762,21 +763,18 @@ func TestLogStreamAppender(t *testing.T) {
err := lsa.AppendBatch(data, cb)
require.NoError(t, err)
}
require.Eventually(t, func() bool {
return called.Load() == calls
}, 5*time.Second, 100*time.Millisecond)
},
},
{
name: "CloseWhileProcessingCallbacks",
testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) {
var (
llsn types.LLSN
glsn types.GLSN
called atomic.Int32
llsn types.LLSN
glsn types.GLSN
wg sync.WaitGroup
)
cb := func(metas []varlogpb.LogEntryMeta, err error) {
called.Add(1)
defer wg.Done()
if err != nil {
assert.Equal(t, varlog.ErrClosed, err)
return
Expand All @@ -793,9 +791,10 @@ func TestLogStreamAppender(t *testing.T) {
require.NoError(t, err)
defer func() {
lsa.Close()
require.EqualValues(t, calls, called.Load())
wg.Wait()
}()

wg.Add(calls)
for i := 0; i < calls; i++ {
data := make([][]byte, batchSize)
for j := 0; j < batchSize; j++ {
Expand All @@ -804,9 +803,32 @@ func TestLogStreamAppender(t *testing.T) {
err := lsa.AppendBatch(data, cb)
require.NoError(t, err)
}
require.Eventually(t, func() bool {
return called.Load() > 0
}, time.Second, 10*time.Millisecond)
},
},
{
name: "CloseInCallback",
testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) {
lsa, err := vcli.NewLogStreamAppender(tpid, lsid, varlog.WithPipelineSize(pipelineSize))
require.NoError(t, err)
defer func() {
lsa.Close()
}()

var wg sync.WaitGroup
dataBatch := [][]byte{[]byte("foo")}
wg.Add(1)
err = lsa.AppendBatch(dataBatch, func(_ []varlogpb.LogEntryMeta, err error) {
defer wg.Done()
assert.NoError(t, err)
lsa.Close()
})
require.NoError(t, err)
wg.Wait()

err = lsa.AppendBatch(dataBatch, func([]varlogpb.LogEntryMeta, error) {
assert.Fail(t, "unexpected callback")
})
require.Error(t, err)
},
},
{
Expand Down

0 comments on commit 869135d

Please sign in to comment.