Skip to content

Commit

Permalink
merge
Browse files Browse the repository at this point in the history
  • Loading branch information
David R. Williamson committed Apr 20, 2023
1 parent 9b86af4 commit 83b56f6
Showing 1 changed file with 72 additions and 67 deletions.
139 changes: 72 additions & 67 deletions e2e/LongHaul/service/IotHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,20 @@ public async Task MonitorConnectedDevicesAsync(CancellationToken ct)
{
try
{
AsyncPageable<ClientTwin> allDevices = s_serviceClient.Query.Create<ClientTwin>(
AsyncPageable<ClientTwin> allDeviceTwins = s_serviceClient.Query.Create<ClientTwin>(
"SELECT deviceId, connectionState, lastActivityTime FROM devices where is_defined(properties.reported.runId)",
ct);

AsyncPageable<ClientTwin> allModules = s_serviceClient.Query.Create<ClientTwin>(
AsyncPageable<ClientTwin> allModuleTwins = s_serviceClient.Query.Create<ClientTwin>(
"SELECT deviceId, moduleId, connectionState FROM devices.modules where is_defined(properties.reported.runId)",
ct);

await foreach (ClientTwin device in allDevices)
{
string deviceId = device.DeviceId;
await foreach (ClientTwin deviceTwin in allDeviceTwins)
{
string deviceId = deviceTwin.DeviceId;

if (s_onlineDeviceOperations.ContainsKey(deviceId)
&& device.ConnectionState is ClientConnectionState.Disconnected)
&& deviceTwin.ConnectionState is ClientConnectionState.Disconnected)
{
CancellationTokenSource source = s_onlineDeviceOperations[deviceId].Item2;
// Signal cancellation to all tasks on the particular device.
Expand All @@ -94,28 +94,28 @@ public async Task MonitorConnectedDevicesAsync(CancellationToken ct)
s_onlineDeviceOperations.TryRemove(deviceId, out _);
}
else if (!s_onlineDeviceOperations.ContainsKey(deviceId)
&& device.ConnectionState is ClientConnectionState.Connected)
&& deviceTwin.ConnectionState is ClientConnectionState.Connected)
{
// For each online device, initiate a new cancellation token source.
// Once the device goes offline, cancel all operations on this device.
var source = new CancellationTokenSource();
CancellationToken token = source.Token;

async Task Operations()
{
Logger loggerPerDevice = _logger.Clone();
loggerPerDevice.LoggerContext.Add(DeviceId, deviceId);
var deviceOperations = new DeviceOperations(s_serviceClient, deviceId, loggerPerDevice);
_logger.Trace($"Creating {nameof(DeviceOperations)} on the device [{deviceId}]", TraceSeverity.Verbose);
async Task OperateWithDeviceAsync()
{
Logger deviceLogger = _logger.Clone();
deviceLogger.LoggerContext.Add(DeviceId, deviceId);
var deviceOperations = new DeviceOperations(s_serviceClient, deviceId, deviceLogger);
_logger.Trace($"Creating {nameof(DeviceOperations)} on the device [{deviceId}]", TraceSeverity.Verbose);


try
{
await Task
.WhenAll(
deviceOperations.InvokeDirectMethodAsync(loggerPerDevice.Clone(), token),
deviceOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), loggerPerDevice.Clone(), token),
deviceOperations.SendC2dMessagesAsync(loggerPerDevice.Clone(), token))
deviceOperations.InvokeDirectMethodAsync(deviceLogger.Clone(), token),
deviceOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), deviceLogger.Clone(), token),
deviceOperations.SendC2dMessagesAsync(deviceLogger.Clone(), token))
.ConfigureAwait(false);
}
catch (OperationCanceledException)
Expand All @@ -128,67 +128,72 @@ await Task
}
}

// Passing in "Operations()" as Task so we don't need to manually call "Invoke()" on it.
var operationsTuple = new Tuple<Task, CancellationTokenSource>(Operations(), source);
s_onlineDeviceOperations.TryAdd(deviceId, operationsTuple);
// Passing in "Operations()" as Task so we don't need to manually call "Invoke()" on it.
var operationsTuple = new Tuple<Task, CancellationTokenSource>(OperateWithDeviceAsync(), source);
s_onlineDeviceOperations.TryAdd(deviceId, operationsTuple);
}
}
}

