Skip to content

Commit

Permalink
Reduce SA bloat on GetVersion (#1056)
Browse files Browse the repository at this point in the history
Reduce SA bloat on GetVersion
  • Loading branch information
Quinn-With-Two-Ns authored Mar 9, 2023
1 parent 3ce152b commit e047c84
Show file tree
Hide file tree
Showing 14 changed files with 14,775 additions and 40 deletions.
52 changes: 36 additions & 16 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ type (
*naiveCommandStateMachine
}

versionMarker struct {
changeID string
searchAttrUpdated bool
}

commandsHelper struct {
nextCommandEventID int64
orderedCommands *list.List
Expand All @@ -148,7 +153,7 @@ type (
scheduledEventIDToActivityID map[int64]string
scheduledEventIDToCancellationID map[int64]string
scheduledEventIDToSignalID map[int64]string
versionMarkerLookup map[int64]string
versionMarkerLookup map[int64]versionMarker
commandsCancelledDuringWFCancellation int64
workflowExecutionIsCancelling bool

Expand Down Expand Up @@ -221,13 +226,14 @@ const (
localActivityMarkerName = "LocalActivity"
mutableSideEffectMarkerName = "MutableSideEffect"

sideEffectMarkerIDName = "side-effect-id"
sideEffectMarkerDataName = "data"
versionMarkerChangeIDName = "change-id"
versionMarkerDataName = "version"
localActivityMarkerDataName = "data"
localActivityResultName = "result"
mutableSideEffectCallCounterName = "mutable-side-effect-call-counter"
sideEffectMarkerIDName = "side-effect-id"
sideEffectMarkerDataName = "data"
versionMarkerChangeIDName = "change-id"
versionMarkerDataName = "version"
versionSearchAttributeUpdatedName = "version-search-attribute-updated"
localActivityMarkerDataName = "data"
localActivityResultName = "result"
mutableSideEffectCallCounterName = "mutable-side-effect-call-counter"
)

func (d commandState) String() string {
Expand Down Expand Up @@ -884,7 +890,7 @@ func newCommandsHelper() *commandsHelper {
scheduledEventIDToActivityID: make(map[int64]string),
scheduledEventIDToCancellationID: make(map[int64]string),
scheduledEventIDToSignalID: make(map[int64]string),
versionMarkerLookup: make(map[int64]string),
versionMarkerLookup: make(map[int64]versionMarker),
commandsCancelledDuringWFCancellation: 0,
}
}
Expand Down Expand Up @@ -917,15 +923,18 @@ func (h *commandsHelper) getNextID() int64 {
}

func (h *commandsHelper) incrementNextCommandEventIDIfVersionMarker() {
_, ok := h.versionMarkerLookup[h.nextCommandEventID]
marker, ok := h.versionMarkerLookup[h.nextCommandEventID]
for ok {
// Remove the marker from the lookup map and increment nextCommandEventID by 2 because call to GetVersion
// results in 2 events in the history. One is GetVersion marker event for changeID and change version, other
// results in 1 or 2 events in the history. One is GetVersion marker event for changeID and change version, other
// is UpsertSearchableAttributes to keep track of executions using particular version of code.
delete(h.versionMarkerLookup, h.nextCommandEventID)
h.incrementNextCommandEventID()
h.incrementNextCommandEventID()
_, ok = h.versionMarkerLookup[h.nextCommandEventID]
// UpsertSearchableAttributes may not have been written if the search attribute was too large.
if marker.searchAttrUpdated {
h.incrementNextCommandEventID()
}
marker, ok = h.versionMarkerLookup[h.nextCommandEventID]
}
}

Expand Down Expand Up @@ -1071,7 +1080,7 @@ func (h *commandsHelper) getActivityAndScheduledEventIDs(event *historypb.Histor
return activityID, scheduledEventID
}

func (h *commandsHelper) recordVersionMarker(changeID string, version Version, dc converter.DataConverter) commandStateMachine {
func (h *commandsHelper) recordVersionMarker(changeID string, version Version, dc converter.DataConverter, searchAttributeWasUpdated bool) commandStateMachine {
markerID := fmt.Sprintf("%v_%v", versionMarkerName, changeID)

changeIDPayload, err := dc.ToPayloads(changeID)
Expand All @@ -1092,12 +1101,20 @@ func (h *commandsHelper) recordVersionMarker(changeID string, version Version, d
},
}

if !searchAttributeWasUpdated {
searchAttributeWasUpdatedPayload, err := dc.ToPayloads(searchAttributeWasUpdated)
if err != nil {
panic(err)
}
recordMarker.Details[versionSearchAttributeUpdatedName] = searchAttributeWasUpdatedPayload
}

command := h.newMarkerCommandStateMachine(markerID, recordMarker)
h.addCommand(command)
return command
}

func (h *commandsHelper) handleVersionMarker(eventID int64, changeID string) {
func (h *commandsHelper) handleVersionMarker(eventID int64, changeID string, searchAttrUpdated bool) {
if _, ok := h.versionMarkerLookup[eventID]; ok {
panicMsg := fmt.Sprintf("marker event already exists for eventID in lookup: eventID: %v, changeID: %v",
eventID, changeID)
Expand All @@ -1107,7 +1124,10 @@ func (h *commandsHelper) handleVersionMarker(eventID int64, changeID string) {
// During processing of a workflow task we reorder all GetVersion markers and process them first.
// Keep track of all GetVersion marker events during the processing of workflow task so we can
// generate correct eventIDs for other events during replay.
h.versionMarkerLookup[eventID] = changeID
h.versionMarkerLookup[eventID] = versionMarker{
changeID: changeID,
searchAttrUpdated: searchAttrUpdated,
}
}

func (h *commandsHelper) recordSideEffectMarker(sideEffectID int64, data *commonpb.Payloads, dc converter.DataConverter) commandStateMachine {
Expand Down
37 changes: 33 additions & 4 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
historypb "go.temporal.io/api/history/v1"
protocolpb "go.temporal.io/api/protocol/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common"
Expand All @@ -51,7 +52,8 @@ import (
)

const (
queryResultSizeLimit = 2000000 // 2MB
queryResultSizeLimit = 2000000 // 2MB
changeVersionSearchAttrSizeLimit = 2048
)

// Assert that structs do indeed implement the interfaces
Expand Down Expand Up @@ -151,6 +153,7 @@ type (
failureConverter converter.FailureConverter
contextPropagators []ContextPropagator
deadlockDetectionTimeout time.Duration
sdkFlags *sdkFlags

protocols *protocol.Registry
}
Expand Down Expand Up @@ -201,6 +204,7 @@ func newWorkflowExecutionEventHandler(
failureConverter converter.FailureConverter,
contextPropagators []ContextPropagator,
deadlockDetectionTimeout time.Duration,
capabilities *workflowservice.GetSystemInfoResponse_Capabilities,
) workflowExecutionEventHandler {
context := &workflowEnvironmentImpl{
workflowInfo: workflowInfo,
Expand All @@ -220,6 +224,7 @@ func newWorkflowExecutionEventHandler(
deadlockDetectionTimeout: deadlockDetectionTimeout,
protocols: protocol.NewRegistry(),
mutableSideEffectCallCounter: make(map[string]int),
sdkFlags: newSDKFlags(capabilities),
}
context.logger = ilog.NewReplayLogger(
log.With(logger,
Expand Down Expand Up @@ -728,8 +733,26 @@ func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, max
// GetVersion for changeID is called first time (non-replay mode), generate a marker command for it.
// Also upsert search attributes to enable ability to search by changeVersion.
version = maxSupported
wc.commandsHelper.recordVersionMarker(changeID, version, wc.GetDataConverter())
_ = wc.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions))
changeVersionSA := createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions)
attr, err := validateAndSerializeSearchAttributes(changeVersionSA)
if err != nil {
wc.logger.Warn(fmt.Sprintf("Failed to seralize %s search attribute with: %v", TemporalChangeVersion, err))
} else {
// Server has a limit for the max size of a single search attribute value. If we exceed the default limit
// do not try to upsert as it will cause the workflow to fail.
updateSearchAttribute := true
if wc.sdkFlags.tryUse(SDKFlagLimitChangeVersionSASize, !wc.isReplay) && len(attr.IndexedFields[TemporalChangeVersion].GetData()) >= changeVersionSearchAttrSizeLimit {
wc.logger.Warn(fmt.Sprintf("Serialized size of %s search attribute update would "+
"exceed the maximum value size. Skipping this upsert. Be aware that your "+
"visibility records will not include the following patch: %s", TemporalChangeVersion, getChangeVersion(changeID, version)),
)
updateSearchAttribute = false
}
wc.commandsHelper.recordVersionMarker(changeID, version, wc.GetDataConverter(), updateSearchAttribute)
if updateSearchAttribute {
_ = wc.UpsertSearchAttributes(changeVersionSA)
}
}
}

validateVersion(changeID, version, minSupported, maxSupported)
Expand Down Expand Up @@ -1324,12 +1347,18 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded(
if versionPayload, ok := attributes.GetDetails()[versionMarkerDataName]; !ok {
err = fmt.Errorf("key %q: %w", versionMarkerDataName, ErrMissingMarkerDataKey)
} else {
// versionSearchAttributeUpdatedName is optional and was only added later so do not expect all version
// markers to have this.
searchAttrUpdated := true
if searchAttrUpdatedPayload, ok := attributes.GetDetails()[versionSearchAttributeUpdatedName]; ok {
_ = weh.dataConverter.FromPayloads(searchAttrUpdatedPayload, &searchAttrUpdated)
}
var changeID string
_ = weh.dataConverter.FromPayloads(changeIDPayload, &changeID)
var version Version
_ = weh.dataConverter.FromPayloads(versionPayload, &version)
weh.changeVersions[changeID] = version
weh.commandsHelper.handleVersionMarker(eventID, changeID)
weh.commandsHelper.handleVersionMarker(eventID, changeID, searchAttrUpdated)
}
}
case localActivityMarkerName:
Expand Down
107 changes: 107 additions & 0 deletions internal/internal_flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// The MIT License
//
// Copyright (c) 2023 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
"math"

"go.temporal.io/api/workflowservice/v1"
)

// sdkFlag represents a flag used to help version the sdk internally to make breaking changes
// in workflow logic.
type sdkFlag uint32

const (
SDKFlagUnset sdkFlag = 0
// LimitChangeVersionSASize will limit the search attribute size of TemporalChangeVersion to 2048 when
// calling GetVersion. If the limit is exceeded the search attribute is not updated.
SDKFlagLimitChangeVersionSASize = 1
SDKFlagUnknown = math.MaxUint32
)

func sdkFlagFromUint(value uint32) sdkFlag {
switch value {
case uint32(SDKFlagUnset):
return SDKFlagUnset
case uint32(SDKFlagLimitChangeVersionSASize):
return SDKFlagLimitChangeVersionSASize
default:
return SDKFlagUnknown
}
}

func (f sdkFlag) isValid() bool {
return f != SDKFlagUnset && f != SDKFlagUnknown
}

// sdkFlags represents all the flags that are currently set in a workflow execution.
type sdkFlags struct {
capabilities *workflowservice.GetSystemInfoResponse_Capabilities
// Flags that have been recieved from the server
currentFlags map[sdkFlag]bool
// Flags that have been set this WFT that have not been sent to the server.
// Keep track of them sepratly so we know what to send to the server.
newFlags map[sdkFlag]bool
}

func newSDKFlags(capabilities *workflowservice.GetSystemInfoResponse_Capabilities) *sdkFlags {
return &sdkFlags{
capabilities: capabilities,
currentFlags: make(map[sdkFlag]bool),
newFlags: make(map[sdkFlag]bool),
}
}

// tryUse returns true if this flag may currently be used. If record is true, always returns
// true and records the flag as being used.
func (sf *sdkFlags) tryUse(flag sdkFlag, record bool) bool {
if !sf.capabilities.GetSdkMetadata() {
return false
}

if record && !sf.currentFlags[flag] {
// Only set new flags
sf.newFlags[flag] = true
return true
} else {
return sf.currentFlags[flag]
}
}

// markSDKFlagsSent marks all sdk flags as sent to the server.
func (sf *sdkFlags) markSDKFlagsSent() {
for flag := range sf.newFlags {
sf.currentFlags[flag] = true
}
sf.newFlags = make(map[sdkFlag]bool)
}

// gatherNewSDKFlags returns all sdkFlags set since the last call to markSDKFlagsSent.
func (sf *sdkFlags) gatherNewSDKFlags() []sdkFlag {
flags := make([]sdkFlag, 0, len(sf.newFlags))
for flag := range sf.newFlags {
flags = append(flags, flag)
}
return flags
}
Loading

0 comments on commit e047c84

Please sign in to comment.