Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass byte[] through as payload for C2D for binary payloads #3229

Merged
merged 4 commits into from
Mar 31, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 18 additions & 24 deletions e2e/Tests/helpers/TaskCompletionSourceHelper.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,31 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Azure.Devices.E2ETests.helpers
namespace Microsoft.Azure.Devices.E2ETests
{
public class TaskCompletionSourceHelper
/// <summary>
/// Modern .NET supports waiting on the TaskCompletionSource with a cancellation token, but older ones
/// do not. We can bind that task with a call to Task.Delay to get the same effect, though.
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
/// </summary>
internal static class TaskCompletionSourceHelper
{
/// <summary>
/// Gets the result of the provided task completion source or throws OperationCancelledException if the provided
/// cancellation token is cancelled beforehand.
/// </summary>
/// <typeparam name="T">The type of the result of the task completion source.</typeparam>
/// <param name="taskCompletionSource">The task completion source to asynchronously wait for the result of.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The result of the provided task completion source if it completes before the provided cancellation token is cancelled.</returns>
/// <exception cref="OperationCanceledException">If the cancellation token is cancelled before the provided task completion source finishes.</exception>
public static async Task<T> GetTaskCompletionSourceResultAsync<T>(TaskCompletionSource<T> taskCompletionSource, CancellationToken cancellationToken)
internal static async Task<T> WaitAsync<T>(this TaskCompletionSource<T> taskCompletionSource, CancellationToken ct)
{
// Note that Task.Delay(-1, cancellationToken) effectively waits until the cancellation token is cancelled. The -1 value
// just means that the task is allowed to run indefinitely.
Task finishedTask = await Task.WhenAny(taskCompletionSource.Task, Task.Delay(-1, cancellationToken)).ConfigureAwait(false);
#if NET5_0_OR_GREATER
return await taskCompletionSource.Task.WaitAsync(ct).ConfigureAwait(false);
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
#else
Task finishedTask = await Task
.WhenAny(
taskCompletionSource.Task,
Task.Delay(-1, ct))
.ConfigureAwait(false);

// If the finished task is not the cancellation token
if (finishedTask is Task<T>)
{
return await ((Task<T>)finishedTask).ConfigureAwait(false);
}

// Otherwise throw operation cancelled exception since the cancellation token was cancelled before the task finished.
throw new OperationCanceledException();
ct.ThrowIfCancellationRequested();
return await taskCompletionSource.Task.ConfigureAwait(false);
#endif
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler t

// D2C Operation
VerboseTestLogger.WriteLine($"{nameof(CombinedClientOperationsPoolAmqpTests)}: Operation 1: Send D2C for device={testDevice.Id}");
TelemetryMessage message = TelemetryE2ETests.ComposeD2cTestMessage(out string _, out string _);
TelemetryMessage message = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string _, out string _);
Task sendD2cMessage = testDevice.DeviceClient.SendTelemetryAsync(message);
clientOperations.Add(sendD2cMessage);

Expand All @@ -105,7 +105,7 @@ async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler t
OutgoingMessage msg = msgSent.Item1;
string payload = msgSent.Item2;

Task verifyDeviceClientReceivesMessage = MessageReceiveE2ETests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msg, payload);
Task verifyDeviceClientReceivesMessage = IncomingMessageCallbackE2eTests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msg, payload);
clientOperations.Add(verifyDeviceClientReceivesMessage);

