Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture the property bag from metadata request and pass it to Function load request #8895

Merged
merged 6 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,10 @@ internal void SendFunctionLoadRequest(FunctionMetadata metadata, ManagedDependen

internal FunctionLoadRequest GetFunctionLoadRequest(FunctionMetadata metadata, ManagedDependencyOptions managedDependencyOptions)
{
var functionId = metadata.GetFunctionId();
soninaren marked this conversation as resolved.
Show resolved Hide resolved
FunctionLoadRequest request = new FunctionLoadRequest()
{
FunctionId = metadata.GetFunctionId(),
FunctionId = functionId,
soninaren marked this conversation as resolved.
Show resolved Hide resolved
Metadata = new RpcFunctionMetadata()
{
Name = metadata.Name,
Expand All @@ -553,6 +554,15 @@ internal FunctionLoadRequest GetFunctionLoadRequest(FunctionMetadata metadata, M

request.Metadata.Bindings.Add(binding.Name, bindingInfo);
}

foreach (var property in metadata.Properties)
{
// worker properties are expected to be string values
request.Metadata.Properties.Add(property.Key, property.Value.ToString());
soninaren marked this conversation as resolved.
Show resolved Hide resolved
}

_workerChannelLogger?.LogDebug($"Adding {request.Metadata.Properties.Count} worker properties for {functionId}");
soninaren marked this conversation as resolved.
Show resolved Hide resolved
soninaren marked this conversation as resolved.
Show resolved Hide resolved

return request;
}

Expand Down Expand Up @@ -721,6 +731,11 @@ internal void ProcessFunctionMetadataResponses(FunctionMetadataResponse function

functionMetadata.SetFunctionId(metadata.FunctionId);

foreach (var property in metadata.Properties)
{
functionMetadata.Properties.TryAdd(property.Key, property.Value.ToString());
}

var bindings = new List<string>();
foreach (string binding in metadata.RawBindings)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ message RpcFunctionMetadata {
// Properties for function metadata
// They're usually specific to a worker and largely passed along to the controller API for use
// outside the host
map<string,string> Properties = 16;
map<string,string> properties = 16;
soninaren marked this conversation as resolved.
Show resolved Hide resolved
}

// Host tells worker it is ready to receive metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,60 @@ public async Task SendLoadRequests_PublishesOutboundEvents_OrdersDisabled()
Assert.Equal(3, functionLoadLogs.Count());
}

[Fact]
public async Task GetFunctionMetadata_IncludesMetadataProperties()
{
await CreateDefaultWorkerChannel();

var functionMetadata = GetTestFunctionsList("python", true);
var functionId = "id123";
_testFunctionRpcService.OnMessage(StreamingMessage.ContentOneofCase.FunctionsMetadataRequest,
_ => _testFunctionRpcService.PublishWorkerMetadataResponse(_workerId, functionId, functionMetadata, successful: true, useDefaultMetadataIndexing: false));

var functions = await _workerChannel.GetFunctionMetadata();

Assert.Equal(functions[0].Metadata.Properties.Count, 4);
Assert.Equal(functions[0].Metadata.Properties["worker.functionId"], "fn1");
}

[Fact]
public async Task SendLoadRequests_IncludesMetadataProperties()
{
await CreateDefaultWorkerChannel();

var functionMetadata = GetTestFunctionsList("python", true);
var functionId = "id123";
_testFunctionRpcService.OnMessage(StreamingMessage.ContentOneofCase.FunctionsMetadataRequest,
_ => _testFunctionRpcService.PublishWorkerMetadataResponse(_workerId, functionId, functionMetadata, successful: true, useDefaultMetadataIndexing: false));

var functions = await _workerChannel.GetFunctionMetadata();

functionMetadata = functions.Select(f => f.Metadata);
_workerChannel.SetupFunctionInvocationBuffers(functionMetadata);
_workerChannel.SendFunctionLoadRequests(null, null);

await Task.Delay(500);
var traces = _logger.GetLogMessages();
ShowOutput(traces);

string expectedLogMessage = "Adding 4 worker properties";
var functionLoadLogs = traces.Where(m => m.FormattedMessage?.Contains(expectedLogMessage) ?? false);

Assert.Equal(2, functionLoadLogs.Count());
}

[Fact]
public async Task GetFunctionLoadRequest_IncludesWorkerProperties()
{
await CreateDefaultWorkerChannel();

var functionMetadata = GetTestFunctionsList("python", true);
_workerChannel.SetupFunctionInvocationBuffers(functionMetadata);
var loadRequest = _workerChannel.GetFunctionLoadRequest(functionMetadata.ElementAt(0), null);

Assert.Equal(loadRequest.Metadata.Properties["worker.functionId"], "fn1");
}

[Fact]
public async Task SendLoadRequests_DoesNotTimeout_FunctionTimeoutNotSet()
{
Expand Down Expand Up @@ -1136,7 +1190,7 @@ public async Task SendInvocationRequest_ValidateTraceContext_SessionId()
}
}

private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime, bool addWorkerProperties = false)
{
var metadata1 = new FunctionMetadata()
{
Expand All @@ -1147,6 +1201,12 @@ private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
metadata1.SetFunctionId("TestFunctionId1");
metadata1.Properties.Add(LogConstants.CategoryNameKey, "testcat1");
metadata1.Properties.Add(ScriptConstants.LogPropertyHostInstanceIdKey, "testhostId1");

if (addWorkerProperties)
{
metadata1.Properties.Add("worker.functionId", "fn1");
}

var metadata2 = new FunctionMetadata()
{
Language = runtime,
Expand All @@ -1156,6 +1216,12 @@ private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
metadata2.SetFunctionId("TestFunctionId2");
metadata2.Properties.Add(LogConstants.CategoryNameKey, "testcat2");
metadata2.Properties.Add(ScriptConstants.LogPropertyHostInstanceIdKey, "testhostId2");

if (addWorkerProperties)
{
metadata2.Properties.Add("WORKER.functionId", "fn2");
}

return new List<FunctionMetadata>()
{
metadata1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
using Microsoft.Azure.WebJobs.Script.Eventing;
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Script.Tests.Workers.Rpc
{
Expand Down Expand Up @@ -324,6 +326,11 @@ public void PublishWorkerMetadataResponse(string workerId, string functionId, IE
FunctionId = functionId
};

foreach (var property in response.Properties)
{
indexingResponse.Properties.Add(property.Key, property.Value.ToString());
}

overallResponse.FunctionMetadataResults.Add(indexingResponse);
}
}
Expand Down