Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow a message to be sent to multiple outputs #3203

Merged
merged 2 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions e2e/test/helpers/TestDevice.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ public enum ConnectionStringAuthScope
public class TestDevice : IDisposable
{
private const int MaxRetryCount = 5;
private static readonly HashSet<Type> s_throttlingExceptions = new HashSet<Type> { typeof(ThrottlingException), };
private static readonly HashSet<Type> s_getRetryableExceptions = new HashSet<Type>(s_throttlingExceptions) { typeof(DeviceNotFoundException) };
private static readonly SemaphoreSlim s_semaphore = new SemaphoreSlim(1, 1);
private static readonly HashSet<Type> s_throttlingExceptions = new() { typeof(ThrottlingException), };
private static readonly HashSet<Type> s_getRetryableExceptions = new(s_throttlingExceptions) { typeof(DeviceNotFoundException) };
private static readonly SemaphoreSlim s_semaphore = new(1, 1);

private static readonly IRetryPolicy s_exponentialBackoffRetryStrategy = new ExponentialBackoff(
retryCount: MaxRetryCount,
Expand Down Expand Up @@ -148,7 +148,7 @@ public string ConnectionString
/// <summary>
/// Used in conjunction with DeviceClient.Create()
/// </summary>
public string IotHubHostName => GetHostName(TestConfiguration.IotHub.ConnectionString);
public static string IotHubHostName { get; } = GetHostName(TestConfiguration.IotHub.ConnectionString);

/// <summary>
/// Device Id
Expand All @@ -173,7 +173,7 @@ public DeviceClient CreateDeviceClient(Client.TransportType transport, ClientOpt
}
else
{
deviceClient = DeviceClient.Create(IotHubHostName, AuthenticationMethod, transport, options);
deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, AuthenticationMethod, transport, options);
VerboseTestLogger.WriteLine($"{nameof(CreateDeviceClient)}: Created {nameof(DeviceClient)} {Device.Id} from IAuthenticationMethod");
}

Expand Down
12 changes: 6 additions & 6 deletions e2e/test/iothub/AuthenticationWithTokenRefreshDisposalTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private async Task AuthenticationMethodDisposesTokenRefresher(Client.TransportTy
var authenticationMethod = new DeviceAuthenticationSasToken(testDevice.ConnectionString, disposeWithClient: true);

// Create an instance of the device client, send a test message and then close and dispose it.
var deviceClient = DeviceClient.Create(testDevice.IotHubHostName, authenticationMethod, transport);
var deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, authenticationMethod, transport);
using var message1 = new Client.Message();
await deviceClient.SendEventAsync(message1).ConfigureAwait(false);
await deviceClient.CloseAsync();
Expand All @@ -127,7 +127,7 @@ private async Task AuthenticationMethodDisposesTokenRefresher(Client.TransportTy
// Perform the same steps again, reusing the previously created authentication method instance.
// Since the default behavior is to dispose AuthenticationWithTokenRefresh authentication method on DeviceClient disposal,
// this should now throw an ObjectDisposedException.
var deviceClient2 = DeviceClient.Create(testDevice.IotHubHostName, authenticationMethod, transport);
var deviceClient2 = DeviceClient.Create(TestDevice.IotHubHostName, authenticationMethod, transport);
using var message2 = new Client.Message();

Func<Task> act = async () => await deviceClient2.SendEventAsync(message2).ConfigureAwait(false);
Expand All @@ -143,7 +143,7 @@ private async Task ReuseAuthenticationMethod_SingleDevice(Client.TransportType t
var authenticationMethod = new DeviceAuthenticationSasToken(testDevice.ConnectionString, disposeWithClient: false);

// Create an instance of the device client, send a test message and then close and dispose it.
var deviceClient = DeviceClient.Create(testDevice.IotHubHostName, authenticationMethod, transport);
var deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, authenticationMethod, transport);
using var message1 = new Client.Message();
await deviceClient.SendEventAsync(message1).ConfigureAwait(false);
await deviceClient.CloseAsync();
Expand All @@ -152,7 +152,7 @@ private async Task ReuseAuthenticationMethod_SingleDevice(Client.TransportType t

// Perform the same steps again, reusing the previously created authentication method instance to ensure
// that the SDK did not dispose the user supplied authentication method instance.
var deviceClient2 = DeviceClient.Create(testDevice.IotHubHostName, authenticationMethod, transport);
var deviceClient2 = DeviceClient.Create(TestDevice.IotHubHostName, authenticationMethod, transport);
using var message2 = new Client.Message();
await deviceClient2.SendEventAsync(message2).ConfigureAwait(false);
await deviceClient2.CloseAsync();
Expand Down Expand Up @@ -194,7 +194,7 @@ private async Task ReuseAuthenticationMethod_MuxedDevices(Client.TransportType t
for (int i = 0; i < devicesCount; i++)
{
#pragma warning disable CA2000 // Dispose objects before losing scope - the client instance is disposed during the course of the test.
var deviceClient = DeviceClient.Create(testDevices[i].IotHubHostName, authenticationMethods[i], new ITransportSettings[] { amqpTransportSettings });
var deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, authenticationMethods[i], new ITransportSettings[] { amqpTransportSettings });
#pragma warning restore CA2000 // Dispose objects before losing scope

var amqpConnectionStatusChange = new AmqpConnectionStatusChange(testDevices[i].Id);
Expand Down Expand Up @@ -259,7 +259,7 @@ private async Task ReuseAuthenticationMethod_MuxedDevices(Client.TransportType t
{
#pragma warning disable CA2000 // Dispose objects before losing scope - the client instance is disposed at the end of the test.
var deviceClient = DeviceClient.Create(
testDevices[i].IotHubHostName,
TestDevice.IotHubHostName,
authenticationMethods[i],
new ITransportSettings[]
{
Expand Down
2 changes: 1 addition & 1 deletion e2e/test/iothub/DeviceTokenRefreshE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ private async Task DeviceClient_TokenIsRefreshed_Internal(Client.TransportType t
buffer,
transport);

using var deviceClient = DeviceClient.Create(testDevice.IotHubHostName, refresher, transport);
using var deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, refresher, transport);
VerboseTestLogger.WriteLine($"Created {nameof(DeviceClient)}");

if (transport == Client.TransportType.Mqtt)
Expand Down
6 changes: 3 additions & 3 deletions e2e/test/iothub/FileUploadE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private async Task UploadFileGranularAsync(Stream source, string filename, Http1
cert = s_selfSignedCertificate;
x509Auth = new DeviceAuthenticationWithX509Certificate(testDevice.Id, cert);

deviceClient = DeviceClient.Create(testDevice.IotHubHostName, x509Auth, Client.TransportType.Http1);
deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, x509Auth, Client.TransportType.Http1);
}
else
{
Expand Down Expand Up @@ -193,7 +193,7 @@ private async Task UploadFileAsync(Client.TransportType transport, string filena
cert = s_selfSignedCertificate;
x509Auth = new DeviceAuthenticationWithX509Certificate(testDevice.Id, cert);

deviceClient = DeviceClient.Create(testDevice.IotHubHostName, x509Auth, transport);
deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, x509Auth, transport);
}
else
{
Expand Down Expand Up @@ -231,7 +231,7 @@ private async Task GetSasUriAsync(Client.TransportType transport, string blobNam
cert = s_selfSignedCertificate;
x509Auth = new DeviceAuthenticationWithX509Certificate(testDevice.Id, cert);

deviceClient = DeviceClient.Create(testDevice.IotHubHostName, x509Auth, transport);
deviceClient = DeviceClient.Create(TestDevice.IotHubHostName, x509Auth, transport);
}
else
{
Expand Down
36 changes: 32 additions & 4 deletions e2e/test/iothub/messaging/MessageSendE2ETests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Net;
using System.Text;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Client.Exceptions;
using Microsoft.Azure.Devices.Client.Transport.Mqtt;
Expand Down Expand Up @@ -102,7 +103,7 @@ public async Task Message_DeviceSendSingleMessage_AmqpWs_WithHeartbeats()
[TestCategory("LongRunning")]
public async Task Message_DeviceSendSingleMessage_Http_WithProxy()
{
var httpTransportSettings = new Client.Http1TransportSettings
var httpTransportSettings = new Http1TransportSettings
{
Proxy = new WebProxy(s_proxyServerAddress)
};
Expand Down Expand Up @@ -173,7 +174,7 @@ public async Task Message_ModuleSendSingleMessage_AmqpWs_WithProxy()
[TestCategory("Proxy")]
public async Task Message_ModuleSendSingleMessage_MqttWs_WithProxy()
{
var mqttTransportSettings = new Client.Transport.Mqtt.MqttTransportSettings(Client.TransportType.Mqtt_WebSocket_Only)
var mqttTransportSettings = new MqttTransportSettings(Client.TransportType.Mqtt_WebSocket_Only)
{
Proxy = new WebProxy(s_proxyServerAddress)
};
Expand All @@ -182,6 +183,33 @@ public async Task Message_ModuleSendSingleMessage_MqttWs_WithProxy()
await SendSingleMessageModule(transportSettings).ConfigureAwait(false);
}

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
public async Task Message_ModuleSendsMessageToRouteTwice()
{
// arrange

TestModule testModule = await TestModule.GetTestModuleAsync(nameof(Message_ModuleSendsMessageToRouteTwice), "module").ConfigureAwait(false);

try
{
using var moduleClient = ModuleClient.CreateFromConnectionString(testModule.ConnectionString);
using var message = new Client.Message(new byte[10]);
await moduleClient.SendEventAsync("output1", message).ConfigureAwait(false);

// act
Func<Task> secondSend = async () => await moduleClient.SendEventAsync("output2", message).ConfigureAwait(false);

// assert
await secondSend.Should().NotThrowAsync();
}
finally
{
using RegistryManager rm = RegistryManager.CreateFromConnectionString(TestConfiguration.IotHub.ConnectionString);
await rm.RemoveDeviceAsync(testModule.DeviceId).ConfigureAwait(false);
}
}

[TestMethod]
[Timeout(TestTimeoutMilliseconds)]
public async Task X509_DeviceSendSingleMessage_Amqp()
Expand Down Expand Up @@ -424,7 +452,7 @@ private async Task SendSingleMessageModule(ITransportSettings[] transportSetting
using var moduleClient = ModuleClient.CreateFromConnectionString(testModule.ConnectionString, transportSettings);

await moduleClient.OpenAsync().ConfigureAwait(false);
await SendSingleMessageModuleAsync(moduleClient).ConfigureAwait(false);
await MessageSendE2ETests.SendSingleMessageModuleAsync(moduleClient).ConfigureAwait(false);
await moduleClient.CloseAsync().ConfigureAwait(false);
}

Expand Down Expand Up @@ -462,7 +490,7 @@ public static async Task SendBatchMessagesAsync(DeviceClient deviceClient)
}
}

private async Task SendSingleMessageModuleAsync(ModuleClient moduleClient)
private static async Task SendSingleMessageModuleAsync(ModuleClient moduleClient)
{
using Client.Message testMessage = ComposeD2cTestMessage(out string _, out string _);

Expand Down
13 changes: 3 additions & 10 deletions iothub/device/src/InternalClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -707,10 +707,7 @@ public Task SendEventBatchAsync(IEnumerable<Message> messages, CancellationToken
{
foreach (Message message in messages)
{
if (message.MessageId == null)
{
message.MessageId = Guid.NewGuid().ToString();
}
message.MessageId ??= Guid.NewGuid().ToString();
}
}

Expand Down Expand Up @@ -1502,7 +1499,7 @@ public Task SendEventAsync(string outputName, Message message, CancellationToken
throw new ArgumentNullException(nameof(message));
}

message.SystemProperties.Add(MessageSystemPropertyNames.OutputName, outputName);
message.SystemProperties[MessageSystemPropertyNames.OutputName] = outputName;
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved

return InnerHandler.SendEventAsync(message, cancellationToken);
}
Expand Down Expand Up @@ -1635,11 +1632,7 @@ public async Task SetInputMessageHandlerAsync(
// When using a device module we need to enable the 'deviceBound' message link
await EnableEventReceiveAsync(isAnEdgeModule, cancellationToken).ConfigureAwait(false);

if (_receiveEventEndpoints == null)
{
_receiveEventEndpoints = new Dictionary<string, Tuple<MessageHandler, object>>();
}

_receiveEventEndpoints ??= new Dictionary<string, Tuple<MessageHandler, object>>();
_receiveEventEndpoints[inputName] = new Tuple<MessageHandler, object>(messageHandler, userContext);
}
else
Expand Down
1 change: 0 additions & 1 deletion iothub/device/src/Message.cs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,6 @@ public string ComponentName

bool IReadOnlyIndicator.IsReadOnly => Interlocked.Read(ref _sizeInBytesCalled) == 1;


/// <summary>
/// The body stream of the current event data instance
/// </summary>
Expand Down
2 changes: 0 additions & 2 deletions iothub/device/tests/ModuleClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public void ModuleClient_CreateFromConnectionStringWithClientOptions_DoesNotThro
}

[TestMethod]
// Tests_SRS_DEVICECLIENT_33_003: [** It shall EnableEventReceiveAsync when called for the first time. **]**
public async Task ModuleClient_SetReceiveCallbackAsync_SetCallback_Mqtt()
{
var transportType = TransportType.Mqtt_Tcp_Only;
Expand All @@ -103,7 +102,6 @@ public async Task ModuleClient_SetReceiveCallbackAsync_SetCallback_Mqtt()
}

[TestMethod]
// Tests_SRS_DEVICECLIENT_33_004: [** It shall call DisableEventReceiveAsync when the last delegate has been removed. **]**
public async Task ModuleClient_SetReceiveCallbackAsync_RemoveCallback_Mqtt()
{
var transportType = TransportType.Mqtt_Tcp_Only;
Expand Down