Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Capture the property bag from metadata request and pass it to Function load request #8895

Merged
merged 6 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/WebJobs.Script.Grpc/Channel/GrpcWorkerChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,13 @@ internal FunctionLoadRequest GetFunctionLoadRequest(FunctionMetadata metadata, M

request.Metadata.Bindings.Add(binding.Name, bindingInfo);
}

foreach (var property in metadata.Properties)
{
// worker properties are expected to be string values
request.Metadata.Properties.Add(property.Key, property.Value?.ToString());
}

return request;
}

Expand Down Expand Up @@ -721,6 +728,14 @@ internal void ProcessFunctionMetadataResponses(FunctionMetadataResponse function

functionMetadata.SetFunctionId(metadata.FunctionId);

foreach (var property in metadata.Properties)
{
if (!functionMetadata.Properties.TryAdd(property.Key, property.Value?.ToString()))
{
_workerChannelLogger?.LogDebug("{metadataPropertyKey} is already a part of metadata properties for {functionId}", property.Key, metadata.FunctionId);
}
}

var bindings = new List<string>();
foreach (string binding in metadata.RawBindings)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ message RpcFunctionMetadata {
// Properties for function metadata
// They're usually specific to a worker and largely passed along to the controller API for use
// outside the host
map<string,string> Properties = 16;
map<string,string> properties = 16;
soninaren marked this conversation as resolved.
Show resolved Hide resolved
}

// Host tells worker it is ready to receive metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
using Microsoft.Azure.WebJobs.Script.Grpc;
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
using Microsoft.Azure.WebJobs.Script.Tests.Description.DotNet;
using Microsoft.Azure.WebJobs.Script.Workers;
using Microsoft.Azure.WebJobs.Script.Workers.FunctionDataCache;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
Expand Down Expand Up @@ -605,6 +606,75 @@ public async Task SendLoadRequests_PublishesOutboundEvents_OrdersDisabled()
Assert.Equal(3, functionLoadLogs.Count());
}

[Fact]
public async Task GetFunctionMetadata_IncludesMetadataProperties()
{
await CreateDefaultWorkerChannel();

var functionMetadata = GetTestFunctionsList("python", true);
var functionId = "id123";
_testFunctionRpcService.OnMessage(StreamingMessage.ContentOneofCase.FunctionsMetadataRequest,
_ => _testFunctionRpcService.PublishWorkerMetadataResponse(_workerId, functionId, functionMetadata, successful: true, useDefaultMetadataIndexing: false));

var functions = await _workerChannel.GetFunctionMetadata();

Assert.Equal(functions[0].Metadata.Properties.Count, 4);
Assert.Equal(functions[0].Metadata.Properties["worker.functionId"], "fn1");
}

[Fact]
public async Task SendLoadRequests_IncludesMetadataProperties()
{
await CreateDefaultWorkerChannel();

var functionMetadata = GetTestFunctionsList("python", true);
var functionId = "id123";
_testFunctionRpcService.OnMessage(StreamingMessage.ContentOneofCase.FunctionsMetadataRequest,
_ => _testFunctionRpcService.PublishWorkerMetadataResponse(_workerId, functionId, functionMetadata, successful: true, useDefaultMetadataIndexing: false));

var functions = await _workerChannel.GetFunctionMetadata();

functionMetadata = functions.Select(f => f.Metadata);
_workerChannel.SetupFunctionInvocationBuffers(functionMetadata);
_workerChannel.SendFunctionLoadRequests(null, null);

_testFunctionRpcService.OnMessage(StreamingMessage.ContentOneofCase.FunctionLoadRequest,
(m) =>
{
Assert.Contains("\"worker.functionId\": \"fn1\"", m.Message.ToString());
});

await Task.Delay(500);
}

[Fact]
public async Task GetFunctionLoadRequest_IncludesAvoidsDuplicateProperties()
{
await CreateDefaultWorkerChannel();
var functionMetadata = GetTestFunctionsList("python");
var functionId = "TestFunctionId1";
_testFunctionRpcService.OnMessage(StreamingMessage.ContentOneofCase.FunctionsMetadataRequest,
_ => _testFunctionRpcService.PublishWorkerMetadataResponse(_workerId, functionId, functionMetadata, true));
var functions = _workerChannel.GetFunctionMetadata();

await Task.Delay(500);
var traces = _logger.GetLogMessages();
ShowOutput(traces);
Assert.True(traces.Any(m => string.Equals(m.FormattedMessage, $"FunctionId is already a part of metadata properties for TestFunctionId1")));
}

[Fact]
public async Task GetFunctionLoadRequest_IncludesWorkerProperties()
{
await CreateDefaultWorkerChannel();

var functionMetadata = GetTestFunctionsList("python", true);
_workerChannel.SetupFunctionInvocationBuffers(functionMetadata);
var loadRequest = _workerChannel.GetFunctionLoadRequest(functionMetadata.ElementAt(0), null);

Assert.Equal(loadRequest.Metadata.Properties["worker.functionId"], "fn1");
}

[Fact]
public async Task SendLoadRequests_DoesNotTimeout_FunctionTimeoutNotSet()
{
Expand Down Expand Up @@ -1136,7 +1206,7 @@ public async Task SendInvocationRequest_ValidateTraceContext_SessionId()
}
}

private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime, bool addWorkerProperties = false)
{
var metadata1 = new FunctionMetadata()
{
Expand All @@ -1147,6 +1217,12 @@ private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
metadata1.SetFunctionId("TestFunctionId1");
metadata1.Properties.Add(LogConstants.CategoryNameKey, "testcat1");
metadata1.Properties.Add(ScriptConstants.LogPropertyHostInstanceIdKey, "testhostId1");

if (addWorkerProperties)
{
metadata1.Properties.Add("worker.functionId", "fn1");
}

var metadata2 = new FunctionMetadata()
{
Language = runtime,
Expand All @@ -1156,6 +1232,12 @@ private IEnumerable<FunctionMetadata> GetTestFunctionsList(string runtime)
metadata2.SetFunctionId("TestFunctionId2");
metadata2.Properties.Add(LogConstants.CategoryNameKey, "testcat2");
metadata2.Properties.Add(ScriptConstants.LogPropertyHostInstanceIdKey, "testhostId2");

if (addWorkerProperties)
{
metadata2.Properties.Add("WORKER.functionId", "fn2");
}

return new List<FunctionMetadata>()
{
metadata1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
using Microsoft.Azure.WebJobs.Script.Eventing;
using Microsoft.Azure.WebJobs.Script.Grpc.Eventing;
using Microsoft.Azure.WebJobs.Script.Grpc.Messages;
using Microsoft.Azure.WebJobs.Script.Workers.Rpc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;

namespace Microsoft.Azure.WebJobs.Script.Tests.Workers.Rpc
{
Expand Down Expand Up @@ -324,6 +326,11 @@ public void PublishWorkerMetadataResponse(string workerId, string functionId, IE
FunctionId = functionId
};

foreach (var property in response.Properties)
{
indexingResponse.Properties.Add(property.Key, property.Value.ToString());
}

overallResponse.FunctionMetadataResults.Add(indexingResponse);
}
}
Expand Down