// Invoke direct methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private static async Task SendMessageTestAsync(IotHubClientTransportSettings tra
await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(s_devicePrefix, TestDeviceType.X509).ConfigureAwait(false);
IotHubDeviceClient deviceClient = testDevice.CreateDeviceClient(new IotHubClientOptions(transportSetting));
await testDevice.OpenWithRetryAsync().ConfigureAwait(false);
TelemetryMessage message = TelemetryE2ETests.ComposeD2cTestMessage(out string _, out string _);
TelemetryMessage message = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string _, out string _);
await deviceClient.SendTelemetryAsync(message).ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.Azure.Devices.E2ETests.Methods
[TestClass]
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
public class MethodE2ECustomPayloadTests : E2EMsTestBase
public class DirectMethodE2eCustomPayloadTests : E2EMsTestBase
{
private static readonly DirectMethodRequestPayload _customTypeRequest = new() { DesiredState = "on" };
private static readonly DirectMethodResponsePayload _customTypeResponse = new() { CurrentState = "off" };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ async Task TestInitAsync(IotHubDeviceClient deviceClient, TestDevice testDevice,

async Task TestOperationAsync(IotHubDeviceClient deviceClient, TestDevice testDevice, TestDeviceCallbackHandler _)
{
TelemetryMessage testMessage = TelemetryE2ETests.ComposeD2cTestMessage(out string payload, out string p1Value);
TelemetryMessage testMessage = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string payload, out string p1Value);

VerboseTestLogger.WriteLine($"{nameof(FaultInjectionPoolAmqpTests)}.{testDevice.Id}: payload='{payload}' p1Value='{p1Value}'");
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestClass]
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
public class MessageReceiveE2EPoolAmqpTests : E2EMsTestBase
public class IncomingMessageCallbackE2ePoolAmqpTests : E2EMsTestBase
{
private readonly string DevicePrefix = $"{nameof(MessageReceiveE2EPoolAmqpTests)}_";
private readonly string DevicePrefix = $"{nameof(IncomingMessageCallbackE2ePoolAmqpTests)}_";

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
Expand Down Expand Up @@ -132,11 +132,11 @@ async Task InitOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _

async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _)
{
VerboseTestLogger.WriteLine($"{nameof(MessageReceiveE2EPoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
VerboseTestLogger.WriteLine($"{nameof(IncomingMessageCallbackE2ePoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
await testDevice.OpenWithRetryAsync().ConfigureAwait(false);

Tuple<OutgoingMessage, string> msgSent = messagesSent[testDevice.Id];
await MessageReceiveE2ETests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msgSent.Item1, msgSent.Item2).ConfigureAwait(false);
await IncomingMessageCallbackE2eTests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msgSent.Item1, msgSent.Item2).ConfigureAwait(false);
}

await PoolingOverAmqp
Expand Down Expand Up @@ -176,7 +176,7 @@ async Task InitOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler t

async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler testDeviceCallbackHandler)
{
VerboseTestLogger.WriteLine($"{nameof(MessageReceiveE2EPoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
VerboseTestLogger.WriteLine($"{nameof(IncomingMessageCallbackE2ePoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
await testDeviceCallbackHandler.WaitForReceiveMessageCallbackAsync(cts.Token).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.E2ETests.helpers;
using Microsoft.Azure.Devices.E2ETests.Helpers;
using Microsoft.Azure.Devices.E2ETests.Helpers.Templates;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand All @@ -20,9 +20,9 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
[TestCategory("LongRunning")]
public class MessageReceiveE2ETests : E2EMsTestBase
public class IncomingMessageCallbackE2eTests : E2EMsTestBase
{
private static readonly string s_devicePrefix = $"{nameof(MessageReceiveE2ETests)}_";
private static readonly string s_devicePrefix = $"{nameof(IncomingMessageCallbackE2eTests)}_";

private static readonly TimeSpan s_oneSecond = TimeSpan.FromSeconds(1);
private static readonly TimeSpan s_fiveSeconds = TimeSpan.FromSeconds(5);
Expand Down Expand Up @@ -88,16 +88,14 @@ public static async Task VerifyReceivedC2dMessageAsync(IotHubDeviceClient dc, st

using var cts = new CancellationTokenSource(s_oneMinute);
var c2dMessageReceived = new TaskCompletionSource<IncomingMessage>(TaskCreationOptions.RunContinuationsAsynchronously);
Func<IncomingMessage, Task<MessageAcknowledgement>> OnC2DMessageReceived = (message) =>
Task<MessageAcknowledgement> OnC2DMessageReceived(IncomingMessage message)
{
c2dMessageReceived.TrySetResult(message);
return Task.FromResult(MessageAcknowledgement.Complete);
};
}
await dc.SetIncomingMessageCallbackAsync(OnC2DMessageReceived).ConfigureAwait(false);

IncomingMessage receivedMessage = await TaskCompletionSourceHelper
.GetTaskCompletionSourceResultAsync(c2dMessageReceived, cts.Token)
.ConfigureAwait(false);
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
IncomingMessage receivedMessage = await c2dMessageReceived.WaitAsync(cts.Token).ConfigureAwait(false);

receivedMessage.MessageId.Should().Be(message.MessageId, "Received message Id is not what was sent by service");
receivedMessage.UserId.Should().Be(message.UserId, "Received user Id is not what was sent by service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestClass]
[TestCategory("FaultInjection")]
[TestCategory("IoTHub-Client")]
public partial class MessageReceiveFaultInjectionTests : E2EMsTestBase
public partial class IncomingMessageCallbackFaultInjectionTests : E2EMsTestBase
{
private readonly string DevicePrefix = $"{nameof(MessageReceiveFaultInjectionTests)}_";
private readonly string DevicePrefix = $"{nameof(IncomingMessageCallbackFaultInjectionTests)}_";

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ async Task InitAsync(IotHubDeviceClient deviceClient, TestDevice testDevice)

async Task TestOperationAsync(IotHubDeviceClient deviceClient, TestDevice testDevice)
{
TelemetryMessage testMessage = TelemetryE2ETests.ComposeD2cTestMessage(out string _, out string _);
TelemetryMessage testMessage = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string _, out string _);
using var cts = new CancellationTokenSource(operationTimeout);
await deviceClient.SendTelemetryAsync(testMessage, cts.Token).ConfigureAwait(false);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestClass]
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
public partial class TelemetryE2ETests : E2EMsTestBase
public partial class TelemetryMessageE2eTests : E2EMsTestBase
{
private const int MessageBatchCount = 5;

Expand All @@ -28,7 +28,7 @@ public partial class TelemetryE2ETests : E2EMsTestBase
// the message size is less than 1 MB.
private const int OverlyExceedAllowedMessageSizeInBytes = 3000 * 1024;

private readonly string _idPrefix = $"{nameof(TelemetryE2ETests)}_";
private readonly string _idPrefix = $"{nameof(TelemetryMessageE2eTests)}_";
private static readonly string s_proxyServerAddress = TestConfiguration.IotHub.ProxyServerAddress;

[TestMethod]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestClass]
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
public class MessageSendE2EPoolAmqpTests : E2EMsTestBase
public class TelemetryMessageSendE2ePoolAmqpTests : E2EMsTestBase
{
private readonly string _devicePrefix = $"{nameof(MessageSendE2EPoolAmqpTests)}_";
private readonly string _devicePrefix = $"{nameof(TelemetryMessageSendE2ePoolAmqpTests)}_";

[TestMethod]
[Timeout(LongRunningTestTimeoutMilliseconds)]
Expand Down Expand Up @@ -53,8 +53,8 @@ async Task InitAsync(TestDevice testDevice, TestDeviceCallbackHandler c)

async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _)
{
TelemetryMessage testMessage = TelemetryE2ETests.ComposeD2cTestMessage(out string payload, out string p1Value);
VerboseTestLogger.WriteLine($"{nameof(MessageSendE2EPoolAmqpTests)}.{testDevice.Id}: messageId='{testMessage.MessageId}' payload='{payload}' p1Value='{p1Value}'");
TelemetryMessage testMessage = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string payload, out string p1Value);
VerboseTestLogger.WriteLine($"{nameof(TelemetryMessageSendE2ePoolAmqpTests)}.{testDevice.Id}: messageId='{testMessage.MessageId}' payload='{payload}' p1Value='{p1Value}'");
await testDevice.DeviceClient.SendTelemetryAsync(testMessage).ConfigureAwait(false);
}

Expand Down
Loading