Skip to content

Commit

Permalink
Deprecate & disable DisableStickyExecution flag (#318)
Browse files Browse the repository at this point in the history
* Deprecated. Will be removed in a later release.
  • Loading branch information
Sushisource authored Dec 30, 2020
1 parent 56a1a18 commit 0e6bf42
Show file tree
Hide file tree
Showing 17 changed files with 114 additions and 165 deletions.
8 changes: 4 additions & 4 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@ steps:
run: unit-test
config: docker/buildkite/docker-compose.yml

- label: ":golang: integration-test-sticky-off"
- label: ":golang: integration-test-zero-cache"
agents:
queue: "default"
docker: "*"
command: "make integration-test-sticky-off"
command: "make integration-test-zero-cache"
artifact_paths:
- ".build/coverage/*.out"
retry:
Expand All @@ -36,11 +36,11 @@ steps:
run: integ-test
config: docker/buildkite/docker-compose.yml

- label: ":golang: integration-test-sticky-on"
- label: ":golang: integration-test-normal-cache"
agents:
queue: "default"
docker: "*"
command: "make integration-test-sticky-on"
command: "make integration-test-normal-cache"
artifact_paths:
- ".build/coverage/*.out"
retry:
Expand Down
4 changes: 2 additions & 2 deletions .buildkite/scripts/gocov.sh
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ go get github.com/mattn/goveralls
# download cover files from all the tests
mkdir -p .build/coverage
buildkite-agent artifact download ".build/coverage/unit_test_cover.out" . --step ":golang: unit-test" --build "$BUILDKITE_BUILD_ID"
buildkite-agent artifact download ".build/coverage/integ_test_sticky_off_cover.out" . --step ":golang: integration-test-sticky-off" --build "$BUILDKITE_BUILD_ID"
buildkite-agent artifact download ".build/coverage/integ_test_sticky_on_cover.out" . --step ":golang: integration-test-sticky-on" --build "$BUILDKITE_BUILD_ID"
buildkite-agent artifact download ".build/coverage/integ_test_zero_cache_cover.out" . --step ":golang: integration-test-zero-cache" --build "$BUILDKITE_BUILD_ID"
buildkite-agent artifact download ".build/coverage/integ_test_normal_cache_cover.out" . --step ":golang: integration-test-normal-cache" --build "$BUILDKITE_BUILD_ID"

echo "download complete"

Expand Down
20 changes: 10 additions & 10 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ TEST_ARG ?= -race -v -timeout $(TEST_TIMEOUT)
INTEG_TEST_ROOT := ./test
COVER_ROOT := $(BUILD)/coverage
UT_COVER_FILE := $(COVER_ROOT)/unit_test_cover.out
INTEG_STICKY_OFF_COVER_FILE := $(COVER_ROOT)/integ_test_sticky_off_cover.out
INTEG_STICKY_ON_COVER_FILE := $(COVER_ROOT)/integ_test_sticky_on_cover.out
INTEG_ZERO_CACHE_COVER_FILE := $(COVER_ROOT)/integ_test_zero_cache_cover.out
INTEG_NORMAL_CACHE_COVER_FILE := $(COVER_ROOT)/integ_test_normal_cache_cover.out

# Automatically gather all srcs
ALL_SRC := $(shell find . -name "*.go")
Expand Down Expand Up @@ -45,25 +45,25 @@ unit-test: $(BUILD)/dummy
cat $(COVER_ROOT)/"$$dir"/cover.out | grep -v "mode: atomic" >> $(UT_COVER_FILE); \
done;

integration-test-sticky-off: $(BUILD)/dummy
integration-test-zero-cache: $(BUILD)/dummy
@mkdir -p $(COVER_ROOT)
@for dir in $(INTEG_TEST_DIRS); do \
STICKY_OFF=true go test $(TEST_ARG) "$$dir" -coverprofile=$(INTEG_STICKY_OFF_COVER_FILE) -coverpkg=./... || exit 1; \
WORKFLOW_CACHE_SIZE=0 go test $(TEST_ARG) "$$dir" -coverprofile=$(INTEG_ZERO_CACHE_COVER_FILE) -coverpkg=./... || exit 1; \
done;

integration-test-sticky-on: $(BUILD)/dummy
integration-test-normal-cache: $(BUILD)/dummy
@mkdir -p $(COVER_ROOT)
@for dir in $(INTEG_TEST_DIRS); do \
STICKY_OFF=false go test $(TEST_ARG) "$$dir" -coverprofile=$(INTEG_STICKY_ON_COVER_FILE) -coverpkg=./... || exit 1; \
go test $(TEST_ARG) "$$dir" -coverprofile=$(INTEG_NORMAL_CACHE_COVER_FILE) -coverpkg=./... || exit 1; \
done;

test: unit-test integration-test-sticky-off integration-test-sticky-on
test: unit-test integration-test-zero-cache integration-test-normal-cache

$(COVER_ROOT)/cover.out: $(UT_COVER_FILE) $(INTEG_STICKY_OFF_COVER_FILE) $(INTEG_STICKY_ON_COVER_FILE)
$(COVER_ROOT)/cover.out: $(UT_COVER_FILE) $(INTEG_ZERO_CACHE_COVER_FILE) $(INTEG_NORMAL_CACHE_COVER_FILE)
@echo "mode: atomic" > $(COVER_ROOT)/cover.out
cat $(UT_COVER_FILE) | grep -v "^mode: \w\+" | grep -v ".gen" >> $(COVER_ROOT)/cover.out
cat $(INTEG_STICKY_OFF_COVER_FILE) | grep -v "^mode: \w\+" | grep -v ".gen" >> $(COVER_ROOT)/cover.out
cat $(INTEG_STICKY_ON_COVER_FILE) | grep -v "^mode: \w\+" | grep -v ".gen" >> $(COVER_ROOT)/cover.out
cat $(INTEG_ZERO_CACHE_COVER_FILE) | grep -v "^mode: \w\+" | grep -v ".gen" >> $(COVER_ROOT)/cover.out
cat $(INTEG_NORMAL_CACHE_COVER_FILE) | grep -v "^mode: \w\+" | grep -v ".gen" >> $(COVER_ROOT)/cover.out

cover: $(COVER_ROOT)/cover.out
go tool cover -html=$(COVER_ROOT)/cover.out;
Expand Down
12 changes: 6 additions & 6 deletions docker/buildkite/docker-compose-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ services:
aliases:
- temporal

integ-test-sticky-off:
integ-test-zero-cache:
build:
context: ../../
dockerfile: ./docker/buildkite/Dockerfile
Expand All @@ -36,7 +36,7 @@ services:
- -e
- -c
- |
make integ_test_sticky_off
make integ_test_zero_cache
environment:
- "SERVICE_ADDR=temporal:7233"
- "GO111MODULE=on"
Expand All @@ -47,9 +47,9 @@ services:
networks:
services-network:
aliases:
- integ-test-sticky-off
- integ-test-zero-cache

integ-test-sticky-on:
integ-test-normal-cache:
build:
context: ../../
dockerfile: ./docker/buildkite/Dockerfile
Expand All @@ -58,7 +58,7 @@ services:
- -e
- -c
- |
make integ_test_sticky_on
make integ_test_normal_cache
environment:
- "SERVICE_ADDR=temporal:7233"
- "GO111MODULE=on"
Expand All @@ -69,7 +69,7 @@ services:
networks:
services-network:
aliases:
- integ-test-sticky-on
- integ-test-normal-cache

unit-test:
build:
Expand Down
3 changes: 1 addition & 2 deletions evictiontest/workflow_cache_eviction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() {
}
// pick 5 as cache size because it's not too big and not too small.
cacheSize := 5
internal.SetStickyWorkflowCacheSize(cacheSize)
// once for workflow worker because we disable activity worker
s.service.EXPECT().DescribeNamespace(gomock.Any(), gomock.Any()).Return(nil, nil).Times(1)
// feed our worker exactly *cacheSize* "legit" workflow tasks
Expand All @@ -158,8 +159,6 @@ func (s *CacheEvictionSuite) TestResetStickyOnEviction() {
s.service.EXPECT().ResetStickyTaskQueue(gomock.Any(), gomock.Any()).DoAndReturn(mockResetStickyTaskQueue).Times(1)

client := internal.NewServiceClient(s.service, nil, internal.ClientOptions{})

internal.SetStickyWorkflowCacheSize(cacheSize)
workflowWorker := internal.NewAggregatedWorker(client, "taskqueue", worker.Options{})
// this is an arbitrary workflow we use for this test
// NOTE: a simple helloworld that doesn't execute an activity
Expand Down
57 changes: 28 additions & 29 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,19 @@ type (

// workflowTaskHandlerImpl is the implementation of WorkflowTaskHandler
workflowTaskHandlerImpl struct {
namespace string
metricsScope tally.Scope
ppMgr pressurePointMgr
logger log.Logger
identity string
enableLoggingInReplay bool
disableStickyExecution bool
registry *registry
laTunnel *localActivityTunnel
workflowPanicPolicy WorkflowPanicPolicy
dataConverter converter.DataConverter
contextPropagators []ContextPropagator
tracer opentracing.Tracer
cache *WorkerCache
namespace string
metricsScope tally.Scope
ppMgr pressurePointMgr
logger log.Logger
identity string
enableLoggingInReplay bool
registry *registry
laTunnel *localActivityTunnel
workflowPanicPolicy WorkflowPanicPolicy
dataConverter converter.DataConverter
contextPropagators []ContextPropagator
tracer opentracing.Tracer
cache *WorkerCache
}

activityProvider func(name string) activity
Expand Down Expand Up @@ -386,19 +385,18 @@ func isPreloadMarkerEvent(event *historypb.HistoryEvent) bool {
func newWorkflowTaskHandler(params workerExecutionParameters, ppMgr pressurePointMgr, registry *registry) WorkflowTaskHandler {
ensureRequiredParams(&params)
return &workflowTaskHandlerImpl{
namespace: params.Namespace,
logger: params.Logger,
ppMgr: ppMgr,
metricsScope: params.MetricsScope,
identity: params.Identity,
enableLoggingInReplay: params.EnableLoggingInReplay,
disableStickyExecution: params.DisableStickyExecution,
registry: registry,
workflowPanicPolicy: params.WorkflowPanicPolicy,
dataConverter: params.DataConverter,
contextPropagators: params.ContextPropagators,
tracer: params.Tracer,
cache: params.cache,
namespace: params.Namespace,
logger: params.Logger,
ppMgr: ppMgr,
metricsScope: params.MetricsScope,
identity: params.Identity,
enableLoggingInReplay: params.EnableLoggingInReplay,
registry: registry,
workflowPanicPolicy: params.WorkflowPanicPolicy,
dataConverter: params.DataConverter,
contextPropagators: params.ContextPropagators,
tracer: params.Tracer,
cache: params.cache,
}
}

Expand All @@ -421,7 +419,8 @@ func (w *workflowExecutionContextImpl) Lock() {
}

func (w *workflowExecutionContextImpl) Unlock(err error) {
if err != nil || w.err != nil || w.isWorkflowCompleted || (w.wth.disableStickyExecution && !w.hasPendingLocalActivityWork()) {
if err != nil || w.err != nil || w.isWorkflowCompleted ||
(w.wth.cache.MaxWorkflowCacheSize() <= 0 && !w.hasPendingLocalActivityWork()) {
// TODO: in case of closed, it asumes the close command always succeed. need server side change to return
// error to indicate the close failure case. This should be rare case. For now, always remove the cache, and
// if the close command failed, the next command will have to rebuild the state.
Expand Down Expand Up @@ -634,7 +633,7 @@ func (wth *workflowTaskHandlerImpl) getOrCreateWorkflowContext(
return
}

if !wth.disableStickyExecution && task.Query == nil {
if wth.cache.MaxWorkflowCacheSize() > 0 && task.Query == nil {
workflowContext, _ = wth.cache.putWorkflowContext(runID, workflowContext)
}
workflowContext.Lock()
Expand Down
24 changes: 10 additions & 14 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"errors"
"fmt"
"strconv"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -752,18 +753,20 @@ func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() {
}
}

func (t *TaskHandlersTestSuite) TestSideEffectDefer_Sticky() {
t.testSideEffectDeferHelper(false)
func (t *TaskHandlersTestSuite) TestSideEffectDefer() {
t.testSideEffectDeferHelper(1)
}

func (t *TaskHandlersTestSuite) TestSideEffectDefer_NonSticky() {
t.testSideEffectDeferHelper(true)
func (t *TaskHandlersTestSuite) TestSideEffectDefer_NoCache() {
t.testSideEffectDeferHelper(0)
}

func (t *TaskHandlersTestSuite) testSideEffectDeferHelper(disableSticky bool) {
func (t *TaskHandlersTestSuite) testSideEffectDeferHelper(cacheSize int) {
value := "should not be modified"
expectedValue := value
doneCh := make(chan struct{})
var myWorkerCachePtr = &sharedWorkerCache{}
var myWorkerCacheLock sync.Mutex

workflowFunc := func(ctx Context) error {
defer func() {
Expand All @@ -776,7 +779,7 @@ func (t *TaskHandlersTestSuite) testSideEffectDeferHelper(disableSticky bool) {
_ = Sleep(ctx, 1*time.Second)
return nil
}
workflowName := fmt.Sprintf("SideEffectDeferWorkflow-Sticky=%v", disableSticky)
workflowName := fmt.Sprintf("SideEffectDeferWorkflow-CacheSize=%d", cacheSize)
t.registry.RegisterWorkflowWithOptions(
workflowFunc,
RegisterWorkflowOptions{Name: workflowName},
Expand All @@ -790,20 +793,13 @@ func (t *TaskHandlersTestSuite) testSideEffectDeferHelper(disableSticky bool) {
}

params := t.getTestWorkerExecutionParams()
params.DisableStickyExecution = disableSticky
params.cache = newWorkerCache(myWorkerCachePtr, &myWorkerCacheLock, cacheSize)

taskHandler := newWorkflowTaskHandler(params, nil, t.registry)
task := createWorkflowTask(testEvents, 0, workflowName)
_, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
t.Nil(err)

if !params.DisableStickyExecution {
// TODO: This ain't true any more. Clean up.
// 1. We can't set cache size in the test to 1, otherwise other tests will break.
// 2. We need to make sure cache is empty when the test is completed,
// So manually trigger a delete.
params.cache.getWorkflowCache().Delete(task.WorkflowExecution.GetRunId())
}
// Make sure the workflow coroutine has exited.
<-doneCh
// The side effect op should not be executed.
Expand Down
12 changes: 6 additions & 6 deletions internal/internal_task_pollers.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,13 @@ type (
dataConverter converter.DataConverter

stickyUUID string
disableStickyExecution bool
StickyScheduleToStartTimeout time.Duration

pendingRegularPollCount int
pendingStickyPollCount int
stickyBacklog int64
requestLock sync.Mutex
stickyCacheSize int
}

// activityTaskPoller implements polling/processing a workflow task
Expand Down Expand Up @@ -232,8 +232,8 @@ func newWorkflowTaskPoller(taskHandler WorkflowTaskHandler, service workflowserv
logger: params.Logger,
dataConverter: params.DataConverter,
stickyUUID: uuid.New(),
disableStickyExecution: params.DisableStickyExecution,
StickyScheduleToStartTimeout: params.StickyScheduleToStartTimeout,
stickyCacheSize: params.cache.MaxWorkflowCacheSize(),
}
}

Expand Down Expand Up @@ -388,7 +388,7 @@ func (wtp *workflowTaskPoller) RespondTaskCompleted(completedRequest interface{}
}
}
case *workflowservice.RespondWorkflowTaskCompletedRequest:
if request.StickyAttributes == nil && !wtp.disableStickyExecution {
if request.StickyAttributes == nil && wtp.stickyCacheSize > 0 {
request.StickyAttributes = &taskqueuepb.StickyExecutionAttributes{
WorkerTaskQueue: &taskqueuepb.TaskQueue{
Name: getWorkerTaskQueue(wtp.stickyUUID),
Expand Down Expand Up @@ -603,7 +603,7 @@ WaitResult:
}

func (wtp *workflowTaskPoller) release(kind enumspb.TaskQueueKind) {
if wtp.disableStickyExecution {
if wtp.stickyCacheSize <= 0 {
return
}

Expand All @@ -617,7 +617,7 @@ func (wtp *workflowTaskPoller) release(kind enumspb.TaskQueueKind) {
}

func (wtp *workflowTaskPoller) updateBacklog(taskQueueKind enumspb.TaskQueueKind, backlogCountHint int64) {
if taskQueueKind == enumspb.TASK_QUEUE_KIND_NORMAL || wtp.disableStickyExecution {
if taskQueueKind == enumspb.TASK_QUEUE_KIND_NORMAL || wtp.stickyCacheSize <= 0 {
// we only care about sticky backlog for now.
return
}
Expand All @@ -636,7 +636,7 @@ func (wtp *workflowTaskPoller) updateBacklog(taskQueueKind enumspb.TaskQueueKind
func (wtp *workflowTaskPoller) getNextPollRequest() (request *workflowservice.PollWorkflowTaskQueueRequest) {
taskQueueName := wtp.taskQueueName
taskQueueKind := enumspb.TASK_QUEUE_KIND_NORMAL
if !wtp.disableStickyExecution {
if wtp.stickyCacheSize > 0 {
wtp.requestLock.Lock()
if wtp.stickyBacklog > 0 || wtp.pendingStickyPollCount <= wtp.pendingRegularPollCount {
wtp.pendingStickyPollCount++
Expand Down
4 changes: 0 additions & 4 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,6 @@ type (
// Context cancel function to cancel user context
UserContextCancel context.CancelFunc

// Disable sticky execution
DisableStickyExecution bool

StickyScheduleToStartTimeout time.Duration

// WorkflowPanicPolicy is used for configuring how client's workflow task handler deals with workflow
Expand Down Expand Up @@ -1275,7 +1272,6 @@ func NewAggregatedWorker(client *WorkflowClient, taskQueue string, options Worke
EnableLoggingInReplay: options.EnableLoggingInReplay,
UserContext: backgroundActivityContext,
UserContextCancel: backgroundActivityContextCancel,
DisableStickyExecution: options.DisableStickyExecution,
StickyScheduleToStartTimeout: options.StickyScheduleToStartTimeout,
TaskQueueActivitiesPerSecond: options.TaskQueueActivitiesPerSecond,
WorkflowPanicPolicy: options.WorkflowPanicPolicy,
Expand Down
Loading

0 comments on commit 0e6bf42

Please sign in to comment.