diff --git a/internal/storagenode/logstream/append.go b/internal/storagenode/logstream/append.go index 61ffca3d5..92fa1dd07 100644 --- a/internal/storagenode/logstream/append.go +++ b/internal/storagenode/logstream/append.go @@ -2,6 +2,7 @@ package logstream import ( "context" + "errors" "sync" "time" @@ -56,23 +57,44 @@ func (at *AppendTask) WaitForCompletion(ctx context.Context) (res []snpb.AppendR return nil, at.err } + // Append batch requests can succeed partially. In case of partial failure, + // failed log entries must be sequential. That is, suffixes of the batch + // can be failed to append, whose length can be from zero to the full size. + hasNonContextErr := false res = make([]snpb.AppendResult, at.dataBatchLen) for i := range at.apc.awgs { cerr := at.apc.awgs[i].wait(ctx) if cerr != nil { + if !hasNonContextErr { + hasNonContextErr = !errors.Is(cerr, context.Canceled) && !errors.Is(cerr, context.DeadlineExceeded) + } + res[i].Error = cerr.Error() if err == nil { err = cerr } continue } - if err != nil { + + // It has not failed yet. + if err == nil { + res[i].Meta.TopicID = at.lse.tpid + res[i].Meta.LogStreamID = at.lse.lsid + res[i].Meta.GLSN = at.apc.awgs[i].glsn + res[i].Meta.LLSN = at.apc.awgs[i].llsn + at.apc.awgs[i].release() + continue + } + + // It panics when the batch's success and failure are interleaved. + // However, context errors caused by clients can be ignored since + // the batch can succeed in the storage node, although clients + // canceled it. + // Once the codebase stabilizes, it is planned to be removed. + if hasNonContextErr { at.lse.logger.Panic("Results of batch requests of Append RPC must not be interleaved with success and failure", zap.Error(err)) } - res[i].Meta.TopicID = at.lse.tpid - res[i].Meta.LogStreamID = at.lse.lsid - res[i].Meta.GLSN = at.apc.awgs[i].glsn - res[i].Meta.LLSN = at.apc.awgs[i].llsn + res[i].Error = err.Error() at.apc.awgs[i].release() } if res[0].Meta.GLSN.Invalid() { diff --git a/internal/storagenode/logstream/append_waitgroup.go b/internal/storagenode/logstream/append_waitgroup.go index 23966b8e0..9f7c3e624 100644 --- a/internal/storagenode/logstream/append_waitgroup.go +++ b/internal/storagenode/logstream/append_waitgroup.go @@ -147,6 +147,9 @@ func (awg *appendWaitGroup) wait(ctx context.Context) error { if awg == nil { return nil } + if err := ctx.Err(); err != nil { + return err + } err := awg.wwg.wait(ctx) if err != nil { return err