From 01d071abe7d088cca498ca8f669bdf2a32e6ca60 Mon Sep 17 00:00:00 2001 From: Alex Shtin Date: Tue, 8 Feb 2022 19:49:16 -0800 Subject: [PATCH] Fix conflict error handling (#2469) --- common/persistence/cassandra/errors.go | 76 +++++++++++-------- common/persistence/cassandra/errors_test.go | 24 +++--- .../cassandra/mutable_state_store.go | 30 ++++---- service/history/workflow/transaction_impl.go | 63 ++++++++------- 4 files changed, 105 insertions(+), 88 deletions(-) diff --git a/common/persistence/cassandra/errors.go b/common/persistence/cassandra/errors.go index 5eb8fb43d5e..94653836f39 100644 --- a/common/persistence/cassandra/errors.go +++ b/common/persistence/cassandra/errors.go @@ -57,52 +57,62 @@ type ( ) func convertErrors( - record map[string]interface{}, - iter gocql.Iter, + conflictRecord map[string]interface{}, + conflictIter gocql.Iter, requestShardID int32, requestRangeID int64, requestCurrentRunID string, requestExecutionCASConditions []executionCASCondition, ) error { - records := []map[string]interface{}{record} + conflictRecords := []map[string]interface{}{conflictRecord} errors := extractErrors( - record, + conflictRecord, requestShardID, requestRangeID, requestCurrentRunID, requestExecutionCASConditions, ) - record = make(map[string]interface{}) - for iter.MapScan(record) { - records = append(records, record) + conflictRecord = make(map[string]interface{}) + for conflictIter.MapScan(conflictRecord) { + if conflictRecord["[applied]"].(bool) { + // Should never happen. All records in batch should have [applied]=false. + continue + } + + conflictRecords = append(conflictRecords, conflictRecord) errors = append(errors, extractErrors( - record, + conflictRecord, requestShardID, requestRangeID, requestCurrentRunID, requestExecutionCASConditions, )...) - record = make(map[string]interface{}) + conflictRecord = make(map[string]interface{}) } - errors = sortErrors(errors) if len(errors) == 0 { + // This means that extractErrors wasn't able to extract error from the conflicting records. + // Most likely record to update is not found in the DB by WHERE clause and is NOT in conflictRecords slice. + // Unfortunately, there is no way to get the missing record w/o extra call to DB. + // Most likely it is current workflow execution record. return &p.ConditionFailedError{ - Msg: fmt.Sprintf("Encounter unknown error: shard ID: %v, range ID: %v, error: %v", + Msg: fmt.Sprintf("Encounter unknown condition update error: shard ID: %v, range ID: %v, possibly conflicting records:%v", requestShardID, requestRangeID, - printRecords(records), + printRecords(conflictRecords), ), } } + + errors = sortErrors(errors) return errors[0] } func extractErrors( - record map[string]interface{}, + conflictRecord map[string]interface{}, requestShardID int32, requestRangeID int64, requestCurrentRunID string, @@ -112,7 +122,7 @@ func extractErrors( var errors []error if err := extractShardOwnershipLostError( - record, + conflictRecord, requestShardID, requestRangeID, ); err != nil { @@ -120,7 +130,7 @@ func extractErrors( } if err := extractCurrentWorkflowConflictError( - record, + conflictRecord, requestCurrentRunID, ); err != nil { errors = append(errors, err) @@ -128,7 +138,7 @@ func extractErrors( for _, condition := range requestExecutionCASConditions { if err := extractWorkflowConflictError( - record, + conflictRecord, condition.runID, condition.dbVersion, condition.nextEventID, @@ -158,11 +168,11 @@ func sortErrors( } func extractShardOwnershipLostError( - record map[string]interface{}, + conflictRecord map[string]interface{}, requestShardID int32, requestRangeID int64, ) error { - rowType, ok := record["type"].(int) + rowType, ok := conflictRecord["type"].(int) if !ok { // this case should not happen, maybe panic? return nil @@ -171,7 +181,7 @@ func extractShardOwnershipLostError( return nil } - actualRangeID := record["range_id"].(int64) + actualRangeID := conflictRecord["range_id"].(int64) if actualRangeID != requestRangeID { return &p.ShardOwnershipLostError{ ShardID: requestShardID, @@ -185,10 +195,10 @@ func extractShardOwnershipLostError( } func extractCurrentWorkflowConflictError( - record map[string]interface{}, + conflictRecord map[string]interface{}, requestCurrentRunID string, ) error { - rowType, ok := record["type"].(int) + rowType, ok := conflictRecord["type"].(int) if !ok { // this case should not happen, maybe panic? return nil @@ -196,14 +206,14 @@ func extractCurrentWorkflowConflictError( if rowType != rowTypeExecution { return nil } - if runID := gocql.UUIDToString(record["run_id"]); runID != permanentRunID { + if runID := gocql.UUIDToString(conflictRecord["run_id"]); runID != permanentRunID { return nil } - actualCurrentRunID := gocql.UUIDToString(record["current_run_id"]) + actualCurrentRunID := gocql.UUIDToString(conflictRecord["current_run_id"]) if actualCurrentRunID != requestCurrentRunID { - binary, _ := record["execution_state"].([]byte) - encoding, _ := record["execution_state_encoding"].(string) + binary, _ := conflictRecord["execution_state"].([]byte) + encoding, _ := conflictRecord["execution_state_encoding"].(string) executionState := &persistencespb.WorkflowExecutionState{} if state, err := serialization.WorkflowExecutionStateFromBlob( binary, @@ -213,12 +223,12 @@ func extractCurrentWorkflowConflictError( } // if err != nil, this means execution state cannot be parsed, just use default values - lastWriteVersion, _ := record["workflow_last_write_version"].(int64) + lastWriteVersion, _ := conflictRecord["workflow_last_write_version"].(int64) // TODO maybe assert actualCurrentRunID == executionState.RunId ? return &p.CurrentWorkflowConditionFailedError{ - Msg: fmt.Sprintf("Encounter concurrent workflow error, request run ID: %v, actual run ID: %v", + Msg: fmt.Sprintf("Encounter current workflow error, request run ID: %v, actual run ID: %v", requestCurrentRunID, actualCurrentRunID, ), @@ -233,12 +243,12 @@ func extractCurrentWorkflowConflictError( } func extractWorkflowConflictError( - record map[string]interface{}, + conflictRecord map[string]interface{}, requestRunID string, requestDBVersion int64, requestNextEventID int64, // TODO deprecate this variable once DB version comparison is the default ) error { - rowType, ok := record["type"].(int) + rowType, ok := conflictRecord["type"].(int) if !ok { // this case should not happen, maybe panic? return nil @@ -246,12 +256,12 @@ func extractWorkflowConflictError( if rowType != rowTypeExecution { return nil } - if runID := gocql.UUIDToString(record["run_id"]); runID != requestRunID { + if runID := gocql.UUIDToString(conflictRecord["run_id"]); runID != requestRunID { return nil } - actualNextEventID, _ := record["next_event_id"].(int64) - actualDBVersion, _ := record["db_version"].(int64) + actualNextEventID, _ := conflictRecord["next_event_id"].(int64) + actualDBVersion, _ := conflictRecord["db_record_version"].(int64) // TODO remove this block once DB version comparison is the default if requestDBVersion == 0 { @@ -270,7 +280,7 @@ func extractWorkflowConflictError( if actualDBVersion != requestDBVersion { return &p.WorkflowConditionFailedError{ - Msg: fmt.Sprintf("Encounter workflow db version mismatch, request db version ID: %v, actual db version ID: %v", + Msg: fmt.Sprintf("Encounter workflow db version mismatch, request db version: %v, actual db version: %v", requestDBVersion, actualDBVersion, ), diff --git a/common/persistence/cassandra/errors_test.go b/common/persistence/cassandra/errors_test.go index c21e04505ea..3c71c3cb4c5 100644 --- a/common/persistence/cassandra/errors_test.go +++ b/common/persistence/cassandra/errors_test.go @@ -244,23 +244,23 @@ func (s *cassandraErrorsSuite) TestExtractWorkflowConflictError_Failed() { s.NoError(err) err = extractWorkflowConflictError(map[string]interface{}{ - "type": rowTypeShard, - "run_id": gocql.UUID(runID), - "db_version": dbVersion, + "type": rowTypeShard, + "run_id": gocql.UUID(runID), + "db_record_version": dbVersion, }, runID.String(), dbVersion+1, rand.Int63()) s.NoError(err) err = extractWorkflowConflictError(map[string]interface{}{ - "type": rowTypeExecution, - "run_id": gocql.UUID([16]byte{}), - "db_version": dbVersion, + "type": rowTypeExecution, + "run_id": gocql.UUID([16]byte{}), + "db_record_version": dbVersion, }, runID.String(), dbVersion+1, rand.Int63()) s.NoError(err) err = extractWorkflowConflictError(map[string]interface{}{ - "type": rowTypeExecution, - "run_id": gocql.UUID(runID), - "db_version": dbVersion, + "type": rowTypeExecution, + "run_id": gocql.UUID(runID), + "db_record_version": dbVersion, }, runID.String(), dbVersion, rand.Int63()) s.NoError(err) } @@ -269,9 +269,9 @@ func (s *cassandraErrorsSuite) TestExtractWorkflowConflictError_Success() { runID := uuid.New() dbVersion := rand.Int63() + 1 record := map[string]interface{}{ - "type": rowTypeExecution, - "run_id": gocql.UUID(runID), - "db_version": dbVersion, + "type": rowTypeExecution, + "run_id": gocql.UUID(runID), + "db_record_version": dbVersion, } err := extractWorkflowConflictError(record, runID.String(), dbVersion+1, rand.Int63()) diff --git a/common/persistence/cassandra/mutable_state_store.go b/common/persistence/cassandra/mutable_state_store.go index 791162ccaed..07550ceed52 100644 --- a/common/persistence/cassandra/mutable_state_store.go +++ b/common/persistence/cassandra/mutable_state_store.go @@ -451,19 +451,19 @@ func (d *MutableStateStore) CreateWorkflowExecution( request.RangeID, ) - record := make(map[string]interface{}) - applied, iter, err := d.Session.MapExecuteBatchCAS(batch, record) + conflictRecord := make(map[string]interface{}) + applied, conflictIter, err := d.Session.MapExecuteBatchCAS(batch, conflictRecord) if err != nil { return nil, gocql.ConvertError("CreateWorkflowExecution", err) } defer func() { - _ = iter.Close() + _ = conflictIter.Close() }() if !applied { return nil, convertErrors( - record, - iter, + conflictRecord, + conflictIter, shardID, request.RangeID, requestCurrentRunID, @@ -670,19 +670,19 @@ func (d *MutableStateStore) UpdateWorkflowExecution( request.RangeID, ) - record := make(map[string]interface{}) - applied, iter, err := d.Session.MapExecuteBatchCAS(batch, record) + conflictRecord := make(map[string]interface{}) + applied, conflictIter, err := d.Session.MapExecuteBatchCAS(batch, conflictRecord) if err != nil { return gocql.ConvertError("UpdateWorkflowExecution", err) } defer func() { - _ = iter.Close() + _ = conflictIter.Close() }() if !applied { return convertErrors( - record, - iter, + conflictRecord, + conflictIter, request.ShardID, request.RangeID, updateWorkflow.ExecutionState.RunId, @@ -819,13 +819,13 @@ func (d *MutableStateStore) ConflictResolveWorkflowExecution( request.RangeID, ) - record := make(map[string]interface{}) - applied, iter, err := d.Session.MapExecuteBatchCAS(batch, record) + conflictRecord := make(map[string]interface{}) + applied, conflictIter, err := d.Session.MapExecuteBatchCAS(batch, conflictRecord) if err != nil { return gocql.ConvertError("ConflictResolveWorkflowExecution", err) } defer func() { - _ = iter.Close() + _ = conflictIter.Close() }() if !applied { @@ -846,8 +846,8 @@ func (d *MutableStateStore) ConflictResolveWorkflowExecution( }) } return convertErrors( - record, - iter, + conflictRecord, + conflictIter, request.ShardID, request.RangeID, currentRunID, diff --git a/service/history/workflow/transaction_impl.go b/service/history/workflow/transaction_impl.go index 5fdf34163c5..8d2618991ea 100644 --- a/service/history/workflow/transaction_impl.go +++ b/service/history/workflow/transaction_impl.go @@ -472,41 +472,44 @@ func updateWorkflowExecutionWithRetry( op, PersistenceOperationRetryPolicy, common.IsPersistenceTransientError, ) - switch err.(type) { - case nil: - if namespaceEntry, err := shard.GetNamespaceRegistry().GetNamespaceByID( - namespace.ID(request.UpdateWorkflowMutation.ExecutionInfo.NamespaceId), - ); err == nil { - emitMutationMetrics( - shard, - namespaceEntry, - &resp.UpdateMutableStateStats, - resp.NewMutableStateStats, - ) - emitCompletionMetrics( - shard, - namespaceEntry, - mutationToCompletionMetric(&request.UpdateWorkflowMutation), - snapshotToCompletionMetric(request.NewWorkflowSnapshot), - ) - } - return resp, nil - case *persistence.CurrentWorkflowConditionFailedError, - *persistence.WorkflowConditionFailedError, - *persistence.ConditionFailedError: - // TODO get rid of ErrConflict - return nil, consts.ErrConflict - default: + if err != nil { shard.GetLogger().Error( - "Persistent store operation Failure", + "Update workflow execution operation failed.", tag.WorkflowNamespaceID(request.UpdateWorkflowMutation.ExecutionInfo.NamespaceId), tag.WorkflowID(request.UpdateWorkflowMutation.ExecutionInfo.WorkflowId), tag.WorkflowRunID(request.UpdateWorkflowMutation.ExecutionState.RunId), tag.StoreOperationUpdateWorkflowExecution, tag.Error(err), ) - return nil, err + switch err.(type) { + case *persistence.CurrentWorkflowConditionFailedError, + *persistence.WorkflowConditionFailedError, + *persistence.ConditionFailedError: + // TODO get rid of ErrConflict + return nil, consts.ErrConflict + default: + return nil, err + } } + + if namespaceEntry, err := shard.GetNamespaceRegistry().GetNamespaceByID( + namespace.ID(request.UpdateWorkflowMutation.ExecutionInfo.NamespaceId), + ); err == nil { + emitMutationMetrics( + shard, + namespaceEntry, + &resp.UpdateMutableStateStats, + resp.NewMutableStateStats, + ) + emitCompletionMetrics( + shard, + namespaceEntry, + mutationToCompletionMetric(&request.UpdateWorkflowMutation), + snapshotToCompletionMetric(request.NewWorkflowSnapshot), + ) + } + + return resp, nil } func NotifyWorkflowSnapshotTasks( @@ -727,6 +730,10 @@ func emitCompletionMetrics( } func operationPossiblySucceeded(err error) bool { + if err == consts.ErrConflict { + return false + } + switch err.(type) { case *persistence.CurrentWorkflowConditionFailedError, *persistence.WorkflowConditionFailedError, @@ -736,7 +743,7 @@ func operationPossiblySucceeded(err error) bool { *persistence.TransactionSizeLimitError, *serviceerror.ResourceExhausted, *serviceerror.NotFound: - // Persistence failure that means the write was definitely not committed: + // Persistence failure that means that write was definitely not committed. return false default: return true