await foreach (ClientTwin module in allModules)
{
string moduleId = module.DeviceId + "_" + module.ModuleId;
if (s_onlineModuleOperations.ContainsKey(moduleId)
&& module.ConnectionState is ClientConnectionState.Disconnected)
{
CancellationTokenSource source = s_onlineModuleOperations[moduleId].Item2;
// Signal cancellation to all tasks on the particular module.
source.Cancel();
// Dispose the cancellation token source.
source.Dispose();
// Remove the correlated module operations and cancellation token source of the particular module from the dictionary.
s_onlineModuleOperations.TryRemove(moduleId, out _);
}
else if (!s_onlineModuleOperations.ContainsKey(moduleId)
&& module.ConnectionState is ClientConnectionState.Connected)
await foreach (ClientTwin moduleTwin in allModuleTwins)
{
// For each online module, initiate a new cancellation token source.
// Once the module goes offline, cancel all operations on this module.
var source = new CancellationTokenSource();
CancellationToken token = source.Token;

async Task Operations()
string moduleId = $"{moduleTwin.DeviceId}/{moduleTwin.ModuleId}";
if (s_onlineModuleOperations.ContainsKey(moduleId)
&& moduleTwin.ConnectionState is ClientConnectionState.Disconnected)
{
var moduleOperations = new ModuleOperations(s_serviceClient, module.DeviceId, module.ModuleId, _logger.Clone());
_logger.Trace($"Creating {nameof(ModuleOperations)} on the device: [{module.DeviceId}], module: [{module.ModuleId}]", TraceSeverity.Verbose);
CancellationTokenSource source = s_onlineModuleOperations[moduleId].Item2;
// Signal cancellation to all tasks on the particular module.
source.Cancel();
// Dispose the cancellation token source.
source.Dispose();
// Remove the correlated module operations and cancellation token source of the particular module from the dictionary.
s_onlineModuleOperations.TryRemove(moduleId, out _);
}
else if (!s_onlineModuleOperations.ContainsKey(moduleId)
&& moduleTwin.ConnectionState is ClientConnectionState.Connected)
{
// For each online module, initiate a new cancellation token source.
// Once the module goes offline, cancel all operations on this module.
var source = new CancellationTokenSource();
CancellationToken token = source.Token;

try
{
await Task
.WhenAll(
moduleOperations.InvokeDirectMethodAsync(_logger.Clone(), token),
moduleOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), _logger.Clone(), token))
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_logger.Trace($"Operations on device: [{module.DeviceId}], module: [{module.ModuleId}] have been canceled as the device goes offline.", TraceSeverity.Information);
}
catch (Exception ex)
async Task Operations()
{
_logger.Trace($"Service app failed with exception {ex}", TraceSeverity.Error);
var moduleOperations = new ModuleOperations(s_serviceClient, moduleTwin.DeviceId, moduleTwin.ModuleId, _logger.Clone());
_logger.Trace($"Creating {nameof(ModuleOperations)} on the device: [{moduleTwin.DeviceId}], module: [{moduleTwin.ModuleId}]", TraceSeverity.Verbose);

try
{
await Task
.WhenAll(
moduleOperations.InvokeDirectMethodAsync(_logger.Clone(), token),
moduleOperations.SetDesiredPropertiesAsync("guidValue", Guid.NewGuid().ToString(), _logger.Clone(), token))
.ConfigureAwait(false);
}
catch (OperationCanceledException)
{
_logger.Trace($"Operations on device: [{moduleTwin.DeviceId}], module: [{moduleTwin.ModuleId}] have been canceled as the device goes offline.", TraceSeverity.Information);
}
catch (Exception ex)
{
_logger.Trace($"Service app failed with exception {ex}", TraceSeverity.Error);
}
}
}

// Passing in "Operations()" as Task so we don't need to manually call "Invoke()" on it.
var operationsTuple = new Tuple<Task, CancellationTokenSource>(Operations(), source);
s_onlineModuleOperations.TryAdd(moduleId, operationsTuple);
// Passing in "Operations()" as Task so we don't need to manually call "Invoke()" on it.
var operationsTuple = new Tuple<Task, CancellationTokenSource>(Operations(), source);
s_onlineModuleOperations.TryAdd(moduleId, operationsTuple);
}
}
}

_logger.Trace($"Total number of connected devices: {s_onlineDeviceOperations.Count}", TraceSeverity.Information);
_logger.Trace($"Total number of connected modules: {s_onlineModuleOperations.Count}", TraceSeverity.Information);
_logger.Metric(TotalOnlineDevicesCount, s_onlineDeviceOperations.Count);
_logger.Metric(TotalOnlineModulesCount, s_onlineModuleOperations.Count);
_logger.Trace($"Total number of connected devices: {s_onlineDeviceOperations.Count}", TraceSeverity.Information);
_logger.Trace($"Total number of connected modules: {s_onlineModuleOperations.Count}", TraceSeverity.Information);
_logger.Metric(TotalOnlineDevicesCount, s_onlineDeviceOperations.Count);
_logger.Metric(TotalOnlineModulesCount, s_onlineModuleOperations.Count);
}
catch (Exception ex)
{
_logger.Trace($"Exception querying devices and modules\n{ex}", TraceSeverity.Warning);
}

await Task.Delay(s_deviceCountMonitorInterval, ct).ConfigureAwait(false);
}
Expand Down

0 comments on commit 83b56f6

Please sign in to comment.