Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(client): add call timeout to log stream appender #474

Merged
merged 1 commit into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions pkg/varlog/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ type logStreamAppenderConfig struct {
tpid types.TopicID
lsid types.LogStreamID
pipelineSize int
callTimeout time.Duration
}

func newLogStreamAppenderConfig(opts []LogStreamAppenderOption) logStreamAppenderConfig {
Expand Down Expand Up @@ -164,3 +165,18 @@ func WithDefaultBatchCallback(defaultBatchCallback BatchCallback) LogStreamAppen
cfg.defaultBatchCallback = defaultBatchCallback
})
}

// WithCallTimeout configures a timeout for each AppendBatch call. If the
// timeout has elapsed, the AppendBatch and callback functions may result in an
// ErrCallTimeout error.
//
// ErrCallTimeout may be returned in the following scenarios:
// - Waiting for the pipeline too long since it is full.
// - Sending RPC requests to the varlog is blocked for too long.
// - Receiving RPC response from the varlog is blocked too long.
// - User codes for callback take time too long.
func WithCallTimeout(callTimeout time.Duration) LogStreamAppenderOption {
return newFuncLogStreamAppenderOption(func(cfg *logStreamAppenderConfig) {
cfg.callTimeout = callTimeout
})
}
5 changes: 4 additions & 1 deletion pkg/varlog/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ package varlog

import "errors"

var ErrClosed = errors.New("client: closed")
var (
ErrClosed = errors.New("client: closed")
ErrCallTimeout = errors.New("client: call timeout")
)
115 changes: 84 additions & 31 deletions pkg/varlog/log_stream_appender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/puzpuzpuz/xsync/v2"

