Skip to content

Commit

Permalink
Update shard context to reduce DB calls for closed shards (#4547)
Browse files Browse the repository at this point in the history
  • Loading branch information
Shaddoll authored Oct 7, 2021
1 parent f2f859b commit 384a3a4
Showing 1 changed file with 34 additions and 0 deletions.
34 changes: 34 additions & 0 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,10 @@ func (s *contextImpl) CreateWorkflowExecution(
ctx context.Context,
request *persistence.CreateWorkflowExecutionRequest,
) (*persistence.CreateWorkflowExecutionResponse, error) {
if s.isClosed() {
return nil, ErrShardClosed
}

ctx, cancel, err := s.ensureMinContextTimeout(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -623,6 +627,9 @@ func (s *contextImpl) CreateWorkflowExecution(

Create_Loop:
for attempt := 0; attempt < conditionalRetryCount && ctx.Err() == nil; attempt++ {
if s.isClosed() {
return nil, ErrShardClosed
}
currentRangeID := s.getRangeID()
request.RangeID = currentRangeID

Expand Down Expand Up @@ -690,6 +697,10 @@ func (s *contextImpl) UpdateWorkflowExecution(
ctx context.Context,
request *persistence.UpdateWorkflowExecutionRequest,
) (*persistence.UpdateWorkflowExecutionResponse, error) {
if s.isClosed() {
return nil, ErrShardClosed
}

ctx, cancel, err := s.ensureMinContextTimeout(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -740,6 +751,9 @@ func (s *contextImpl) UpdateWorkflowExecution(

Update_Loop:
for attempt := 0; attempt < conditionalRetryCount && ctx.Err() == nil; attempt++ {
if s.isClosed() {
return nil, ErrShardClosed
}
currentRangeID := s.getRangeID()
request.RangeID = currentRangeID

Expand Down Expand Up @@ -800,6 +814,10 @@ func (s *contextImpl) ConflictResolveWorkflowExecution(
ctx context.Context,
request *persistence.ConflictResolveWorkflowExecutionRequest,
) (*persistence.ConflictResolveWorkflowExecutionResponse, error) {
if s.isClosed() {
return nil, ErrShardClosed
}

ctx, cancel, err := s.ensureMinContextTimeout(ctx)
if err != nil {
return nil, err
Expand Down Expand Up @@ -863,6 +881,9 @@ func (s *contextImpl) ConflictResolveWorkflowExecution(

Conflict_Resolve_Loop:
for attempt := 0; attempt < conditionalRetryCount && ctx.Err() == nil; attempt++ {
if s.isClosed() {
return nil, ErrShardClosed
}
currentRangeID := s.getRangeID()
request.RangeID = currentRangeID
resp, err := s.executionManager.ConflictResolveWorkflowExecution(ctx, request)
Expand Down Expand Up @@ -940,6 +961,9 @@ func (s *contextImpl) AppendHistoryV2Events(
domainID string,
execution types.WorkflowExecution,
) (int, error) {
if s.isClosed() {
return 0, ErrShardClosed
}

domainName, err := s.GetDomainCache().GetDomainName(domainID)
if err != nil {
Expand Down Expand Up @@ -1052,6 +1076,10 @@ func (s *contextImpl) renewRangeLocked(isStealing bool) error {
var attempt int32
Retry_Loop:
for attempt = 0; attempt < conditionalRetryCount; attempt++ {
if s.isClosed() {
err = ErrShardClosed
break Retry_Loop
}
err = s.GetShardManager().UpdateShard(context.Background(), &persistence.UpdateShardRequest{
ShardInfo: updatedShardInfo,
PreviousRangeID: s.shardInfo.RangeID})
Expand Down Expand Up @@ -1370,6 +1398,9 @@ func (s *contextImpl) ReplicateFailoverMarkers(
ctx context.Context,
markers []*persistence.FailoverMarkerTask,
) error {
if s.isClosed() {
return ErrShardClosed
}

tasks := make([]persistence.Task, 0, len(markers))
for _, marker := range markers {
Expand All @@ -1391,6 +1422,9 @@ func (s *contextImpl) ReplicateFailoverMarkers(
var err error
Retry_Loop:
for attempt := int32(0); attempt < conditionalRetryCount && ctx.Err() == nil; attempt++ {
if s.isClosed() {
return ErrShardClosed
}
err = s.executionManager.CreateFailoverMarkerTasks(
ctx,
&persistence.CreateFailoverMarkersRequest{
Expand Down

0 comments on commit 384a3a4

Please sign in to comment.