Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ServiceBusSessionMessageActions management APIs: worker changes #2404

Merged
merged 12 commits into from
Jun 4, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
using System.Runtime.CompilerServices;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;

[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.15.1")]
[assembly: ExtensionInformation("Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.16.0")]
[assembly: InternalsVisibleTo("Microsoft.Azure.Functions.Worker.Extensions.Tests, PublicKey=00240000048000009400000006020000002400005253413100040000010001005148be37ac1d9f58bd40a2e472c9d380d635b6048278f7d47480b08c928858f0f7fe17a6e4ce98da0e7a7f0b8c308aecd9e9b02d7e9680a5b5b75ac7773cec096fbbc64aebd429e77cb5f89a569a79b28e9c76426783f624b6b70327eb37341eb498a2c3918af97c4860db6cdca4732787150841e395a29cfacb959c1fd971c1")]
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ syntax = "proto3";

import "google/protobuf/empty.proto";
import "google/protobuf/wrappers.proto";
import "google/protobuf/timestamp.proto";

// this namespace will be shared between isolated worker and WebJobs extension so make it somewhat generic
option csharp_namespace = "Microsoft.Azure.ServiceBus.Grpc";
Expand All @@ -19,6 +20,21 @@ service Settlement {

// Defers a message
rpc Defer (DeferRequest) returns (google.protobuf.Empty) {}

// Renew message lock
rpc RenewMessageLock (RenewMessageLockRequest) returns (google.protobuf.Empty) {}

// Get session state
rpc GetSessionState (GetSessionStateRequest) returns (GetSessionStateResponse) {}

// Set session state
rpc SetSessionState (SetSessionStateRequest) returns (google.protobuf.Empty) {}

// Release session
rpc ReleaseSession (ReleaseSessionRequest) returns (google.protobuf.Empty) {}

// Renew session lock
rpc RenewSessionLock (RenewSessionLockRequest) returns (RenewSessionLockResponse) {}
}

// The complete message request containing the locktoken.
Expand All @@ -44,4 +60,40 @@ message DeadletterRequest {
message DeferRequest {
string locktoken = 1;
bytes propertiesToModify = 2;
}
}

// The renew message lock request containing the locktoken.
message RenewMessageLockRequest {
string locktoken = 1;
}

// The get message request.
message GetSessionStateRequest {
string sessionId = 1;
}

// The set message request.
message SetSessionStateRequest {
string sessionId = 1;
bytes sessionState = 2;
}

// Get response containing the session state.
message GetSessionStateResponse {
bytes sessionState = 1;
}

// Release session.
message ReleaseSessionRequest {
string sessionId = 1;
}

// Renew session lock.
message RenewSessionLockRequest {
string sessionId = 1;
}

// Renew session lock.
message RenewSessionLockResponse {
google.protobuf.Timestamp lockedUntil = 1;
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Collections;
using System.Collections.Generic;
Expand Down Expand Up @@ -148,6 +151,24 @@ public virtual async Task DeferMessageAsync(
await _settlement.DeferAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.RenewMessageLockAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task RenewMessageLockAsync(
ServiceBusReceivedMessage message,
CancellationToken cancellationToken = default)
{
if (message == null)
{
throw new ArgumentNullException(nameof(message));
}

var request = new RenewMessageLockRequest()
{
Locktoken = message.LockToken,
};

await _settlement.RenewMessageLockAsync(request, cancellationToken: cancellationToken);
}

internal static ByteString ConvertToByteString(IDictionary<string, object> propertiesToModify)
{
var map = new AmqpMap();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Google.Protobuf;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.ServiceBus.Grpc;

namespace Microsoft.Azure.Functions.Worker
{
/// <summary>
/// Converter to bind to <see cref="ServiceBusSessionMessageActions" /> type parameters.
/// </summary>
[InputConverter(typeof(ServiceBusSessionMessageActionsConverter))]
public class ServiceBusSessionMessageActions
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
{
private readonly Settlement.SettlementClient _settlement;
private readonly string _sessionId;

internal ServiceBusSessionMessageActions(Settlement.SettlementClient settlement, string sessionId, DateTimeOffset sessionLockedUntil)
{
_settlement = settlement ?? throw new ArgumentNullException(nameof(settlement));
_sessionId = sessionId ?? throw new ArgumentNullException(nameof(sessionId));
SessionLockedUntil = sessionLockedUntil;
}

/// <summary>
/// Initializes a new instance of the <see cref="ServiceBusMessageActions"/> class for mocking use in testing.
/// </summary>
/// <remarks>
/// This constructor exists only to support mocking. When used, class state is not fully initialized, and
/// will not function correctly; virtual members are meant to be mocked.
///</remarks>
#pragma warning disable CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
protected ServiceBusSessionMessageActions()
#pragma warning restore CS8618 // Non-nullable field must contain a non-null value when exiting constructor. Consider declaring as nullable.
{
_settlement = null!; // not expected to be used during mocking.
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}

public virtual DateTimeOffset SessionLockedUntil { get; private set; }
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task<BinaryData> GetSessionStateAsync(
CancellationToken cancellationToken = default)
{
var request = new GetSessionStateRequest()
{
SessionId = _sessionId,
};

GetSessionStateResponse result = await _settlement.GetSessionStateAsync(request, cancellationToken: cancellationToken);
BinaryData binaryData = new BinaryData(result.SessionState.Memory);
return await Task.FromResult(binaryData);
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task SetSessionStateAsync(
BinaryData sessionState,
CancellationToken cancellationToken = default)
{
var request = new SetSessionStateRequest()
{
SessionId = _sessionId,
SessionState = ByteString.CopyFrom(sessionState.ToMemory().Span),
};

await _settlement.SetSessionStateAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task ReleaseSession(
CancellationToken cancellationToken = default)
{
var request = new ReleaseSessionRequest()
{
SessionId = _sessionId,
};

await _settlement.ReleaseSessionAsync(request, cancellationToken: cancellationToken);
}

///<inheritdoc cref="ServiceBusReceiver.CompleteMessageAsync(ServiceBusReceivedMessage, CancellationToken)"/>
public virtual async Task RenewSessionLockAsync(
CancellationToken cancellationToken = default)
{
var request = new RenewSessionLockRequest()
{
SessionId = _sessionId,
};

var result = await _settlement.RenewSessionLockAsync(request, cancellationToken: cancellationToken);
SessionLockedUntil = result.LockedUntil.ToDateTimeOffset();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using System;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker.Converters;
using Microsoft.Azure.Functions.Worker.Extensions.Abstractions;
using Microsoft.Azure.ServiceBus.Grpc;
using System.Text.Json;

namespace Microsoft.Azure.Functions.Worker
{
/// <summary>
/// Converter to bind to <see cref="ServiceBusSessionMessageActions" /> or <see cref="ServiceBusSessionMessageActions{}" /> type parameters.
/// </summary>
[SupportsDeferredBinding]
[SupportedTargetType(typeof(ServiceBusSessionMessageActions))]
[SupportedTargetType(typeof(ServiceBusSessionMessageActions[]))]
internal class ServiceBusSessionMessageActionsConverter : IInputConverter
{
private readonly Settlement.SettlementClient _settlement;

public ServiceBusSessionMessageActionsConverter(Settlement.SettlementClient settlement)
{
_settlement = settlement;
}

public ValueTask<ConversionResult> ConvertAsync(ConverterContext context)
{
try
{
var foundSessionId = context.FunctionContext.BindingContext.BindingData.TryGetValue("SessionId", out object? sessionId);
if (!foundSessionId)
{
throw new InvalidOperationException("Expecting SessionId within binding data and value was not present.");
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}

// Get the sessionLockedUntil property from the SessionActions binding data
var foundSessionActions = context.FunctionContext.BindingContext.BindingData.TryGetValue("SessionActions", out object? sessionActions);
if (!foundSessionActions)
{
throw new InvalidOperationException("Expecting SessionActions within binding data and value was not present.");
aishwaryabh marked this conversation as resolved.
Show resolved Hide resolved
}

JsonDocument jsonDocument = JsonDocument.Parse(sessionActions!.ToString());
var foundSessionLockedUntil = jsonDocument.RootElement.TryGetProperty("SessionLockedUntil", out JsonElement sessionLockedUntil);
if (!foundSessionLockedUntil)
{
throw new InvalidOperationException("Expecting SessionLockedUntil within binding data of session actions and value was not present.");
}

var sessionActionResult = new ServiceBusSessionMessageActions(_settlement, sessionId!.ToString(), sessionLockedUntil.GetDateTimeOffset());
var result = ConversionResult.Success(sessionActionResult);
return new ValueTask<ConversionResult>(result);
}
catch (Exception exception)
{
return new ValueTask<ConversionResult>(ConversionResult.Failed(exception));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1156,7 +1156,7 @@ public void ServiceBus_SDKTypeBindings()

AssertDictionary(extensions, new Dictionary<string, string>
{
{ "Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.15.1" },
{ "Microsoft.Azure.WebJobs.Extensions.ServiceBus", "5.16.0" },
});

var serviceBusTriggerFunction = functions.Single(p => p.Name == nameof(SDKTypeBindings_ServiceBus.ServiceBusTriggerFunction));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,23 @@ public async Task CanDeferMessage()
await messageActions.DeferMessageAsync(message, properties);
}

[Fact]
public async Task CanRenewMessageLock()
{
var message = ServiceBusModelFactory.ServiceBusReceivedMessage(lockTokenGuid: Guid.NewGuid());
var properties = new Dictionary<string, object>()
{
{ "int", 1 },
{ "string", "foo"},
{ "timespan", TimeSpan.FromSeconds(1) },
{ "datetime", DateTime.UtcNow },
{ "datetimeoffset", DateTimeOffset.UtcNow },
{ "guid", Guid.NewGuid() }
};
var messageActions = new ServiceBusMessageActions(new MockSettlementClient(message.LockToken, properties));
await messageActions.RenewMessageLockAsync(message);
}

[Fact]
public async Task PassingNullMessageThrows()
{
Expand All @@ -83,6 +100,7 @@ public async Task PassingNullMessageThrows()
await Assert.ThrowsAsync<ArgumentNullException>(async () => await messageActions.AbandonMessageAsync(null));
await Assert.ThrowsAsync<ArgumentNullException>(async () => await messageActions.DeadLetterMessageAsync(null));
await Assert.ThrowsAsync<ArgumentNullException>(async () => await messageActions.DeferMessageAsync(null));
await Assert.ThrowsAsync<ArgumentNullException>(async () => await messageActions.RenewMessageLockAsync(null));
}

private class MockSettlementClient : Settlement.SettlementClient
Expand Down Expand Up @@ -128,6 +146,12 @@ public override AsyncUnaryCall<Empty> DeferAsync(DeferRequest request, Metadata
Assert.Equal(_propertiesToModify, request.PropertiesToModify);
return new AsyncUnaryCall<Empty>(Task.FromResult(new Empty()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
}

public override AsyncUnaryCall<Empty> RenewMessageLockAsync(RenewMessageLockRequest request, CallOptions options)
{
Assert.Equal(_lockToken, request.Locktoken);
return new AsyncUnaryCall<Empty>(Task.FromResult(new Empty()), Task.FromResult(new Metadata()), () => Status.DefaultSuccess, () => new Metadata(), () => { });
}
}
}
}
}
Loading
Loading