From 066e6f4e87f5a619f4022b34a44d81346601016f Mon Sep 17 00:00:00 2001 From: "David R. Williamson" Date: Mon, 27 Mar 2023 17:28:51 -0700 Subject: [PATCH 1/2] Allow a message to be sent to multiple outputs --- iothub/device/src/InternalClient.cs | 13 +++---------- iothub/device/src/Message.cs | 1 - 2 files changed, 3 insertions(+), 11 deletions(-) diff --git a/iothub/device/src/InternalClient.cs b/iothub/device/src/InternalClient.cs index 7c6c10709b..278be06b9b 100644 --- a/iothub/device/src/InternalClient.cs +++ b/iothub/device/src/InternalClient.cs @@ -707,10 +707,7 @@ public Task SendEventBatchAsync(IEnumerable messages, CancellationToken { foreach (Message message in messages) { - if (message.MessageId == null) - { - message.MessageId = Guid.NewGuid().ToString(); - } + message.MessageId ??= Guid.NewGuid().ToString(); } } @@ -1502,7 +1499,7 @@ public Task SendEventAsync(string outputName, Message message, CancellationToken throw new ArgumentNullException(nameof(message)); } - message.SystemProperties.Add(MessageSystemPropertyNames.OutputName, outputName); + message.SystemProperties[MessageSystemPropertyNames.OutputName] = outputName; return InnerHandler.SendEventAsync(message, cancellationToken); } @@ -1635,11 +1632,7 @@ public async Task SetInputMessageHandlerAsync( // When using a device module we need to enable the 'deviceBound' message link await EnableEventReceiveAsync(isAnEdgeModule, cancellationToken).ConfigureAwait(false); - if (_receiveEventEndpoints == null) - { - _receiveEventEndpoints = new Dictionary>(); - } - + _receiveEventEndpoints ??= new Dictionary>(); _receiveEventEndpoints[inputName] = new Tuple(messageHandler, userContext); } else diff --git a/iothub/device/src/Message.cs b/iothub/device/src/Message.cs index 67963f0dff..7cd93fdd14 100644 --- a/iothub/device/src/Message.cs +++ b/iothub/device/src/Message.cs @@ -258,7 +258,6 @@ public string ComponentName bool IReadOnlyIndicator.IsReadOnly => Interlocked.Read(ref _sizeInBytesCalled) == 1; - /// /// The body stream of the current event data instance /// From 55287f57ecdcaad63577d3afd76b0a34b5ff0c98 Mon Sep 17 00:00:00 2001 From: "David R. Williamson" Date: Mon, 27 Mar 2023 17:46:19 -0700 Subject: [PATCH 2/2] Add an E2E test --- e2e/test/helpers/TestDevice.cs | 10 +++--- ...enticationWithTokenRefreshDisposalTests.cs | 12 +++---- e2e/test/iothub/DeviceTokenRefreshE2ETests.cs | 2 +- e2e/test/iothub/FileUploadE2ETests.cs | 6 ++-- .../iothub/messaging/MessageSendE2ETests.cs | 36 ++++++++++++++++--- iothub/device/tests/ModuleClientTests.cs | 2 -- 6 files changed, 47 insertions(+), 21 deletions(-) diff --git a/e2e/test/helpers/TestDevice.cs b/e2e/test/helpers/TestDevice.cs index d72bc16195..724174cae7 100644 --- a/e2e/test/helpers/TestDevice.cs +++ b/e2e/test/helpers/TestDevice.cs @@ -27,9 +27,9 @@ public enum ConnectionStringAuthScope public class TestDevice : IDisposable { private const int MaxRetryCount = 5; - private static readonly HashSet s_throttlingExceptions = new HashSet { typeof(ThrottlingException), }; - private static readonly HashSet s_getRetryableExceptions = new HashSet(s_throttlingExceptions) { typeof(DeviceNotFoundException) }; - private static readonly SemaphoreSlim s_semaphore = new SemaphoreSlim(1, 1); + private static readonly HashSet s_throttlingExceptions = new() { typeof(ThrottlingException), }; + private static readonly HashSet s_getRetryableExceptions = new(s_throttlingExceptions) { typeof(DeviceNotFoundException) }; + private static readonly SemaphoreSlim s_semaphore = new(1, 1); private static readonly IRetryPolicy s_exponentialBackoffRetryStrategy = new ExponentialBackoff( retryCount: MaxRetryCount, @@ -148,7 +148,7 @@ public string ConnectionString /// /// Used in conjunction with DeviceClient.Create() /// - public string IotHubHostName => GetHostName(TestConfiguration.IotHub.ConnectionString); + public static string IotHubHostName { get; } = GetHostName(TestConfiguration.IotHub.ConnectionString); /// /// Device Id @@ -173,7 +173,7 @@ public DeviceClient CreateDeviceClient(Client.TransportType transport, ClientOpt } else { - deviceClient = DeviceClient.Create(IotHubHostName, AuthenticationMethod, transport, options); + deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, AuthenticationMethod, transport, options); VerboseTestLogger.WriteLine($"{nameof(CreateDeviceClient)}: Created {nameof(DeviceClient)} {Device.Id} from IAuthenticationMethod"); } diff --git a/e2e/test/iothub/AuthenticationWithTokenRefreshDisposalTests.cs b/e2e/test/iothub/AuthenticationWithTokenRefreshDisposalTests.cs index 504b31987d..4ac04ada2f 100644 --- a/e2e/test/iothub/AuthenticationWithTokenRefreshDisposalTests.cs +++ b/e2e/test/iothub/AuthenticationWithTokenRefreshDisposalTests.cs @@ -117,7 +117,7 @@ private async Task AuthenticationMethodDisposesTokenRefresher(Client.TransportTy var authenticationMethod = new DeviceAuthenticationSasToken(testDevice.ConnectionString, disposeWithClient: true); // Create an instance of the device client, send a test message and then close and dispose it. - var deviceClient = DeviceClient.Create(testDevice.IotHubHostName, authenticationMethod, transport); + var deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, authenticationMethod, transport); using var message1 = new Client.Message(); await deviceClient.SendEventAsync(message1).ConfigureAwait(false); await deviceClient.CloseAsync(); @@ -127,7 +127,7 @@ private async Task AuthenticationMethodDisposesTokenRefresher(Client.TransportTy // Perform the same steps again, reusing the previously created authentication method instance. // Since the default behavior is to dispose AuthenticationWithTokenRefresh authentication method on DeviceClient disposal, // this should now throw an ObjectDisposedException. - var deviceClient2 = DeviceClient.Create(testDevice.IotHubHostName, authenticationMethod, transport); + var deviceClient2 = DeviceClient.Create(TestDevice.IotHubHostName, authenticationMethod, transport); using var message2 = new Client.Message(); Func act = async () => await deviceClient2.SendEventAsync(message2).ConfigureAwait(false); @@ -143,7 +143,7 @@ private async Task ReuseAuthenticationMethod_SingleDevice(Client.TransportType t var authenticationMethod = new DeviceAuthenticationSasToken(testDevice.ConnectionString, disposeWithClient: false); // Create an instance of the device client, send a test message and then close and dispose it. - var deviceClient = DeviceClient.Create(testDevice.IotHubHostName, authenticationMethod, transport); + var deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, authenticationMethod, transport); using var message1 = new Client.Message(); await deviceClient.SendEventAsync(message1).ConfigureAwait(false); await deviceClient.CloseAsync(); @@ -152,7 +152,7 @@ private async Task ReuseAuthenticationMethod_SingleDevice(Client.TransportType t // Perform the same steps again, reusing the previously created authentication method instance to ensure // that the SDK did not dispose the user supplied authentication method instance. - var deviceClient2 = DeviceClient.Create(testDevice.IotHubHostName, authenticationMethod, transport); + var deviceClient2 = DeviceClient.Create(TestDevice.IotHubHostName, authenticationMethod, transport); using var message2 = new Client.Message(); await deviceClient2.SendEventAsync(message2).ConfigureAwait(false); await deviceClient2.CloseAsync(); @@ -194,7 +194,7 @@ private async Task ReuseAuthenticationMethod_MuxedDevices(Client.TransportType t for (int i = 0; i < devicesCount; i++) { #pragma warning disable CA2000 // Dispose objects before losing scope - the client instance is disposed during the course of the test. - var deviceClient = DeviceClient.Create(testDevices[i].IotHubHostName, authenticationMethods[i], new ITransportSettings[] { amqpTransportSettings }); + var deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, authenticationMethods[i], new ITransportSettings[] { amqpTransportSettings }); #pragma warning restore CA2000 // Dispose objects before losing scope var amqpConnectionStatusChange = new AmqpConnectionStatusChange(testDevices[i].Id); @@ -259,7 +259,7 @@ private async Task ReuseAuthenticationMethod_MuxedDevices(Client.TransportType t { #pragma warning disable CA2000 // Dispose objects before losing scope - the client instance is disposed at the end of the test. var deviceClient = DeviceClient.Create( - testDevices[i].IotHubHostName, + TestDevice.IotHubHostName, authenticationMethods[i], new ITransportSettings[] { diff --git a/e2e/test/iothub/DeviceTokenRefreshE2ETests.cs b/e2e/test/iothub/DeviceTokenRefreshE2ETests.cs index 9aad0222db..4bdd6cc671 100644 --- a/e2e/test/iothub/DeviceTokenRefreshE2ETests.cs +++ b/e2e/test/iothub/DeviceTokenRefreshE2ETests.cs @@ -189,7 +189,7 @@ private async Task DeviceClient_TokenIsRefreshed_Internal(Client.TransportType t buffer, transport); - using var deviceClient = DeviceClient.Create(testDevice.IotHubHostName, refresher, transport); + using var deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, refresher, transport); VerboseTestLogger.WriteLine($"Created {nameof(DeviceClient)}"); if (transport == Client.TransportType.Mqtt) diff --git a/e2e/test/iothub/FileUploadE2ETests.cs b/e2e/test/iothub/FileUploadE2ETests.cs index c908a0d53e..f522c26f11 100644 --- a/e2e/test/iothub/FileUploadE2ETests.cs +++ b/e2e/test/iothub/FileUploadE2ETests.cs @@ -146,7 +146,7 @@ private async Task UploadFileGranularAsync(Stream source, string filename, Http1 cert = s_selfSignedCertificate; x509Auth = new DeviceAuthenticationWithX509Certificate(testDevice.Id, cert); - deviceClient = DeviceClient.Create(testDevice.IotHubHostName, x509Auth, Client.TransportType.Http1); + deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, x509Auth, Client.TransportType.Http1); } else { @@ -193,7 +193,7 @@ private async Task UploadFileAsync(Client.TransportType transport, string filena cert = s_selfSignedCertificate; x509Auth = new DeviceAuthenticationWithX509Certificate(testDevice.Id, cert); - deviceClient = DeviceClient.Create(testDevice.IotHubHostName, x509Auth, transport); + deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, x509Auth, transport); } else { @@ -231,7 +231,7 @@ private async Task GetSasUriAsync(Client.TransportType transport, string blobNam cert = s_selfSignedCertificate; x509Auth = new DeviceAuthenticationWithX509Certificate(testDevice.Id, cert); - deviceClient = DeviceClient.Create(testDevice.IotHubHostName, x509Auth, transport); + deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, x509Auth, transport); } else { diff --git a/e2e/test/iothub/messaging/MessageSendE2ETests.cs b/e2e/test/iothub/messaging/MessageSendE2ETests.cs index ad81591540..bf4aea40f7 100644 --- a/e2e/test/iothub/messaging/MessageSendE2ETests.cs +++ b/e2e/test/iothub/messaging/MessageSendE2ETests.cs @@ -7,6 +7,7 @@ using System.Net; using System.Text; using System.Threading.Tasks; +using FluentAssertions; using Microsoft.Azure.Devices.Client; using Microsoft.Azure.Devices.Client.Exceptions; using Microsoft.Azure.Devices.Client.Transport.Mqtt; @@ -102,7 +103,7 @@ public async Task Message_DeviceSendSingleMessage_AmqpWs_WithHeartbeats() [TestCategory("LongRunning")] public async Task Message_DeviceSendSingleMessage_Http_WithProxy() { - var httpTransportSettings = new Client.Http1TransportSettings + var httpTransportSettings = new Http1TransportSettings { Proxy = new WebProxy(s_proxyServerAddress) }; @@ -173,7 +174,7 @@ public async Task Message_ModuleSendSingleMessage_AmqpWs_WithProxy() [TestCategory("Proxy")] public async Task Message_ModuleSendSingleMessage_MqttWs_WithProxy() { - var mqttTransportSettings = new Client.Transport.Mqtt.MqttTransportSettings(Client.TransportType.Mqtt_WebSocket_Only) + var mqttTransportSettings = new MqttTransportSettings(Client.TransportType.Mqtt_WebSocket_Only) { Proxy = new WebProxy(s_proxyServerAddress) }; @@ -182,6 +183,33 @@ public async Task Message_ModuleSendSingleMessage_MqttWs_WithProxy() await SendSingleMessageModule(transportSettings).ConfigureAwait(false); } + [TestMethod] + [Timeout(TestTimeoutMilliseconds)] + public async Task Message_ModuleSendsMessageToRouteTwice() + { + // arrange + + TestModule testModule = await TestModule.GetTestModuleAsync(nameof(Message_ModuleSendsMessageToRouteTwice), "module").ConfigureAwait(false); + + try + { + using var moduleClient = ModuleClient.CreateFromConnectionString(testModule.ConnectionString); + using var message = new Client.Message(new byte[10]); + await moduleClient.SendEventAsync("output1", message).ConfigureAwait(false); + + // act + Func secondSend = async () => await moduleClient.SendEventAsync("output2", message).ConfigureAwait(false); + + // assert + await secondSend.Should().NotThrowAsync(); + } + finally + { + using RegistryManager rm = RegistryManager.CreateFromConnectionString(TestConfiguration.IotHub.ConnectionString); + await rm.RemoveDeviceAsync(testModule.DeviceId).ConfigureAwait(false); + } + } + [TestMethod] [Timeout(TestTimeoutMilliseconds)] public async Task X509_DeviceSendSingleMessage_Amqp() @@ -424,7 +452,7 @@ private async Task SendSingleMessageModule(ITransportSettings[] transportSetting using var moduleClient = ModuleClient.CreateFromConnectionString(testModule.ConnectionString, transportSettings); await moduleClient.OpenAsync().ConfigureAwait(false); - await SendSingleMessageModuleAsync(moduleClient).ConfigureAwait(false); + await MessageSendE2ETests.SendSingleMessageModuleAsync(moduleClient).ConfigureAwait(false); await moduleClient.CloseAsync().ConfigureAwait(false); } @@ -462,7 +490,7 @@ public static async Task SendBatchMessagesAsync(DeviceClient deviceClient) } } - private async Task SendSingleMessageModuleAsync(ModuleClient moduleClient) + private static async Task SendSingleMessageModuleAsync(ModuleClient moduleClient) { using Client.Message testMessage = ComposeD2cTestMessage(out string _, out string _); diff --git a/iothub/device/tests/ModuleClientTests.cs b/iothub/device/tests/ModuleClientTests.cs index 8861f9a5ff..cdffa26901 100644 --- a/iothub/device/tests/ModuleClientTests.cs +++ b/iothub/device/tests/ModuleClientTests.cs @@ -86,7 +86,6 @@ public void ModuleClient_CreateFromConnectionStringWithClientOptions_DoesNotThro } [TestMethod] - // Tests_SRS_DEVICECLIENT_33_003: [** It shall EnableEventReceiveAsync when called for the first time. **]** public async Task ModuleClient_SetReceiveCallbackAsync_SetCallback_Mqtt() { var transportType = TransportType.Mqtt_Tcp_Only; @@ -103,7 +102,6 @@ public async Task ModuleClient_SetReceiveCallbackAsync_SetCallback_Mqtt() } [TestMethod] - // Tests_SRS_DEVICECLIENT_33_004: [** It shall call DisableEventReceiveAsync when the last delegate has been removed. **]** public async Task ModuleClient_SetReceiveCallbackAsync_RemoveCallback_Mqtt() { var transportType = TransportType.Mqtt_Tcp_Only;