Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Schedule fixes #959

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions internal/internal_schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ func convertFromPBScheduleListEntry(schedule *schedulepb.ScheduleListEntry) *Sch

func convertToPBScheduleAction(ctx context.Context, client *WorkflowClient, scheduleAction ScheduleAction) (*schedulepb.ScheduleAction, error) {
switch action := scheduleAction.(type) {
case ScheduleWorkflowAction:
case *ScheduleWorkflowAction:
// Set header before interceptor run
dataConverter := WithContext(ctx, client.dataConverter)

Expand Down Expand Up @@ -636,7 +636,7 @@ func convertFromPBScheduleAction(action *schedulepb.ScheduleAction) (ScheduleAct
searchAttributes[key] = element
}

return ScheduleWorkflowAction{
return &ScheduleWorkflowAction{
ID: workflow.GetWorkflowId(),
Workflow: workflow.WorkflowType.GetName(),
Args: args,
Expand Down Expand Up @@ -761,7 +761,6 @@ func encodeScheduleWorklowArgs(dc converter.DataConverter, args []interface{}) (
}
payloads[i] = payload
}

}
return &commonpb.Payloads{
Payloads: payloads,
Expand Down
36 changes: 28 additions & 8 deletions internal/internal_schedule_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import (
)

const (
scheduleID = "some random schedule ID"
scheduleID = "some random schedule ID"
)

// schedule client test suite
Expand Down Expand Up @@ -66,7 +66,6 @@ func (s *scheduleClientTestSuite) TearDownTest() {
s.mockCtrl.Finish() // assert mock’s expectations
}


func (s *scheduleClientTestSuite) TestCreateScheduleClient() {
wf := func(ctx Context) string {
panic("this is just a stub")
Expand All @@ -76,8 +75,8 @@ func (s *scheduleClientTestSuite) TestCreateScheduleClient() {
Spec: ScheduleSpec{
CronExpressions: []string{"*"},
},
Action: ScheduleWorkflowAction{
Workflow: wf,
Action: &ScheduleWorkflowAction{
Workflow: wf,
ID: workflowID,
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
Expand All @@ -92,6 +91,27 @@ func (s *scheduleClientTestSuite) TestCreateScheduleClient() {
s.Equal(scheduleHandle.GetID(), scheduleID)
}

func (s *scheduleClientTestSuite) TestCreateScheduleNoID() {
wf := func(ctx Context) string {
panic("this is just a stub")
}
options := ScheduleOptions{
Spec: ScheduleSpec{
CronExpressions: []string{"*"},
},
Action: &ScheduleWorkflowAction{
Workflow: wf,
ID: workflowID,
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
WorkflowTaskTimeout: timeoutInSeconds,
},
}

_, err := s.client.ScheduleClient().Create(context.Background(), options)
s.NotNil(err)
}

func (s *scheduleClientTestSuite) TestCreateScheduleWithMemoAndSearchAttr() {
memo := map[string]interface{}{
"testMemo": "memo value",
Expand All @@ -109,8 +129,8 @@ func (s *scheduleClientTestSuite) TestCreateScheduleWithMemoAndSearchAttr() {
Spec: ScheduleSpec{
CronExpressions: []string{"*"},
},
Action: ScheduleWorkflowAction{
Workflow: wf,
Action: &ScheduleWorkflowAction{
Workflow: wf,
ID: "wid",
TaskQueue: taskqueue,
WorkflowExecutionTimeout: timeoutInSeconds,
Expand Down Expand Up @@ -138,7 +158,7 @@ func (s *scheduleClientTestSuite) TestCreateScheduleWithMemoAndSearchAttr() {

func getListSchedulesRequest() *workflowservice.ListSchedulesRequest {
request := &workflowservice.ListSchedulesRequest{
Namespace: DefaultNamespace,
Namespace: DefaultNamespace,
}

return request
Expand Down Expand Up @@ -220,4 +240,4 @@ func (s *scheduleClientTestSuite) TestIteratorError() {
event, err = iter.Next()
s.Nil(event)
s.NotNil(err)
}
}
31 changes: 16 additions & 15 deletions internal/schedule_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ type (
EndAt time.Time

// Jitter - All times will be incremented by a random value from 0 to this amount of jitter, capped
// by the time until the next schedule.
// by the time until the next schedule.
// Optional: Defaulted to 0
Jitter time.Duration

Expand All @@ -220,6 +220,9 @@ type (
// ScheduleWorkflowAction implements ScheduleAction to launch a workflow.
ScheduleWorkflowAction struct {
// ID - The business identifier of the workflow execution.
// The workflow ID of the started workflow may not match this exactly,
// it may have a timestamp appended for uniqueness.
// Optional: defaulted to a uuid.
ID string

// Workflow - What workflow to run.
Expand Down Expand Up @@ -267,7 +270,6 @@ type (
// ScheduleOptions configure the parameters for creating a schedule.
ScheduleOptions struct {
// ID - The business identifier of the schedule.
// Optional: defaulted to a uuid.
ID string

// Schedule - Describes when Actions should be taken.
Expand All @@ -285,17 +287,17 @@ type (

// CatchupWindow - The Temporal Server might be down or unavailable at the time when a Schedule should take an Action.
// When the Server comes back up, CatchupWindow controls which missed Actions should be taken at that point. The default is one
// minute, which means that the Schedule attempts to take any Actions that wouldn't be more than one minute late. It
// takes those Actions according to the Overlap. An outage that lasts longer than the Catchup
// Window could lead to missed Actions.
// minute, which means that the Schedule attempts to take any Actions that wouldn't be more than one minute late. It
// takes those Actions according to the Overlap. An outage that lasts longer than the Catchup
// Window could lead to missed Actions.
// Optional: defaulted to 1 minute
CatchupWindow time.Duration

// PauseOnFailure - When an Action times out or reaches the end of its Retry Policy the Schedule will pause.
//
// With SCHEDULE_OVERLAP_POLICY_ALLOW_ALL, this pause might not apply to the next Action, because the next Action
// might have already started previous to the failed one finishing. Pausing applies only to Actions that are scheduled
// to start after the failed one finishes.
// might have already started previous to the failed one finishing. Pausing applies only to Actions that are scheduled
// to start after the failed one finishes.
// Optional: defaulted to false
PauseOnFailure bool

Expand Down Expand Up @@ -451,35 +453,35 @@ type (
}

// ScheduleUpdateOptions configure the parameters for updating a schedule.
ScheduleUpdateOptions struct{
ScheduleUpdateOptions struct {
// DoUpdate - Takes a description of the schedule and returns the new desired schedule.
// If update returns ErrSkipScheduleUpdate response and no update will occur.
// Any other error will be passed through.
DoUpdate func(ScheduleUpdateInput) (*ScheduleUpdate, error)
}

// ScheduleTriggerOptions configure the parameters for triggering a schedule.
ScheduleTriggerOptions struct{
ScheduleTriggerOptions struct {
// Overlap - If specified, policy to override the schedules default overlap policy.
Overlap enumspb.ScheduleOverlapPolicy
}

// SchedulePauseOptions configure the parameters for pausing a schedule.
SchedulePauseOptions struct{
SchedulePauseOptions struct {
// Note - Informative human-readable message with contextual notes.
// Optional: defaulted to 'Paused via Go SDK'
Note string
}

// ScheduleUnpauseOptions configure the parameters for unpausing a schedule.
ScheduleUnpauseOptions struct{
ScheduleUnpauseOptions struct {
// Note - Informative human-readable message with contextual notes.
// Optional: defaulted to 'Unpaused via Go SDK'
Note string
}

// ScheduleBackfillOptions configure the parameters for backfilling a schedule.
ScheduleBackfillOptions struct{
ScheduleBackfillOptions struct {
// Backfill - Time periods to backfill the schedule.
Backfill []ScheduleBackfill
}
Expand All @@ -489,7 +491,7 @@ type (
// GetID returns the schedule ID asssociated with this handle.
GetID() string

// Delete the Schedule
// Delete the Schedule
Delete(ctx context.Context) error

// Backfill the schedule by going though the specified time periods and taking Actions as if that time passed by right now, all at once.
Expand Down Expand Up @@ -597,8 +599,7 @@ type (
// methods like ScheduleHandle.Describe() will return an error.
GetHandle(ctx context.Context, scheduleID string) ScheduleHandle
}

)

func (ScheduleWorkflowAction) isScheduleAction() {
func (*ScheduleWorkflowAction) isScheduleAction() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is a backwards incompatible change. I think it's ok because we're "experimental", just worth noting.

}
Loading