Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ServiceBusSessionMessageActions management APIs: worker changes #2404

Merged
merged 12 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
using System.Runtime.CompilerServices;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.15.1")]
[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.16.0")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.Worker.Extensions.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")]
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";

// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";
Expand All @@ -19,6 +20,21 @@ service Settlement {

// Defers a message
rpc Defer (DeferRequest) returns (google.protobuf.Empty) {}

// Renew message lock
rpc RenewMessageLock (RenewMessageLockRequest) returns (google.protobuf.Empty) {}

// Get session state
rpc GetSessionState (GetSessionStateRequest) returns (GetSessionStateResponse) {}

// Set session state
rpc SetSessionState (SetSessionStateRequest) returns (google.protobuf.Empty) {}

// Release session
rpc ReleaseSession (ReleaseSessionRequest) returns (google.protobuf.Empty) {}

// Renew session lock
rpc RenewSessionLock (RenewSessionLockRequest) returns (RenewSessionLockResponse) {}
}

// The complete message request containing the locktoken.
Expand All @@ -44,4 +60,40 @@ message DeadletterRequest {
message DeferRequest {
string locktoken = 1;
bytes propertiesToModify = 2;
}
}

// The renew message lock request containing the locktoken.
message RenewMessageLockRequest {
string locktoken = 1;
}

// The get message request.
message GetSessionStateRequest {
string sessionId = 1;
}

// The set message request.
message SetSessionStateRequest {
string sessionId = 1;
bytes sessionState = 2;
}

// Get response containing the session state.
message GetSessionStateResponse {
bytes sessionState = 1;
}

// Release session.
message ReleaseSessionRequest {
string sessionId = 1;
}

// Renew session lock.
message RenewSessionLockRequest {
string sessionId = 1;
}

