Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pack] Enable ordering of RpcLog and InvocationResponse messages #9657

Merged
merged 5 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 105 additions & 14 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Yarp.ReverseProxy.Forwarder;

using static Microsoft.Azure.WebJobs.Script.Grpc.Messages.RpcLog.Types;
using FunctionMetadata = Microsoft.Azure.WebJobs.Script.Description.FunctionMetadata;
using MsgType = Microsoft.Azure.WebJobs.Script.Grpc.Messages.StreamingMessage.ContentOneofCase;
Expand All @@ -51,6 +50,7 @@ internal partial class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
private readonly List<TimeSpan> _workerStatusLatencyHistory = new List<TimeSpan>();
private readonly IOptions<WorkerConcurrencyOptions> _workerConcurrencyOptions;
private readonly WaitCallback _processInbound;
private readonly IInvocationMessageDispatcherFactory _messageDispatcherFactory;
private readonly object _syncLock = new object();
private readonly object _metadataLock = new object();
private readonly Dictionary<MsgType, Queue<PendingItem>> _pendingActions = new();
Expand All @@ -65,7 +65,7 @@ internal partial class GrpcWorkerChannel : IRpcWorkerChannel, IDisposable
private RpcWorkerChannelState _state;
private IDictionary<string, Exception> _functionLoadErrors = new Dictionary<string, Exception>();
private IDictionary<string, Exception> _metadataRequestErrors = new Dictionary<string, Exception>();
private ConcurrentDictionary<string, ScriptInvocationContext> _executingInvocations = new ConcurrentDictionary<string, ScriptInvocationContext>();
private ConcurrentDictionary<string, ExecutingInvocation> _executingInvocations = new();
private IDictionary<string, BufferBlock<ScriptInvocationContext>> _functionInputBuffers = new ConcurrentDictionary<string, BufferBlock<ScriptInvocationContext>>();
private ConcurrentDictionary<string, TaskCompletionSource<bool>> _workerStatusRequests = new ConcurrentDictionary<string, TaskCompletionSource<bool>>();
private List<IDisposable> _inputLinks = new List<IDisposable>();
Expand Down Expand Up @@ -137,6 +137,9 @@ internal GrpcWorkerChannel(
_startLatencyMetric = metricsLogger?.LatencyEvent(string.Format(MetricEventNames.WorkerInitializeLatency, workerConfig.Description.Language, attemptCount));

_state = RpcWorkerChannelState.Default;

// Temporary switch to allow fully testing new algorithm in production
_messageDispatcherFactory = GetProcessorFactory();
}

private bool IsHttpProxyingWorker => _httpProxyEndpoint is not null;
Expand All @@ -149,6 +152,23 @@ internal GrpcWorkerChannel(

public RpcWorkerConfig WorkerConfig => _workerConfig;

// Temporary switch that allows us to move between the "old" ThreadPool-only processor
// and a "new" Channel processor (for proper ordering of messages).
private IInvocationMessageDispatcherFactory GetProcessorFactory()
{
if (_hostingConfigOptions.Value.EnableOrderedInvocationMessages ||
FeatureFlags.IsEnabled(ScriptConstants.FeatureFlagEnableOrderedInvocationmessages, _environment))
{
_workerChannelLogger.LogDebug($"Using {nameof(OrderedInvocationMessageDispatcherFactory)}.");
return new OrderedInvocationMessageDispatcherFactory(ProcessItem, _workerChannelLogger);
}
else
{
_workerChannelLogger.LogDebug($"Using {nameof(ThreadPoolInvocationProcessorFactory)}.");
return new ThreadPoolInvocationProcessorFactory(_processInbound);
}
}

private void ProcessItem(InboundGrpcEvent msg)
{
// note this method is a thread-pool (QueueUserWorkItem) entry-point
Expand Down Expand Up @@ -251,7 +271,8 @@ private async Task ProcessInbound()
{
Logger.ChannelReceivedMessage(_workerChannelLogger, msg.WorkerId, msg.MessageType);
}
ThreadPool.QueueUserWorkItem(_processInbound, msg);

DispatchMessage(msg);
}
}
}
Expand All @@ -266,6 +287,40 @@ private async Task ProcessInbound()
}
}