Expand All @@ -20,16 +21,20 @@ type LogStreamAppender interface {
// Users can call this method without being blocked until the pipeline of
// the LogStreamAppender is full. If the pipeline of the LogStreamAppender
// is already full, it may become blocked. However, the process will
// continue once a response is received from the storage node.
// continue once a response is received from the storage node. A long block
// duration with a configured WithCallTimeout can cause ErrCallTimeout to
// occur.
//
// On completion of AppendBatch, the argument callback provided by users
// will be invoked. All callback functions registered to the same
// LogStreamAppender will be called by the same goroutine sequentially.
// Therefore, the callback should be lightweight. If heavy work is
// necessary for the callback, it would be better to use separate worker
// goroutines.
// The only error from the AppendBatch is ErrClosed, which is returned when
// the LogStreamAppender is already closed. It returns nil even if the
// underlying stream is disconnected and notifies errors via callback.
// Once the stream in the LogStreamAppender is either done or broken, the
// AppendBatch returns an error. It returns an ErrClosed when the
// LogStreamAppender is closed and an ErrCallTimeout when the call timeout
// expires.
//
// It is safe to have multiple goroutines calling AppendBatch
// simultaneously, but the order between them is not guaranteed.
Expand All @@ -47,10 +52,11 @@ type LogStreamAppender interface {
type BatchCallback func([]varlogpb.LogEntryMeta, error)

type cbQueueEntry struct {
cb BatchCallback
data [][]byte
err error
meta []varlogpb.LogEntryMeta
cb BatchCallback
data [][]byte
err error
meta []varlogpb.LogEntryMeta
expireTime time.Time
}

func newCallbackQueueEntry() *cbQueueEntry {
Expand Down Expand Up @@ -141,11 +147,28 @@ func (lsa *logStreamAppender) AppendBatch(dataBatch [][]byte, callback BatchCall
return ErrClosed
}

lsa.sema <- struct{}{}
if err := lsa.causeFunc(); err != nil {
return err
}

now := time.Now()
if lsa.callTimeout > 0 {
timer := time.NewTimer(lsa.callTimeout)
defer timer.Stop()
select {
case lsa.sema <- struct{}{}:
case <-timer.C:
lsa.cancelFunc(ErrCallTimeout)
return ErrCallTimeout
}
} else {
lsa.sema <- struct{}{}
}

qe := newCallbackQueueEntry()
qe.data = dataBatch
qe.cb = callback
qe.expireTime = now.Add(lsa.callTimeout)
lsa.sq <- qe
return nil
}
Expand Down Expand Up @@ -178,7 +201,20 @@ func (lsa *logStreamAppender) sendLoop() {
for qe := range lsa.sq {
if sendErr == nil {
req.Payload = qe.data

var wg sync.WaitGroup
var watchdog *time.Timer
if lsa.callTimeout > 0 {
wg.Add(1)
watchdog = time.AfterFunc(time.Until(qe.expireTime), func() {
defer wg.Done()
lsa.cancelFunc(ErrCallTimeout)
})
}
sendErr = lsa.stream.Send(req)
if watchdog != nil && !watchdog.Stop() {
wg.Wait()
}
if sendErr != nil {
if cause := lsa.causeFunc(); cause != nil {
sendErr = cause
Expand All @@ -199,38 +235,55 @@ func (lsa *logStreamAppender) recvLoop() {
lsa.wg.Done()
}()

var recvErr error
rsp := &snpb.AppendResponse{}
for qe := range lsa.rq {
var err error
var meta []varlogpb.LogEntryMeta
if recvErr == nil {
var wg sync.WaitGroup
var watchdog *time.Timer
var meta []varlogpb.LogEntryMeta

err = qe.err
if err != nil {
goto Call
}
recvErr = qe.err
if recvErr != nil {
goto Call
}

rsp.Reset()
err = lsa.stream.RecvMsg(rsp)
if err != nil {
goto Call
}
if lsa.callTimeout > 0 {
wg.Add(1)
watchdog = time.AfterFunc(time.Until(qe.expireTime), func() {
defer wg.Done()
lsa.cancelFunc(ErrCallTimeout)
})
}

rsp.Reset()
recvErr = lsa.stream.RecvMsg(rsp)
if watchdog != nil && !watchdog.Stop() {
wg.Wait()
}
if recvErr != nil {
goto Call
}

meta = make([]varlogpb.LogEntryMeta, len(rsp.Results))
for idx, res := range rsp.Results {
if len(res.Error) == 0 {
meta[idx] = res.Meta
continue
meta = make([]varlogpb.LogEntryMeta, len(rsp.Results))
for idx, res := range rsp.Results {
if len(res.Error) == 0 {
meta[idx] = res.Meta
continue
}
recvErr = errors.New(res.Error)
lsa.cancelFunc(recvErr)
break
}
err = errors.New(res.Error)
break
qe.meta = meta
}

Call:
qe.meta = meta
if err != nil {
if recvErr != nil {
if cause := lsa.causeFunc(); cause != nil {
err = cause
recvErr = cause
}
qe.err = err
qe.err = recvErr
}
lsa.cq <- qe
}
Expand Down
74 changes: 74 additions & 0 deletions tests/it/cluster/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,80 @@ func TestLogStreamAppender(t *testing.T) {
wg.Wait()
},
},
{
name: "CallTimeoutCausedBySemaphore",
testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) {
const (
callTimeout = 500 * time.Millisecond
blockedPipelineSize = 1
)

lsa, err := vcli.NewLogStreamAppender(tpid, lsid,
varlog.WithPipelineSize(blockedPipelineSize),
varlog.WithCallTimeout(callTimeout),
)
require.NoError(t, err)
defer lsa.Close()

var wg sync.WaitGroup
dataBatch := [][]byte{[]byte("foo")}

wg.Add(1)
err = lsa.AppendBatch(dataBatch, func(_ []varlogpb.LogEntryMeta, err error) {
defer wg.Done()
assert.NoError(t, err)
time.Sleep(callTimeout * 2)
})
require.NoError(t, err)

err = lsa.AppendBatch(dataBatch, func([]varlogpb.LogEntryMeta, error) {
assert.Fail(t, "unexpected callback")
})
require.Error(t, err)
require.Equal(t, varlog.ErrCallTimeout, err)

wg.Wait()
},
},
{
name: "CallTimeoutCausedSlowCallback",
testf: func(t *testing.T, tpid types.TopicID, lsid types.LogStreamID, vcli varlog.Log) {
const callTimeout = 500 * time.Millisecond

lsa, err := vcli.NewLogStreamAppender(tpid, lsid,
varlog.WithCallTimeout(callTimeout),
varlog.WithPipelineSize(5),
)
require.NoError(t, err)
defer lsa.Close()

var wg sync.WaitGroup
dataBatch := [][]byte{[]byte("foo")}

var failfast atomic.Bool
for err == nil {
wg.Add(1)
err = lsa.AppendBatch(dataBatch, func(_ []varlogpb.LogEntryMeta, cerr error) {
defer wg.Done()
if cerr != nil {
assert.Equal(t, varlog.ErrCallTimeout, cerr)
failfast.Store(true)
return
}
time.Sleep(callTimeout * 2)
})
if err == nil {
require.True(t, failfast.CompareAndSwap(false, false))
} else {
wg.Done()
require.Equal(t, varlog.ErrCallTimeout, err)
}
time.Sleep(callTimeout)
}

wg.Wait()
},
},
}

for _, tc := range tcs {
Expand Down