Skip to content

Commit

Permalink
Workflow Management - Initial Methods (dapr#1003)
Browse files Browse the repository at this point in the history
Initial work for workflows DotNET SDK

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>
  • Loading branch information
RyanLettieri committed Feb 9, 2023
1 parent 6e77f12 commit 1bec37d
Show file tree
Hide file tree
Showing 12 changed files with 383 additions and 12 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/itests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ jobs:
install-version: '7.0.x'
env:
NUPKG_OUTDIR: bin/Release/nugets
GOVER: 1.17
GOVER: 1.19
GOOS: linux
GOARCH: amd64
GOPROXY: https://proxy.golang.org
DAPR_CLI_VER: 1.8.0
DAPR_RUNTIME_VER: 1.8.0
DAPR_CLI_VER: 1.9.1
DAPR_RUNTIME_VER: 1.9.5
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/3dacfb672d55f1436c249057aaebbe597e1066f3/install/install.sh
DAPR_CLI_REF: ''
DAPR_REF: ''
DAPR_REF: '82e097134cdc7494c5c3be71bf80f7904d6db9ea'
steps:
- name: Set up Dapr CLI
run: wget -q ${{ env.DAPR_INSTALL_URL }} -O - | /bin/bash -s ${{ env.DAPR_CLI_VER }}
Expand Down
47 changes: 47 additions & 0 deletions src/Dapr.Client/DaprClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -975,6 +975,53 @@ public abstract Task<UnlockResponse> Unlock(
string lockOwner,
CancellationToken cancellationToken = default);

/// <summary>
/// Attempt to start the given workflow with response indicating success.
/// </summary>
/// <param name="instanceId">Identifier of the specific run.</param>
/// <param name="workflowComponent">The component to interface with.</param>
/// <param name="workflowName">Name of the workflow to run.</param>
/// <param name="workflowOptions">The list of options that are potentially needed to start a workflow.</param>
/// <param name="input">The input input for the given workflow.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task"/> containing a <see cref="WorkflowReference"/></returns>
[Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public abstract Task<WorkflowReference> StartWorkflowAsync(
string instanceId,
string workflowComponent,
string workflowName,
Object input,
IReadOnlyDictionary<string, string> workflowOptions = default,
CancellationToken cancellationToken = default);

/// <summary>
/// Attempt to get information about the given workflow.
/// </summary>
/// <param name="instanceId">The unique ID of the target workflow instance.</param>
/// <param name="workflowComponent">The component to interface with.</param>
/// <param name="workflowName">Name of the workflow to run.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task"/> containing a <see cref="GetWorkflowResponse"/></returns>
[Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public abstract Task<GetWorkflowResponse> GetWorkflowAsync(
string instanceId,
string workflowComponent,
string workflowName,
CancellationToken cancellationToken = default);

/// <summary>
/// Attempt to get terminate the given workflow.
/// </summary>
/// <param name="instanceId">The unique ID of the target workflow instance.</param>
/// <param name="workflowComponent">The component to interface with.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" /> that will complete when the terminate operation has been scheduled. If the wrapped value is true the operation suceeded.</returns>
[Obsolete("This API is currently not stable as it is in the Alpha stage. This attribute will be removed once it is stable.")]
public abstract Task TerminateWorkflowAsync(
string instanceId,
string workflowComponent,
CancellationToken cancellationToken = default);

/// <inheritdoc />
public void Dispose()
{
Expand Down
114 changes: 114 additions & 0 deletions src/Dapr.Client/DaprClientGrpc.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1462,6 +1462,120 @@ public async override Task<UnlockResponse> Unlock(

#endregion


#region Workflow API
/// <inheritdoc/>
[Obsolete]
public async override Task<WorkflowReference> StartWorkflowAsync(
string instanceId,
string workflowComponent,
string workflowName,
Object input,
IReadOnlyDictionary<string, string> workflowOptions = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(instanceId, nameof(instanceId));
ArgumentVerifier.ThrowIfNullOrEmpty(workflowComponent, nameof(workflowComponent));
ArgumentVerifier.ThrowIfNullOrEmpty(workflowName, nameof(workflowName));
ArgumentVerifier.ThrowIfNull(input, nameof(input));

// Serialize json data. Converts input object to bytes and then bytestring inside the request.
byte[] jsonUtf8Bytes = JsonSerializer.SerializeToUtf8Bytes(input);

var request = new Autogenerated.StartWorkflowRequest()
{
InstanceId = instanceId,
WorkflowComponent = workflowComponent,
WorkflowName = workflowName,
Input = ByteString.CopyFrom(jsonUtf8Bytes),
};

if (workflowOptions?.Count > 0)
{
foreach (var item in workflowOptions)
{
request.Options[item.Key] = item.Value;
}
}

try
{
var options = CreateCallOptions(headers: null, cancellationToken);
var response = await client.StartWorkflowAlpha1Async(request, options);
return new WorkflowReference(response.InstanceId);

}
catch (RpcException ex)
{
throw new DaprException("Start Workflow operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}
}

/// <inheritdoc/>
[Obsolete]
public async override Task<GetWorkflowResponse> GetWorkflowAsync(
string instanceId,
string workflowComponent,
string workflowName,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(instanceId, nameof(instanceId));
ArgumentVerifier.ThrowIfNullOrEmpty(workflowComponent, nameof(workflowComponent));

var request = new Autogenerated.GetWorkflowRequest()
{
InstanceId = instanceId,
WorkflowComponent = workflowComponent,
WorkflowType = workflowName //TODO: Change 'WorkflowType' to 'WorkflowName' once changes go through dapr/dapr
};

try
{
var options = CreateCallOptions(headers: null, cancellationToken);
var response = await client.GetWorkflowAlpha1Async(request, options);
var dateTimeValue = new DateTime(response.StartTime, DateTimeKind.Utc);
return new GetWorkflowResponse(response.InstanceId, dateTimeValue, response.Metadata);
}
catch (RpcException ex)
{
throw new DaprException("Get workflow operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}

}


/// <inheritdoc/>
[Obsolete]
public async override Task TerminateWorkflowAsync(
string instanceId,
string workflowComponent,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(instanceId, nameof(instanceId));
ArgumentVerifier.ThrowIfNullOrEmpty(workflowComponent, nameof(workflowComponent));

var request = new Autogenerated.TerminateWorkflowRequest()
{
InstanceId = instanceId,
WorkflowComponent = workflowComponent
};

var options = CreateCallOptions(headers: null, cancellationToken);

try
{
await client.TerminateWorkflowAlpha1Async(request, options);
}
catch (RpcException ex)
{
throw new DaprException("Terminate workflow operation failed: the Dapr endpoint indicated a failure. See InnerException for details.", ex);
}

}

#endregion


#region Dapr Sidecar Methods

/// <inheritdoc/>
Expand Down
26 changes: 26 additions & 0 deletions src/Dapr.Client/GetWorkflowResponse.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System;
using System.Collections.Generic;

namespace Dapr.Client
{
/// <summary>
/// Initializes a new <see cref="GetWorkflowResponse" />.
/// </summary>
/// <param name="instanceId">The instance ID assocated with this response.</param>
/// <param name="startTime">The time at which the workflow started executing.</param>
/// <param name="metadata">The response metadata.</param>
public record GetWorkflowResponse(string instanceId, DateTime startTime, IReadOnlyDictionary<string, string> metadata);
}
71 changes: 66 additions & 5 deletions src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package dapr.proto.runtime.v1;

import "google/protobuf/any.proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
import "dapr/proto/common/v1/common.proto";

option csharp_namespace = "Dapr.Client.Autogen.Grpc.v1";
Expand Down Expand Up @@ -110,6 +111,15 @@ service Dapr {
// Sets value in extended metadata of the sidecar
rpc SetMetadata (SetMetadataRequest) returns (google.protobuf.Empty) {}

// Start Workflow
rpc StartWorkflowAlpha1 (StartWorkflowRequest) returns (WorkflowReference) {}

// Get Workflow details
rpc GetWorkflowAlpha1 (GetWorkflowRequest) returns (GetWorkflowResponse) {}

// Terminate Workflow
rpc TerminateWorkflowAlpha1 (TerminateWorkflowRequest) returns (TerminateWorkflowResponse) {}

// Shutdown the sidecar
rpc Shutdown (google.protobuf.Empty) returns (google.protobuf.Empty) {}
}
Expand Down Expand Up @@ -345,10 +355,10 @@ message InvokeBindingRequest {
bytes data = 2;

// The metadata passing to output binding components
//
//
// Common metadata property:
// - ttlInSeconds : the time to live in seconds for the message.
// If set in the binding definition will cause all messages to
// - ttlInSeconds : the time to live in seconds for the message.
// If set in the binding definition will cause all messages to
// have a default time to live. The message ttl overrides any value
// in the binding definition.
map<string, string> metadata = 3;
Expand Down Expand Up @@ -411,7 +421,7 @@ message TransactionalStateOperation {
// The type of operation to be executed
string operationType = 1;

// State values to be operated on
// State values to be operated on
common.v1.StateItem request = 2;
}

Expand Down Expand Up @@ -504,6 +514,7 @@ message InvokeActorRequest {
string actor_id = 2;
string method = 3;
bytes data = 4;
map<string, string> metadata = 5;
}

// InvokeActorResponse is the method that returns an actor invocation response.
Expand All @@ -517,6 +528,7 @@ message GetMetadataResponse {
repeated ActiveActorsCount active_actors_count = 2;
repeated RegisteredComponents registered_components = 3;
map<string, string> extended_metadata = 4;
repeated PubsubSubscription subscriptions = 5;
}

message ActiveActorsCount {
Expand All @@ -531,6 +543,23 @@ message RegisteredComponents {
repeated string capabilities = 4;
}

message PubsubSubscription {
string pubsub_name = 1;
string topic = 2;
map<string,string> metadata = 3;
PubsubSubscriptionRules rules = 4;
string dead_letter_topic = 5;
}

message PubsubSubscriptionRules {
repeated PubsubSubscriptionRule rules = 1;
}

message PubsubSubscriptionRule {
string match = 1;
string path = 2;
}

message SetMetadataRequest {
string key = 1;
string value = 2;
Expand Down Expand Up @@ -644,4 +673,36 @@ message UnlockResponse {
}

Status status = 1;
}
}

message WorkflowReference {
string instance_id = 1;
}

message GetWorkflowRequest {
string instance_id = 1;
string workflow_type = 2;
string workflow_component = 3;
}

message GetWorkflowResponse {
string instance_id = 1;
int64 start_time = 2;
map<string, string> metadata = 3;
}

message StartWorkflowRequest {
string instance_id = 1;
string workflow_component = 2;
string workflow_name = 3;
map<string, string> options = 4;
bytes input = 5;
}

message TerminateWorkflowRequest {
string instance_id = 1;
string workflow_component = 2;
}

message TerminateWorkflowResponse {
}
40 changes: 40 additions & 0 deletions src/Dapr.Client/WorkflowReference.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------

using System;
using System.Collections.Generic;

namespace Dapr.Client
{
/// <summary>
/// Represents the response from invoking a workflow.
/// </summary>
public sealed class WorkflowReference
{
/// <summary>
/// Initializes a new <see cref="WorkflowReference" />.`
/// </summary>
/// <param name="instanceId">The instance ID assocated with this response.</param>
public WorkflowReference(string instanceId)
{
ArgumentVerifier.ThrowIfNull(instanceId, nameof(instanceId));
this.InstanceId = instanceId;
}

/// <summary>
/// The instance ID assocated with this workflow.
/// </summary>
public string InstanceId { set; get; }

}
}
2 changes: 1 addition & 1 deletion src/Dapr.Workflow/Dapr.Workflow.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

<!-- NuGet configuration -->
<PropertyGroup>
<TargetFramework>net6</TargetFramework>
<TargetFrameworks>netcoreapp3.1;net6;net7</TargetFrameworks>
<Nullable>enable</Nullable>
<PackageId>Dapr.Workflow</PackageId>
<Title>Dapr Workflow Authoring SDK</Title>
Expand Down
Loading

0 comments on commit 1bec37d

Please sign in to comment.