// Renew session lock.
message RenewSessionLockResponse {
google.protobuf.Timestamp lockedUntil = 1;
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
using System;
using System;
using System.Collections;
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
using System.Collections.Generic;
using System.Globalization;
Expand Down Expand Up @@ -148,6 +148,24 @@ public virtual async Task DeferMessageAsync(
await _settlement.DeferAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.RenewMessageLockAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task RenewMessageLockAsync(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}

var request = new RenewMessageLockRequest()
{
Locktoken = message.LockToken,
};

await _settlement.RenewMessageLockAsync(request, cancellationToken: cancellationToken);
}

internal static ByteString ConvertToByteString(IDictionary<string, object> propertiesToModify)
{
var map = new AmqpMap();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
using System;
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Google.Protobuf;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.ServiceBus.Grpc;

namespace Microsoft.Azure.Functions.Worker
{
[InputConverter(typeof(ServiceBusSessionMessageActionsConverter))]
public class ServiceBusSessionMessageActions
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly Settlement.SettlementClient _settlement;

aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
private readonly string _sessionId;

internal ServiceBusSessionMessageActions(Settlement.SettlementClient settlement, string sessionId, DateTimeOffset sessionLockedUntil)
{
_settlement = settlement ?? throw new ArgumentNullException(nameof(settlement));
_sessionId = sessionId ?? throw new ArgumentNullException(nameof(sessionId));
SessionLockedUntil = sessionLockedUntil;
}

/// <summary>
/// Initializes a new instance of the <see cref="ServiceBusMessageActions"/> class for mocking use in testing.
/// </summary>
/// <remarks>
/// This constructor exists only to support mocking. When used, class state is not fully initialized, and
/// will not function correctly; virtual members are meant to be mocked.
///</remarks>
protected ServiceBusSessionMessageActions()
{
_settlement = null!; // not expected to be used during mocking.
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}

public virtual DateTimeOffset SessionLockedUntil { get; private set; }
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task<BinaryData> GetSessionStateAsync(
CancellationToken cancellationToken = default)
{
var request = new GetSessionStateRequest()
{
SessionId = _sessionId,
};

GetSessionStateResponse result = await _settlement.GetSessionStateAsync(request, cancellationToken: cancellationToken);
byte[] byteArray = result.SessionState.ToByteArray();
BinaryData binaryData = BinaryData.FromBytes(byteArray);
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
return await Task.FromResult(binaryData);
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task SetSessionStateAsync(
BinaryData sessionState,
CancellationToken cancellationToken = default)
{
var request = new SetSessionStateRequest()
{
SessionId = _sessionId,
SessionState = ByteString.CopyFrom(sessionState.ToArray()),
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
};

await _settlement.SetSessionStateAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task ReleaseSession(
CancellationToken cancellationToken = default)
{
var request = new ReleaseSessionRequest()
{
SessionId = _sessionId,
};

await _settlement.ReleaseSessionAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task RenewSessionLockAsync(
CancellationToken cancellationToken = default)
{
var request = new RenewSessionLockRequest()
{
SessionId = _sessionId,
};

var result = await _settlement.RenewSessionLockAsync(request, cancellationToken: cancellationToken);
SessionLockedUntil = result.LockedUntil.ToDateTimeOffset();

aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using Microsoft.Azure.ServiceBus.Grpc;
using System.Text.Json;

namespace Microsoft.Azure.Functions.Worker
{
/// <summary>
/// Converter to bind to <see cref="ServiceBusSessionMessageActions" /> or <see cref="ServiceBusSessionMessageActions[]" /> type parameters.
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
[SupportsDeferredBinding]
[SupportedTargetType(typeof(ServiceBusSessionMessageActions))]
[SupportedTargetType(typeof(ServiceBusSessionMessageActions[]))]
internal class ServiceBusSessionMessageActionsConverter : IInputConverter
{
private readonly Settlement.SettlementClient _settlement;

public ServiceBusSessionMessageActionsConverter(Settlement.SettlementClient settlement)
{
_settlement = settlement;
}

public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
{
try
{
var foundSessionId = context.FunctionContext.BindingContext.BindingData.TryGetValue("SessionId", out object? sessionId);
if (!foundSessionId)
{
throw new InvalidOperationException("Expecting SessionId within binding data and value was not present.");
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}

// Get the sessionLockedUntil property from the SessionActions binding data
var foundSessionActions = context.FunctionContext.BindingContext.BindingData.TryGetValue("SessionActions", out object? sessionActions);
if (!foundSessionActions)
{
throw new InvalidOperationException("Expecting SessionActions within binding data and value was not present.");
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}

JsonDocument jsonDocument = JsonDocument.Parse(sessionActions!.ToString());
var foundSessionLockedUntil = jsonDocument.RootElement.TryGetProperty("SessionLockedUntil", out JsonElement sessionLockedUntil);
if (!foundSessionLockedUntil)
{
throw new InvalidOperationException("Expecting SessionLockedUntil within binding data of session actions and value was not present.");
}

var sessionActionResult = new ServiceBusSessionMessageActions(_settlement, sessionId!.ToString(), sessionLockedUntil.GetDateTimeOffset());
var result = ConversionResult.Success(sessionActionResult);
return new ValueTask<ConversionResult>(result);
}
catch (Exception exception)
{
return new ValueTask<ConversionResult>(ConversionResult.Failed(exception));
}
}
}
}
5 changes: 4 additions & 1 deletion samples/Extensions/Extensions.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.1.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Kafka" Version="3.10.1" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.RabbitMQ" Version="2.0.3" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.ServiceBus" Version="5.17.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.SignalRService" Version="1.13.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Storage.Blobs" Version="6.3.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Storage.Queues" Version="5.3.0" />
Expand All @@ -23,6 +22,10 @@
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.17.0" />
<PackageReference Include="Newtonsoft.Json" Version="13.0.3" />
</ItemGroup>
<ItemGroup>
<!-- Sample uses project reference, but actual apps should use 'PackageReference' -->
<ProjectReference Include="../../extensions/Worker.Extensions.ServiceBus/src/Worker.Extensions.ServiceBus.csproj" />
liliankasem marked this conversation as resolved.
Show resolved Hide resolved
</ItemGroup>
<ItemGroup>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
Expand All @@ -21,6 +22,7 @@ public ServiceBusReceivedMessageFunctions(ILogger<ServiceBusReceivedMessageFunct
{
_logger = logger;
}

//</docsnippet_servicebusmessage_createlogger>
/// <summary>
/// This function demonstrates binding to a single <see cref="ServiceBusReceivedMessage"/>.
Expand All @@ -38,6 +40,7 @@ public string ServiceBusReceivedMessageFunction(
var outputMessage = $"Output message created at {DateTime.Now}";
return outputMessage;
}

//</docsnippet_servicebus_readmessage>
/// <summary>
/// This function demonstrates binding to an array of <see cref="ServiceBusReceivedMessage"/>.
Expand Down Expand Up @@ -75,7 +78,11 @@ public void ServiceBusReceivedMessageWithStringProperties(
_logger.LogInformation("Delivery Count: {count}", message.DeliveryCount);
_logger.LogInformation("Delivery Count: {count}", deliveryCount);
}


//<docsnippet_servicebus_message_actions>
/// This function demonstrates binding to <see cref="ServiceBusMessageActions"/>.
/// </summary>
[Function(nameof(ServiceBusMessageActionsFunction))]
public async Task ServiceBusMessageActionsFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", AutoCompleteMessages = false)]
Expand All @@ -86,9 +93,39 @@ public async Task ServiceBusMessageActionsFunction(
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);

// Complete the message
await messageActions.CompleteMessageAsync(message);
await messageActions.RenewMessageLockAsync(message);
}

//</docsnippet_servicebus_message_actions>
/// This function demonstrates binding to <see cref="ServiceBusSessionMessageActions"/>.
//<docsnippet_servicebus_session_message_actions>
[Function(nameof(ServiceBusSessionMessageActionsFunction))]
public async Task ServiceBusSessionMessageActionsFunction(
[ServiceBusTrigger("queue", Connection = "ServiceBusConnection", AutoCompleteMessages = false, IsSessionsEnabled = true)]
ServiceBusReceivedMessage message,
ServiceBusSessionMessageActions messageActions)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);

// Set the session state
await messageActions.SetSessionStateAsync(new BinaryData(Encoding.UTF8.GetBytes("your_session_state")));

// Get the session state
var sessionState = await messageActions.GetSessionStateAsync();

_logger.LogInformation("Session state: {}", sessionState.ToString());
_logger.LogInformation("Session locked until before renewal: {value}", messageActions.SessionLockedUntil);

// Renew session lock
await messageActions.RenewSessionLockAsync();

_logger.LogInformation("Session locked until after renewal: {value}", messageActions.SessionLockedUntil);

// Release session
await messageActions.ReleaseSession();
}
//<docsnippet_servicebus_session_message_actions>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ public void ServiceBus_SDKTypeBindings()

AssertDictionary(extensions, new Dictionary<string, string>
{
{ "Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.15.1" },
{ "Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.16.0" },
});

var serviceBusTriggerFunction = functions.Single(p => p.Name == nameof(SDKTypeBindings_ServiceBus.ServiceBusTriggerFunction));
Expand Down
Loading
Loading