diff --git a/e2e/LongHaul/service/IotHub.cs b/e2e/LongHaul/service/IotHub.cs index 8703167bc5..fe775c9946 100644 --- a/e2e/LongHaul/service/IotHub.cs +++ b/e2e/LongHaul/service/IotHub.cs @@ -68,77 +68,75 @@ public async Task MonitorConnectedDevicesAsync(CancellationToken ct) { while (!ct.IsCancellationRequested) { - try - { - AsyncPageable allDevices = s_serviceClient.Query.Create( - "SELECT deviceId, connectionState, lastActivityTime FROM devices where is_defined(properties.reported.runId)", - ct); + AsyncPageable allDeviceTwins = s_serviceClient.Query.Create( + "SELECT deviceId, connectionState, lastActivityTime FROM devices where is_defined(properties.reported.runId)", + ct); - AsyncPageable allModules = s_serviceClient.Query.Create( - "SELECT deviceId, moduleId, connectionState FROM devices.modules where is_defined(properties.reported.runId)", - ct); + AsyncPageable allModuleTwins = s_serviceClient.Query.Create( + "SELECT deviceId, moduleId, connectionState FROM devices.modules where is_defined(properties.reported.runId)", + ct); - await foreach (ClientTwin device in allDevices) + await foreach (ClientTwin deviceTwin in allDeviceTwins) { - string deviceId = device.DeviceId; + string deviceId = deviceTwin.DeviceId; - if (s_onlineDeviceOperations.ContainsKey(deviceId) - && device.ConnectionState is ClientConnectionState.Disconnected) - { - CancellationTokenSource source = s_onlineDeviceOperations[deviceId].Item2; - // Signal cancellation to all tasks on the particular device. - source.Cancel(); - // Dispose the cancellation token source. - source.Dispose(); - // Remove the correlated device operations and cancellation token source of the particular device from the dictionary. - s_onlineDeviceOperations.TryRemove(deviceId, out _); - } - else if (!s_onlineDeviceOperations.ContainsKey(deviceId) - && device.ConnectionState is ClientConnectionState.Connected) - { - // For each online device, initiate a new cancellation token source. - // Once the device goes offline, cancel all operations on this device. - var source = new CancellationTokenSource(); - CancellationToken token = source.Token; + if (s_onlineDeviceOperations.ContainsKey(deviceId) + && deviceTwin.ConnectionState is ClientConnectionState.Disconnected) + { + CancellationTokenSource source = s_onlineDeviceOperations[deviceId].Item2; + // Signal cancellation to all tasks on the particular device. + source.Cancel(); + // Dispose the cancellation token source. + source.Dispose(); + // Remove the correlated device operations and cancellation token source of the particular device from the dictionary. + s_onlineDeviceOperations.TryRemove(deviceId, out _); + } + else if (!s_onlineDeviceOperations.ContainsKey(deviceId) + && deviceTwin.ConnectionState is ClientConnectionState.Connected) + { + // For each online device, initiate a new cancellation token source. + // Once the device goes offline, cancel all operations on this device. + var source = new CancellationTokenSource(); + CancellationToken token = source.Token; - async Task Operations() + async Task OperateWithDeviceAsync() { - Logger loggerPerDevice = _logger.Clone(); - loggerPerDevice.LoggerContext.Add(DeviceId, deviceId); - var deviceOperations = new DeviceOperations(s_serviceClient, deviceId, loggerPerDevice); + Logger deviceLogger = _logger.Clone(); + deviceLogger.LoggerContext.Add(DeviceId, deviceId); + var deviceOperations = new DeviceOperations(s_serviceClient, deviceId, deviceLogger); _logger.Trace($"Creating {nameof(DeviceOperations)} on the device [{deviceId}]", TraceSeverity.Verbose); - try - { - await Task - .WhenAll( - deviceOperations.InvokeDirectMethodAsync(loggerPerDevice.Clone(), token), - deviceOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), loggerPerDevice.Clone(), token), - deviceOperations.SendC2dMessagesAsync(loggerPerDevice.Clone(), token)) - .ConfigureAwait(false); - } - catch (OperationCanceledException) - { - _logger.Trace($"Operations on [{deviceId}] have been canceled as the device goes offline.", TraceSeverity.Information); - } - catch (Exception ex) - { - _logger.Trace($"Service app failed with exception {ex}", TraceSeverity.Error); - } + try + { + await Task + .WhenAll( + deviceOperations.InvokeDirectMethodAsync(deviceLogger.Clone(), token), + deviceOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), deviceLogger.Clone(), token), + deviceOperations.SendC2dMessagesAsync(deviceLogger.Clone(), token)) + .ConfigureAwait(false); + } + catch (OperationCanceledException) + { + _logger.Trace($"Operations on [{deviceId}] have been canceled as the device goes offline.", TraceSeverity.Information); } + catch (Exception ex) + { + _logger.Trace($"Service app failed with exception {ex}", TraceSeverity.Error); + } + } // Passing in "Operations()" as Task so we don't need to manually call "Invoke()" on it. - var operationsTuple = new Tuple(Operations(), source); + var operationsTuple = new Tuple(OperateWithDeviceAsync(), source); s_onlineDeviceOperations.TryAdd(deviceId, operationsTuple); } } - await foreach (ClientTwin module in allModules) + await foreach (ClientTwin moduleTwin in allModuleTwins) { - string moduleId = module.DeviceId + "_" + module.ModuleId; + string moduleId = $"{moduleTwin.DeviceId}/{moduleTwin.ModuleId}"; if (s_onlineModuleOperations.ContainsKey(moduleId) - && module.ConnectionState is ClientConnectionState.Disconnected) + && moduleTwin.ConnectionState is ClientConnectionState.Disconnected) { CancellationTokenSource source = s_onlineModuleOperations[moduleId].Item2; // Signal cancellation to all tasks on the particular module. @@ -149,7 +147,7 @@ await Task s_onlineModuleOperations.TryRemove(moduleId, out _); } else if (!s_onlineModuleOperations.ContainsKey(moduleId) - && module.ConnectionState is ClientConnectionState.Connected) + && moduleTwin.ConnectionState is ClientConnectionState.Connected) { // For each online module, initiate a new cancellation token source. // Once the module goes offline, cancel all operations on this module. @@ -158,8 +156,8 @@ await Task async Task Operations() { - var moduleOperations = new ModuleOperations(s_serviceClient, module.DeviceId, module.ModuleId, _logger.Clone()); - _logger.Trace($"Creating {nameof(ModuleOperations)} on the device: [{module.DeviceId}], module: [{module.ModuleId}]", TraceSeverity.Verbose); + var moduleOperations = new ModuleOperations(s_serviceClient, moduleTwin.DeviceId, moduleTwin.ModuleId, _logger.Clone()); + _logger.Trace($"Creating {nameof(ModuleOperations)} on the device: [{moduleTwin.DeviceId}], module: [{moduleTwin.ModuleId}]", TraceSeverity.Verbose); try { @@ -171,7 +169,7 @@ await Task } catch (OperationCanceledException) { - _logger.Trace($"Operations on device: [{module.DeviceId}], module: [{module.ModuleId}] have been canceled as the device goes offline.", TraceSeverity.Information); + _logger.Trace($"Operations on device: [{moduleTwin.DeviceId}], module: [{moduleTwin.ModuleId}] have been canceled as the device goes offline.", TraceSeverity.Information); } catch (Exception ex) {