Skip to content

Commit

Permalink
Pass byte[] through as payload for C2D for binary payloads (#3229)
Browse files Browse the repository at this point in the history
* Pass byte[] through as payload for C2D for binary payloads

* Add a test and rename other test classes to match newest names

* Share task completion helper more broadly

* Fix .NET version requirement
  • Loading branch information
David R. Williamson authored Mar 31, 2023
1 parent 7a2e0c1 commit 3e65aed
Show file tree
Hide file tree
Showing 21 changed files with 198 additions and 186 deletions.
42 changes: 18 additions & 24 deletions e2e/Tests/helpers/TaskCompletionSourceHelper.cs
Original file line number Diff line number Diff line change
@@ -1,37 +1,31 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Microsoft.Azure.Devices.E2ETests.helpers
namespace Microsoft.Azure.Devices.E2ETests
{
public class TaskCompletionSourceHelper
/// <summary>
/// Modern .NET supports waiting on the TaskCompletionSource with a cancellation token, but older ones
/// do not. We can bind that task with a call to Task.Delay to get the same effect, though.
/// </summary>
internal static class TaskCompletionSourceHelper
{
/// <summary>
/// Gets the result of the provided task completion source or throws OperationCancelledException if the provided
/// cancellation token is cancelled beforehand.
/// </summary>
/// <typeparam name="T">The type of the result of the task completion source.</typeparam>
/// <param name="taskCompletionSource">The task completion source to asynchronously wait for the result of.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>The result of the provided task completion source if it completes before the provided cancellation token is cancelled.</returns>
/// <exception cref="OperationCanceledException">If the cancellation token is cancelled before the provided task completion source finishes.</exception>
public static async Task<T> GetTaskCompletionSourceResultAsync<T>(TaskCompletionSource<T> taskCompletionSource, CancellationToken cancellationToken)
internal static async Task<T> WaitAsync<T>(this TaskCompletionSource<T> taskCompletionSource, CancellationToken ct)
{
// Note that Task.Delay(-1, cancellationToken) effectively waits until the cancellation token is cancelled. The -1 value
// just means that the task is allowed to run indefinitely.
Task finishedTask = await Task.WhenAny(taskCompletionSource.Task, Task.Delay(-1, cancellationToken)).ConfigureAwait(false);
#if NET6_0_OR_GREATER
return await taskCompletionSource.Task.WaitAsync(ct).ConfigureAwait(false);
#else
await Task
.WhenAny(
taskCompletionSource.Task,
Task.Delay(-1, ct))
.ConfigureAwait(false);

// If the finished task is not the cancellation token
if (finishedTask is Task<T>)
{
return await ((Task<T>)finishedTask).ConfigureAwait(false);
}

// Otherwise throw operation cancelled exception since the cancellation token was cancelled before the task finished.
throw new OperationCanceledException();
ct.ThrowIfCancellationRequested();
return await taskCompletionSource.Task.ConfigureAwait(false);
#endif
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler t

// D2C Operation
VerboseTestLogger.WriteLine($"{nameof(CombinedClientOperationsPoolAmqpTests)}: Operation 1: Send D2C for device={testDevice.Id}");
TelemetryMessage message = TelemetryE2ETests.ComposeD2cTestMessage(out string _, out string _);
TelemetryMessage message = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string _, out string _);
Task sendD2cMessage = testDevice.DeviceClient.SendTelemetryAsync(message);
clientOperations.Add(sendD2cMessage);

Expand All @@ -105,7 +105,7 @@ async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler t
OutgoingMessage msg = msgSent.Item1;
string payload = msgSent.Item2;

Task verifyDeviceClientReceivesMessage = MessageReceiveE2ETests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msg, payload);
Task verifyDeviceClientReceivesMessage = IncomingMessageCallbackE2eTests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msg, payload);
clientOperations.Add(verifyDeviceClientReceivesMessage);

// Invoke direct methods
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ private static async Task SendMessageTestAsync(IotHubClientTransportSettings tra
await using TestDevice testDevice = await TestDevice.GetTestDeviceAsync(s_devicePrefix, TestDeviceType.X509).ConfigureAwait(false);
IotHubDeviceClient deviceClient = testDevice.CreateDeviceClient(new IotHubClientOptions(transportSetting));
await testDevice.OpenWithRetryAsync().ConfigureAwait(false);
TelemetryMessage message = TelemetryE2ETests.ComposeD2cTestMessage(out string _, out string _);
TelemetryMessage message = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string _, out string _);
await deviceClient.SendTelemetryAsync(message).ConfigureAwait(false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.Azure.Devices.E2ETests.Methods
[TestClass]
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
public class MethodE2ECustomPayloadTests : E2EMsTestBase
public class DirectMethodE2eCustomPayloadTests : E2EMsTestBase
{
private static readonly DirectMethodRequestPayload _customTypeRequest = new() { DesiredState = "on" };
private static readonly DirectMethodResponsePayload _customTypeResponse = new() { CurrentState = "off" };
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ async Task TestInitAsync(IotHubDeviceClient deviceClient, TestDevice testDevice,

async Task TestOperationAsync(IotHubDeviceClient deviceClient, TestDevice testDevice, TestDeviceCallbackHandler _)
{
TelemetryMessage testMessage = TelemetryE2ETests.ComposeD2cTestMessage(out string payload, out string p1Value);
TelemetryMessage testMessage = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string payload, out string p1Value);

VerboseTestLogger.WriteLine($"{nameof(FaultInjectionPoolAmqpTests)}.{testDevice.Id}: payload='{payload}' p1Value='{p1Value}'");
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestClass]
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
public class MessageReceiveE2EPoolAmqpTests : E2EMsTestBase
public class IncomingMessageCallbackE2ePoolAmqpTests : E2EMsTestBase
{
private readonly string DevicePrefix = $"{nameof(MessageReceiveE2EPoolAmqpTests)}_";
private readonly string DevicePrefix = $"{nameof(IncomingMessageCallbackE2ePoolAmqpTests)}_";

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
Expand Down Expand Up @@ -132,11 +132,11 @@ async Task InitOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _

async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _)
{
VerboseTestLogger.WriteLine($"{nameof(MessageReceiveE2EPoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
VerboseTestLogger.WriteLine($"{nameof(IncomingMessageCallbackE2ePoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
await testDevice.OpenWithRetryAsync().ConfigureAwait(false);

Tuple<OutgoingMessage, string> msgSent = messagesSent[testDevice.Id];
await MessageReceiveE2ETests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msgSent.Item1, msgSent.Item2).ConfigureAwait(false);
await IncomingMessageCallbackE2eTests.VerifyReceivedC2dMessageAsync(testDevice.DeviceClient, testDevice.Id, msgSent.Item1, msgSent.Item2).ConfigureAwait(false);
}

await PoolingOverAmqp
Expand Down Expand Up @@ -176,7 +176,7 @@ async Task InitOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler t

async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler testDeviceCallbackHandler)
{
VerboseTestLogger.WriteLine($"{nameof(MessageReceiveE2EPoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");
VerboseTestLogger.WriteLine($"{nameof(IncomingMessageCallbackE2ePoolAmqpTests)}: Preparing to receive message for device {testDevice.Id}");

using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
await testDeviceCallbackHandler.WaitForReceiveMessageCallbackAsync(cts.Token).ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.E2ETests.helpers;
using Microsoft.Azure.Devices.E2ETests.Helpers;
using Microsoft.Azure.Devices.E2ETests.Helpers.Templates;
using Microsoft.VisualStudio.TestTools.UnitTesting;
Expand All @@ -20,9 +20,9 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
[TestCategory("LongRunning")]
public class MessageReceiveE2ETests : E2EMsTestBase
public class IncomingMessageCallbackE2eTests : E2EMsTestBase
{
private static readonly string s_devicePrefix = $"{nameof(MessageReceiveE2ETests)}_";
private static readonly string s_devicePrefix = $"{nameof(IncomingMessageCallbackE2eTests)}_";

private static readonly TimeSpan s_oneSecond = TimeSpan.FromSeconds(1);
private static readonly TimeSpan s_fiveSeconds = TimeSpan.FromSeconds(5);
Expand Down Expand Up @@ -88,16 +88,14 @@ public static async Task VerifyReceivedC2dMessageAsync(IotHubDeviceClient dc, st

using var cts = new CancellationTokenSource(s_oneMinute);
var c2dMessageReceived = new TaskCompletionSource<IncomingMessage>(TaskCreationOptions.RunContinuationsAsynchronously);
Func<IncomingMessage, Task<MessageAcknowledgement>> OnC2DMessageReceived = (message) =>
Task<MessageAcknowledgement> OnC2DMessageReceived(IncomingMessage message)
{
c2dMessageReceived.TrySetResult(message);
return Task.FromResult(MessageAcknowledgement.Complete);
};
}
await dc.SetIncomingMessageCallbackAsync(OnC2DMessageReceived).ConfigureAwait(false);

IncomingMessage receivedMessage = await TaskCompletionSourceHelper
.GetTaskCompletionSourceResultAsync(c2dMessageReceived, cts.Token)
.ConfigureAwait(false);
IncomingMessage receivedMessage = await c2dMessageReceived.WaitAsync(cts.Token).ConfigureAwait(false);

receivedMessage.MessageId.Should().Be(message.MessageId, "Received message Id is not what was sent by service");
receivedMessage.UserId.Should().Be(message.UserId, "Received user Id is not what was sent by service");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestClass]
[TestCategory("FaultInjection")]
[TestCategory("IoTHub-Client")]
public partial class MessageReceiveFaultInjectionTests : E2EMsTestBase
public partial class IncomingMessageCallbackFaultInjectionTests : E2EMsTestBase
{
private readonly string DevicePrefix = $"{nameof(MessageReceiveFaultInjectionTests)}_";
private readonly string DevicePrefix = $"{nameof(IncomingMessageCallbackFaultInjectionTests)}_";

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
Expand Down
2 changes: 1 addition & 1 deletion e2e/Tests/iothub/device/MessageSendFaultInjectionTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ async Task InitAsync(IotHubDeviceClient deviceClient, TestDevice testDevice)

async Task TestOperationAsync(IotHubDeviceClient deviceClient, TestDevice testDevice)
{
TelemetryMessage testMessage = TelemetryE2ETests.ComposeD2cTestMessage(out string _, out string _);
TelemetryMessage testMessage = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string _, out string _);
using var cts = new CancellationTokenSource(operationTimeout);
await deviceClient.SendTelemetryAsync(testMessage, cts.Token).ConfigureAwait(false);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestClass]
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
public partial class TelemetryE2ETests : E2EMsTestBase
public partial class TelemetryMessageE2eTests : E2EMsTestBase
{
private const int MessageBatchCount = 5;

Expand All @@ -28,7 +28,7 @@ public partial class TelemetryE2ETests : E2EMsTestBase
// the message size is less than 1 MB.
private const int OverlyExceedAllowedMessageSizeInBytes = 3000 * 1024;

private readonly string _idPrefix = $"{nameof(TelemetryE2ETests)}_";
private readonly string _idPrefix = $"{nameof(TelemetryMessageE2eTests)}_";
private static readonly string s_proxyServerAddress = TestConfiguration.IotHub.ProxyServerAddress;

[TestMethod]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ namespace Microsoft.Azure.Devices.E2ETests.Messaging
[TestClass]
[TestCategory("E2E")]
[TestCategory("IoTHub-Client")]
public class MessageSendE2EPoolAmqpTests : E2EMsTestBase
public class TelemetryMessageSendE2ePoolAmqpTests : E2EMsTestBase
{
private readonly string _devicePrefix = $"{nameof(MessageSendE2EPoolAmqpTests)}_";
private readonly string _devicePrefix = $"{nameof(TelemetryMessageSendE2ePoolAmqpTests)}_";

[TestMethod]
[Timeout(LongRunningTestTimeoutMilliseconds)]
Expand Down Expand Up @@ -53,8 +53,8 @@ async Task InitAsync(TestDevice testDevice, TestDeviceCallbackHandler c)

async Task TestOperationAsync(TestDevice testDevice, TestDeviceCallbackHandler _)
{
TelemetryMessage testMessage = TelemetryE2ETests.ComposeD2cTestMessage(out string payload, out string p1Value);
VerboseTestLogger.WriteLine($"{nameof(MessageSendE2EPoolAmqpTests)}.{testDevice.Id}: messageId='{testMessage.MessageId}' payload='{payload}' p1Value='{p1Value}'");
TelemetryMessage testMessage = TelemetryMessageE2eTests.ComposeD2cTestMessage(out string payload, out string p1Value);
VerboseTestLogger.WriteLine($"{nameof(TelemetryMessageSendE2ePoolAmqpTests)}.{testDevice.Id}: messageId='{testMessage.MessageId}' payload='{payload}' p1Value='{p1Value}'");
await testDevice.DeviceClient.SendTelemetryAsync(testMessage).ConfigureAwait(false);
}

Expand Down
21 changes: 10 additions & 11 deletions e2e/Tests/iothub/service/FileUploadNotificationE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ public class FileUploadNotificationE2ETest : E2EMsTestBase
[DataRow(IotHubTransportProtocol.WebSocket, 1, true)]
public async Task FileUploadNotification_FileUploadNotificationProcessor_ReceivesNotifications(IotHubTransportProtocol protocol, int filesToUpload, bool shouldReconnect)
{
// arrange

var options = new IotHubServiceClientOptions
{
Protocol = protocol
};

using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(TestTimeoutMilliseconds));
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
using var serviceClient = new IotHubServiceClient(TestConfiguration.IotHub.ConnectionString, options);
using StorageContainer storage = await StorageContainer.GetInstanceAsync("fileupload", false).ConfigureAwait(false);
using var fileNotification = new SemaphoreSlim(1, 1);
Expand Down Expand Up @@ -94,22 +96,23 @@ async Task<AcknowledgementType> OnFileUploadNotificationReceived(FileUploadNotif
await serviceClient.FileUploadNotifications.OpenAsync(cts.Token).ConfigureAwait(false);
}

// act
for (int i = 0; i < filesToUpload; ++i)
{
string fileName = $"TestPayload-{Guid.NewGuid()}.txt";
files.Add(fileName, false);
await UploadFile(fileName, cts.Token).ConfigureAwait(false);
}

await Task
.WhenAny(
allFilesFound.Task,
Task.Delay(-1, cts.Token))
.ConfigureAwait(false);
VerboseTestLogger.WriteLine($"Waiting on file upload notification...");
await allFilesFound.WaitAsync(cts.Token).ConfigureAwait(false);

// assert
allFilesFound.Task.IsCompleted.Should().BeTrue();
}
finally
{
VerboseTestLogger.WriteLine($"Cleanup: closing client...");
await serviceClient.FileUploadNotifications.CloseAsync().ConfigureAwait(false);
}
}
Expand Down Expand Up @@ -149,11 +152,7 @@ public async Task FileUploadNotification_ErrorProcessor_ReceivesNotifications(Io
// file upload notification without closing and re-opening as long as there is more
// than one file upload notification to consume.
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(TestTimeoutMilliseconds));
await Task
.WhenAny(
errorProcessorNotified.Task,
Task.Delay(-1, cts.Token))
.ConfigureAwait(false);
await errorProcessorNotified.WaitAsync(cts.Token).ConfigureAwait(false);
errorProcessorNotified.Task.IsCompleted.Should().BeTrue();
}
finally
Expand Down
16 changes: 4 additions & 12 deletions e2e/Tests/iothub/service/MessageFeedbackReceiverE2ETest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -77,22 +77,14 @@ Task<MessageAcknowledgement> OnC2DMessageReceived(IncomingMessage message)
await serviceClient.Messages.SendAsync(testDevice.Device.Id, message).ConfigureAwait(false);

// Wait for the device to receive the message.
await Task
.WhenAny(
Task.Delay(TimeSpan.FromSeconds(20)),
c2dMessageReceived.Task)
.ConfigureAwait(false);
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(20));
await c2dMessageReceived.WaitAsync(cts.Token).ConfigureAwait(false);

c2dMessageReceived.Task.IsCompleted.Should().BeTrue("Timed out waiting for C2D message to be received by device");

// Wait for the service to receive the feedback message.
await Task
.WhenAny(
// Wait for up to 200 seconds for the feedback message as the service may not send messages
// until they can batch others, even up to a minute later.
Task.Delay(TimeSpan.FromSeconds(200)),
feedbackMessageReceived.Task)
.ConfigureAwait(false);
using var cts2 = new CancellationTokenSource(TimeSpan.FromSeconds(200));
await feedbackMessageReceived.WaitAsync(cts2.Token).ConfigureAwait(false);

feedbackMessageReceived.Task.IsCompleted.Should().BeTrue("service client never received c2d feedback message even though the device received the message");
}
Expand Down
Loading

0 comments on commit 3e65aed

Please sign in to comment.