Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove RX as the primary pipe for gRPC communications #8281

Merged
merged 25 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
da1932e
remove RX as the primary pipe for gRPC communications
mgravell Apr 4, 2022
ea4aee8
fixup test failures
Apr 5, 2022
3739612
tweak the worker channel process flow to ensure it is testable; fixup…
mgravell Apr 6, 2022
6f1ea2e
Merge branch 'dev' into mgravell/remove-rx
mgravell May 5, 2022
e9a2c4d
Merge branch 'dev' into mgravell/remove-rx
mgravell May 5, 2022
0daaa10
merge removal of PublishFunctionLoadResponsesEvent work
mgravell May 5, 2022
60e01c8
Update src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
mgravell Jun 20, 2022
a83275f
Update src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
mgravell Jun 20, 2022
29540a2
nits and renames from PR review feedback
mgravell Jun 20, 2022
85a4265
Merge branch 'mgravell/remove-rx' of https://github.com/Azure/azure-f…
mgravell Jun 20, 2022
990ed5e
Merge branch 'dev' into mgravell/remove-rx
mgravell Jun 21, 2022
22cab89
Merge branch 'dev' into mgravell/remove-rx
mgravell Jul 5, 2022
255c7bb
fix tests on script-invocation context settings - was mock; now we si…
mgravell Jul 5, 2022
0a693c1
Merge branch 'dev' into mgravell/remove-rx
NickCraver Jul 27, 2022
a4fde1c
Fix latest dev tests
NickCraver Jul 27, 2022
2842743
Fix new capabilities test to use channel properly
NickCraver Jul 27, 2022
4a1b0c6
Tidy up/PR comments
NickCraver Jul 27, 2022
914b7d6
Fix WorkerStatus_NewWorkerAdded by properly simulating latency in the…
NickCraver Jul 27, 2022
39385ad
Merge remote-tracking branch 'origin/dev' into mgravell/remove-rx
NickCraver Aug 30, 2022
4764d1c
Address PR comments
NickCraver Aug 30, 2022
f0233b0
Test fix for SendInvocationRequest_SignalCancellation_WithCapability_…
NickCraver Aug 30, 2022
8f6ca1a
Fix PR comments - ensure _outbound and _inbound are never null
NickCraver Aug 30, 2022
6b06dc9
Remove cruft
NickCraver Aug 30, 2022
015d385
Merge remote-tracking branch 'origin/dev' into mgravell/remove-rx
NickCraver Sep 2, 2022
6f9f11e
Capture ObjectDisposedException in disposal race (occasional issue in…
NickCraver Sep 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 278 additions & 62 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannelFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Reactive.Linq;
using Microsoft.Azure.WebJobs.Script.Diagnostics;
using Microsoft.Azure.WebJobs.Script.Eventing;
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
using Microsoft.Azure.WebJobs.Script.Workers;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
using Microsoft.Azure.WebJobs.Script.Workers.SharedMemoryDataTransfer;
Expand Down Expand Up @@ -47,6 +48,7 @@ public IRpcWorkerChannel Create(string scriptRootPath, string runtime, IMetricsL
throw new InvalidOperationException($"WorkerCofig for runtime: {runtime} not found");
}
string workerId = Guid.NewGuid().ToString();
_eventManager.AddGrpcChannels(workerId); // prepare the inbound/outbound dedicated channels
liliankasem marked this conversation as resolved.
Show resolved Hide resolved
ILogger workerLogger = _loggerFactory.CreateLogger($"Worker.LanguageWorkerChannel.{runtime}.{workerId}");
IWorkerProcess rpcWorkerProcess = _rpcWorkerProcessFactory.Create(workerId, runtime, scriptRootPath, languageWorkerConfig);
return new GrpcWorkerChannel(
Expand Down
67 changes: 67 additions & 0 deletions src/WebJobs.Script.Grpc/Eventing/GrpcEventExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Channels;
using Microsoft.Azure.WebJobs.Script.Eventing;

namespace Microsoft.Azure.WebJobs.Script.Grpc.Eventing;

internal static class GrpcEventExtensions
{
// flow here is:
// 1) external request is proxied to the the GrpcWorkerChannel via one of the many Send* APIs, which writes
// to outbound-writer; this means we can have concurrent writes to outbound
// 2) if an out-of-process function is connected, a FunctionRpcService-EventStream will consume
// from outbound-reader (we'll allow for the multi-stream possibility, hence concurrent), and push it via gRPC
// 3) when the out-of-process function provides a response to FunctionRpcService-EventStream, it is written to
// inbound-writer (note we will allow for multi-stream possibility)
// 4) the GrpcWorkerChannel has a single dedicated consumer of inbound-reader, which it then marries to
// in-flight operations
internal static readonly UnboundedChannelOptions InboundOptions = new UnboundedChannelOptions
{
SingleReader = true, // see 4
SingleWriter = false, // see 3
AllowSynchronousContinuations = false,
};

internal static readonly UnboundedChannelOptions OutboundOptions = new UnboundedChannelOptions
{
SingleReader = false, // see 2
SingleWriter = false, // see 1
AllowSynchronousContinuations = false,
};

public static void AddGrpcChannels(this IScriptEventManager manager, string workerId)
{
var inbound = Channel.CreateUnbounded<InboundGrpcEvent>(InboundOptions);
if (manager.TryAddWorkerState(workerId, inbound))
{
var outbound = Channel.CreateUnbounded<OutboundGrpcEvent>(OutboundOptions);
if (manager.TryAddWorkerState(workerId, outbound))
{
return; // successfully added both
}
// we added the inbound but not the outbound; revert
manager.TryRemoveWorkerState(workerId, out inbound);
}
// this is not anticipated, so don't panic abount the allocs above
throw new ArgumentException("Duplicate worker id: " + workerId, nameof(workerId));
}

public static bool TryGetGrpcChannels(this IScriptEventManager manager, string workerId, out Channel<InboundGrpcEvent> inbound, out Channel<OutboundGrpcEvent> outbound)
=> manager.TryGetWorkerState(workerId, out inbound) & manager.TryGetWorkerState(workerId, out outbound);

public static void RemoveGrpcChannels(this IScriptEventManager manager, string workerId)
{
// remove any channels, and shut them down
if (manager.TryGetWorkerState<Channel<InboundGrpcEvent>>(workerId, out var inbound))
{
inbound.Writer.TryComplete();
}
if (manager.TryGetWorkerState<Channel<OutboundGrpcEvent>>(workerId, out var outbound))
{
outbound.Writer.TryComplete();
}
}
}

This file was deleted.

113 changes: 64 additions & 49 deletions src/WebJobs.Script.Grpc/Server/FunctionRpcService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,9 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Grpc.Core;
using Microsoft.Azure.WebJobs.Script.Eventing;
Expand All @@ -21,7 +20,6 @@ namespace Microsoft.Azure.WebJobs.Script.Grpc
// TODO: move to WebJobs.Script.Grpc package and provide event stream abstraction
internal class FunctionRpcService : FunctionRpc.FunctionRpcBase
{
private readonly SemaphoreSlim _writeLock = new SemaphoreSlim(1, 1);
private readonly IScriptEventManager _eventManager;
private readonly ILogger _logger;

Expand All @@ -33,64 +31,48 @@ public FunctionRpcService(IScriptEventManager eventManager, ILogger<FunctionRpcS

public override async Task EventStream(IAsyncStreamReader<StreamingMessage> requestStream, IServerStreamWriter<StreamingMessage> responseStream, ServerCallContext context)
{
var cancelSource = new TaskCompletionSource<bool>();
IDictionary<string, IDisposable> outboundEventSubscriptions = new Dictionary<string, IDisposable>();

var cancelSource = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
var cts = CancellationTokenSource.CreateLinkedTokenSource(context.CancellationToken);
CancellationTokenRegistration ctr = cts.Token.Register(static state => ((TaskCompletionSource<bool>)state).TrySetResult(false), cancelSource);
try
{
context.CancellationToken.Register(() => cancelSource.TrySetResult(false));
Func<Task<bool>> messageAvailable = async () =>
static Task<Task<bool>> MoveNextAsync(IAsyncStreamReader<StreamingMessage> requestStream, TaskCompletionSource<bool> cancelSource)
{
// GRPC does not accept cancellation tokens for individual reads, hence wrapper
var requestTask = requestStream.MoveNext(CancellationToken.None);
var completed = await Task.WhenAny(cancelSource.Task, requestTask);
return completed.Result;
};
return Task.WhenAny(cancelSource.Task, requestTask);
}

if (await messageAvailable())
if (await await MoveNextAsync(requestStream, cancelSource))
liliankasem marked this conversation as resolved.
Show resolved Hide resolved
{
string workerId = requestStream.Current.StartStream.WorkerId;
_logger.LogDebug("Established RPC channel. WorkerId: {workerId}", workerId);
outboundEventSubscriptions.Add(workerId, _eventManager.OfType<OutboundGrpcEvent>()
.Where(evt => evt.WorkerId == workerId)
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(async evt =>
var currentMessage = requestStream.Current;
// expect first operation (and only the first; we don't support re-registration) to be StartStream
if (currentMessage.ContentCase == MsgType.StartStream)
{
var workerId = currentMessage.StartStream?.WorkerId;
if (!string.IsNullOrEmpty(workerId) && _eventManager.TryGetGrpcChannels(workerId, out var inbound, out var outbound))
{
try
{
if (evt.MessageType == MsgType.InvocationRequest)
{
_logger.LogTrace("Writing invocation request invocationId: {invocationId} to workerId: {workerId}", evt.Message.InvocationRequest.InvocationId, workerId);
}
// send work
_ = PushFromOutboundToGrpc(workerId, responseStream, outbound.Reader, cts.Token);

try
// this loop is "pull from gRPC and push to inbound"
do
{
currentMessage = requestStream.Current;
if (currentMessage.ContentCase == MsgType.InvocationResponse && !string.IsNullOrEmpty(currentMessage.InvocationResponse?.InvocationId))
{
// WriteAsync only allows one pending write at a time, so we
// serialize access to the stream for each subscription
await _writeLock.WaitAsync();
await responseStream.WriteAsync(evt.Message);
_logger.LogTrace("Received invocation response for invocationId: {invocationId} from workerId: {workerId}", currentMessage.InvocationResponse.InvocationId, workerId);
}
finally
var newInbound = new InboundGrpcEvent(workerId, currentMessage);
if (!inbound.Writer.TryWrite(newInbound))
{
_writeLock.Release();
await inbound.Writer.WriteAsync(newInbound);
}
currentMessage = null; // allow old messages to be collected while we wait
}
catch (Exception subscribeEventEx)
{
_logger.LogError(subscribeEventEx, "Error writing message type {messageType} to workerId: {workerId}", evt.MessageType, workerId);
}
}));

do
{
var currentMessage = requestStream.Current;
if (currentMessage.InvocationResponse != null && !string.IsNullOrEmpty(currentMessage.InvocationResponse.InvocationId))
{
_logger.LogTrace("Received invocation response for invocationId: {invocationId} from workerId: {workerId}", currentMessage.InvocationResponse.InvocationId, workerId);
while (await await MoveNextAsync(requestStream, cancelSource));
}
_eventManager.Publish(new InboundGrpcEvent(workerId, currentMessage));
}
while (await messageAvailable());
}
}
catch (Exception rpcException)
Expand All @@ -101,14 +83,47 @@ public override async Task EventStream(IAsyncStreamReader<StreamingMessage> requ
}
finally
{
foreach (var sub in outboundEventSubscriptions)
{
sub.Value?.Dispose();
}
cts.Cancel();
ctr.Dispose();

// ensure cancellationSource task completes
cancelSource.TrySetResult(false);
}
}

private async Task PushFromOutboundToGrpc(string workerId, IServerStreamWriter<StreamingMessage> responseStream, ChannelReader<OutboundGrpcEvent> source, CancellationToken cancellationToken)
{
try
{
_logger.LogDebug("Established RPC channel. WorkerId: {workerId}", workerId);
await Task.Yield(); // free up the caller
while (await source.WaitToReadAsync(cancellationToken))
{
while (source.TryRead(out var evt))
{
if (evt.MessageType == MsgType.InvocationRequest)
{
_logger.LogTrace("Writing invocation request invocationId: {invocationId} to workerId: {workerId}", evt.Message.InvocationRequest.InvocationId, workerId);
}
try
{
await responseStream.WriteAsync(evt.Message);
}
catch (Exception subscribeEventEx)
{
_logger.LogError(subscribeEventEx, "Error writing message type {messageType} to workerId: {workerId}", evt.MessageType, workerId);
}
}
}
}
catch (OperationCanceledException oce) when (oce.CancellationToken == cancellationToken)
{
// that's fine; normaly exit through cancellation
}
catch (Exception ex)
{
_logger.LogError(ex, "Error pushing from outbound to gRPC");
}
}
}
}
1 change: 1 addition & 0 deletions src/WebJobs.Script.Grpc/WebJobs.Script.Grpc.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
<PackageReference Include="StyleCop.Analyzers" Version="1.1.0-beta004">
<PrivateAssets>all</PrivateAssets>
</PackageReference>
<PackageReference Include="System.Threading.Channels" Version="6.0.0" />
</ItemGroup>

<ItemGroup>
Expand Down
6 changes: 6 additions & 0 deletions src/WebJobs.Script/Eventing/IScriptEventManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,11 @@ namespace Microsoft.Azure.WebJobs.Script.Eventing
public interface IScriptEventManager : IObservable<ScriptEvent>
{
void Publish(ScriptEvent scriptEvent);

bool TryAddWorkerState<T>(string workerId, T state);

bool TryGetWorkerState<T>(string workerId, out T state);

bool TryRemoveWorkerState<T>(string workerId, out T state);
}
}
35 changes: 34 additions & 1 deletion src/WebJobs.Script/Eventing/ScriptEventManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,16 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections.Concurrent;
using System.Reactive.Subjects;

namespace Microsoft.Azure.WebJobs.Script.Eventing
{
public sealed class ScriptEventManager : IScriptEventManager, IDisposable
public class ScriptEventManager : IScriptEventManager, IDisposable
{
private readonly Subject<ScriptEvent> _subject = new Subject<ScriptEvent>();
private readonly ConcurrentDictionary<(string, Type), object> _workerState = new ();

private bool _disposed = false;

public void Publish(ScriptEvent scriptEvent)
Expand Down Expand Up @@ -47,5 +50,35 @@ private void Dispose(bool disposing)
}

public void Dispose() => Dispose(true);

bool IScriptEventManager.TryAddWorkerState<T>(string workerId, T state)
{
var key = (workerId, typeof(T));
return _workerState.TryAdd(key, state);
}

bool IScriptEventManager.TryGetWorkerState<T>(string workerId, out T state)
{
var key = (workerId, typeof(T));
if (_workerState.TryGetValue(key, out var tmp) && tmp is T typed)
{
state = typed;
return true;
}
state = default;
return false;
}

bool IScriptEventManager.TryRemoveWorkerState<T>(string workerId, out T state)
{
var key = (workerId, typeof(T));
if (_workerState.TryRemove(key, out var tmp) && tmp is T typed)
{
state = typed;
return true;
}
state = default;
return false;
}
}
}
1 change: 1 addition & 0 deletions src/WebJobs.Script/Workers/Rpc/RpcWorkerConstants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static class RpcWorkerConstants

// Host Capabilities
public const string V2Compatable = "V2Compatable";
public const string MultiStream = nameof(MultiStream);

// dotnet executable file path components
public const string DotNetExecutableName = "dotnet";
Expand Down
Loading