From 1bec37d79599af5cfe2202da36d0720ecf3ab09d Mon Sep 17 00:00:00 2001 From: Ryan Lettieri <67934986+RyanLettieri@users.noreply.github.com> Date: Mon, 30 Jan 2023 17:12:00 -0700 Subject: [PATCH] Workflow Management - Initial Methods (#1003) Initial work for workflows DotNET SDK Signed-off-by: Ryan Lettieri --- .github/workflows/itests.yml | 8 +- src/Dapr.Client/DaprClient.cs | 47 ++++++++ src/Dapr.Client/DaprClientGrpc.cs | 114 ++++++++++++++++++ src/Dapr.Client/GetWorkflowResponse.cs | 26 ++++ .../Protos/dapr/proto/dapr/v1/dapr.proto | 71 ++++++++++- src/Dapr.Client/WorkflowReference.cs | 40 ++++++ src/Dapr.Workflow/Dapr.Workflow.csproj | 2 +- test/Dapr.Client.Test/Dapr.Client.Test.csproj | 2 +- .../Dapr.E2E.Test.App.csproj | 1 + test/Dapr.E2E.Test.App/Startup.cs | 20 +++ test/Dapr.E2E.Test/Dapr.E2E.Test.csproj | 3 +- test/Dapr.E2E.Test/Workflows/WorkflowTest.cs | 61 ++++++++++ 12 files changed, 383 insertions(+), 12 deletions(-) create mode 100644 src/Dapr.Client/GetWorkflowResponse.cs create mode 100644 src/Dapr.Client/WorkflowReference.cs create mode 100644 test/Dapr.E2E.Test/Workflows/WorkflowTest.cs diff --git a/.github/workflows/itests.yml b/.github/workflows/itests.yml index b82583bee..b241562bf 100644 --- a/.github/workflows/itests.yml +++ b/.github/workflows/itests.yml @@ -41,15 +41,15 @@ jobs: install-version: '7.0.x' env: NUPKG_OUTDIR: bin/Release/nugets - GOVER: 1.17 + GOVER: 1.19 GOOS: linux GOARCH: amd64 GOPROXY: https://proxy.golang.org - DAPR_CLI_VER: 1.8.0 - DAPR_RUNTIME_VER: 1.8.0 + DAPR_CLI_VER: 1.9.1 + DAPR_RUNTIME_VER: 1.9.5 DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh DAPR_CLI_REF: '' - DAPR_REF: '' + DAPR_REF: '82e097134cdc7494c5c3be71bf80f7904d6db9ea' steps: - name: Set up Dapr CLI run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }} diff --git a/src/Dapr.Client/DaprClient.cs b/src/Dapr.Client/DaprClient.cs index ef90d5f58..29f0d9b3f 100644 --- a/src/Dapr.Client/DaprClient.cs +++ b/src/Dapr.Client/DaprClient.cs @@ -975,6 +975,53 @@ public abstract Task Unlock( string lockOwner, CancellationToken cancellationToken = default); + /// + /// Attempt to start the given workflow with response indicating success. + /// + /// Identifier of the specific run. + /// The component to interface with. + /// Name of the workflow to run. + /// The list of options that are potentially needed to start a workflow. + /// The input input for the given workflow. + /// A that can be used to cancel the operation. + /// A containing a + [Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] + public abstract Task StartWorkflowAsync( + string instanceId, + string workflowComponent, + string workflowName, + Object input, + IReadOnlyDictionary workflowOptions = default, + CancellationToken cancellationToken = default); + + /// + /// Attempt to get information about the given workflow. + /// + /// The unique ID of the target workflow instance. + /// The component to interface with. + /// Name of the workflow to run. + /// A that can be used to cancel the operation. + /// A containing a + [Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] + public abstract Task GetWorkflowAsync( + string instanceId, + string workflowComponent, + string workflowName, + CancellationToken cancellationToken = default); + + /// + /// Attempt to get terminate the given workflow. + /// + /// The unique ID of the target workflow instance. + /// The component to interface with. + /// A that can be used to cancel the operation. + /// A that will complete when the terminate operation has been scheduled. If the wrapped value is true the operation suceeded. + [Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")] + public abstract Task TerminateWorkflowAsync( + string instanceId, + string workflowComponent, + CancellationToken cancellationToken = default); + /// public void Dispose() { diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index 28df698d0..b5b0774a8 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -1462,6 +1462,120 @@ public async override Task Unlock( #endregion + + #region Workflow API + /// + [Obsolete] + public async override Task StartWorkflowAsync( + string instanceId, + string workflowComponent, + string workflowName, + Object input, + IReadOnlyDictionary workflowOptions = default, + CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNullOrEmpty(instanceId, nameof(instanceId)); + ArgumentVerifier.ThrowIfNullOrEmpty(workflowComponent, nameof(workflowComponent)); + ArgumentVerifier.ThrowIfNullOrEmpty(workflowName, nameof(workflowName)); + ArgumentVerifier.ThrowIfNull(input, nameof(input)); + + // Serialize json data. Converts input object to bytes and then bytestring inside the request. + byte[] jsonUtf8Bytes = JsonSerializer.SerializeToUtf8Bytes(input); + + var request = new Autogenerated.StartWorkflowRequest() + { + InstanceId = instanceId, + WorkflowComponent = workflowComponent, + WorkflowName = workflowName, + Input = ByteString.CopyFrom(jsonUtf8Bytes), + }; + + if (workflowOptions?.Count > 0) + { + foreach (var item in workflowOptions) + { + request.Options[item.Key] = item.Value; + } + } + + try + { + var options = CreateCallOptions(headers: null, cancellationToken); + var response = await client.StartWorkflowAlpha1Async(request, options); + return new WorkflowReference(response.InstanceId); + + } + catch (RpcException ex) + { + throw new DaprException("Start Workflow operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + } + + /// + [Obsolete] + public async override Task GetWorkflowAsync( + string instanceId, + string workflowComponent, + string workflowName, + CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNullOrEmpty(instanceId, nameof(instanceId)); + ArgumentVerifier.ThrowIfNullOrEmpty(workflowComponent, nameof(workflowComponent)); + + var request = new Autogenerated.GetWorkflowRequest() + { + InstanceId = instanceId, + WorkflowComponent = workflowComponent, + WorkflowType = workflowName //TODO: Change 'WorkflowType' to 'WorkflowName' once changes go through dapr/dapr + }; + + try + { + var options = CreateCallOptions(headers: null, cancellationToken); + var response = await client.GetWorkflowAlpha1Async(request, options); + var dateTimeValue = new DateTime(response.StartTime, DateTimeKind.Utc); + return new GetWorkflowResponse(response.InstanceId, dateTimeValue, response.Metadata); + } + catch (RpcException ex) + { + throw new DaprException("Get workflow operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + + } + + + /// + [Obsolete] + public async override Task TerminateWorkflowAsync( + string instanceId, + string workflowComponent, + CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNullOrEmpty(instanceId, nameof(instanceId)); + ArgumentVerifier.ThrowIfNullOrEmpty(workflowComponent, nameof(workflowComponent)); + + var request = new Autogenerated.TerminateWorkflowRequest() + { + InstanceId = instanceId, + WorkflowComponent = workflowComponent + }; + + var options = CreateCallOptions(headers: null, cancellationToken); + + try + { + await client.TerminateWorkflowAlpha1Async(request, options); + } + catch (RpcException ex) + { + throw new DaprException("Terminate workflow operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex); + } + + } + + #endregion + + #region Dapr Sidecar Methods /// diff --git a/src/Dapr.Client/GetWorkflowResponse.cs b/src/Dapr.Client/GetWorkflowResponse.cs new file mode 100644 index 000000000..408a52592 --- /dev/null +++ b/src/Dapr.Client/GetWorkflowResponse.cs @@ -0,0 +1,26 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Collections.Generic; + +namespace Dapr.Client +{ + /// + /// Initializes a new . + /// + /// The instance ID assocated with this response. + /// The time at which the workflow started executing. + /// The response metadata. + public record GetWorkflowResponse(string instanceId, DateTime startTime, IReadOnlyDictionary metadata); +} diff --git a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto index b5bd00db0..52cbfe6f8 100644 --- a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto +++ b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto @@ -17,6 +17,7 @@ package dapr.proto.runtime.v1; import "google/protobuf/any.proto"; import "google/protobuf/empty.proto"; +import "google/protobuf/timestamp.proto"; import "dapr/proto/common/v1/common.proto"; option csharp_namespace = "Dapr.Client.Autogen.Grpc.v1"; @@ -110,6 +111,15 @@ service Dapr { // Sets value in extended metadata of the sidecar rpc SetMetadata (SetMetadataRequest) returns (google.protobuf.Empty) {} + // Start Workflow + rpc StartWorkflowAlpha1 (StartWorkflowRequest) returns (WorkflowReference) {} + + // Get Workflow details + rpc GetWorkflowAlpha1 (GetWorkflowRequest) returns (GetWorkflowResponse) {} + + // Terminate Workflow + rpc TerminateWorkflowAlpha1 (TerminateWorkflowRequest) returns (TerminateWorkflowResponse) {} + // Shutdown the sidecar rpc Shutdown (google.protobuf.Empty) returns (google.protobuf.Empty) {} } @@ -345,10 +355,10 @@ message InvokeBindingRequest { bytes data = 2; // The metadata passing to output binding components - // + // // Common metadata property: - // - ttlInSeconds : the time to live in seconds for the message. - // If set in the binding definition will cause all messages to + // - ttlInSeconds : the time to live in seconds for the message. + // If set in the binding definition will cause all messages to // have a default time to live. The message ttl overrides any value // in the binding definition. map metadata = 3; @@ -411,7 +421,7 @@ message TransactionalStateOperation { // The type of operation to be executed string operationType = 1; - // State values to be operated on + // State values to be operated on common.v1.StateItem request = 2; } @@ -504,6 +514,7 @@ message InvokeActorRequest { string actor_id = 2; string method = 3; bytes data = 4; + map metadata = 5; } // InvokeActorResponse is the method that returns an actor invocation response. @@ -517,6 +528,7 @@ message GetMetadataResponse { repeated ActiveActorsCount active_actors_count = 2; repeated RegisteredComponents registered_components = 3; map extended_metadata = 4; + repeated PubsubSubscription subscriptions = 5; } message ActiveActorsCount { @@ -531,6 +543,23 @@ message RegisteredComponents { repeated string capabilities = 4; } +message PubsubSubscription { + string pubsub_name = 1; + string topic = 2; + map metadata = 3; + PubsubSubscriptionRules rules = 4; + string dead_letter_topic = 5; +} + +message PubsubSubscriptionRules { + repeated PubsubSubscriptionRule rules = 1; +} + +message PubsubSubscriptionRule { + string match = 1; + string path = 2; +} + message SetMetadataRequest { string key = 1; string value = 2; @@ -644,4 +673,36 @@ message UnlockResponse { } Status status = 1; -} \ No newline at end of file +} + +message WorkflowReference { + string instance_id = 1; +} + +message GetWorkflowRequest { + string instance_id = 1; + string workflow_type = 2; + string workflow_component = 3; +} + +message GetWorkflowResponse { + string instance_id = 1; + int64 start_time = 2; + map metadata = 3; +} + +message StartWorkflowRequest { + string instance_id = 1; + string workflow_component = 2; + string workflow_name = 3; + map options = 4; + bytes input = 5; +} + +message TerminateWorkflowRequest { + string instance_id = 1; + string workflow_component = 2; +} + +message TerminateWorkflowResponse { +} diff --git a/src/Dapr.Client/WorkflowReference.cs b/src/Dapr.Client/WorkflowReference.cs new file mode 100644 index 000000000..22489bb9d --- /dev/null +++ b/src/Dapr.Client/WorkflowReference.cs @@ -0,0 +1,40 @@ +// ------------------------------------------------------------------------ +// Copyright 2021 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Collections.Generic; + +namespace Dapr.Client +{ + /// + /// Represents the response from invoking a workflow. + /// + public sealed class WorkflowReference + { + /// + /// Initializes a new .` + /// + /// The instance ID assocated with this response. + public WorkflowReference(string instanceId) + { + ArgumentVerifier.ThrowIfNull(instanceId, nameof(instanceId)); + this.InstanceId = instanceId; + } + + /// + /// The instance ID assocated with this workflow. + /// + public string InstanceId { set; get; } + + } +} diff --git a/src/Dapr.Workflow/Dapr.Workflow.csproj b/src/Dapr.Workflow/Dapr.Workflow.csproj index 5c7bf8e98..48a57eb17 100644 --- a/src/Dapr.Workflow/Dapr.Workflow.csproj +++ b/src/Dapr.Workflow/Dapr.Workflow.csproj @@ -2,7 +2,7 @@ - net6 + netcoreapp3.1;net6;net7 enable Dapr.Workflow Dapr Workflow Authoring SDK diff --git a/test/Dapr.Client.Test/Dapr.Client.Test.csproj b/test/Dapr.Client.Test/Dapr.Client.Test.csproj index a4f91d7a1..c94f031e9 100644 --- a/test/Dapr.Client.Test/Dapr.Client.Test.csproj +++ b/test/Dapr.Client.Test/Dapr.Client.Test.csproj @@ -9,7 +9,7 @@ all - + diff --git a/test/Dapr.E2E.Test.App/Dapr.E2E.Test.App.csproj b/test/Dapr.E2E.Test.App/Dapr.E2E.Test.App.csproj index ca1004cba..7e114e8df 100644 --- a/test/Dapr.E2E.Test.App/Dapr.E2E.Test.App.csproj +++ b/test/Dapr.E2E.Test.App/Dapr.E2E.Test.App.csproj @@ -8,5 +8,6 @@ + diff --git a/test/Dapr.E2E.Test.App/Startup.cs b/test/Dapr.E2E.Test.App/Startup.cs index d6c82413a..14e1358d5 100644 --- a/test/Dapr.E2E.Test.App/Startup.cs +++ b/test/Dapr.E2E.Test.App/Startup.cs @@ -18,6 +18,7 @@ namespace Dapr.E2E.Test using Dapr.E2E.Test.Actors.Timers; using Dapr.E2E.Test.Actors.ExceptionTesting; using Dapr.E2E.Test.App.ErrorTesting; + using Dapr.Workflow; using Microsoft.AspNetCore.Authentication; using Microsoft.AspNetCore.Authorization; using Microsoft.AspNetCore.Builder; @@ -25,6 +26,7 @@ namespace Dapr.E2E.Test using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; + using System.Threading.Tasks; /// /// Startup class. @@ -54,6 +56,24 @@ public void ConfigureServices(IServiceCollection services) services.AddAuthentication().AddDapr(); services.AddAuthorization(o => o.AddDapr()); services.AddControllers().AddDapr(); + // Register a workflow and associated activity + services.AddDaprWorkflow(options => + { + // Example of registering a "PlaceOrder" workflow function + options.RegisterWorkflow("PlaceOrder", implementation: async (context, input) => + { + // In real life there are other steps related to placing an order, like reserving + // inventory and charging the customer credit card etc. But let's keep it simple ;) + return await context.CallActivityAsync("ShipProduct", "Coffee Beans"); + }); + + // Example of registering a "ShipProduct" workflow activity function + options.RegisterActivity("ShipProduct", implementation: (context, input) => + { + System.Threading.Thread.Sleep(10000); // sleep for 10s to allow the terminate command to come through + return Task.FromResult($"We are shipping {input} to the customer using our hoard of drones!"); + }); + }); services.AddActors(options => { options.Actors.RegisterActor(); diff --git a/test/Dapr.E2E.Test/Dapr.E2E.Test.csproj b/test/Dapr.E2E.Test/Dapr.E2E.Test.csproj index a330fbf30..3df269709 100644 --- a/test/Dapr.E2E.Test/Dapr.E2E.Test.csproj +++ b/test/Dapr.E2E.Test/Dapr.E2E.Test.csproj @@ -4,7 +4,7 @@ - + @@ -16,6 +16,7 @@ + diff --git a/test/Dapr.E2E.Test/Workflows/WorkflowTest.cs b/test/Dapr.E2E.Test/Workflows/WorkflowTest.cs new file mode 100644 index 000000000..b5421b999 --- /dev/null +++ b/test/Dapr.E2E.Test/Workflows/WorkflowTest.cs @@ -0,0 +1,61 @@ +// ------------------------------------------------------------------------ +// Copyright 2022 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.E2E.Test +{ + + using System.Threading; + using System.Threading.Tasks; + using Xunit; + using FluentAssertions; + using System; + using System.Collections.Generic; + using Google.Protobuf; + using Dapr.Client; + + [System.Obsolete] + public partial class E2ETests + { + [Fact] + public async Task TestWorkflows() + { + string instanceId = "TestWorkflowInstanceID"; + string workflowComponent = "dapr"; + string workflowName = "PlaceOrder"; + object input = ByteString.CopyFrom(0x01); + Dictionary workflowOptions = new Dictionary(); + workflowOptions.Add("task_queue", "testQueue"); + CancellationToken cts = new CancellationToken(); + + using var daprClient = new DaprClientBuilder().UseGrpcEndpoint(this.GrpcEndpoint).UseHttpEndpoint(this.HttpEndpoint).Build(); + var health = await daprClient.CheckHealthAsync(); + health.Should().Be(true, "DaprClient is not healthy"); + + // START WORKFLOW TEST + var startResponse = await daprClient.StartWorkflowAsync(instanceId, workflowComponent, workflowName, input, workflowOptions, cts); + startResponse.InstanceId.Should().Be("TestWorkflowInstanceID", $"Instance ID {startResponse.InstanceId} was not correct"); + + // GET INFO TEST + var getResponse = await daprClient.GetWorkflowAsync(instanceId, workflowComponent, workflowName); + getResponse.instanceId.Should().Be("TestWorkflowInstanceID"); + getResponse.metadata["dapr.workflow.runtime_status"].Should().Be("RUNNING", $"Instance ID {getResponse.metadata["dapr.workflow.runtime_status"]} was not correct"); + + // TERMINATE TEST: + await daprClient.TerminateWorkflowAsync(instanceId, workflowComponent); + getResponse = await daprClient.GetWorkflowAsync(instanceId, workflowComponent, workflowName); + getResponse.metadata["dapr.workflow.runtime_status"].Should().Be("TERMINATED", $"Instance ID {getResponse.metadata["dapr.workflow.runtime_status"]} was not correct"); + + } + + } +} \ No newline at end of file