private void DispatchMessage(InboundGrpcEvent msg)
{
// RpcLog and InvocationResponse messages are special. They need to be handled by the InvocationMessageDispatcher
switch (msg.MessageType)
{
case MsgType.RpcLog when msg.Message.RpcLog.LogCategory == RpcLogCategory.User || msg.Message.RpcLog.LogCategory == RpcLogCategory.CustomMetric:
if (_executingInvocations.TryGetValue(msg.Message.RpcLog.InvocationId, out var invocation))
{
invocation.Dispatcher.DispatchRpcLog(msg);
}
else
{
// We received a log outside of a invocation
ThreadPool.QueueUserWorkItem(_processInbound, msg);
}
break;
case MsgType.InvocationResponse:
if (_executingInvocations.TryGetValue(msg.Message.InvocationResponse.InvocationId, out invocation))
{
invocation.Dispatcher.DispatchInvocationResponse(msg);
}
else
{
// This should never happen, but if it does, just send it to the ThreadPool.
ThreadPool.QueueUserWorkItem(_processInbound, msg);
}
break;
default:
// All other messages can go to the thread pool.
ThreadPool.QueueUserWorkItem(_processInbound, msg);
break;
}
}

public bool IsChannelReadyForInvocations()
{
return !_disposing && !_disposed && _state.HasFlag(RpcWorkerChannelState.InvocationBuffersInitialized | RpcWorkerChannelState.Initialized);
Expand Down Expand Up @@ -750,14 +805,14 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context)
{
_workerChannelLogger.LogDebug("Function {functionName} failed to load", context.FunctionMetadata.Name);
context.ResultSource.TrySetException(_functionLoadErrors[context.FunctionMetadata.GetFunctionId()]);
_executingInvocations.TryRemove(invocationId, out ScriptInvocationContext _);
RemoveExecutingInvocation(invocationId);
return;
}
else if (_metadataRequestErrors.ContainsKey(context.FunctionMetadata.GetFunctionId()))
{
_workerChannelLogger.LogDebug("Worker failed to load metadata for {functionName}", context.FunctionMetadata.Name);
context.ResultSource.TrySetException(_metadataRequestErrors[context.FunctionMetadata.GetFunctionId()]);
_executingInvocations.TryRemove(invocationId, out ScriptInvocationContext _);
RemoveExecutingInvocation(invocationId);
return;
}

Expand All @@ -771,7 +826,7 @@ internal async Task SendInvocationRequest(ScriptInvocationContext context)

var invocationRequest = await context.ToRpcInvocationRequest(_workerChannelLogger, _workerCapabilities, _isSharedMemoryDataTransferEnabled, _sharedMemoryManager);
AddAdditionalTraceContext(invocationRequest.TraceContext.Attributes, context);
_executingInvocations.TryAdd(invocationRequest.InvocationId, context);
_executingInvocations.TryAdd(invocationRequest.InvocationId, new(context, _messageDispatcherFactory.Create(invocationRequest.InvocationId)));
kshyju marked this conversation as resolved.
Show resolved Hide resolved
_metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvoked, Id), functionName: Sanitizer.Sanitize(context.FunctionMetadata.Name));

await SendStreamingMessageAsync(new StreamingMessage
Expand Down Expand Up @@ -980,8 +1035,9 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse)
// Check if the worker supports logging user-code-thrown exceptions to app insights
bool capabilityEnabled = !string.IsNullOrEmpty(_workerCapabilities.GetCapabilityState(RpcWorkerConstants.EnableUserCodeException));

if (_executingInvocations.TryRemove(invokeResponse.InvocationId, out ScriptInvocationContext context))
if (_executingInvocations.TryRemove(invokeResponse.InvocationId, out var invocation))
{
var context = invocation.Context;
if (invokeResponse.Result.IsInvocationSuccess(context.ResultSource, capabilityEnabled))
{
_metricsLogger.LogEvent(string.Format(MetricEventNames.WorkerInvokeSucceeded, Id));
Expand Down Expand Up @@ -1051,6 +1107,8 @@ internal async Task InvokeResponse(InvocationResponse invokeResponse)
SendCloseSharedMemoryResourcesForInvocationRequest(outputMaps);
}
}

