Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
David R. Williamson committed Apr 20, 2023
1 parent 9b86af4 commit 4c92bdd
Showing 1 changed file with 55 additions and 57 deletions.
112 changes: 55 additions & 57 deletions e2e/LongHaul/service/IotHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,77 +68,75 @@ public async Task MonitorConnectedDevicesAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
try
{
AsyncPageable<ClientTwin> allDevices = s_serviceClient.Query.Create<ClientTwin>(
"SELECT deviceId, connectionState, lastActivityTime FROM devices where is_defined(properties.reported.runId)",
ct);
AsyncPageable<ClientTwin> allDeviceTwins = s_serviceClient.Query.Create<ClientTwin>(
"SELECT deviceId, connectionState, lastActivityTime FROM devices where is_defined(properties.reported.runId)",
ct);

AsyncPageable<ClientTwin> allModules = s_serviceClient.Query.Create<ClientTwin>(
"SELECT deviceId, moduleId, connectionState FROM devices.modules where is_defined(properties.reported.runId)",
ct);
AsyncPageable<ClientTwin> allModuleTwins = s_serviceClient.Query.Create<ClientTwin>(
"SELECT deviceId, moduleId, connectionState FROM devices.modules where is_defined(properties.reported.runId)",
ct);

await foreach (ClientTwin device in allDevices)
await foreach (ClientTwin deviceTwin in allDeviceTwins)
{
string deviceId = device.DeviceId;
string deviceId = deviceTwin.DeviceId;

if (s_onlineDeviceOperations.ContainsKey(deviceId)
&& device.ConnectionState is ClientConnectionState.Disconnected)
{
CancellationTokenSource source = s_onlineDeviceOperations[deviceId].Item2;
// Signal cancellation to all tasks on the particular device.
source.Cancel();
// Dispose the cancellation token source.
source.Dispose();
// Remove the correlated device operations and cancellation token source of the particular device from the dictionary.
s_onlineDeviceOperations.TryRemove(deviceId, out _);
}
else if (!s_onlineDeviceOperations.ContainsKey(deviceId)
&& device.ConnectionState is ClientConnectionState.Connected)
{
// For each online device, initiate a new cancellation token source.
// Once the device goes offline, cancel all operations on this device.
var source = new CancellationTokenSource();
CancellationToken token = source.Token;
if (s_onlineDeviceOperations.ContainsKey(deviceId)
&& deviceTwin.ConnectionState is ClientConnectionState.Disconnected)
{
CancellationTokenSource source = s_onlineDeviceOperations[deviceId].Item2;
// Signal cancellation to all tasks on the particular device.
source.Cancel();
// Dispose the cancellation token source.
source.Dispose();
// Remove the correlated device operations and cancellation token source of the particular device from the dictionary.
s_onlineDeviceOperations.TryRemove(deviceId, out _);
}
else if (!s_onlineDeviceOperations.ContainsKey(deviceId)
&& deviceTwin.ConnectionState is ClientConnectionState.Connected)
{
// For each online device, initiate a new cancellation token source.
// Once the device goes offline, cancel all operations on this device.
var source = new CancellationTokenSource();
CancellationToken token = source.Token;

async Task Operations()
async Task OperateWithDeviceAsync()
{
Logger loggerPerDevice = _logger.Clone();
loggerPerDevice.LoggerContext.Add(DeviceId, deviceId);
var deviceOperations = new DeviceOperations(s_serviceClient, deviceId, loggerPerDevice);
Logger deviceLogger = _logger.Clone();
deviceLogger.LoggerContext.Add(DeviceId, deviceId);
var deviceOperations = new DeviceOperations(s_serviceClient, deviceId, deviceLogger);
_logger.Trace($"Creating {nameof(DeviceOperations)} on the device [{deviceId}]", TraceSeverity.Verbose);


try
{
await Task
.WhenAll(
deviceOperations.InvokeDirectMethodAsync(loggerPerDevice.Clone(), token),
deviceOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), loggerPerDevice.Clone(), token),
deviceOperations.SendC2dMessagesAsync(loggerPerDevice.Clone(), token))
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_logger.Trace($"Operations on [{deviceId}] have been canceled as the device goes offline.", TraceSeverity.Information);
}
catch (Exception ex)
{
_logger.Trace($"Service app failed with exception {ex}", TraceSeverity.Error);
}
try
{
await Task
.WhenAll(
deviceOperations.InvokeDirectMethodAsync(deviceLogger.Clone(), token),
deviceOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), deviceLogger.Clone(), token),
deviceOperations.SendC2dMessagesAsync(deviceLogger.Clone(), token))
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_logger.Trace($"Operations on [{deviceId}] have been canceled as the device goes offline.", TraceSeverity.Information);
}
catch (Exception ex)
{
_logger.Trace($"Service app failed with exception {ex}", TraceSeverity.Error);
}
}

// Passing in "Operations()" as Task so we don't need to manually call "Invoke()" on it.
var operationsTuple = new Tuple<Task, CancellationTokenSource>(Operations(), source);
var operationsTuple = new Tuple<Task, CancellationTokenSource>(OperateWithDeviceAsync(), source);
s_onlineDeviceOperations.TryAdd(deviceId, operationsTuple);
}
}

await foreach (ClientTwin module in allModules)
await foreach (ClientTwin moduleTwin in allModuleTwins)
{
string moduleId = module.DeviceId + "_" + module.ModuleId;
string moduleId = $"{moduleTwin.DeviceId}/{moduleTwin.ModuleId}";
if (s_onlineModuleOperations.ContainsKey(moduleId)
&& module.ConnectionState is ClientConnectionState.Disconnected)
&& moduleTwin.ConnectionState is ClientConnectionState.Disconnected)
{
CancellationTokenSource source = s_onlineModuleOperations[moduleId].Item2;
// Signal cancellation to all tasks on the particular module.
Expand All @@ -149,7 +147,7 @@ await Task
s_onlineModuleOperations.TryRemove(moduleId, out _);
}
else if (!s_onlineModuleOperations.ContainsKey(moduleId)
&& module.ConnectionState is ClientConnectionState.Connected)
&& moduleTwin.ConnectionState is ClientConnectionState.Connected)
{
// For each online module, initiate a new cancellation token source.
// Once the module goes offline, cancel all operations on this module.
Expand All @@ -158,8 +156,8 @@ await Task

async Task Operations()
{
var moduleOperations = new ModuleOperations(s_serviceClient, module.DeviceId, module.ModuleId, _logger.Clone());
_logger.Trace($"Creating {nameof(ModuleOperations)} on the device: [{module.DeviceId}], module: [{module.ModuleId}]", TraceSeverity.Verbose);
var moduleOperations = new ModuleOperations(s_serviceClient, moduleTwin.DeviceId, moduleTwin.ModuleId, _logger.Clone());
_logger.Trace($"Creating {nameof(ModuleOperations)} on the device: [{moduleTwin.DeviceId}], module: [{moduleTwin.ModuleId}]", TraceSeverity.Verbose);

try
{
Expand All @@ -171,7 +169,7 @@ await Task
}
catch (OperationCanceledException)
{
_logger.Trace($"Operations on device: [{module.DeviceId}], module: [{module.ModuleId}] have been canceled as the device goes offline.", TraceSeverity.Information);
_logger.Trace($"Operations on device: [{moduleTwin.DeviceId}], module: [{moduleTwin.ModuleId}] have been canceled as the device goes offline.", TraceSeverity.Information);
}
catch (Exception ex)
{
Expand Down

0 comments on commit 4c92bdd

Please sign in to comment.