Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add schedule API #943

Merged
merged 8 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ default: check test
# general build-product folder, cleaned as part of `make clean`
BUILD := .build

TEST_TIMEOUT := 3m
TEST_TIMEOUT := 5m
TEST_ARG ?= -race -v -timeout $(TEST_TIMEOUT)

INTEG_TEST_ROOT := ./test
Expand Down
100 changes: 100 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,102 @@ type (
// CheckHealthResponse is a response for Client.CheckHealth.
CheckHealthResponse = internal.CheckHealthResponse

// ScheduleRange represents a set of integer values.
// NOTE: Experimental
ScheduleRange = internal.ScheduleRange

// ScheduleCalendarSpec is an event specification relative to the calendar.
// NOTE: Experimental
ScheduleCalendarSpec = internal.ScheduleCalendarSpec

// ScheduleIntervalSpec describes periods a schedules action should occur.
// NOTE: Experimental
ScheduleIntervalSpec = internal.ScheduleIntervalSpec

// ScheduleSpec describes when a schedules action should occur.
// NOTE: Experimental
ScheduleSpec = internal.ScheduleSpec

// ScheduleBackfill desribes a time periods and policy and takes Actions as if that time passed by right now, all at once.
// NOTE: Experimental
ScheduleBackfill = internal.ScheduleBackfill

// ScheduleAction is the interface for all actions a schedule can take.
// NOTE: Experimental
ScheduleAction = internal.ScheduleAction

// ScheduleWorkflowAction is the implementation of ScheduleAction to start a workflow.
// NOTE: Experimental
ScheduleWorkflowAction = internal.ScheduleWorkflowAction

// ScheduleOptions configuration parameters for creating a schedule.
// NOTE: Experimental
ScheduleOptions = internal.ScheduleOptions

// ScheduleClient is the interface with the server to create and get handles to schedules.
// NOTE: Experimental
ScheduleClient = internal.ScheduleClient

// ScheduleListOptions are configuration parameters for listing schedules.
// NOTE: Experimental
ScheduleListOptions = internal.ScheduleListOptions

// ScheduleListIterator is a iterator which can return created schedules.
// NOTE: Experimental
ScheduleListIterator = internal.ScheduleListIterator

// ScheduleListEntry is a result from ScheduleListEntry.
// NOTE: Experimental
ScheduleListEntry = internal.ScheduleListEntry

// ScheduleUpdateOptions are configuration parameters for updating a schedule.
// NOTE: Experimental
ScheduleUpdateOptions = internal.ScheduleUpdateOptions

// ScheduleHandle represents a created schedule.
// NOTE: Experimental
ScheduleHandle = internal.ScheduleHandle

// ScheduleActionResult describes when a schedule action took place.
// NOTE: Experimental
ScheduleActionResult = internal.ScheduleActionResult

// ScheduleWorkflowExecution contains details on a workflows execution stared by a schedule.
// NOTE: Experimental
ScheduleWorkflowExecution = internal.ScheduleWorkflowExecution

// ScheduleDescription describes the current Schedule details from ScheduleHandle.Describe.
// NOTE: Experimental
ScheduleDescription = internal.ScheduleDescription

// Schedule describes a created schedule.
// NOTE: Experimental
Schedule = internal.Schedule

// ScheduleUpdate describes the desired new schedule from ScheduleHandle.Update.
// NOTE: Experimental
ScheduleUpdate = internal.ScheduleUpdate

// ScheduleUpdateInput describes the current state of the schedule to be updated.
// NOTE: Experimental
ScheduleUpdateInput = internal.ScheduleUpdateInput

// ScheduleTriggerOptions configure the parameters for triggering a schedule.
// NOTE: Experimental
ScheduleTriggerOptions = internal.ScheduleTriggerOptions

// SchedulePauseOptions configure the parameters for pausing a schedule.
// NOTE: Experimental
SchedulePauseOptions = internal.SchedulePauseOptions

// ScheduleUnpauseOptions configure the parameters for unpausing a schedule.
// NOTE: Experimental
ScheduleUnpauseOptions = internal.ScheduleUnpauseOptions

// ScheduleBackfillOptions configure the parameters for backfilling a schedule.
// NOTE: Experimental
ScheduleBackfillOptions = internal.ScheduleBackfillOptions

// Client is the client for starting and getting information about a workflow executions as well as
// completing activities asynchronously.
Client interface {
Expand Down Expand Up @@ -389,6 +485,10 @@ type (
// OperatorService creates a new operator service client with the same gRPC connection as this client.
OperatorService() operatorservice.OperatorServiceClient

// Schedule creates a new shedule client with the same gRPC connection as this client.
// NOTE: Experimental
ScheduleClient() ScheduleClient

// Close client and clean up underlying resources.
//
// If this client was created via NewClientFromExisting or this client has
Expand Down
4 changes: 4 additions & 0 deletions interceptor/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ type ClientTerminateWorkflowInput = internal.ClientTerminateWorkflowInput
// ClientOutboundInterceptor.QueryWorkflow.
type ClientQueryWorkflowInput = internal.ClientQueryWorkflowInput

// ScheduleClientCreateInput is input for
// ScheduleClientInterceptor.CreateSchedule.
type ScheduleClientCreateInput = internal.ScheduleClientCreateInput

// Header provides Temporal header information from the context for reading or
// writing during specific interceptor calls.
//
Expand Down
19 changes: 19 additions & 0 deletions interceptor/tracing_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,25 @@ type tracingClientOutboundInterceptor struct {
root *tracingInterceptor
}

func (t *tracingClientOutboundInterceptor) CreateSchedule(ctx context.Context, in *ScheduleClientCreateInput) (client.ScheduleHandle, error) {
// Start span and write to header
span, ctx, err := t.root.startSpanFromContext(ctx, &TracerStartSpanOptions{
Operation: "CreateSchedule",
Name: in.Options.ID,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schedules don't really have a concept of a name so I used the ID.

ToHeader: true,
Time: time.Now(),
})
if err != nil {
return nil, err
}
var finishOpts TracerFinishSpanOptions
defer span.Finish(&finishOpts)

run, err := t.Next.CreateSchedule(ctx, in)
finishOpts.Error = err
return run, err
}

func (t *tracingClientOutboundInterceptor) ExecuteWorkflow(
ctx context.Context,
in *ClientExecuteWorkflowInput,
Expand Down
5 changes: 4 additions & 1 deletion internal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,9 @@ type (
// OperatorService creates a new operator service client with the same gRPC connection as this client.
OperatorService() operatorservice.OperatorServiceClient

// Schedule creates a new shedule client with the same gRPC connection as this client.
Quinn-With-Two-Ns marked this conversation as resolved.
Show resolved Hide resolved
ScheduleClient() ScheduleClient

// Close client and clean up underlying resources.
Close()
}
Expand Down Expand Up @@ -395,7 +398,7 @@ type (
DataConverter converter.DataConverter

// Optional: Sets FailureConverter to customize serialization/deserialization of errors.
// default: temporal.DefaultFailureConverter, does not encode any fields of the error. Use temporal.NewDefaultFailureConverter
// default: temporal.DefaultFailureConverter, does not encode any fields of the error. Use temporal.NewDefaultFailureConverter
// options to configure or create a custom converter.
FailureConverter converter.FailureConverter

Expand Down
6 changes: 6 additions & 0 deletions internal/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,12 @@ var (
// which indicate the activity is not done yet. Then, when the waited human action happened, it needs to trigger something
// that could report the activity completed event to temporal server via Client.CompleteActivity() API.
ErrActivityResultPending = errors.New("not error: do not autocomplete, using Client.CompleteActivity() to complete")

// ErrScheduleAlreadyRunning is returned if there's already a running (not deleted) Schedule with the same ID
ErrScheduleAlreadyRunning = errors.New("schedule with this ID is already registered")

// ErrSkipScheduleUpdate is used by a user if they want to skip updating a schedule.
ErrSkipScheduleUpdate = errors.New("skip schedule update")
)

// NewApplicationError create new instance of *ApplicationError with message, type, and optional details.
Expand Down
9 changes: 9 additions & 0 deletions internal/interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ type ClientOutboundInterceptor interface {
// interceptor.Header will return a non-nil map for this context.
ExecuteWorkflow(context.Context, *ClientExecuteWorkflowInput) (WorkflowRun, error)

// CreateSchedule - Intercept a service call to CreateSchedule
CreateSchedule(ctx context.Context, options *ScheduleClientCreateInput) (ScheduleHandle, error)

// SignalWorkflow intercepts client.Client.SignalWorkflow.
// interceptor.Header will return a non-nil map for this context.
SignalWorkflow(context.Context, *ClientSignalWorkflowInput) error
Expand All @@ -299,6 +302,12 @@ type ClientOutboundInterceptor interface {
mustEmbedClientOutboundInterceptorBase()
}

// ScheduleClientCreateInput is the input to
// ClientOutboundInterceptor.CreateSchedule.
type ScheduleClientCreateInput struct {
Options *ScheduleOptions
}

// ClientExecuteWorkflowInput is the input to
// ClientOutboundInterceptor.ExecuteWorkflow.
type ClientExecuteWorkflowInput struct {
Expand Down
5 changes: 5 additions & 0 deletions internal/interceptor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,4 +429,9 @@ func (c *ClientOutboundInterceptorBase) QueryWorkflow(
return c.Next.QueryWorkflow(ctx, in)
}

// ExecuteWorkflow implements ClientOutboundInterceptor.CreateSchedule.
func (c *ClientOutboundInterceptorBase) CreateSchedule(ctx context.Context, in *ScheduleClientCreateInput) (ScheduleHandle, error) {
return c.Next.CreateSchedule(ctx, in)
}

func (*ClientOutboundInterceptorBase) mustEmbedClientOutboundInterceptorBase() {}
Loading