invocation.Dispose();
}
else
{
Expand Down Expand Up @@ -1083,8 +1141,10 @@ internal void SendCloseSharedMemoryResourcesForInvocationRequest(IList<string> o
internal void Log(GrpcEvent msg)
{
var rpcLog = msg.Message.RpcLog;
if (_executingInvocations.TryGetValue(rpcLog.InvocationId, out ScriptInvocationContext context))
if (_executingInvocations.TryGetValue(rpcLog.InvocationId, out var invocation))
{
var context = invocation.Context;

// Restore the execution context from the original invocation. This allows AsyncLocal state to flow to loggers.
System.Threading.ExecutionContext.Run(context.AsyncExecutionContext, static (state) =>
{
Expand Down Expand Up @@ -1273,12 +1333,25 @@ internal void ReceiveWorkerStatusResponse(string requestId, WorkerStatusResponse
}
}

private void RemoveExecutingInvocation(string invocationId)
{
if (_executingInvocations.TryRemove(invocationId, out var invocation))
{
invocation.Dispose();
}
}

protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
foreach (var id in _executingInvocations.Keys)
{
RemoveExecutingInvocation(id);
}

_startLatencyMetric?.Dispose();
_workerInitTask?.TrySetCanceled();
_timer?.Dispose();
Expand Down Expand Up @@ -1366,9 +1439,9 @@ private void StopWorkerProcess()
public async Task DrainInvocationsAsync()
{
_workerChannelLogger.LogDebug("Count of in-buffer invocations waiting to be drained out: {invocationCount}", _executingInvocations.Count);
foreach (ScriptInvocationContext currContext in _executingInvocations.Values)
foreach (var invocation in _executingInvocations.Values)
{
await currContext.ResultSource.Task;
await invocation.Context.ResultSource.Task;
}
}

Expand All @@ -1384,12 +1457,12 @@ public bool TryFailExecutions(Exception workerException)
return false;
}

foreach (ScriptInvocationContext currContext in _executingInvocations?.Values)
foreach (var invocation in _executingInvocations?.Values)
{
string invocationId = currContext?.ExecutionContext?.InvocationId.ToString();
string invocationId = invocation.Context?.ExecutionContext?.InvocationId.ToString();
_workerChannelLogger.LogDebug("Worker '{workerId}' encountered a fatal error. Failing invocation: '{invocationId}'", _workerId, invocationId);
currContext?.ResultSource?.TrySetException(workerException);
_executingInvocations.TryRemove(invocationId, out ScriptInvocationContext _);
invocation.Context?.ResultSource?.TrySetException(workerException);
RemoveExecutingInvocation(invocationId);
}
return true;
}
Expand Down Expand Up @@ -1529,6 +1602,24 @@ private void AddAdditionalTraceContext(MapField<string, string> attributes, Scri
}
}

private sealed class ExecutingInvocation : IDisposable
{
public ExecutingInvocation(ScriptInvocationContext context, IInvocationMessageDispatcher dispatcher)
{
Context = context;
Dispatcher = dispatcher;
}

public ScriptInvocationContext Context { get; }

public IInvocationMessageDispatcher Dispatcher { get; }

public void Dispose()
{
(Dispatcher as IDisposable)?.Dispose();
}
}

private sealed class PendingItem
{
private readonly Action<InboundGrpcEvent> _callback;
Expand Down
43 changes: 43 additions & 0 deletions src/WebJobs.Script.Grpc/Channel/IInvocationMessageDispatcher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;

namespace Microsoft.Azure.WebJobs.Script.Grpc;

/// <summary>
/// Interface for processing grpc messages that may come from the worker that are
/// related to an invocation (RpcLog and InvocationResponse). The contract here is as-follows:
/// - The contract with a worker is that, during an invocation (i.e an InvocationRequest has been sent), there are only
/// two messages that the worker can send us related to that invocation
/// - one or many RpcLog messages
/// - one final InvocationResponse that effectively ends the invocation. Once the InvocationResponse is received, no
/// more RpcLog messages from this specific invocation will be processed.
/// - The GrpcChannel is looping to dequeue grpc messages from a worker. When it finds one that is either an RpcLog
/// or an InvocationResponse, it will will call the matching method on this interface (i.e. RpcLog -> DispatchRpcLog)
/// from the same thread that is looping and dequeuing items from the grpc channel. The implementors of this interface
/// must quickly dispatch the message to a background Task or Thread for handling, so as to not block the
/// main loop from dequeuing more messages.
/// - Because the methods on this interface are all on being called from the same thread, they do not need to be
/// thread-safe. They can assume that they will not be called multiple times from different threads.
/// </summary>
internal interface IInvocationMessageDispatcher
{
/// <summary>
/// Inspects the incoming RpcLog and dispatches to a Thread or background Task as quickly as possible. This method is
/// called from a loop processing incoming grpc messages and any thread blocking will delay the processing of that loop.
/// It can be assumed that this method will never be called from multiple threads simultaneously and thus does not need
/// to be thread-safe.
/// </summary>
/// <param name="msg">The RpcLog message. Implementors can assume that this message is an RpcLog.</param>
void DispatchRpcLog(InboundGrpcEvent msg);

/// <summary>
/// Inspects an incoming InvocationResponse message and dispatches to a Thread or background Task as quickly as possible.
/// This method is called from a loop processing incoming grpc messages and any thread blocking will delay the processing
/// of that loop. It can be assumed that this method will never be called from multiple threads simultaneously and thus
/// does not need to be thread-safe.
/// </summary>
/// <param name="msg">The InvocationResponse message. Implementors can assume that this message is an InvocationResponse.</param>
void DispatchInvocationResponse(InboundGrpcEvent msg);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

namespace Microsoft.Azure.WebJobs.Script.Grpc;

internal interface IInvocationMessageDispatcherFactory
{
IInvocationMessageDispatcher Create(string invocationId);
}
Loading