Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ServiceBusSessionMessageActions management APIs: worker changes #2404

Merged
merged 12 commits into from
Jun 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";

// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";
Expand All @@ -19,6 +20,18 @@ service Settlement {

// Defers a message
rpc Defer (DeferRequest) returns (google.protobuf.Empty) {}

// Get
rpc Get (GetRequest) returns (GetResponse) {}
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved

// Set
rpc Set (SetRequest) returns (google.protobuf.Empty) {}

// Release
rpc Release (ReleaseSession) returns (google.protobuf.Empty) {}

// Renew
rpc Renew (RenewSessionLock) returns (RenewSessionLockResponse) {}
}

// The complete message request containing the locktoken.
Expand All @@ -44,4 +57,35 @@ message DeadletterRequest {
message DeferRequest {
string locktoken = 1;
bytes propertiesToModify = 2;
}
}

// The get message request.
message GetRequest {
string sessionId = 1;
}

// The set message request.
message SetRequest {
string sessionId = 1;
bytes sessionState = 2;
}

// Get response containing the session state.
message GetResponse {
bytes sessionState = 1;
}

// Release session.
message ReleaseSession {
string sessionId = 1;
}

// Renew session lock.
message RenewSessionLock {
string sessionId = 1;
}

// Renew session lock.
message RenewSessionLockResponse {
google.protobuf.Timestamp lockedUntil = 1;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
using System;
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Google.Protobuf;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.ServiceBus.Grpc;

namespace Microsoft.Azure.Functions.Worker
{
[InputConverter(typeof(ServiceBusSessionMessageActionsConverter))]
public class ServiceBusSessionMessageActions
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly Settlement.SettlementClient _settlement;

aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
private readonly string _sessionId;

internal ServiceBusSessionMessageActions(Settlement.SettlementClient settlement, string sessionId, DateTimeOffset sessionLockedUntil)
{
_settlement = settlement;
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
_sessionId = sessionId ?? throw new ArgumentNullException(nameof(sessionId));
SessionLockedUntil = sessionLockedUntil;

aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}

/// <summary>
/// Initializes a new instance of the <see cref="ServiceBusMessageActions"/> class for mocking use in testing.
/// </summary>
/// <remarks>
/// This constructor exists only to support mocking. When used, class state is not fully initialized, and
/// will not function correctly; virtual members are meant to be mocked.
///</remarks>
protected ServiceBusSessionMessageActions()
{
_settlement = null!; // not expected to be used during mocking.
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}

public virtual DateTimeOffset SessionLockedUntil { get; private set; }
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task<BinaryData> GetSessionStateAsync(
CancellationToken cancellationToken = default)
{
var request = new GetRequest()
{
SessionId = _sessionId,
};

GetResponse result = await _settlement.GetAsync(request, cancellationToken: cancellationToken);
byte[] byteArray = result.SessionState.ToByteArray();
BinaryData binaryData = BinaryData.FromBytes(byteArray);
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
return await Task.FromResult(binaryData);
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task SetSessionStateAsync(
BinaryData sessionState,
CancellationToken cancellationToken = default)
{
var request = new SetRequest()
{
SessionId = _sessionId,
SessionState = ByteString.CopyFrom(sessionState.ToArray()),
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
};

await _settlement.SetAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task ReleaseSession(
CancellationToken cancellationToken = default)
{
var request = new ReleaseSession()
{
SessionId = _sessionId,
};

await _settlement.ReleaseAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task RenewSessionLockAsync(
CancellationToken cancellationToken = default)
{
var request = new RenewSessionLock()
{
SessionId = _sessionId,
};

var result = await _settlement.RenewAsync(request, cancellationToken: cancellationToken);
SessionLockedUntil = result.LockedUntil.ToDateTimeOffset();

aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using Microsoft.Azure.ServiceBus.Grpc;
using System.Text.Json;

namespace Microsoft.Azure.Functions.Worker
{
/// <summary>
/// Converter to bind to <see cref="ServiceBusSessionMessageActions" /> or <see cref="ServiceBusSessionMessageActions[]" /> type parameters.
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
[SupportsDeferredBinding]
[SupportedTargetType(typeof(ServiceBusSessionMessageActions))]
[SupportedTargetType(typeof(ServiceBusSessionMessageActions[]))]
internal class ServiceBusSessionMessageActionsConverter : IInputConverter
{
private readonly Settlement.SettlementClient _settlement;

public ServiceBusSessionMessageActionsConverter(Settlement.SettlementClient settlement)
{
_settlement = settlement;
}

public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
{
try
{
context.FunctionContext.BindingContext.BindingData.TryGetValue("SessionId", out object? sessionId);
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved

// Get the sessionLockedUntil property from the SessionActions binding data
context.FunctionContext.BindingContext.BindingData.TryGetValue("SessionActions", out object? sessionActions);
JsonDocument jsonDocument = JsonDocument.Parse(sessionActions.ToString());
jsonDocument.RootElement.TryGetProperty("SessionLockedUntil", out JsonElement sessionLockedUntil);

var sessionActionResult = new ServiceBusSessionMessageActions(_settlement, sessionId?.ToString(), sessionLockedUntil.GetDateTimeOffset());
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
var result = ConversionResult.Success(sessionActionResult);
return new ValueTask<ConversionResult>(result);
}
catch (Exception exception)
{
return new ValueTask<ConversionResult>(ConversionResult.Failed(exception));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Text;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
Expand All @@ -21,6 +22,7 @@
{
_logger = logger;
}

//</docsnippet_servicebusmessage_createlogger>
/// <summary>
/// This function demonstrates binding to a single <see cref="ServiceBusReceivedMessage"/>.
Expand All @@ -38,6 +40,7 @@
var outputMessage = $"Output message created at {DateTime.Now}";
return outputMessage;
}

//</docsnippet_servicebus_readmessage>
/// <summary>
/// This function demonstrates binding to an array of <see cref="ServiceBusReceivedMessage"/>.
Expand Down Expand Up @@ -75,6 +78,7 @@
_logger.LogInformation("Delivery Count: {count}", message.DeliveryCount);
_logger.LogInformation("Delivery Count: {count}", deliveryCount);
}

//<docsnippet_servicebus_message_actions>
[Function(nameof(ServiceBusMessageActionsFunction))]
public async Task ServiceBusMessageActionsFunction(
Expand All @@ -90,5 +94,35 @@
await messageActions.CompleteMessageAsync(message);
}
//</docsnippet_servicebus_message_actions>

//<docsnippet_servicebus_session_message_actions>
[Function(nameof(ServiceBusSessionMessageActionsFunction))]
public async Task ServiceBusSessionMessageActionsFunction(
[ServiceBusTrigger("bro", Connection = "ServiceBusConnection", AutoCompleteMessages = false, IsSessionsEnabled = true)]
ServiceBusReceivedMessage message,
ServiceBusSessionMessageActions messageActions)

Check failure on line 103 in samples/Extensions/ServiceBus/ServiceBusReceivedMessageFunctions.cs

View check run for this annotation

Azure Pipelines / dotnet-worker (E2ETests_Ubuntu)

samples/Extensions/ServiceBus/ServiceBusReceivedMessageFunctions.cs#L103

samples/Extensions/ServiceBus/ServiceBusReceivedMessageFunctions.cs(103,13): Error CS0246: The type or namespace name 'ServiceBusSessionMessageActions' could not be found (are you missing a using directive or an assembly reference?)

Check failure on line 103 in samples/Extensions/ServiceBus/ServiceBusReceivedMessageFunctions.cs

View check run for this annotation

Azure Pipelines / dotnet-worker

samples/Extensions/ServiceBus/ServiceBusReceivedMessageFunctions.cs#L103

samples/Extensions/ServiceBus/ServiceBusReceivedMessageFunctions.cs(103,13): Error CS0246: The type or namespace name 'ServiceBusSessionMessageActions' could not be found (are you missing a using directive or an assembly reference?)
{
_logger.LogInformation("Message ID: {id}", message.MessageId);
_logger.LogInformation("Message Body: {body}", message.Body);
_logger.LogInformation("Message Content-Type: {contentType}", message.ContentType);

// Set the session state
await messageActions.SetSessionStateAsync(new BinaryData(Encoding.UTF8.GetBytes("your_session_state")));

// Get the session state
var sessionState = await messageActions.GetSessionStateAsync();

_logger.LogInformation("Session state: {}", sessionState.ToString());
_logger.LogInformation("Session locked until before renewal: {value}", messageActions.SessionLockedUntil);

// Renew session lock
await messageActions.RenewSessionLockAsync();

_logger.LogInformation("Session locked until after renewal: {value}", messageActions.SessionLockedUntil);

// Release session
await messageActions.ReleaseSession();
}
//<docsnippet_servicebus_session_message_actions>
}
}
Loading