Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce SA bloat on GetVersion #1056

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 36 additions & 16 deletions internal/internal_command_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ type (
*naiveCommandStateMachine
}

versionMarker struct {
changeID string
searchAttrUpdated bool
}

commandsHelper struct {
nextCommandEventID int64
orderedCommands *list.List
Expand All @@ -148,7 +153,7 @@ type (
scheduledEventIDToActivityID map[int64]string
scheduledEventIDToCancellationID map[int64]string
scheduledEventIDToSignalID map[int64]string
versionMarkerLookup map[int64]string
versionMarkerLookup map[int64]versionMarker
commandsCancelledDuringWFCancellation int64
workflowExecutionIsCancelling bool

Expand Down Expand Up @@ -221,13 +226,14 @@ const (
localActivityMarkerName = "LocalActivity"
mutableSideEffectMarkerName = "MutableSideEffect"

sideEffectMarkerIDName = "side-effect-id"
sideEffectMarkerDataName = "data"
versionMarkerChangeIDName = "change-id"
versionMarkerDataName = "version"
localActivityMarkerDataName = "data"
localActivityResultName = "result"
mutableSideEffectCallCounterName = "mutable-side-effect-call-counter"
sideEffectMarkerIDName = "side-effect-id"
sideEffectMarkerDataName = "data"
versionMarkerChangeIDName = "change-id"
versionMarkerDataName = "version"
versionSearchAttributeUpdatedName = "version-search-attribute-updated"
localActivityMarkerDataName = "data"
localActivityResultName = "result"
mutableSideEffectCallCounterName = "mutable-side-effect-call-counter"
)

func (d commandState) String() string {
Expand Down Expand Up @@ -884,7 +890,7 @@ func newCommandsHelper() *commandsHelper {
scheduledEventIDToActivityID: make(map[int64]string),
scheduledEventIDToCancellationID: make(map[int64]string),
scheduledEventIDToSignalID: make(map[int64]string),
versionMarkerLookup: make(map[int64]string),
versionMarkerLookup: make(map[int64]versionMarker),
commandsCancelledDuringWFCancellation: 0,
}
}
Expand Down Expand Up @@ -917,15 +923,18 @@ func (h *commandsHelper) getNextID() int64 {
}

func (h *commandsHelper) incrementNextCommandEventIDIfVersionMarker() {
_, ok := h.versionMarkerLookup[h.nextCommandEventID]
marker, ok := h.versionMarkerLookup[h.nextCommandEventID]
for ok {
// Remove the marker from the lookup map and increment nextCommandEventID by 2 because call to GetVersion
// results in 2 events in the history. One is GetVersion marker event for changeID and change version, other
// results in 1 or 2 events in the history. One is GetVersion marker event for changeID and change version, other
// is UpsertSearchableAttributes to keep track of executions using particular version of code.
delete(h.versionMarkerLookup, h.nextCommandEventID)
h.incrementNextCommandEventID()
h.incrementNextCommandEventID()
_, ok = h.versionMarkerLookup[h.nextCommandEventID]
// UpsertSearchableAttributes may not have been written if the search attribute was too large.
if marker.searchAttrUpdated {
h.incrementNextCommandEventID()
}
marker, ok = h.versionMarkerLookup[h.nextCommandEventID]
}
}

Expand Down Expand Up @@ -1071,7 +1080,7 @@ func (h *commandsHelper) getActivityAndScheduledEventIDs(event *historypb.Histor
return activityID, scheduledEventID
}

func (h *commandsHelper) recordVersionMarker(changeID string, version Version, dc converter.DataConverter) commandStateMachine {
func (h *commandsHelper) recordVersionMarker(changeID string, version Version, dc converter.DataConverter, searchAttributeWasUpdated bool) commandStateMachine {
markerID := fmt.Sprintf("%v_%v", versionMarkerName, changeID)

changeIDPayload, err := dc.ToPayloads(changeID)
Expand All @@ -1092,12 +1101,20 @@ func (h *commandsHelper) recordVersionMarker(changeID string, version Version, d
},
}

if !searchAttributeWasUpdated {
searchAttributeWasUpdatedPayload, err := dc.ToPayloads(searchAttributeWasUpdated)
if err != nil {
panic(err)
}
recordMarker.Details[versionSearchAttributeUpdatedName] = searchAttributeWasUpdatedPayload
}

command := h.newMarkerCommandStateMachine(markerID, recordMarker)
h.addCommand(command)
return command
}

func (h *commandsHelper) handleVersionMarker(eventID int64, changeID string) {
func (h *commandsHelper) handleVersionMarker(eventID int64, changeID string, searchAttrUpdated bool) {
if _, ok := h.versionMarkerLookup[eventID]; ok {
panicMsg := fmt.Sprintf("marker event already exists for eventID in lookup: eventID: %v, changeID: %v",
eventID, changeID)
Expand All @@ -1107,7 +1124,10 @@ func (h *commandsHelper) handleVersionMarker(eventID int64, changeID string) {
// During processing of a workflow task we reorder all GetVersion markers and process them first.
// Keep track of all GetVersion marker events during the processing of workflow task so we can
// generate correct eventIDs for other events during replay.
h.versionMarkerLookup[eventID] = changeID
h.versionMarkerLookup[eventID] = versionMarker{
changeID: changeID,
searchAttrUpdated: searchAttrUpdated,
}
}

func (h *commandsHelper) recordSideEffectMarker(sideEffectID int64, data *commonpb.Payloads, dc converter.DataConverter) commandStateMachine {
Expand Down
37 changes: 33 additions & 4 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
historypb "go.temporal.io/api/history/v1"
protocolpb "go.temporal.io/api/protocol/v1"
taskqueuepb "go.temporal.io/api/taskqueue/v1"
"go.temporal.io/api/workflowservice/v1"

"go.temporal.io/sdk/converter"
"go.temporal.io/sdk/internal/common"
Expand All @@ -51,7 +52,8 @@ import (
)

const (
queryResultSizeLimit = 2000000 // 2MB
queryResultSizeLimit = 2000000 // 2MB
changeVersionSearchAttrSizeLimit = 2048
)

// Assert that structs do indeed implement the interfaces
Expand Down Expand Up @@ -151,6 +153,7 @@ type (
failureConverter converter.FailureConverter
contextPropagators []ContextPropagator
deadlockDetectionTimeout time.Duration
sdkFlags *sdkFlags

protocols *protocol.Registry
}
Expand Down Expand Up @@ -201,6 +204,7 @@ func newWorkflowExecutionEventHandler(
failureConverter converter.FailureConverter,
contextPropagators []ContextPropagator,
deadlockDetectionTimeout time.Duration,
capabilities *workflowservice.GetSystemInfoResponse_Capabilities,
) workflowExecutionEventHandler {
context := &workflowEnvironmentImpl{
workflowInfo: workflowInfo,
Expand All @@ -220,6 +224,7 @@ func newWorkflowExecutionEventHandler(
deadlockDetectionTimeout: deadlockDetectionTimeout,
protocols: protocol.NewRegistry(),
mutableSideEffectCallCounter: make(map[string]int),
sdkFlags: newSDKFlags(capabilities),
}
context.logger = ilog.NewReplayLogger(
log.With(logger,
Expand Down Expand Up @@ -728,8 +733,26 @@ func (wc *workflowEnvironmentImpl) GetVersion(changeID string, minSupported, max
// GetVersion for changeID is called first time (non-replay mode), generate a marker command for it.
// Also upsert search attributes to enable ability to search by changeVersion.
version = maxSupported
wc.commandsHelper.recordVersionMarker(changeID, version, wc.GetDataConverter())
_ = wc.UpsertSearchAttributes(createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions))
changeVersionSA := createSearchAttributesForChangeVersion(changeID, version, wc.changeVersions)
attr, err := validateAndSerializeSearchAttributes(changeVersionSA)
Quinn-With-Two-Ns marked this conversation as resolved.
Show resolved Hide resolved
Quinn-With-Two-Ns marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
wc.logger.Warn(fmt.Sprintf("Failed to seralize %s search attribute with: %v", TemporalChangeVersion, err))
} else {
// Server has a limit for the max size of a single search attribute value. If we exceed the default limit
// do not try to upsert as it will cause the workflow to fail.
updateSearchAttribute := true
if wc.sdkFlags.tryUse(SDKFlagLimitChangeVersionSASize, !wc.isReplay) && len(attr.IndexedFields[TemporalChangeVersion].GetData()) >= changeVersionSearchAttrSizeLimit {
wc.logger.Warn(fmt.Sprintf("Serialized size of %s search attribute update would "+
"exceed the maximum value size. Skipping this upsert. Be aware that your "+
"visibility records will not include the following patch: %s", TemporalChangeVersion, getChangeVersion(changeID, version)),
)
updateSearchAttribute = false
}
wc.commandsHelper.recordVersionMarker(changeID, version, wc.GetDataConverter(), updateSearchAttribute)
if updateSearchAttribute {
_ = wc.UpsertSearchAttributes(changeVersionSA)
}
}
}

validateVersion(changeID, version, minSupported, maxSupported)
Expand Down Expand Up @@ -1324,12 +1347,18 @@ func (weh *workflowExecutionEventHandlerImpl) handleMarkerRecorded(
if versionPayload, ok := attributes.GetDetails()[versionMarkerDataName]; !ok {
err = fmt.Errorf("key %q: %w", versionMarkerDataName, ErrMissingMarkerDataKey)
} else {
// versionSearchAttributeUpdatedName is optional and was only added later so do not expect all version
// markers to have this.
searchAttrUpdated := true
if searchAttrUpdatedPayload, ok := attributes.GetDetails()[versionSearchAttributeUpdatedName]; ok {
_ = weh.dataConverter.FromPayloads(searchAttrUpdatedPayload, &searchAttrUpdated)
}
var changeID string
_ = weh.dataConverter.FromPayloads(changeIDPayload, &changeID)
var version Version
_ = weh.dataConverter.FromPayloads(versionPayload, &version)
weh.changeVersions[changeID] = version
weh.commandsHelper.handleVersionMarker(eventID, changeID)
weh.commandsHelper.handleVersionMarker(eventID, changeID, searchAttrUpdated)
}
}
case localActivityMarkerName:
Expand Down
107 changes: 107 additions & 0 deletions internal/internal_flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// The MIT License
//
// Copyright (c) 2023 Temporal Technologies Inc. All rights reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package internal

import (
"math"

"go.temporal.io/api/workflowservice/v1"
)

// sdkFlag represents a flag used to help version the sdk internally to make breaking changes
// in workflow logic.
type sdkFlag uint32

const (
SDKFlagUnset sdkFlag = 0
// LimitChangeVersionSASize will limit the search attribute size of TemporalChangeVersion to 2048 when
// calling GetVersion. If the limit is exceeded the search attribute is not updated.
SDKFlagLimitChangeVersionSASize = 1
SDKFlagUnknown = math.MaxUint32
)

func sdkFlagFromUint(value uint32) sdkFlag {
switch value {
case uint32(SDKFlagUnset):
return SDKFlagUnset
case uint32(SDKFlagLimitChangeVersionSASize):
return SDKFlagLimitChangeVersionSASize
default:
return SDKFlagUnknown
}
}

func (f sdkFlag) isValid() bool {
return f != SDKFlagUnset && f != SDKFlagUnknown
}

// sdkFlags represents all the flags that are currently set in a workflow execution.
type sdkFlags struct {
capabilities *workflowservice.GetSystemInfoResponse_Capabilities
// Flags that have been recieved from the server
currentFlags map[sdkFlag]bool
// Flags that have been set this WFT that have not been sent to the server.
// Keep track of them sepratly so we know what to send to the server.
newFlags map[sdkFlag]bool
}

func newSDKFlags(capabilities *workflowservice.GetSystemInfoResponse_Capabilities) *sdkFlags {
return &sdkFlags{
capabilities: capabilities,
currentFlags: make(map[sdkFlag]bool),
newFlags: make(map[sdkFlag]bool),
}
}

// tryUse returns true if this flag may currently be used. If record is true, always returns
// true and records the flag as being used.
func (sf *sdkFlags) tryUse(flag sdkFlag, record bool) bool {
if !sf.capabilities.GetSdkMetadata() {
return false
}

if record && !sf.currentFlags[flag] {
// Only set new flags
sf.newFlags[flag] = true
Quinn-With-Two-Ns marked this conversation as resolved.
Show resolved Hide resolved
return true
} else {
return sf.currentFlags[flag]
}
}

// markSDKFlagsSent marks all sdk flags as sent to the server.
func (sf *sdkFlags) markSDKFlagsSent() {
for flag := range sf.newFlags {
sf.currentFlags[flag] = true
}
sf.newFlags = make(map[sdkFlag]bool)
}

// gatherNewSDKFlags returns all sdkFlags set since the last call to markSDKFlagsSent.
func (sf *sdkFlags) gatherNewSDKFlags() []sdkFlag {
flags := make([]sdkFlag, 0, len(sf.newFlags))
for flag := range sf.newFlags {
flags = append(flags, flag)
}
return flags
}
Loading