Skip to content

Commit

Permalink
Add ability to bind to SBReceivedMessage (#1313)
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLove-msft authored and liliankasem committed May 22, 2023
1 parent 32005f0 commit 0d49e19
Show file tree
Hide file tree
Showing 14 changed files with 387 additions and 6 deletions.
2 changes: 1 addition & 1 deletion extensions/Worker.Extensions.ServiceBus/release_notes.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@

### Microsoft.Azure.Functions.Worker.Extensions.ServiceBus <version>

- <entry>
- Support binding to ServiceBusReceivedMessage (#1313)
11 changes: 11 additions & 0 deletions extensions/Worker.Extensions.ServiceBus/src/Constants.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.


namespace Microsoft.Azure.Functions.Worker.Extensions.ServiceBus
{
public static class Constants
{
internal const string BinaryContentType = "application/octet-stream";
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using System.Runtime.CompilerServices;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.10.0")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.WorkerExtension.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")]

[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.7.0")]
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Azure.Functions.Worker.Core;
using Microsoft.Extensions.Azure;

[assembly: WorkerExtensionStartup(typeof(ServiceBusExtensionStartup))]

namespace Microsoft.Azure.Functions.Worker
{
public class ServiceBusExtensionStartup : WorkerExtensionStartup
{
public override void Configure(IFunctionsWorkerApplicationBuilder applicationBuilder)
{
if (applicationBuilder == null)
{
throw new ArgumentNullException(nameof(applicationBuilder));
}

applicationBuilder.Services.AddAzureClientsCore(); // Adds AzureComponentFactory
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Linq;
using System.Threading.Tasks;
using Azure.Core.Amqp;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Core;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using Microsoft.Azure.Functions.Worker.Extensions.ServiceBus;

namespace Microsoft.Azure.Functions.Worker
{

[SupportsDeferredBinding]
[SupportedConverterType(typeof(ServiceBusReceivedMessage))]
[SupportedConverterType(typeof(ServiceBusReceivedMessage[]))]
internal class ServiceBusReceivedMessageConverter : IInputConverter
{
public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
{
ConversionResult result = context?.Source switch
{
ModelBindingData binding => ConversionResult.Success(ConvertToServiceBusReceivedMessage(binding)),
// Only array collections are currently supported, which matches the behavior of the in-proc extension.
CollectionModelBindingData collection => ConversionResult.Success(collection.ModelBindingDataArray
.Select(ConvertToServiceBusReceivedMessage).ToArray()),
_ => ConversionResult.Unhandled()
};
return new ValueTask<ConversionResult>(result);
}

private ServiceBusReceivedMessage ConvertToServiceBusReceivedMessage(ModelBindingData binding)
{
// The lock token is a 16 byte GUID
const int lockTokenLength = 16;

if (binding.ContentType != Constants.BinaryContentType)
{
throw new InvalidOperationException(
$"Unexpected content-type. Only '{Constants.BinaryContentType}' is supported.");
}

ReadOnlyMemory<byte> bytes = binding.Content.ToMemory();
ReadOnlyMemory<byte> lockTokenBytes = bytes.Slice(0, lockTokenLength);
ReadOnlyMemory<byte> messageBytes = bytes.Slice(lockTokenLength, bytes.Length - lockTokenLength);
return ServiceBusReceivedMessage.FromAmqpMessage(AmqpAnnotatedMessage.FromBytes(BinaryData.FromBytes(messageBytes)),
BinaryData.FromBytes(lockTokenBytes));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using System.Collections.Generic;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

namespace Microsoft.Azure.Functions.Worker
{
[AllowConverterFallback(true)]
[InputConverter(typeof(ServiceBusReceivedMessageConverter))]
public sealed class ServiceBusTriggerAttribute : TriggerBindingAttribute, ISupportCardinality
{
private bool _isBatched = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,27 @@
<Description>Azure Service Bus extensions for .NET isolated functions</Description>

<!--Version information-->
<VersionPrefix>5.7.0</VersionPrefix>
<VersionPrefix>5.10.0</VersionPrefix>
<VersionSuffix>-preview1</VersionSuffix>

<!--Temporarily opting out of documentation. Pending documentation-->
<GenerateDocumentationFile>false</GenerateDocumentationFile>
</PropertyGroup>

<Import Project="..\..\..\build\Extensions.props" />


<ItemGroup>
<PackageReference Include="Azure.Messaging.ServiceBus" Version="7.14.0" />
<PackageReference Include="Microsoft.Extensions.Azure" Version="1.6.3" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\Worker.Extensions.Abstractions\src\Worker.Extensions.Abstractions.csproj" />
<ProjectReference Include="..\..\..\src\DotNetWorker.Core\DotNetWorker.Core.csproj" />
</ItemGroup>

<ItemGroup>
<SharedReference Include="..\..\Worker.Extensions.Shared/Worker.Extensions.Shared.csproj" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace SampleApp
{
/// <summary>
/// Samples demonstrating binding to the <see cref="ServiceBusReceivedMessage"/> type.
/// </summary>
public class ServiceBusReceivedMessageBindingSamples
{
private readonly ILogger<ServiceBusReceivedMessageBindingSamples> _logger;

public ServiceBusReceivedMessageBindingSamples(ILogger<ServiceBusReceivedMessageBindingSamples> logger)
{
_logger = logger;
}

/// <summary>
/// This function demonstrates binding to a single <see cref="ServiceBusReceivedMessage"/>.
/// </summary>
[Function(nameof(ServiceBusReceivedMessageFunction))]
public void ServiceBusReceivedMessageFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")] ServiceBusReceivedMessage message)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
}

/// <summary>
/// This function demonstrates binding to an array of <see cref="ServiceBusReceivedMessage"/>.
/// Note that when doing so, you must also set the <see cref="ServiceBusTriggerAttribute.IsBatched"/> property
/// to <value>true</value>.
/// </summary>
[Function(nameof(ServiceBusReceivedMessageBatchFunction))]
public void ServiceBusReceivedMessageBatchFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", IsBatched = true)] ServiceBusReceivedMessage[] messages)
{
foreach (ServiceBusReceivedMessage message in messages)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);
}
}

/// <summary>
/// This functions demonstrates that it is possible to bind to both the ServiceBusReceivedMessage and any of the supported binding contract
/// properties at the same time. If attempting this, the ServiceBusReceivedMessage must be the first parameter. There is not
/// much benefit to doing this as all of the binding contract properties are available as properties on the ServiceBusReceivedMessage.
/// </summary>
[Function(nameof(ServiceBusReceivedMessageWithStringProperties))]
public void ServiceBusReceivedMessageWithStringProperties(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection")]
ServiceBusReceivedMessage message, string messageId, int deliveryCount)
{
// The MessageId property and the messageId parameter are the same.
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message ID: {id}", messageId);

// Similarly the DeliveryCount property and the deliveryCount parameter are the same.
_logger.LogInformation("Delivery Count: {count}", message.DeliveryCount);
_logger.LogInformation("Delivery Count: {count}", deliveryCount);
}
}
}
1 change: 1 addition & 0 deletions samples/WorkerBindingSamples/WorkerBindingSamples.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\extensions\Worker.Extensions.Abstractions\src\Worker.Extensions.Abstractions.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.ServiceBus\src\Worker.Extensions.ServiceBus.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.Storage\src\Worker.Extensions.Storage.csproj" />
<ProjectReference Include="..\..\src\DotNetWorker.ApplicationInsights\DotNetWorker.ApplicationInsights.csproj" />
<ProjectReference Include="..\..\src\DotNetWorker\DotNetWorker.csproj" />
Expand Down
1 change: 1 addition & 0 deletions test/E2ETests/E2EApps/E2EApp/E2EApp.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\..\extensions\Worker.Extensions.ServiceBus\src\Worker.Extensions.ServiceBus.csproj" />
<ProjectReference Include="..\..\..\..\extensions\Worker.Extensions.Storage\src\Worker.Extensions.Storage.csproj" />
<ProjectReference Include="..\..\..\..\extensions\Worker.Extensions.Abstractions\src\Worker.Extensions.Abstractions.csproj" />
<ProjectReference Include="..\..\..\..\extensions\Worker.Extensions.CosmosDB\src\Worker.Extensions.CosmosDB.csproj" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Linq;
using System.Reflection;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Azure.Storage.Blobs;
using Microsoft.Azure.Functions.Tests;
using Microsoft.Azure.Functions.Worker;
Expand Down Expand Up @@ -848,6 +849,58 @@ public void FunctionWithRetryPolicyWithInvalidIntervals()
Assert.Throws<ArgumentOutOfRangeException>(() => new ExponentialBackoffRetryAttribute(5, "something_bad", "00:01:00"));
}

[Fact]
public void ServiceBus_SDKTypeBindings()
{
var generator = new FunctionMetadataGenerator();
var module = ModuleDefinition.ReadModule(_thisAssembly.Location);
var typeDef = TestUtility.GetTypeDefinition(typeof(SDKTypeBindings_ServiceBus));
var functions = generator.GenerateFunctionMetadata(typeDef);
var extensions = generator.Extensions;

Assert.Equal(2, functions.Count());

AssertDictionary(extensions, new Dictionary<string, string>
{
{ "Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.10.0" },
});

var serviceBusTriggerFunction = functions.Single(p => p.Name == nameof(SDKTypeBindings_ServiceBus.ServiceBusTriggerFunction));

ValidateFunction(serviceBusTriggerFunction, nameof(SDKTypeBindings_ServiceBus.ServiceBusTriggerFunction), GetEntryPoint(nameof(SDKTypeBindings_ServiceBus), nameof(SDKTypeBindings_ServiceBus.ServiceBusTriggerFunction)),
ValidateServiceBusTrigger);

var serviceBusBatchTriggerFunction = functions.Single(p => p.Name == nameof(SDKTypeBindings_ServiceBus.ServiceBusBatchTriggerFunction));

ValidateFunction(serviceBusBatchTriggerFunction, nameof(SDKTypeBindings_ServiceBus.ServiceBusBatchTriggerFunction), GetEntryPoint(nameof(SDKTypeBindings_ServiceBus), nameof(SDKTypeBindings_ServiceBus.ServiceBusBatchTriggerFunction)),
ValidateServiceBusBatchTrigger);

void ValidateServiceBusTrigger(ExpandoObject b)
{
AssertExpandoObject(b, new Dictionary<string, object>
{
{ "Name", "message" },
{ "Type", "serviceBusTrigger" },
{ "Direction", "In" },
{ "queueName", "queue" },
{ "Properties", new Dictionary<String, Object>( ) { { "SupportsDeferredBinding" , "True"} } }
});
}

void ValidateServiceBusBatchTrigger(ExpandoObject b)
{
AssertExpandoObject(b, new Dictionary<string, object>
{
{ "Name", "messages" },
{ "Type", "serviceBusTrigger" },
{ "Direction", "In" },
{ "queueName", "queue" },
{ "Cardinality", "Many" },
{ "Properties", new Dictionary<String, Object>( ) { { "SupportsDeferredBinding" , "True"} } }
});
}
}

private class EventHubNotBatched
{
[Function("EventHubTrigger")]
Expand Down Expand Up @@ -1034,6 +1087,23 @@ public object BlobStringToBlobPocoArray(
}
}

private class SDKTypeBindings_ServiceBus
{
[Function(nameof(ServiceBusTriggerFunction))]
public static void ServiceBusTriggerFunction(
[ServiceBusTrigger("queue")] ServiceBusReceivedMessage message)
{
throw new NotImplementedException();
}

[Function(nameof(ServiceBusBatchTriggerFunction))]
public static void ServiceBusBatchTriggerFunction(
[ServiceBusTrigger("queue", IsBatched = true)] ServiceBusReceivedMessage[] messages)
{
throw new NotImplementedException();
}
}

private class ExternalType_Return
{
public const string FunctionName = "BasicHttpWithExternalTypeReturn";
Expand Down
1 change: 1 addition & 0 deletions test/FunctionMetadataGeneratorTests/SdkTests.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<ProjectReference Include="..\..\extensions\Worker.Extensions.Storage\src\Worker.Extensions.Storage.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.Timer\src\Worker.Extensions.Timer.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.EventHubs\src\Worker.Extensions.EventHubs.csproj" />
<ProjectReference Include="..\..\extensions\Worker.Extensions.ServiceBus\src\Worker.Extensions.ServiceBus.csproj" />
<ProjectReference Include="..\..\sdk\FunctionMetadataLoaderExtension\FunctionMetadataLoaderExtension.csproj" />
<ProjectReference Include="..\..\sdk\Sdk\Sdk.csproj" />
<ProjectReference Include="..\..\src\DotNetWorker\DotNetWorker.csproj" />
Expand Down
Loading

0 comments on commit 0d49e19

Please sign in to comment.