Skip to content

Commit

Permalink
fix(storagenode): ignore context error while checking to interleave o…
Browse files Browse the repository at this point in the history
…f Append RPC errors

Append batch requests can succeed partially. In case of partial failure, failed log entries must be
sequential. That is, suffixes of the batch can be failed to append, whose length can be from zero to
the full size.

Our codebase panics when the batch's success and failure are interleaved. This PR ignores context
errors caused by clients since the batch can succeed in the storage node, although clients canceled
it.
  • Loading branch information
ijsong committed Jul 31, 2023
1 parent 30168ec commit 04d1052
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 5 deletions.
32 changes: 27 additions & 5 deletions internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logstream

import (
"context"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -56,23 +57,44 @@ func (at *AppendTask) WaitForCompletion(ctx context.Context) (res []snpb.AppendR
return nil, at.err
}

// Append batch requests can succeed partially. In case of partial failure,
// failed log entries must be sequential. That is, suffixes of the batch
// can be failed to append, whose length can be from zero to the full size.
hasNonContextErr := false
res = make([]snpb.AppendResult, at.dataBatchLen)
for i := range at.apc.awgs {
cerr := at.apc.awgs[i].wait(ctx)
if cerr != nil {
if !hasNonContextErr {
hasNonContextErr = !errors.Is(cerr, context.Canceled) && !errors.Is(cerr, context.DeadlineExceeded)
}

res[i].Error = cerr.Error()
if err == nil {
err = cerr
}
continue
}
if err != nil {

// It has not failed yet.
if err == nil {
res[i].Meta.TopicID = at.lse.tpid
res[i].Meta.LogStreamID = at.lse.lsid
res[i].Meta.GLSN = at.apc.awgs[i].glsn
res[i].Meta.LLSN = at.apc.awgs[i].llsn
at.apc.awgs[i].release()
continue
}

// It panics when the batch's success and failure are interleaved.
// However, context errors caused by clients can be ignored since
// the batch can succeed in the storage node, although clients
// canceled it.
// Once the codebase stabilizes, it is planned to be removed.
if hasNonContextErr {
at.lse.logger.Panic("Results of batch requests of Append RPC must not be interleaved with success and failure", zap.Error(err))
}
res[i].Meta.TopicID = at.lse.tpid
res[i].Meta.LogStreamID = at.lse.lsid
res[i].Meta.GLSN = at.apc.awgs[i].glsn
res[i].Meta.LLSN = at.apc.awgs[i].llsn
res[i].Error = err.Error()
at.apc.awgs[i].release()
}
if res[0].Meta.GLSN.Invalid() {
Expand Down
3 changes: 3 additions & 0 deletions internal/storagenode/logstream/append_waitgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ func (awg *appendWaitGroup) wait(ctx context.Context) error {
if awg == nil {
return nil
}
if err := ctx.Err(); err != nil {
return err
}
err := awg.wwg.wait(ctx)
if err != nil {
return err
Expand Down

0 comments on commit 04d1052

Please sign in to comment.