Skip to content

Commit

Permalink
Activity Registration Option to automatically record activity heartbe…
Browse files Browse the repository at this point in the history
…ats (#1053)

Co-authored-by: Liang Mei <meiliang86@gmail.com>
  • Loading branch information
willgorman and meiliang86 authored Feb 23, 2021
1 parent 42f6cac commit 5282e89
Show file tree
Hide file tree
Showing 11 changed files with 203 additions and 29 deletions.
20 changes: 16 additions & 4 deletions internal/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ type (
// This option has no effect when explicit Name is provided.
EnableShortName bool
DisableAlreadyRegisteredCheck bool
// Automatically send heartbeats for this activity at an interval that is less than the HeartbeatTimeout.
// This option has no effect if the activity is executed with a HeartbeatTimeout of 0.
// Default: false
EnableAutoHeartbeat bool
}

// ActivityOptions stores all activity-specific parameters that will be stored inside of a context.
Expand Down Expand Up @@ -252,7 +256,7 @@ func GetWorkerStopChannel(ctx context.Context) <-chan struct{} {
// TODO: we don't have a way to distinguish between the two cases when context is cancelled because
// context doesn't support overriding value of ctx.Error.
// TODO: Implement automatic heartbeating with cancellation through ctx.
// details - the details that you provided here can be seen in the worflow when it receives TimeoutError, you
// details - the details that you provided here can be seen in the workflow when it receives TimeoutError, you
// can check error TimeoutType()/Details().
func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
env := getActivityEnv(ctx)
Expand All @@ -269,7 +273,7 @@ func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
panic(err)
}
}
err = env.serviceInvoker.Heartbeat(data, false)
err = env.serviceInvoker.BatchHeartbeat(data)
if err != nil {
log := GetActivityLogger(ctx)
log.Debug("RecordActivityHeartbeat With Error:", zap.Error(err))
Expand All @@ -279,8 +283,16 @@ func RecordActivityHeartbeat(ctx context.Context, details ...interface{}) {
// ServiceInvoker abstracts calls to the Cadence service from an activity implementation.
// Implement to unit test activities.
type ServiceInvoker interface {
// Returns ActivityTaskCanceledError if activity is cancelled
Heartbeat(details []byte, skipBatching bool) error
// All the heartbeat methods will return ActivityTaskCanceledError if activity is cancelled.
// Heartbeat sends a record heartbeat request to Cadence server directly without buffering.
// It should only be used by the sessions framework.
Heartbeat(details []byte) error
// BatchHeartbeat sends heartbeat on the first attempt, and batches additional requests
// to send it later according to heartbeat timeout.
BatchHeartbeat(details []byte) error
// BackgroundHeartbeat should only be used by Cadence library internally to heartbeat automatically
// without detail.
BackgroundHeartbeat() error
Close(flushBufferedHeartbeat bool)
GetClient(domain string, options *ClientOptions) Client
}
Expand Down
1 change: 1 addition & 0 deletions internal/internal_activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type (
Execute(ctx context.Context, input []byte) ([]byte, error)
ActivityType() ActivityType
GetFunction() interface{}
GetOptions() RegisterActivityOptions
}

activityInfo struct {
Expand Down
92 changes: 76 additions & 16 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1622,28 +1622,64 @@ type cadenceInvoker struct {
cancelHandler func()
heartBeatTimeoutInSec int32 // The heart beat interval configured for this activity.
hbBatchEndTimer *time.Timer // Whether we started a batch of operations that need to be reported in the cycle. This gets started on a user call.
lastDetailsToReport *[]byte
detailsToReport *[]byte // Details to be reported in the next reporting interval.
lastDetailsReported *[]byte // Details that were reported in the last reporting interval.
closeCh chan struct{}
workerStopChannel <-chan struct{}
}

func (i *cadenceInvoker) Heartbeat(details []byte, skipBatching bool) error {
func (i *cadenceInvoker) Heartbeat(details []byte) error {
i.Lock()
defer i.Unlock()

if i.hbBatchEndTimer != nil && !skipBatching {
_, err := i.internalHeartBeat(details)
return err
}

func (i *cadenceInvoker) BackgroundHeartbeat() error {
i.Lock()
defer i.Unlock()

if i.hbBatchEndTimer != nil {
if i.detailsToReport == nil {
i.detailsToReport = i.lastDetailsReported
}

return nil
}

var details []byte
if i.detailsToReport != nil {
details = *i.detailsToReport
} else if i.lastDetailsReported != nil {
details = *i.lastDetailsReported
}

return i.heartbeatAndScheduleNextRun(details)
}

func (i *cadenceInvoker) BatchHeartbeat(details []byte) error {
i.Lock()
defer i.Unlock()

if i.hbBatchEndTimer != nil {
// If we have started batching window, keep track of last reported progress.
i.lastDetailsToReport = &details
i.detailsToReport = &details
return nil
}

return i.heartbeatAndScheduleNextRun(details)
}

func (i *cadenceInvoker) heartbeatAndScheduleNextRun(details []byte) error {
isActivityCancelled, err := i.internalHeartBeat(details)

// If the activity is cancelled, the activity can ignore the cancellation and do its work
// and complete. Our cancellation is co-operative, so we will try to heartbeat.
if (err == nil || isActivityCancelled) && !skipBatching {
if err == nil || isActivityCancelled {
// We have successfully sent heartbeat, start next batching window.
i.lastDetailsToReport = nil
i.lastDetailsReported = &details
i.detailsToReport = nil

// Create timer to fire before the threshold to report.
deadlineToTrigger := i.heartBeatTimeoutInSec
Expand Down Expand Up @@ -1671,18 +1707,14 @@ func (i *cadenceInvoker) Heartbeat(details []byte, skipBatching bool) error {
var detailsToReport *[]byte

i.Lock()
detailsToReport = i.lastDetailsToReport
detailsToReport = i.detailsToReport
i.hbBatchEndTimer.Stop()
i.hbBatchEndTimer = nil
i.Unlock()

if detailsToReport != nil {
// TODO: there is a potential race condition here as the lock is released here and
// locked again in the Hearbeat() method. This possible that a heartbeat call from
// user activity grabs the lock first and calls internalHeartBeat before this
// batching goroutine, which means some activity progress will be lost.
i.Heartbeat(*detailsToReport, false)
i.heartbeatAndScheduleNextRun(*detailsToReport)
}
i.Unlock()
}()
}

Expand Down Expand Up @@ -1724,9 +1756,10 @@ func (i *cadenceInvoker) Close(flushBufferedHeartbeat bool) {
close(i.closeCh)
if i.hbBatchEndTimer != nil {
i.hbBatchEndTimer.Stop()
if flushBufferedHeartbeat && i.lastDetailsToReport != nil {
i.internalHeartBeat(*i.lastDetailsToReport)
i.lastDetailsToReport = nil
if flushBufferedHeartbeat && i.detailsToReport != nil {
i.internalHeartBeat(*i.detailsToReport)
i.lastDetailsReported = i.detailsToReport
i.detailsToReport = nil
}
}
}
Expand Down Expand Up @@ -1819,6 +1852,33 @@ func (ath *activityTaskHandlerImpl) Execute(taskList string, t *s.PollForActivit

ctx, span := createOpenTracingActivitySpan(ctx, ath.tracer, time.Now(), activityType, t.WorkflowExecution.GetWorkflowId(), t.WorkflowExecution.GetRunId())
defer span.Finish()

if activityImplementation.GetOptions().EnableAutoHeartbeat && t.HeartbeatTimeoutSeconds != nil && *t.HeartbeatTimeoutSeconds > 0 {
go func() {
autoHbInterval := time.Duration(*t.HeartbeatTimeoutSeconds) * time.Second / 2
ticker := time.NewTicker(autoHbInterval)
defer ticker.Stop()
for {
select {
case <-ath.workerStopCh:
return
case <-ctx.Done():
return
case <-ticker.C:
hbErr := invoker.BackgroundHeartbeat()
if hbErr != nil && !IsCanceledError(hbErr) {
ath.logger.Error("Activity auto heartbeat error.",
zap.String(tagWorkflowID, t.WorkflowExecution.GetWorkflowId()),
zap.String(tagRunID, t.WorkflowExecution.GetRunId()),
zap.String(tagActivityType, activityType),
zap.Error(err),
)
}
}
}
}()
}

output, err := activityImplementation.Execute(ctx, t.Input)

dlCancelFunc()
Expand Down
70 changes: 66 additions & 4 deletions internal/internal_task_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -1183,9 +1184,66 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NoError() {
taskToken: nil,
}

heartbeatErr := cadenceInvoker.Heartbeat(nil, false)
heartbeatErr := cadenceInvoker.BatchHeartbeat(nil)
t.NoError(heartbeatErr)
}

func newHeartbeatRequestMatcher(details []byte) gomock.Matcher {
return &recordHeartbeatRequestMatcher{details: details}
}

type recordHeartbeatRequestMatcher struct {
details []byte
}

func (r *recordHeartbeatRequestMatcher) String() string {
panic("implement me")
}

t.Nil(heartbeatErr)
func (r *recordHeartbeatRequestMatcher) Matches(x interface{}) bool {
req, ok := x.(*s.RecordActivityTaskHeartbeatRequest)
if !ok {
return false
}

return reflect.DeepEqual(r.details, req.Details)
}

func (t *TaskHandlersTestSuite) TestHeartBeat_Interleaved() {
mockCtrl := gomock.NewController(t.T())
mockService := workflowservicetest.NewMockClient(mockCtrl)

cancelRequested := false
heartbeatResponse := s.RecordActivityTaskHeartbeatResponse{CancelRequested: &cancelRequested}
mockService.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), newHeartbeatRequestMatcher([]byte("1")), callOptions...).Return(&heartbeatResponse, nil).Times(3)
mockService.EXPECT().RecordActivityTaskHeartbeat(gomock.Any(), newHeartbeatRequestMatcher([]byte("2")), callOptions...).Return(&heartbeatResponse, nil).Times(3)

cadenceInvoker := &cadenceInvoker{
identity: "Test_Cadence_Invoker",
service: mockService,
taskToken: nil,
heartBeatTimeoutInSec: 3,
}

heartbeatErr := cadenceInvoker.BatchHeartbeat([]byte("1"))
t.NoError(heartbeatErr)
time.Sleep(1 * time.Second)

for i := 0; i < 4; i++ {
heartbeatErr = cadenceInvoker.BackgroundHeartbeat()
t.NoError(heartbeatErr)
time.Sleep(time.Second)
}

heartbeatErr = cadenceInvoker.BatchHeartbeat([]byte("2"))
t.NoError(heartbeatErr)
time.Sleep(1 * time.Second)

for i := 0; i < 4; i++ {
heartbeatErr = cadenceInvoker.BackgroundHeartbeat()
t.NoError(heartbeatErr)
time.Sleep(time.Second)
}
}

func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithError() {
Expand All @@ -1203,7 +1261,7 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithError() {
0,
make(chan struct{}))

heartbeatErr := cadenceInvoker.Heartbeat(nil, false)
heartbeatErr := cadenceInvoker.BatchHeartbeat(nil)
t.NotNil(heartbeatErr)
_, ok := (heartbeatErr).(*s.EntityNotExistsError)
t.True(ok, "heartbeatErr must be EntityNotExistsError.")
Expand All @@ -1227,7 +1285,7 @@ func (t *TaskHandlersTestSuite) TestHeartBeat_NilResponseWithDomainNotActiveErro
0,
make(chan struct{}))

heartbeatErr := cadenceInvoker.Heartbeat(nil, false)
heartbeatErr := cadenceInvoker.BatchHeartbeat(nil)
t.NotNil(heartbeatErr)
_, ok := (heartbeatErr).(*s.DomainNotActiveError)
t.True(ok, "heartbeatErr must be DomainNotActiveError.")
Expand Down Expand Up @@ -1259,6 +1317,10 @@ func (t *testActivityDeadline) GetFunction() interface{} {
return t.Execute
}

func (t *testActivityDeadline) GetOptions() RegisterActivityOptions {
return RegisterActivityOptions{}
}

type deadlineTest struct {
actWaitDuration time.Duration
ScheduleTS time.Time
Expand Down
9 changes: 7 additions & 2 deletions internal/internal_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,9 @@ func (we *workflowExecutor) Execute(ctx Context, input []byte) ([]byte, error) {

// Wrapper to execute activity functions.
type activityExecutor struct {
name string
fn interface{}
name string
fn interface{}
options RegisterActivityOptions
}

func (ae *activityExecutor) ActivityType() ActivityType {
Expand All @@ -705,6 +706,10 @@ func (ae *activityExecutor) GetFunction() interface{} {
return ae.fn
}

func (ae *activityExecutor) GetOptions() RegisterActivityOptions {
return ae.options
}

func (ae *activityExecutor) Execute(ctx context.Context, input []byte) ([]byte, error) {
fnType := reflect.TypeOf(ae.fn)
var args []reflect.Value
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_worker_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,10 @@ func (ga greeterActivity) GetFunction() interface{} {
return ga.Execute
}

func (ga greeterActivity) GetOptions() RegisterActivityOptions {
return RegisterActivityOptions{}
}

// Greeter activity func
func greeterActivityFunc(ctx context.Context, input []byte) ([]byte, error) {
return []byte("Hello world"), nil
Expand Down
4 changes: 2 additions & 2 deletions internal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (r *registry) registerActivityFunction(af interface{}, options RegisterActi
return fmt.Errorf("activity type \"%v\" is already registered", registerName)
}
}
r.activityFuncMap[registerName] = &activityExecutor{registerName, af}
r.activityFuncMap[registerName] = &activityExecutor{registerName, af, options}
if len(alias) > 0 || options.EnableShortName {
r.activityAliasMap[fnName] = registerName
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (r *registry) registerActivityStruct(aStruct interface{}, options RegisterA
return fmt.Errorf("activity type \"%v\" is already registered", registerName)
}
}
r.activityFuncMap[registerName] = &activityExecutor{registerName, methodValue.Interface()}
r.activityFuncMap[registerName] = &activityExecutor{registerName, methodValue.Interface(), options}
if len(structPrefix) > 0 || options.EnableShortName {
r.activityAliasMap[methodName] = registerName
}
Expand Down
2 changes: 1 addition & 1 deletion internal/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,7 +424,7 @@ func sessionCreationActivity(ctx context.Context, sessionID string) error {
// since the heartbeat interval is controlled by the session framework, we don't need to worry about
// calling heartbeat too frequently and causing trouble for the sever. (note the min heartbeat timeout
// is 1 sec.)
return activityEnv.serviceInvoker.Heartbeat([]byte{}, true)
return activityEnv.serviceInvoker.Heartbeat([]byte{})
}
isRetryable := func(_ error) bool {
// there will be two types of error here:
Expand Down
1 change: 1 addition & 0 deletions test/activity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,4 +156,5 @@ func (a *Activities) register(worker worker.Worker) {
// Check prefix
worker.RegisterActivityWithOptions(a.activities2, activity.RegisterOptions{Name: "Prefix_", DisableAlreadyRegisteredCheck: true})
worker.RegisterActivityWithOptions(a.InspectActivityInfo, activity.RegisterOptions{Name: "inspectActivityInfo"})
worker.RegisterActivityWithOptions(a.HeartbeatAndSleep, activity.RegisterOptions{Name: "HeartbeatAndSleep", EnableAutoHeartbeat: true})
}
7 changes: 7 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,13 @@ func (ts *IntegrationTestSuite) TestActivityRetryOnHBTimeout() {
ts.EqualValues(expected, ts.activities.invoked())
}

func (ts *IntegrationTestSuite) TestActivityAutoHeartbeat() {
var expected []string
err := ts.executeWorkflow("test-activity-auto-heartbeat", ts.workflows.ActivityAutoHeartbeat, &expected)
ts.NoError(err)
ts.EqualValues(expected, ts.activities.invoked())
}

func (ts *IntegrationTestSuite) TestContinueAsNew() {
var result int
err := ts.executeWorkflow("test-continueasnew", ts.workflows.ContinueAsNew, &result, 4, ts.taskListName)
Expand Down
Loading

0 comments on commit 5282e89

Please sign in to comment.