Skip to content

Commit

Permalink
Handle history not found error when archiving history (#2465)
Browse files Browse the repository at this point in the history
  • Loading branch information
yycptt authored Feb 5, 2022
1 parent 89c17cc commit 8c81dbc
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 81 deletions.
16 changes: 9 additions & 7 deletions common/archiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ Create a new directory in the `archiver` folder. The structure should look like

```go
type HistoryArchiver interface {
// Archive is used to archive a workflow history. When the context expires the method should stop trying to archive.
// Implementors are free to archive however they want, including implementing retries of sub-operations. The URI defines
// the resource that histories should be archived into. The implementor gets to determine how to interpret the URI.
// The Archive method may or may not be automatically retried by the caller. The ArchiveOptions are used
// to interact with these retries including giving the implementor the ability to cancel retries and record progress
// between retry attempts.
// This method will be invoked after a workflow passes its retention period.
// Archive is used to archive a workflow history. When the context expires the method should stop trying to archive.
// Implementors are free to archive however they want, including implementing retries of sub-operations. The URI defines
// the resource that histories should be archived into. The implementor gets to determine how to interpret the URI.
// The Archive method may or may not be automatically retried by the caller. The ArchiveOptions are used
// to interact with these retries including giving the implementor the ability to cancel retries and record progress
// between retry attempts.
// This method will be invoked after a workflow passes its retention period.
// It's possible that this method will be invoked for one workflow multiple times and potentially concurrently,
// implementation should correctly handle the workflow not exist case and return nil error.
Archive(context.Context, URI, *ArchiveHistoryRequest, ...ArchiveOption) error

// Get is used to access an archived history. When context expires method should stop trying to fetch history.
Expand Down
2 changes: 2 additions & 0 deletions common/archiver/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
ArchiveNonRetryableErrorMsg = "Archive method encountered an non-retryable error."
// ArchiveTransientErrorMsg is the log message when the Archive() method encounters a transient error
ArchiveTransientErrorMsg = "Archive method encountered a transient error."
// ArchiveSkippedInfoMsg is the log messsage when the Archive() method encounter an not found error
ArchiveSkippedInfoMsg = "Archive method encountered not found error and skipped the archival"

// ErrReasonInvalidURI is the error reason for invalid URI
ErrReasonInvalidURI = "URI is invalid"
Expand Down
14 changes: 12 additions & 2 deletions common/archiver/filestore/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/codec"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
)

Expand Down Expand Up @@ -152,10 +153,19 @@ func (h *historyArchiver) Archive(
for historyIterator.HasNext() {
historyBlob, err := getNextHistoryBlob(ctx, historyIterator)
if err != nil {
if common.IsNotFoundError(err) {
// workflow history no longer exists, may due to duplicated archival signal
// this may happen even in the middle of iterating history as two archival signals
// can be processed concurrently.
logger.Info(archiver.ArchiveSkippedInfoMsg)
return nil
}

logger = log.With(logger, tag.ArchivalArchiveFailReason(archiver.ErrReasonReadHistory), tag.Error(err))
if !common.IsPersistenceTransientError(err) {
logger.Error(archiver.ArchiveNonRetryableErrorMsg, tag.ArchivalArchiveFailReason(archiver.ErrReasonReadHistory), tag.Error(err))
logger.Error(archiver.ArchiveNonRetryableErrorMsg)
} else {
logger.Error(archiver.ArchiveTransientErrorMsg, tag.ArchivalArchiveFailReason(archiver.ErrReasonReadHistory), tag.Error(err))
logger.Error(archiver.ArchiveTransientErrorMsg)
}
return err
}
Expand Down
41 changes: 41 additions & 0 deletions common/archiver/filestore/historyArchiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,47 @@ func (s *historyArchiverSuite) TestArchive_Fail_NonRetryableErrorOption() {
s.Equal(nonRetryableErr, err)
}

func (s *historyArchiverSuite) TestArchive_Skip() {
mockCtrl := gomock.NewController(s.T())
defer mockCtrl.Finish()
historyIterator := archiver.NewMockHistoryIterator(mockCtrl)
historyBlob := &archiverspb.HistoryBlob{
Header: &archiverspb.HistoryBlobHeader{
IsLast: false,
},
Body: []*historypb.History{
{
Events: []*historypb.HistoryEvent{
{
EventId: common.FirstEventID,
EventTime: timestamp.TimePtr(time.Now().UTC()),
Version: testCloseFailoverVersion,
},
},
},
},
}
gomock.InOrder(
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(historyBlob, nil),
historyIterator.EXPECT().HasNext().Return(true),
historyIterator.EXPECT().Next().Return(nil, &serviceerror.NotFound{Message: "workflow not found"}),
)

historyArchiver := s.newTestHistoryArchiver(historyIterator)
request := &archiver.ArchiveHistoryRequest{
NamespaceID: testNamespaceID,
Namespace: testNamespace,
WorkflowID: testWorkflowID,
RunID: testRunID,
BranchToken: testBranchToken,
NextEventID: testNextEventID,
CloseFailoverVersion: testCloseFailoverVersion,
}
err := historyArchiver.Archive(context.Background(), s.testArchivalURI, request)
s.NoError(err)
}

func (s *historyArchiverSuite) TestArchive_Success() {
mockCtrl := gomock.NewController(s.T())
defer mockCtrl.Finish()
Expand Down
16 changes: 13 additions & 3 deletions common/archiver/gcloud/historyArchiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/codec"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/metrics"
"go.temporal.io/server/common/persistence"
Expand Down Expand Up @@ -151,13 +152,22 @@ func (h *historyArchiver) Archive(ctx context.Context, URI archiver.URI, request
for historyIterator.HasNext() {
part := progress.CurrentPageNumber
historyBlob, err := getNextHistoryBlob(ctx, historyIterator)

if err != nil {
if common.IsNotFoundError(err) {
// workflow history no longer exists, may due to duplicated archival signal
// this may happen even in the middle of iterating history as two archival signals
// can be processed concurrently.
logger.Info(archiver.ArchiveSkippedInfoMsg)
scope.IncCounter(metrics.HistoryArchiverDuplicateArchivalsCount)
return nil
}

logger = log.With(logger, tag.ArchivalArchiveFailReason(archiver.ErrReasonReadHistory), tag.Error(err))
if !common.IsPersistenceTransientError(err) {
logger.Error(archiver.ArchiveNonRetryableErrorMsg, tag.ArchivalArchiveFailReason(archiver.ErrReasonReadHistory), tag.Error(err))
logger.Error(archiver.ArchiveNonRetryableErrorMsg)
return errUploadNonRetryable
}
logger.Error(archiver.ArchiveTransientErrorMsg, tag.ArchivalArchiveFailReason(archiver.ErrReasonReadHistory), tag.Error(err))
logger.Error(archiver.ArchiveTransientErrorMsg)
return err
}

Expand Down
Loading

0 comments on commit 8c81dbc

Please sign in to comment.