diff --git a/e2e/LongHaul/device/IotHub.cs b/e2e/LongHaul/device/IotHub.cs
index 5b8342a348..5b2c189a15 100644
--- a/e2e/LongHaul/device/IotHub.cs
+++ b/e2e/LongHaul/device/IotHub.cs
@@ -33,6 +33,7 @@ internal sealed class IotHub : IAsyncDisposable
private ConnectionStatusChangeReason _disconnectedReason;
private RecommendedAction _disconnectedRecommendedAction;
private volatile IotHubDeviceClient _deviceClient;
+ private CancellationToken _ct;
private static readonly TimeSpan s_messageLoopSleepTime = TimeSpan.FromSeconds(3);
private static readonly TimeSpan s_deviceTwinUpdateInterval = TimeSpan.FromSeconds(10);
@@ -66,8 +67,13 @@ public IotHub(Logger logger, Parameters parameters)
///
/// Initializes the connection to IoT Hub.
///
- public async Task InitializeAsync()
+ public async Task InitializeAsync(CancellationToken? ct = null)
{
+ if (ct != null)
+ {
+ _ct = ct.Value;
+ }
+
await _lifetimeControl.WaitAsync().ConfigureAwait(false);
try
@@ -112,7 +118,7 @@ public async Task SendTelemetryMessagesAsync(Logger logger, CancellationToken ct
var pendingMessages = new List(maxBulkMessages);
logger.LoggerContext.Add(OperationName, LoggingConstants.TelemetryMessage);
var sw = new Stopwatch();
-
+
while (!ct.IsCancellationRequested)
{
logger.Metric(MessageBacklog, _messagesToSend.Count);
@@ -176,38 +182,6 @@ public async Task SendTelemetryMessagesAsync(Logger logger, CancellationToken ct
}
}
- public async Task ReportReadOnlyPropertiesAsync(Logger logger, CancellationToken ct)
- {
- logger.LoggerContext.Add(OperationName, ReportTwinProperties);
- var sw = new Stopwatch();
-
- while (!ct.IsCancellationRequested)
- {
- try
- {
- var reported = new ReportedProperties
- {
- { "TotalTelemetryMessagesSent", _totalTelemetryMessagesSent },
- };
-
- logger.Trace($"Updating reported properties.", TraceSeverity.Information);
- sw.Restart();
- await _deviceClient.UpdateReportedPropertiesAsync(reported, ct).ConfigureAwait(false);
- sw.Stop();
-
- ++_totalTwinUpdatesReported;
- logger.Metric(TotalTwinUpdatesReported, _totalTwinUpdatesReported);
- logger.Metric(ReportedTwinUpdateOperationSeconds, sw.Elapsed.TotalSeconds);
- }
- catch (Exception ex)
- {
- logger.Trace($"Exception when reporting properties when connected is {IsConnected}\n{ex}");
- }
-
- await Task.Delay(s_deviceTwinUpdateInterval, ct).ConfigureAwait(false);
- }
- }
-
public void AddTelemetry(
TelemetryBase telemetryObject,
IDictionary extraProperties = null)
@@ -244,7 +218,26 @@ public void AddTelemetry(
_messagesToSend.Enqueue(iotMessage);
}
- public async Task SetPropertiesAsync(string keyName, object properties, Logger logger, CancellationToken ct)
+ public async Task ReportReadOnlyPropertiesAsync(Logger logger, CancellationToken ct)
+ {
+ logger.LoggerContext.Add(OperationName, ReportTwinProperties);
+ var sw = new Stopwatch();
+
+ while (!ct.IsCancellationRequested)
+ {
+ sw.Restart();
+ await SetPropertiesAsync("totalTelemetryMessagesSent", _totalTelemetryMessagesSent, logger, ct).ConfigureAwait(false);
+ sw.Stop();
+
+ ++_totalTwinUpdatesReported;
+ logger.Metric(TotalTwinUpdatesReported, _totalTwinUpdatesReported);
+ logger.Metric(ReportedTwinUpdateOperationSeconds, sw.Elapsed.TotalSeconds);
+
+ await Task.Delay(s_deviceTwinUpdateInterval, ct).ConfigureAwait(false);
+ }
+ }
+
+ public async Task SetPropertiesAsync(string keyName, T properties, Logger logger, CancellationToken ct)
{
Debug.Assert(_deviceClient != null);
Debug.Assert(properties != null);
@@ -257,11 +250,11 @@ public async Task SetPropertiesAsync(string keyName, object properties, Logger l
{
try
{
- await _deviceClient
+ long version = await _deviceClient
.UpdateReportedPropertiesAsync(reportedProperties, ct)
.ConfigureAwait(false);
- logger.Trace($"Set the reported property with name [{keyName}] in device twin.", TraceSeverity.Information);
+ logger.Trace($"Set the reported property with key {keyName} and value {properties} in device twin; observed version {version}.", TraceSeverity.Information);
break;
}
catch (Exception ex)
@@ -270,6 +263,28 @@ await _deviceClient
await Task.Delay(s_retryInterval, ct).ConfigureAwait(false);
}
}
+
+ while (!ct.IsCancellationRequested)
+ {
+ try
+ {
+ TwinProperties twin = await _deviceClient.GetTwinPropertiesAsync(ct).ConfigureAwait(false);
+ if (!twin.Reported.TryGetValue(keyName, out T actualValue))
+ {
+ logger.Trace($"Couldn't find the reported property {keyName} in the device twin.", TraceSeverity.Warning);
+ }
+ else if (!actualValue.Equals(properties))
+ {
+ logger.Trace($"Couldn't validate value for {keyName} was set to {properties}, found {actualValue}.");
+ }
+ break;
+ }
+ catch (Exception ex)
+ {
+ logger.Trace($"Exception getting twin\n{ex}", TraceSeverity.Warning);
+ await Task.Delay(s_retryInterval, ct).ConfigureAwait(false);
+ }
+ }
}
public async Task UploadFilesAsync(Logger logger, CancellationToken ct)
@@ -410,7 +425,19 @@ private async void ConnectionStatusChangesHandlerAsync(ConnectionStatusInfo conn
{
case RecommendedAction.OpenConnection:
_logger.Trace($"Following recommended action of reinitializing the client.", TraceSeverity.Information);
- await InitializeAsync().ConfigureAwait(false);
+ while (!_ct.IsCancellationRequested)
+ {
+ try
+ {
+ await InitializeAsync().ConfigureAwait(false);
+ break;
+ }
+ catch (Exception ex)
+ {
+ _logger.Trace($"Exception re-initializing client\n{ex}", TraceSeverity.Warning);
+ await Task.Delay(s_retryInterval, _ct).ConfigureAwait(false);
+ }
+ }
break;
case RecommendedAction.PerformNormally:
@@ -479,7 +506,8 @@ private async Task DesiredPropertyUpdateCallbackAsync(DesiredProperties properti
if (reported.Any())
{
- await _deviceClient.UpdateReportedPropertiesAsync(reported).ConfigureAwait(false);
+ long version = await _deviceClient.UpdateReportedPropertiesAsync(reported).ConfigureAwait(false);
+ _logger.Trace($"Updated {reported.Count()} properties and observed new version {version}.", TraceSeverity.Information);
_totalDesiredPropertiesHandled += reported.Count();
_logger.Metric(TotalDesiredPropertiesHandled, _totalDesiredPropertiesHandled);
diff --git a/e2e/LongHaul/device/Program.cs b/e2e/LongHaul/device/Program.cs
index f2ae71a039..c69f955d13 100644
--- a/e2e/LongHaul/device/Program.cs
+++ b/e2e/LongHaul/device/Program.cs
@@ -82,7 +82,7 @@ private static async Task Main(string[] args)
{
try
{
- await iotHub.InitializeAsync().ConfigureAwait(false);
+ await iotHub.InitializeAsync(cts.Token).ConfigureAwait(false);
break;
}
catch (Exception ex)
@@ -110,6 +110,7 @@ await Task
.ConfigureAwait(false);
}
catch (OperationCanceledException) { } // user signalled an exit
+ catch (AggregateException ex) when (ex.InnerException is OperationCanceledException) { } // user signaled an exit
catch (Exception ex)
{
s_logger.Trace($"Device app failed with exception {ex}", TraceSeverity.Error);
diff --git a/e2e/LongHaul/device/SystemHealthTelemetry.cs b/e2e/LongHaul/device/SystemHealthTelemetry.cs
index 75899747fe..f81836aaf1 100644
--- a/e2e/LongHaul/device/SystemHealthTelemetry.cs
+++ b/e2e/LongHaul/device/SystemHealthTelemetry.cs
@@ -7,6 +7,7 @@
using System.Linq;
using System.Net.NetworkInformation;
using System.Text.Json.Serialization;
+using Newtonsoft.Json;
namespace Microsoft.Azure.Devices.LongHaul.Device
{
@@ -23,15 +24,19 @@ public SystemHealthTelemetry(int port)
TcpPortFilter.Add(port);
}
+ [JsonProperty("processCpuUsagePercent")]
[JsonPropertyName("processCpuUsagePercent")]
public double ProcessCpuUsagePercent { get; set; } = UpdateCpuUsage();
+ [JsonProperty("totalAssignedMemoryBytes")]
[JsonPropertyName("totalAssignedMemoryBytes")]
public long TotalAssignedMemoryBytes { get; set; } = s_currentProcess.WorkingSet64;
+ [JsonProperty("totalGCBytes")]
[JsonPropertyName("totalGCBytes")]
public long TotalGCBytes { get; set; } = GC.GetTotalMemory(false);
+ [JsonProperty("activeTcpConnections")]
[JsonPropertyName("activeTcpConnections")]
public long ActiveTcpConnections { get; set; } = UpdateTcpConnections();
diff --git a/e2e/LongHaul/device/SystemProperties.cs b/e2e/LongHaul/device/SystemProperties.cs
index 9f45adc04f..7316f8c6a2 100644
--- a/e2e/LongHaul/device/SystemProperties.cs
+++ b/e2e/LongHaul/device/SystemProperties.cs
@@ -3,18 +3,35 @@
using System.Runtime.InteropServices;
using System.Text.Json.Serialization;
+using Newtonsoft.Json;
namespace Microsoft.Azure.Devices.LongHaul.Device
{
internal class SystemProperties
{
+ [JsonProperty("systemArchitecture")]
[JsonPropertyName("systemArchitecture")]
public string SystemArchitecture { get; set; } = RuntimeInformation.OSArchitecture.ToString();
+ [JsonProperty("osVersion")]
[JsonPropertyName("osVersion")]
public string OsVersion { get; set; } = RuntimeInformation.OSDescription;
+ [JsonProperty("frameworkDescription")]
[JsonPropertyName("frameworkDescription")]
public string FrameworkDescription { get; set; } = RuntimeInformation.FrameworkDescription;
+
+ public override bool Equals(object obj)
+ {
+ return obj is SystemProperties other
+ && SystemArchitecture == other.SystemArchitecture
+ && OsVersion == other.OsVersion
+ && FrameworkDescription == other.FrameworkDescription;
+ }
+
+ public override int GetHashCode()
+ {
+ return SystemArchitecture.GetHashCode() ^ OsVersion.GetHashCode() ^ FrameworkDescription.GetHashCode();
+ }
}
}
diff --git a/e2e/LongHaul/device/TelemetryBase.cs b/e2e/LongHaul/device/TelemetryBase.cs
index 6ee94d98de..53b61ffe4b 100644
--- a/e2e/LongHaul/device/TelemetryBase.cs
+++ b/e2e/LongHaul/device/TelemetryBase.cs
@@ -3,6 +3,7 @@
using System;
using System.Text.Json.Serialization;
+using Newtonsoft.Json;
namespace Microsoft.Azure.Devices.LongHaul.Device
{
@@ -11,6 +12,7 @@ internal abstract class TelemetryBase
///
/// The date/time the event occurred, in UTC.
///
+ [JsonProperty("eventDateTimeUtc")]
[JsonPropertyName("eventDateTimeUtc")]
public DateTime? EventDateTimeUtc { get; set; } = DateTime.UtcNow;
}
diff --git a/e2e/LongHaul/module/IotHub.cs b/e2e/LongHaul/module/IotHub.cs
index ba2fd7f596..b46b198668 100644
--- a/e2e/LongHaul/module/IotHub.cs
+++ b/e2e/LongHaul/module/IotHub.cs
@@ -14,6 +14,7 @@ internal sealed class IotHub : IAsyncDisposable
private readonly string _moduleConnectionString;
private readonly IotHubClientOptions _clientOptions;
private readonly Logger _logger;
+ private CancellationToken _ct;
private SemaphoreSlim _lifetimeControl = new(1, 1);
@@ -56,8 +57,12 @@ public IotHub(Logger logger, Parameters parameters)
///
/// Initializes the connection to IoT Hub.
///
- public async Task InitializeAsync()
+ public async Task InitializeAsync(CancellationToken? ct = null)
{
+ if (ct != null)
+ {
+ _ct = ct.Value;
+ }
await _lifetimeControl.WaitAsync().ConfigureAwait(false);
var sw = new Stopwatch();
@@ -175,25 +180,13 @@ public async Task ReportReadOnlyPropertiesAsync(Logger logger, CancellationToken
var sw = new Stopwatch();
while (!ct.IsCancellationRequested)
{
- try
- {
- var reported = new ReportedProperties
- {
- { "TotalTelemetryMessagesSent", _totalTelemetryMessagesToModuleSent },
- };
-
- sw.Restart();
- await _moduleClient.UpdateReportedPropertiesAsync(reported, ct).ConfigureAwait(false);
- sw.Stop();
+ sw.Restart();
+ await SetPropertiesAsync("totalTelemetryMessagesSent", _totalTelemetryMessagesToModuleSent, logger, ct).ConfigureAwait(false);
+ sw.Stop();
- ++_totalTwinUpdatesToModuleReported;
- logger.Metric(TotalTwinUpdatesToModuleReported, _totalTwinUpdatesToModuleReported);
- logger.Metric(ReportedTwinUpdateToModuleOperationSeconds, sw.Elapsed.TotalSeconds);
- }
- catch (Exception ex)
- {
- logger.Trace($"Exception when reporting properties when connected is {IsConnected}\n{ex}");
- }
+ ++_totalTwinUpdatesToModuleReported;
+ logger.Metric(TotalTwinUpdatesToModuleReported, _totalTwinUpdatesToModuleReported);
+ logger.Metric(ReportedTwinUpdateToModuleOperationSeconds, sw.Elapsed.TotalSeconds);
await Task.Delay(s_deviceTwinUpdateInterval, ct).ConfigureAwait(false);
}
@@ -249,11 +242,11 @@ public async Task SetPropertiesAsync(string keyName, object properties, Logger l
{
try
{
- await _moduleClient
+ long version = await _moduleClient
.UpdateReportedPropertiesAsync(reportedProperties, ct)
- .ConfigureAwait(false);
+ .ConfigureAwait(false);
- logger.Trace($"Set the reported property with name [{keyName}] in device twin.", TraceSeverity.Information);
+ logger.Trace($"Set the reported property with key {keyName} and value {properties} in device twin; observed version {version}.", TraceSeverity.Information);
break;
}
catch (Exception ex)
@@ -333,7 +326,19 @@ private async void ConnectionStatusChangesHandlerAsync(ConnectionStatusInfo conn
{
case RecommendedAction.OpenConnection:
_logger.Trace($"Following recommended action of reinitializing the client.", TraceSeverity.Information);
- await InitializeAsync().ConfigureAwait(false);
+ while (!_ct.IsCancellationRequested)
+ {
+ try
+ {
+ await InitializeAsync().ConfigureAwait(false);
+ break;
+ }
+ catch (Exception ex)
+ {
+ _logger.Trace($"Exception re-initializing client\n{ex}", TraceSeverity.Warning);
+ await Task.Delay(s_retryInterval, _ct).ConfigureAwait(false);
+ }
+ }
break;
case RecommendedAction.PerformNormally:
@@ -403,7 +408,8 @@ private async Task DesiredPropertyUpdateCallbackAsync(DesiredProperties properti
if (reported.Any())
{
- await _moduleClient.UpdateReportedPropertiesAsync(reported).ConfigureAwait(false);
+ long version = await _moduleClient.UpdateReportedPropertiesAsync(reported).ConfigureAwait(false);
+ _logger.Trace($"Updated {reported.Count()} properties and observed new version {version}.", TraceSeverity.Information);
_totalDesiredPropertiesToModuleHandled += reported.Count();
_logger.Metric(TotalDesiredPropertiesToModuleHandled, _totalDesiredPropertiesToModuleHandled);
diff --git a/e2e/LongHaul/module/Program.cs b/e2e/LongHaul/module/Program.cs
index b3ce7974ef..938605cb8a 100644
--- a/e2e/LongHaul/module/Program.cs
+++ b/e2e/LongHaul/module/Program.cs
@@ -69,7 +69,7 @@ private static async Task Main(string[] args)
{
try
{
- await iotHub.InitializeAsync().ConfigureAwait(false);
+ await iotHub.InitializeAsync(cts.Token).ConfigureAwait(false);
break;
}
catch (Exception ex)
@@ -96,6 +96,7 @@ await Task
.ConfigureAwait(false);
}
catch (TaskCanceledException) { } // user signalled an exit
+ catch (AggregateException ex) when (ex.InnerException is OperationCanceledException) { } // user signaled an exit
catch (Exception ex)
{
s_logger.Trace($"Device app failed with exception {ex}", TraceSeverity.Error);
diff --git a/e2e/LongHaul/service/IotHub.cs b/e2e/LongHaul/service/IotHub.cs
index 82578421fd..f3a23320a2 100644
--- a/e2e/LongHaul/service/IotHub.cs
+++ b/e2e/LongHaul/service/IotHub.cs
@@ -68,125 +68,129 @@ public async Task MonitorConnectedDevicesAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
- AsyncPageable allDevices = s_serviceClient.Query.Create(
- "SELECT deviceId, connectionState FROM devices where is_defined(properties.reported.runId)",
- ct);
-
- AsyncPageable allModules = s_serviceClient.Query.Create(
- "SELECT deviceId, moduleId, connectionState FROM devices.modules where is_defined(properties.reported.runId)",
- ct);
-
- await foreach (ClientTwin device in allDevices)
+ try
{
- string deviceId = device.DeviceId;
+ AsyncPageable allDeviceTwins = s_serviceClient.Query.Create(
+ "SELECT deviceId, connectionState FROM devices where is_defined(properties.reported.runId)",
+ ct);
- if (s_onlineDeviceOperations.ContainsKey(deviceId)
- && device.ConnectionState is ClientConnectionState.Disconnected)
- {
- CancellationTokenSource source = s_onlineDeviceOperations[deviceId].Item2;
- // Signal cancellation to all tasks on the particular device.
- source.Cancel();
- // Dispose the cancellation token source.
- source.Dispose();
- // Remove the correlated device operations and cancellation token source of the particular device from the dictionary.
- s_onlineDeviceOperations.TryRemove(deviceId, out _);
- }
- else if (!s_onlineDeviceOperations.ContainsKey(deviceId)
- && device.ConnectionState is ClientConnectionState.Connected)
+ await foreach (ClientTwin deviceTwin in allDeviceTwins)
{
- // For each online device, initiate a new cancellation token source.
- // Once the device goes offline, cancel all operations on this device.
- var source = new CancellationTokenSource();
- CancellationToken token = source.Token;
+ string deviceId = deviceTwin.DeviceId;
- async Task Operations()
+ if (s_onlineDeviceOperations.ContainsKey(deviceId)
+ && deviceTwin.ConnectionState is ClientConnectionState.Disconnected)
{
- var deviceOperations = new DeviceOperations(s_serviceClient, deviceId, _logger.Clone());
- _logger.Trace($"Creating {nameof(DeviceOperations)} on the device [{deviceId}]", TraceSeverity.Verbose);
-
- Logger loggerPerDevice = _logger.Clone();
- loggerPerDevice.LoggerContext.Add(DeviceId, deviceId);
+ CancellationTokenSource source = s_onlineDeviceOperations[deviceId].Item2;
+ // Signal cancellation to all tasks on the particular device.
+ source.Cancel();
+ // Dispose the cancellation token source.
+ source.Dispose();
+ // Remove the correlated device operations and cancellation token source of the particular device from the dictionary.
+ s_onlineDeviceOperations.TryRemove(deviceId, out _);
+ }
+ else if (!s_onlineDeviceOperations.ContainsKey(deviceId)
+ && deviceTwin.ConnectionState is ClientConnectionState.Connected)
+ {
+ // For each online device, initiate a new cancellation token source.
+ // Once the device goes offline, cancel all operations on this device.
+ var source = new CancellationTokenSource();
+ CancellationToken token = source.Token;
- try
+ async Task OperateWithDeviceAsync()
{
- await Task
- .WhenAll(
- deviceOperations.InvokeDirectMethodAsync(loggerPerDevice.Clone(), token),
- deviceOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), loggerPerDevice.Clone(), token),
- deviceOperations.SendC2dMessagesAsync(loggerPerDevice.Clone(), token))
- .ConfigureAwait(false);
+ Logger deviceLogger = _logger.Clone();
+ deviceLogger.LoggerContext.Add(DeviceId, deviceId);
+ var deviceOperations = new DeviceOperations(s_serviceClient, deviceId, deviceLogger);
+ _logger.Trace($"Creating {nameof(DeviceOperations)} on the device [{deviceId}]", TraceSeverity.Verbose);
+
+ try
+ {
+ await Task
+ .WhenAll(
+ deviceOperations.InvokeDirectMethodAsync(deviceLogger.Clone(), token),
+ deviceOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), deviceLogger.Clone(), token),
+ deviceOperations.SendC2dMessagesAsync(deviceLogger.Clone(), token))
+ .ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ _logger.Trace($"Operations on [{deviceId}] have been canceled as the device goes offline.", TraceSeverity.Information);
+ }
+ catch (Exception ex)
+ {
+ _logger.Trace($"Service app failed with exception {ex}", TraceSeverity.Error);
+ }
}
- catch (OperationCanceledException)
- {
- _logger.Trace($"Operations on [{deviceId}] have been canceled as the device goes offline.", TraceSeverity.Information);
- }
- catch (Exception ex)
- {
- _logger.Trace($"Service app failed with exception {ex}", TraceSeverity.Error);
- }
- }
- // Passing in "Operations()" as Task so we don't need to manually call "Invoke()" on it.
- var operationsTuple = new Tuple(Operations(), source);
- s_onlineDeviceOperations.TryAdd(deviceId, operationsTuple);
+ // Passing in "Operations()" as Task so we don't need to manually call "Invoke()" on it.
+ var operationsTuple = new Tuple(OperateWithDeviceAsync(), source);
+ s_onlineDeviceOperations.TryAdd(deviceId, operationsTuple);
+ }
}
- }
- await foreach (ClientTwin module in allModules)
- {
- string moduleId = module.DeviceId + "_" + module.ModuleId;
- if (s_onlineModuleOperations.ContainsKey(moduleId)
- && module.ConnectionState is ClientConnectionState.Disconnected)
- {
- CancellationTokenSource source = s_onlineModuleOperations[moduleId].Item2;
- // Signal cancellation to all tasks on the particular module.
- source.Cancel();
- // Dispose the cancellation token source.
- source.Dispose();
- // Remove the correlated module operations and cancellation token source of the particular module from the dictionary.
- s_onlineModuleOperations.TryRemove(moduleId, out _);
- }
- else if (!s_onlineModuleOperations.ContainsKey(moduleId)
- && module.ConnectionState is ClientConnectionState.Connected)
- {
- // For each online module, initiate a new cancellation token source.
- // Once the module goes offline, cancel all operations on this module.
- var source = new CancellationTokenSource();
- CancellationToken token = source.Token;
+ AsyncPageable allModuleTwins = s_serviceClient.Query.Create(
+ "SELECT deviceId, moduleId, connectionState FROM devices.modules where is_defined(properties.reported.runId)",
+ ct);
- async Task Operations()
+ await foreach (ClientTwin moduleTwin in allModuleTwins)
+ {
+ string moduleId = $"{moduleTwin.DeviceId}/{moduleTwin.ModuleId}";
+ if (s_onlineModuleOperations.ContainsKey(moduleId)
+ && moduleTwin.ConnectionState is ClientConnectionState.Disconnected)
{
- var moduleOperations = new ModuleOperations(s_serviceClient, module.DeviceId, module.ModuleId, _logger.Clone());
- _logger.Trace($"Creating {nameof(ModuleOperations)} on the device: [{module.DeviceId}], module: [{module.ModuleId}]", TraceSeverity.Verbose);
+ CancellationTokenSource source = s_onlineModuleOperations[moduleId].Item2;
+ // Signal cancellation to all tasks on the particular module.
+ source.Cancel();
+ // Dispose the cancellation token source.
+ source.Dispose();
+ // Remove the correlated module operations and cancellation token source of the particular module from the dictionary.
+ s_onlineModuleOperations.TryRemove(moduleId, out _);
+ }
+ else if (!s_onlineModuleOperations.ContainsKey(moduleId)
+ && moduleTwin.ConnectionState is ClientConnectionState.Connected)
+ {
+ // For each online module, initiate a new cancellation token source.
+ // Once the module goes offline, cancel all operations on this module.
+ var source = new CancellationTokenSource();
+ CancellationToken token = source.Token;
- try
- {
- await Task
- .WhenAll(
- moduleOperations.InvokeDirectMethodAsync(_logger.Clone(), token),
- moduleOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), _logger.Clone(), token))
- .ConfigureAwait(false);
- }
- catch (OperationCanceledException)
+ async Task OperateWithModuleAsync()
{
- _logger.Trace($"Operations on device: [{module.DeviceId}], module: [{module.ModuleId}] have been canceled as the device goes offline.", TraceSeverity.Information);
+ var moduleOperations = new ModuleOperations(s_serviceClient, moduleTwin.DeviceId, moduleTwin.ModuleId, _logger.Clone());
+ _logger.Trace($"Creating {nameof(ModuleOperations)} on the device: [{moduleTwin.DeviceId}], module: [{moduleTwin.ModuleId}]", TraceSeverity.Verbose);
+
+ try
+ {
+ await Task
+ .WhenAll(
+ moduleOperations.InvokeDirectMethodAsync(_logger.Clone(), token),
+ moduleOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), _logger.Clone(), token))
+ .ConfigureAwait(false);
+ }
+ catch (OperationCanceledException)
+ {
+ _logger.Trace($"Operations on device: [{moduleTwin.DeviceId}], module: [{moduleTwin.ModuleId}] have been canceled as the device goes offline.", TraceSeverity.Information);
+ }
+ catch (Exception ex)
+ {
+ _logger.Trace($"Service app failed with exception {ex}", TraceSeverity.Error);
+ }
}
- catch (Exception ex)
- {
- _logger.Trace($"Service app failed with exception {ex}", TraceSeverity.Error);
- }
- }
- // Passing in "Operations()" as Task so we don't need to manually call "Invoke()" on it.
- var operationsTuple = new Tuple(Operations(), source);
- s_onlineModuleOperations.TryAdd(moduleId, operationsTuple);
+ // Passing in "Operations()" as Task so we don't need to manually call "Invoke()" on it.
+ var operationsTuple = new Tuple(OperateWithModuleAsync(), source);
+ s_onlineModuleOperations.TryAdd(moduleId, operationsTuple);
+ }
}
- }
- _logger.Trace($"Total number of connected devices: {s_onlineDeviceOperations.Count}", TraceSeverity.Information);
- _logger.Trace($"Total number of connected modules: {s_onlineModuleOperations.Count}", TraceSeverity.Information);
- _logger.Metric(TotalOnlineDevicesCount, s_onlineDeviceOperations.Count);
- _logger.Metric(TotalOnlineModulesCount, s_onlineModuleOperations.Count);
+ _logger.Metric(TotalOnlineDevicesCount, s_onlineDeviceOperations.Count);
+ _logger.Metric(TotalOnlineModulesCount, s_onlineModuleOperations.Count);
+ }
+ catch (Exception ex)
+ {
+ _logger.Trace($"Exception querying devices and modules\n{ex}", TraceSeverity.Warning);
+ }
await Task.Delay(s_deviceCountMonitorInterval, ct).ConfigureAwait(false);
}
diff --git a/iothub/device/src/Pipeline/AmqpTransportHandler.cs b/iothub/device/src/Pipeline/AmqpTransportHandler.cs
index dc6b8ffcb6..6428364535 100644
--- a/iothub/device/src/Pipeline/AmqpTransportHandler.cs
+++ b/iothub/device/src/Pipeline/AmqpTransportHandler.cs
@@ -22,8 +22,7 @@ internal class AmqpTransportHandler : TransportHandlerBase
protected AmqpUnit _amqpUnit;
private readonly Action _onDesiredStatePatchListener;
private readonly object _lock = new();
- private readonly ConcurrentDictionary> _twinResponseCompletions = new();
- private readonly ConcurrentDictionary _twinResponseTimeouts = new();
+ private readonly ConcurrentDictionary _pendingTwinOperations = new();
// Timer to check if any expired messages exist. The timer is executed after each hour of execution.
private readonly Timer _twinTimeoutTimer;
@@ -32,7 +31,6 @@ internal class AmqpTransportHandler : TransportHandlerBase
internal IotHubConnectionCredentials _connectionCredentials;
- private static readonly TimeSpan s_twinResponseTimeout = TimeSpan.FromMinutes(60);
private bool _closed;
static AmqpTransportHandler()
@@ -81,6 +79,9 @@ internal AmqpTransportHandler(PipelineContext context, IDelegatingHandler nextHa
private void OnDisconnected()
{
+ if (Logging.IsEnabled)
+ Logging.Info(this, $"AMQP connection was lost", nameof(OnDisconnected));
+
if (!_closed)
{
lock (_lock)
@@ -88,6 +89,10 @@ private void OnDisconnected()
if (!_closed)
{
OnTransportDisconnected();
+
+ // During a disconnection, any pending twin updates won't be received, so we'll preemptively
+ // cancel these operations so the client can retry once reconnected.
+ RemoveOldOperations(TimeSpan.Zero);
}
}
}
@@ -138,8 +143,8 @@ public override async Task OpenAsync(CancellationToken cancellationToken)
{
await _amqpUnit.OpenAsync(cancellationToken).ConfigureAwait(false);
- // The timer would invoke callback after every hour.
- _twinTimeoutTimer.Change(s_twinResponseTimeout, s_twinResponseTimeout);
+ // The timer would invoke callback in the specified time and that duration thereafter.
+ _twinTimeoutTimer.Change(TwinResponseTimeout, TwinResponseTimeout);
}
finally
{
@@ -442,8 +447,7 @@ private async Task RoundTripTwinMessageAsync(
{
cancellationToken.ThrowIfCancellationRequested();
var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- _twinResponseCompletions[correlationId] = taskCompletionSource;
- _twinResponseTimeouts[correlationId] = DateTimeOffset.UtcNow;
+ _pendingTwinOperations[correlationId] = new PendingAmqpTwinOperation(taskCompletionSource);
await _amqpUnit.SendTwinMessageAsync(amqpTwinMessageType, correlationId, reportedProperties, cancellationToken).ConfigureAwait(false);
@@ -460,8 +464,7 @@ private async Task RoundTripTwinMessageAsync(
}
finally
{
- _twinResponseCompletions.TryRemove(correlationId, out _);
- _twinResponseTimeouts.TryRemove(correlationId, out _);
+ _pendingTwinOperations.TryRemove(correlationId, out _);
if (Logging.IsEnabled)
Logging.Exit(this, cancellationToken, nameof(RoundTripTwinMessageAsync));
@@ -492,43 +495,60 @@ private void TwinMessageListener(AmqpMessage responseFromService, string correla
Logging.Info(this, $"Received a response for operation with correlation Id {correlationId}.", nameof(TwinMessageListener));
// For Get and Patch, complete the task.
- if (_twinResponseCompletions.TryRemove(correlationId, out TaskCompletionSource task))
+ if (_pendingTwinOperations.TryRemove(correlationId, out PendingAmqpTwinOperation pendingOperation))
{
if (ex == default)
{
- task.TrySetResult(responseFromService);
+ pendingOperation.CompletionTask.TrySetResult(responseFromService);
}
else
{
- task.TrySetException(ex);
+ pendingOperation.CompletionTask.TrySetException(ex);
}
}
else
{
// This can happen if we received a message from service with correlation Id that was not set by SDK or does not exist in dictionary.
if (Logging.IsEnabled)
- Logging.Info("Could not remove correlation Id to complete the task awaiter for a twin operation.", nameof(TwinMessageListener));
+ Logging.Info(this, "Could not remove correlation Id to complete the task awaiter for a twin operation.", nameof(TwinMessageListener));
}
}
else if (correlationId.StartsWith(AmqpTwinMessageType.Put.ToString(), StringComparison.OrdinalIgnoreCase))
{
// This is an acknowledgement received from service for subscribing to desired property updates
if (Logging.IsEnabled)
- Logging.Info("Subscribed for twin desired property updates successfully", nameof(TwinMessageListener));
+ Logging.Info(this, "Subscribed for twin desired property updates successfully", nameof(TwinMessageListener));
}
}
}
- private void RemoveOldOperations(object _)
+ private void RemoveOldOperations(object state)
{
- _ = _twinResponseTimeouts
- .Where(x => DateTimeOffset.UtcNow - x.Value > s_twinResponseTimeout)
+ if (state is not TimeSpan maxAge)
+ {
+ maxAge = TwinResponseTimeout;
+ }
+
+ if (Logging.IsEnabled)
+ Logging.Info(this, $"Removing operations older than {maxAge}", nameof(RemoveOldOperations));
+
+ int canceledOperations = _pendingTwinOperations
+ .Where(x => DateTimeOffset.UtcNow - x.Value.RequestSentOnUtc > maxAge)
.Select(x =>
{
- _twinResponseCompletions.TryRemove(x.Key, out TaskCompletionSource _);
- _twinResponseTimeouts.TryRemove(x.Key, out DateTimeOffset _);
+ if (_pendingTwinOperations.TryRemove(x.Key, out PendingAmqpTwinOperation pendingOperation))
+ {
+ if (Logging.IsEnabled)
+ Logging.Error(this, $"Removing twin response for {x.Key}", nameof(RemoveOldOperations));
+
+ pendingOperation.CompletionTask.TrySetException(new IotHubClientException("Did not receive twin response from service.", IotHubClientErrorCode.NetworkErrors));
+ }
return true;
- });
+ })
+ .Count();
+
+ if (Logging.IsEnabled)
+ Logging.Error(this, $"Removed {canceledOperations} twin responses", nameof(RemoveOldOperations));
}
protected private override void Dispose(bool disposing)
diff --git a/iothub/device/src/Pipeline/MqttTransportHandler.cs b/iothub/device/src/Pipeline/MqttTransportHandler.cs
index f0d300aae9..f7a5e81f3d 100644
--- a/iothub/device/src/Pipeline/MqttTransportHandler.cs
+++ b/iothub/device/src/Pipeline/MqttTransportHandler.cs
@@ -97,17 +97,12 @@ internal sealed class MqttTransportHandler : TransportHandlerBase, IDisposable
private readonly Action _onDesiredStatePatchListener;
private readonly Func> _messageReceivedListener;
- private readonly ConcurrentDictionary> _getTwinResponseCompletions = new();
- private readonly ConcurrentDictionary> _reportedPropertyUpdateResponseCompletions = new();
-
- private readonly ConcurrentDictionary _twinResponseTimeouts = new();
-
- private bool _isSubscribedToTwinResponses;
+ private readonly ConcurrentDictionary _pendingTwinOperations = new();
// Timer to check if any expired messages exist. The timer is executed after each hour of execution.
private readonly Timer _twinTimeoutTimer;
- private static TimeSpan s_twinResponseTimeout = TimeSpan.FromMinutes(60);
+ private bool _isSubscribedToTwinResponses;
private readonly string _deviceId;
private readonly string _moduleId;
@@ -315,8 +310,8 @@ public override async Task OpenAsync(CancellationToken cancellationToken)
_mqttClient.DisconnectedAsync += HandleDisconnectionAsync;
_mqttClient.ApplicationMessageReceivedAsync += HandleReceivedMessageAsync;
- // The timer would invoke callback after every hour.
- _twinTimeoutTimer.Change(s_twinResponseTimeout, s_twinResponseTimeout);
+ // The timer would invoke callback in the specified time and that duration thereafter.
+ _twinTimeoutTimer.Change(TwinResponseTimeout, TwinResponseTimeout);
}
catch (MqttConnectingFailedException ex)
{
@@ -707,8 +702,7 @@ public override async Task GetTwinAsync(CancellationToken cancel
// Note the request as "in progress" before actually sending it so that no matter how quickly the service
// responds, this layer can correlate the request.
var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- _getTwinResponseCompletions[requestId] = taskCompletionSource;
- _twinResponseTimeouts[requestId] = DateTimeOffset.UtcNow;
+ _pendingTwinOperations[requestId] = new PendingMqttTwinOperation(taskCompletionSource);
MqttClientPublishResult result = await _mqttClient.PublishAsync(mqttMessage, cancellationToken).ConfigureAwait(false);
@@ -720,13 +714,13 @@ public override async Task GetTwinAsync(CancellationToken cancel
}
if (Logging.IsEnabled)
- Logging.Info($"Sent get twin request. Waiting on service response with request id {requestId}");
+ Logging.Info(this, $"Sent get twin request. Waiting on service response with request id {requestId}", nameof(GetTwinAsync));
// Wait until IoT hub sends a message to this client with the response to this patch twin request.
GetTwinResponse getTwinResponse = await taskCompletionSource.WaitAsync(cancellationToken).ConfigureAwait(false);
if (Logging.IsEnabled)
- Logging.Info(this, $"Received get twin response for request id {requestId} with status {getTwinResponse.Status}.");
+ Logging.Info(this, $"Received get twin response for request id {requestId} with status {getTwinResponse.Status}.", nameof(GetTwinAsync));
if (getTwinResponse.Status != 200)
{
@@ -776,8 +770,7 @@ public override async Task GetTwinAsync(CancellationToken cancel
finally
{
// No matter what, remove the requestId from this dictionary since no thread will be waiting for it anymore
- _getTwinResponseCompletions.TryRemove(requestId, out _);
- _twinResponseTimeouts.TryRemove(requestId, out _);
+ _pendingTwinOperations.TryRemove(requestId, out _);
}
}
finally
@@ -816,8 +809,7 @@ public override async Task UpdateReportedPropertiesAsync(ReportedPropertie
// Note the request as "in progress" before actually sending it so that no matter how quickly the service
// responds, this layer can correlate the request.
var taskCompletionSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- _reportedPropertyUpdateResponseCompletions[requestId] = taskCompletionSource;
- _twinResponseTimeouts[requestId] = DateTimeOffset.UtcNow;
+ _pendingTwinOperations[requestId] = new PendingMqttTwinOperation(taskCompletionSource);
MqttClientPublishResult result = await _mqttClient.PublishAsync(mqttMessage, cancellationToken).ConfigureAwait(false);
@@ -829,13 +821,13 @@ public override async Task UpdateReportedPropertiesAsync(ReportedPropertie
}
if (Logging.IsEnabled)
- Logging.Info(this, $"Sent twin patch request with request id {requestId}. Now waiting for the service response.");
+ Logging.Info(this, $"Sent twin patch request with request id {requestId}. Now waiting for the service response.", nameof(UpdateReportedPropertiesAsync));
// Wait until IoT hub sends a message to this client with the response to this patch twin request.
PatchTwinResponse patchTwinResponse = await taskCompletionSource.WaitAsync(cancellationToken).ConfigureAwait(false);
if (Logging.IsEnabled)
- Logging.Info(this, $"Received twin patch response for request id {requestId} with status {patchTwinResponse.Status}.");
+ Logging.Info(this, $"Received twin patch response for request id {requestId} with status {patchTwinResponse.Status}.", nameof(UpdateReportedPropertiesAsync));
if (patchTwinResponse.Status != 204)
{
@@ -885,8 +877,7 @@ public override async Task UpdateReportedPropertiesAsync(ReportedPropertie
finally
{
// No matter what, remove the requestId from this dictionary since no thread will be waiting for it anymore
- _reportedPropertyUpdateResponseCompletions.TryRemove(requestId, out TaskCompletionSource _);
- _twinResponseTimeouts.TryRemove(requestId, out DateTimeOffset _);
+ _pendingTwinOperations.TryRemove(requestId, out _);
}
}
finally
@@ -1049,11 +1040,15 @@ private async Task UnsubscribeAsync(string topic, CancellationToken cancellation
private Task HandleDisconnectionAsync(MqttClientDisconnectedEventArgs disconnectedEventArgs)
{
if (Logging.IsEnabled)
- Logging.Info($"MQTT connection was lost {disconnectedEventArgs.Exception}");
+ Logging.Info(this, $"MQTT connection was lost {disconnectedEventArgs.Exception}", nameof(HandleDisconnectionAsync));
if (disconnectedEventArgs.ClientWasConnected)
{
OnTransportDisconnected();
+
+ // During a disconnection, any pending twin updates won't be received, so we'll preemptively
+ // cancel these operations so the client can retry once reconnected.
+ RemoveOldOperations(TimeSpan.Zero);
}
return Task.CompletedTask;
@@ -1181,120 +1176,100 @@ private void HandleTwinResponse(MqttApplicationMessageReceivedEventArgs received
{
byte[] payloadBytes = receivedEventArgs.ApplicationMessage.Payload ?? Array.Empty();
- if (_getTwinResponseCompletions.TryRemove(receivedRequestId, out TaskCompletionSource getTwinCompletion))
+ if (_pendingTwinOperations.TryRemove(receivedRequestId, out PendingMqttTwinOperation twinOperation))
{
if (Logging.IsEnabled)
- Logging.Info(this, $"Received response to get twin request with request id {receivedRequestId}.");
+ Logging.Info(this, $"Received response to patch twin request with request id {receivedRequestId}.", nameof(HandleTwinResponse));
- if (status != 200)
+ IotHubClientErrorResponseMessage ParseError(byte[] payloadBytes)
{
- IotHubClientErrorResponseMessage errorResponse = null;
-
// This will only ever contain an error message which is encoded based on service contract (UTF-8).
- if (payloadBytes.Length > 0)
+ if (payloadBytes.Length == 0)
{
- string errorResponseString = Encoding.UTF8.GetString(payloadBytes);
- try
- {
- errorResponse = DefaultPayloadConvention.Instance.GetObject(errorResponseString);
- }
- catch (JsonException ex)
- {
- if (Logging.IsEnabled)
- Logging.Error(this, $"Failed to parse twin patch error response JSON. Message body: '{errorResponseString}'. Exception: {ex}. ");
-
- errorResponse = new IotHubClientErrorResponseMessage
- {
- Message = errorResponseString,
- };
- }
+ return null;
}
- // This received message is in response to an update reported properties request.
- var getTwinResponse = new GetTwinResponse
- {
- Status = status,
- ErrorResponseMessage = errorResponse,
- };
-
- getTwinCompletion.TrySetResult(getTwinResponse);
- }
- else
- {
+ string errorResponseString = Encoding.UTF8.GetString(payloadBytes);
try
{
- // Use the encoder that has been agreed to between the client and service to decode the byte[] response
- // The response is deserialized into an SDK-defined type based on service-defined NewtonSoft.Json-based json property name.
- // For this reason, we use NewtonSoft Json serializer for this deserialization.
- TwinDocument clientTwinProperties = DefaultPayloadConvention.Instance.GetObject(payloadBytes);
-
- var twinDesiredProperties = new DesiredProperties(clientTwinProperties.Desired)
- {
- PayloadConvention = _payloadConvention,
- };
-
- var twinReportedProperties = new ReportedProperties(clientTwinProperties.Reported, true)
- {
- PayloadConvention = _payloadConvention,
- };
-
- var getTwinResponse = new GetTwinResponse
- {
- Status = status,
- Twin = new TwinProperties(twinDesiredProperties, twinReportedProperties),
- };
-
- getTwinCompletion.TrySetResult(getTwinResponse);
+ return DefaultPayloadConvention.Instance.GetObject(errorResponseString);
}
- catch (JsonReaderException ex)
+ catch (JsonException ex)
{
if (Logging.IsEnabled)
- Logging.Error(this, $"Failed to parse Twin JSON. Message body: '{Encoding.UTF8.GetString(payloadBytes)}'. Exception: {ex}.");
+ Logging.Error(this, $"Failed to parse twin patch error response JSON. Message body: '{errorResponseString}'. Exception: {ex}. ", nameof(HandleTwinResponse));
- getTwinCompletion.TrySetException(ex);
+ return new IotHubClientErrorResponseMessage
+ {
+ Message = errorResponseString,
+ };
}
}
- }
- else if (_reportedPropertyUpdateResponseCompletions.TryRemove(receivedRequestId, out TaskCompletionSource patchTwinCompletion))
- {
- if (Logging.IsEnabled)
- Logging.Info(this, $"Received response to patch twin request with request id {receivedRequestId}.");
- IotHubClientErrorResponseMessage errorResponse = null;
+ if (twinOperation.TwinPatchTask != null)
+ {
+ IotHubClientErrorResponseMessage error = status == 204
+ ? null
+ : ParseError(payloadBytes);
+
+ // This received message is in response to an update reported properties request.
+ var patchTwinResponse = new PatchTwinResponse
+ {
+ Status = status,
+ Version = version,
+ ErrorResponseMessage = error,
+ };
- // This will only ever contain an error message which is encoded based on service contract (UTF-8).
- if (payloadBytes.Length > 0)
+ twinOperation.TwinPatchTask.TrySetResult(patchTwinResponse);
+ }
+ else // should be a "get twin" operation
{
- string errorResponseString = Encoding.UTF8.GetString(payloadBytes);
- try
+ var getTwinResponse = new GetTwinResponse
+ {
+ Status = status,
+ };
+
+ if (status != 200)
{
- errorResponse = DefaultPayloadConvention.Instance.GetObject(errorResponseString);
+ getTwinResponse.ErrorResponseMessage = ParseError(payloadBytes);
+ twinOperation.TwinResponseTask.TrySetResult(getTwinResponse);
}
- catch (JsonException ex)
+ else
{
- if (Logging.IsEnabled)
- Logging.Error(this, $"Failed to parse twin patch error response JSON. Message body: '{errorResponseString}'. Exception: {ex}. ");
-
- errorResponse = new IotHubClientErrorResponseMessage
+ try
{
- Message = errorResponseString,
- };
+ // Use the encoder that has been agreed to between the client and service to decode the byte[] response
+ // The response is deserialized into an SDK-defined type based on service-defined NewtonSoft.Json-based json property name.
+ // For this reason, we use Newtonsoft.Json serializer for this deserialization.
+ TwinDocument clientTwinProperties = status == 200
+ ? DefaultPayloadConvention.Instance.GetObject(payloadBytes)
+ : null;
+
+ getTwinResponse.Twin = new TwinProperties(
+ new DesiredProperties(clientTwinProperties.Desired)
+ {
+ PayloadConvention = _payloadConvention,
+ },
+ new ReportedProperties(clientTwinProperties.Reported, true)
+ {
+ PayloadConvention = _payloadConvention,
+ });
+
+ twinOperation.TwinResponseTask.TrySetResult(getTwinResponse);
+ }
+ catch (JsonReaderException ex)
+ {
+ if (Logging.IsEnabled)
+ Logging.Error(this, $"Failed to parse Twin JSON. Message body: '{Encoding.UTF8.GetString(payloadBytes)}'. Exception: {ex}.", nameof(HandleTwinResponse));
+
+ twinOperation.TwinResponseTask.TrySetException(ex);
+ }
}
}
-
- // This received message is in response to an update reported properties request.
- var patchTwinResponse = new PatchTwinResponse
- {
- Status = status,
- Version = version,
- ErrorResponseMessage = errorResponse,
- };
-
- patchTwinCompletion.TrySetResult(patchTwinResponse);
}
else if (Logging.IsEnabled)
{
- Logging.Info(this, $"Received response to an unknown twin request with request id {receivedRequestId}. Discarding it.");
+ Logging.Info(this, $"Received response to an unknown twin request with request id {receivedRequestId}. Discarding it.", nameof(HandleTwinResponse));
}
}
}
@@ -1355,17 +1330,35 @@ private bool ParseResponseTopic(string topicName, out string rid, out int status
return true;
}
- private void RemoveOldOperations(object _)
+ private void RemoveOldOperations(object state)
{
- _ = _twinResponseTimeouts
- .Where(x => DateTimeOffset.UtcNow - x.Value > s_twinResponseTimeout)
+ if (state is not TimeSpan maxAge)
+ {
+ maxAge = TwinResponseTimeout;
+ }
+
+ if (Logging.IsEnabled)
+ Logging.Info(this, $"Removing operations older than {maxAge}", nameof(RemoveOldOperations));
+
+ const string exceptionMessage = "Did not receive twin response from service.";
+ int canceledOperations = _pendingTwinOperations
+ .Where(x => DateTimeOffset.UtcNow - x.Value.RequestSentOnUtc > maxAge)
.Select(x =>
{
- _getTwinResponseCompletions.TryRemove(x.Key, out TaskCompletionSource _);
- _reportedPropertyUpdateResponseCompletions.TryRemove(x.Key, out TaskCompletionSource _);
- _twinResponseTimeouts.TryRemove(x.Key, out DateTimeOffset _);
+ if (_pendingTwinOperations.TryRemove(x.Key, out PendingMqttTwinOperation pendingOperation))
+ {
+ if (Logging.IsEnabled)
+ Logging.Info(this, $"Removing twin response for {x.Key}", nameof(RemoveOldOperations));
+
+ pendingOperation.TwinResponseTask?.TrySetException(new IotHubClientException(exceptionMessage, IotHubClientErrorCode.NetworkErrors));
+ pendingOperation.TwinPatchTask?.TrySetException(new IotHubClientException(exceptionMessage, IotHubClientErrorCode.NetworkErrors));
+ }
return true;
- });
+ })
+ .Count();
+
+ if (Logging.IsEnabled)
+ Logging.Error(this, $"Removed {canceledOperations} twin responses", nameof(RemoveOldOperations));
}
private static void PopulateMessagePropertiesFromMqttMessage(IncomingMessage message, MqttApplicationMessage mqttMessage)
diff --git a/iothub/device/src/Pipeline/TransportHandlerBase.cs b/iothub/device/src/Pipeline/TransportHandlerBase.cs
index 7af8358f76..4736c7a576 100644
--- a/iothub/device/src/Pipeline/TransportHandlerBase.cs
+++ b/iothub/device/src/Pipeline/TransportHandlerBase.cs
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+using System;
using System.Threading.Tasks;
namespace Microsoft.Azure.Devices.Client.Transport
@@ -17,6 +18,15 @@ protected TransportHandlerBase(PipelineContext context, IDelegatingHandler nextH
{
}
+ ///
+ /// Twin operations are initiated one one link/topic and the response is received on another, where the
+ /// waiting client call is completed via a TaskCompletionSource linked to the request Id.
+ /// For various reasons, the device may never observe the response from the service and the client call will
+ /// sit indefinitely, however unlikely. In order to prevent an ever increasing dictionary, we'll occasionally
+ /// review these pending operations, and cancel/remove them from the dictionary.
+ ///
+ protected private static TimeSpan TwinResponseTimeout { get; } = TimeSpan.FromHours(1);
+
public override Task WaitForTransportClosedAsync()
{
_transportShouldRetry = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
diff --git a/iothub/device/src/Serialization/DefaultPayloadConvention.cs b/iothub/device/src/Serialization/DefaultPayloadConvention.cs
index 47fe77eaca..d31232e918 100644
--- a/iothub/device/src/Serialization/DefaultPayloadConvention.cs
+++ b/iothub/device/src/Serialization/DefaultPayloadConvention.cs
@@ -35,7 +35,7 @@ private DefaultPayloadConvention()
JsonConvert.DefaultSettings = () => s_settings;
}
- internal static Encoding Encoding { get; } = Encoding.UTF8;
+ internal static Encoding Encoding => Encoding.UTF8;
///
/// A static instance of this class.
diff --git a/iothub/device/src/Serialization/PayloadConvention.cs b/iothub/device/src/Serialization/PayloadConvention.cs
index 92089fcc3a..4ef5efc656 100644
--- a/iothub/device/src/Serialization/PayloadConvention.cs
+++ b/iothub/device/src/Serialization/PayloadConvention.cs
@@ -1,8 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
-using System.IO;
-
namespace Microsoft.Azure.Devices.Client
{
///
diff --git a/iothub/device/src/Serialization/SystemTestJsonPayloadConvention.cs b/iothub/device/src/Serialization/SystemTestJsonPayloadConvention.cs
index 0063c819bd..bc131cfd1a 100644
--- a/iothub/device/src/Serialization/SystemTestJsonPayloadConvention.cs
+++ b/iothub/device/src/Serialization/SystemTestJsonPayloadConvention.cs
@@ -14,20 +14,20 @@ namespace Microsoft.Azure.Devices.Client
///
public class SystemTextJsonPayloadConvention : PayloadConvention
{
- internal static readonly Encoding s_encoding = Encoding.UTF8;
-
private SystemTextJsonPayloadConvention() { }
///
/// A static instance of this class.
///
- public static readonly SystemTextJsonPayloadConvention Instance = new();
+ public static SystemTextJsonPayloadConvention Instance { get; } = new();
///
public override string ContentType => "application/json";
///
- public override string ContentEncoding => s_encoding.WebName;
+ public override string ContentEncoding => Encoding.WebName;
+
+ internal static Encoding Encoding => Encoding.UTF8;
///
public override byte[] GetObjectBytes(object objectToSendWithConvention)
diff --git a/iothub/device/src/Transport/Amqp/PendingAmqpTwinOperation.cs b/iothub/device/src/Transport/Amqp/PendingAmqpTwinOperation.cs
new file mode 100644
index 0000000000..5ade17d511
--- /dev/null
+++ b/iothub/device/src/Transport/Amqp/PendingAmqpTwinOperation.cs
@@ -0,0 +1,33 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+using System;
+using System.Threading.Tasks;
+using Microsoft.Azure.Amqp;
+
+namespace Microsoft.Azure.Devices.Client.Transport.Amqp
+{
+ ///
+ /// A class for holding the task completion source of a pending operation and the date/time when that operation
+ /// was initiated. Because PubSub behavior has us send a request on one sending link and receive a response later on another
+ /// receiving link, we need a way to identify the request, when it was sent, and the task completion source to complete the
+ /// waiting user's task.
+ ///
+ internal class PendingAmqpTwinOperation
+ {
+ public PendingAmqpTwinOperation(TaskCompletionSource completionTask)
+ {
+ CompletionTask = completionTask;
+ }
+
+ ///
+ /// The pending task to be signaled when complete.
+ ///
+ public TaskCompletionSource CompletionTask { get; }
+
+ ///
+ /// When the request was sent so we know when to time out older operations
+ ///
+ public DateTimeOffset RequestSentOnUtc { get; set; } = DateTimeOffset.UtcNow;
+ }
+}
diff --git a/iothub/device/src/Transport/Mqtt/PendingMqttTwinOperation.cs b/iothub/device/src/Transport/Mqtt/PendingMqttTwinOperation.cs
new file mode 100644
index 0000000000..e9ffcdd19a
--- /dev/null
+++ b/iothub/device/src/Transport/Mqtt/PendingMqttTwinOperation.cs
@@ -0,0 +1,54 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+using System;
+using System.Threading.Tasks;
+
+namespace Microsoft.Azure.Devices.Client.Transport.Mqtt
+{
+ ///
+ /// A class for holding the task completion source of a pending operation and the date/time when that operation
+ /// was initiated. Because PubSub behavior has us send a request on one topic and receive a response later on another
+ /// topic, we need a way to identify the request, when it was sent, and the task completion source to complete the
+ /// waiting user's task.
+ ///
+ internal class PendingMqttTwinOperation
+ {
+ ///
+ /// Constructor for get twin operations.
+ ///
+ public PendingMqttTwinOperation(TaskCompletionSource twinResponseTask)
+ {
+ TwinResponseTask = twinResponseTask;
+ }
+
+ ///
+ /// Constructor for patch twin operations.
+ ///
+ public PendingMqttTwinOperation(TaskCompletionSource twinPatchTask)
+ {
+ TwinPatchTask = twinPatchTask;
+ }
+
+ ///
+ /// The pending task for getting a twin to be signaled when complete.
+ ///
+ ///
+ /// Will be null if this if this class is not being used for get twin.
+ ///
+ public TaskCompletionSource TwinResponseTask { get; }
+
+ ///
+ /// The pending task for patching a twin to be signaled when complete.
+ ///
+ ///
+ /// Will be null if this if this class is not being used for patch twin.
+ ///
+ public TaskCompletionSource TwinPatchTask { get; }
+
+ ///
+ /// When the request was sent so we know when to time out older operations
+ ///
+ public DateTimeOffset RequestSentOnUtc { get; set; } = DateTimeOffset.UtcNow;
+ }
+}
diff --git a/iothub/device/tests/DefaultPayloadConventionTests.cs b/iothub/device/tests/DefaultPayloadConventionTests.cs
index 482704099e..e3c11b4f4b 100644
--- a/iothub/device/tests/DefaultPayloadConventionTests.cs
+++ b/iothub/device/tests/DefaultPayloadConventionTests.cs
@@ -1,6 +1,7 @@
// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+using System;
using FluentAssertions;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using Newtonsoft.Json;
@@ -15,6 +16,7 @@ namespace Microsoft.Azure.Devices.Client.Tests
[TestCategory("Unit")]
public class DefaultPayloadConventionTests
{
+ private static readonly DefaultPayloadConvention s_cut = DefaultPayloadConvention.Instance;
private static readonly string s_dateTimeString = "2023-01-31T10:37:08.4599400";
private static readonly string s_serializedPayloadString = "{\"time\":\"2023-01-31T10:37:08.4599400\"}";
@@ -38,12 +40,97 @@ public void DefaultPayloadConvention_DateTime_DeserializesProperly()
string jsonStr = $@"{{""time"":""{s_dateTimeString}""}}";
// act
- JObject payload = DefaultPayloadConvention.Instance.GetObject(jsonStr);
+ JObject payload = s_cut.GetObject(jsonStr);
//assert
payload.ToString(Formatting.None).Should().Be(s_serializedPayloadString);
}
+ [TestMethod]
+ public void DefaultPayloadConvention_RoundtripsInt()
+ {
+ // arrange
+ const int expected = 1;
+
+ // act
+ s_cut.GetObject(s_cut.GetObjectBytes(expected))
+ .Should()
+ .Be(expected);
+ }
+
+ [TestMethod]
+ public void DefaultPayloadConvention_RoundtripsBool()
+ {
+ // arrange
+ bool expected = true;
+
+ // act
+ s_cut.GetObject(s_cut.GetObjectBytes(expected))
+ .Should()
+ .Be(expected);
+ }
+
+ [TestMethod]
+ public void DefaultPayloadConvention_RoundtripsDateTimeOffset()
+ {
+ // arrange
+ DateTimeOffset expected = DateTimeOffset.UtcNow;
+
+ // act
+ s_cut.GetObject(s_cut.GetObjectBytes(expected))
+ .Should()
+ .Be(expected);
+ }
+
+ [TestMethod]
+ public void DefaultPayloadConvention_RoundtripsDateTime()
+ {
+ // arrange
+ DateTime expected = DateTime.UtcNow;
+
+ // act
+ s_cut.GetObject(s_cut.GetObjectBytes(expected))
+ .Should()
+ .Be(expected);
+ }
+
+ [TestMethod]
+ public void DefaultPayloadConvention_RoundtripsString()
+ {
+ // arrange
+ const string expected = nameof(DefaultPayloadConvention_RoundtripsString);
+
+ // act
+ s_cut.GetObject(s_cut.GetObjectBytes(expected))
+ .Should()
+ .Be(expected);
+ }
+
+ [TestMethod]
+ public void DefaultPayloadConvention_RoundtripsCustomType()
+ {
+ // arrange
+ var expected = new CustomType
+ {
+ IntProp = 1,
+ StringProp = Guid.NewGuid().ToString(),
+ };
+
+ // act
+ s_cut.GetObject(s_cut.GetObjectBytes(expected))
+ .Should()
+ .BeEquivalentTo(expected);
+ }
+
+ private class CustomType
+ {
+ [JsonProperty("intProp")]
+ public int IntProp { get; set; }
+
+ [JsonProperty("stringProp")]
+ public string StringProp { get; set; }
+ }
+
private class TestDateTime
{
[JsonProperty("time")]
diff --git a/iothub/device/tests/SystemTextJsonPayloadConventionTests.cs b/iothub/device/tests/SystemTextJsonPayloadConventionTests.cs
new file mode 100644
index 0000000000..4f6f4f73d3
--- /dev/null
+++ b/iothub/device/tests/SystemTextJsonPayloadConventionTests.cs
@@ -0,0 +1,102 @@
+// Copyright (c) Microsoft. All rights reserved.
+// Licensed under the MIT license. See LICENSE file in the project root for full license information.
+
+using System;
+using System.Text.Json.Serialization;
+using FluentAssertions;
+using Microsoft.VisualStudio.TestTools.UnitTesting;
+
+namespace Microsoft.Azure.Devices.Client.Tests
+{
+ [TestClass]
+ [TestCategory("Unit")]
+ public class SystemTextJsonPayloadConventionTests
+ {
+ private static readonly SystemTextJsonPayloadConvention s_cut = SystemTextJsonPayloadConvention.Instance;
+
+ [TestMethod]
+ public void SystemTextJsonPayloadConvention_RoundtripsInt()
+ {
+ // arrange
+ const int expected = 1;
+
+ // act
+ s_cut.GetObject(s_cut.GetObjectBytes(expected))
+ .Should()
+ .Be(expected);
+ }
+
+ [TestMethod]
+ public void SystemTextJsonPayloadConvention_RoundtripsBool()
+ {
+ // arrange
+ bool expected = true;
+
+ // act
+ s_cut.GetObject(s_cut.GetObjectBytes(expected))
+ .Should()
+ .Be(expected);
+ }
+
+ [TestMethod]
+ public void SystemTextJsonPayloadConvention_RoundtripsDateTimeOffset()
+ {
+ // arrange
+ DateTimeOffset expected = DateTimeOffset.UtcNow;
+
+ // act
+ s_cut.GetObject(s_cut.GetObjectBytes(expected))
+ .Should()
+ .Be(expected);
+ }
+
+ [TestMethod]
+ public void SystemTextJsonPayloadConvention_RoundtripsDateTime()
+ {
+ // arrange
+ DateTime expected = DateTime.UtcNow;
+
+ // act
+ s_cut.GetObject(s_cut.GetObjectBytes(expected))
+ .Should()
+ .Be(expected);
+ }
+
+ [TestMethod]
+ public void SystemTextJsonPayloadConvention_RoundtripsString()
+ {
+ // arrange
+ const string expected = nameof(SystemTextJsonPayloadConvention_RoundtripsString);
+
+ // act
+ s_cut.GetObject(s_cut.GetObjectBytes(expected))
+ .Should()
+ .Be(expected);
+ }
+
+ [TestMethod]
+ public void SystemTextJsonPayloadConvention_RoundtripsCustomType()
+ {
+ // arrange
+ var expected = new CustomType
+ {
+ IntProp = 1,
+ StringProp = Guid.NewGuid().ToString(),
+ };
+
+ // act
+ s_cut.GetObject(s_cut.GetObjectBytes(expected))
+ .Should()
+ .BeEquivalentTo(expected);
+ }
+
+ private class CustomType
+ {
+ [JsonPropertyName("intProp")]
+ public int IntProp { get; set; }
+
+ [JsonPropertyName("stringProp")]
+ public string StringProp{ get; set; }
+ }
+ }
+}