Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(storagenode): ignore context error while checking to interleave of Append RPC errors #504

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions internal/storagenode/logstream/append.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logstream

import (
"context"
"errors"
"sync"
"time"

Expand Down Expand Up @@ -56,23 +57,44 @@ func (at *AppendTask) WaitForCompletion(ctx context.Context) (res []snpb.AppendR
return nil, at.err
}

// Append batch requests can succeed partially. In case of partial failure,
// failed log entries must be sequential. That is, suffixes of the batch
// can be failed to append, whose length can be from zero to the full size.
hasNonContextErr := false
res = make([]snpb.AppendResult, at.dataBatchLen)
for i := range at.apc.awgs {
cerr := at.apc.awgs[i].wait(ctx)
if cerr != nil {
if !hasNonContextErr {
hasNonContextErr = !errors.Is(cerr, context.Canceled) && !errors.Is(cerr, context.DeadlineExceeded)
}

res[i].Error = cerr.Error()
if err == nil {
err = cerr
}
continue
}
if err != nil {

// It has not failed yet.
if err == nil {
res[i].Meta.TopicID = at.lse.tpid
res[i].Meta.LogStreamID = at.lse.lsid
res[i].Meta.GLSN = at.apc.awgs[i].glsn
res[i].Meta.LLSN = at.apc.awgs[i].llsn
at.apc.awgs[i].release()
continue
}

// It panics when the batch's success and failure are interleaved.
// However, context errors caused by clients can be ignored since
// the batch can succeed in the storage node, although clients
// canceled it.
// Once the codebase stabilizes, it is planned to be removed.
if hasNonContextErr {
hungryjang marked this conversation as resolved.
Show resolved Hide resolved
at.lse.logger.Panic("Results of batch requests of Append RPC must not be interleaved with success and failure", zap.Error(err))
}
res[i].Meta.TopicID = at.lse.tpid
res[i].Meta.LogStreamID = at.lse.lsid
res[i].Meta.GLSN = at.apc.awgs[i].glsn
res[i].Meta.LLSN = at.apc.awgs[i].llsn
res[i].Error = err.Error()
at.apc.awgs[i].release()
}
if res[0].Meta.GLSN.Invalid() {
Expand Down
3 changes: 3 additions & 0 deletions internal/storagenode/logstream/append_waitgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ func (awg *appendWaitGroup) wait(ctx context.Context) error {
if awg == nil {
return nil
}
if err := ctx.Err(); err != nil {
return err
}
err := awg.wwg.wait(ctx)
if err != nil {
return err
Expand Down