Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel twin ops on disconnect #3287

Merged
merged 29 commits into from
Apr 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5ebb0fe
Cancel pending twin operations on disconnect
Apr 19, 2023
d831d48
Add/update logging
Apr 19, 2023
9ea0ec6
Reduce concurrent dictionaries to one per protocol
Apr 19, 2023
8fce7e6
Fix twin patch response
Apr 19, 2023
19a8e0a
Unify twin update and logging in long haul
Apr 19, 2023
9dd4909
Improve error handling in long haul
Apr 19, 2023
2eee027
Fix old task cleanup
Apr 20, 2023
9b86af4
Add retry to long haul init
Apr 20, 2023
a6d0f39
merge
Apr 20, 2023
0a6014b
Rearrange methods
Apr 20, 2023
287a8fa
Fix method name
Apr 20, 2023
1593b03
PR feedback from Bryce
Apr 20, 2023
eb8b7b3
Cancel pending twin operations on disconnect
Apr 19, 2023
99b9b7d
Add/update logging
Apr 19, 2023
586e53d
Reduce concurrent dictionaries to one per protocol
Apr 19, 2023
b8d858f
Fix twin patch response
Apr 19, 2023
4f68177
Unify twin update and logging in long haul
Apr 19, 2023
348e82b
Improve error handling in long haul
Apr 19, 2023
ed93221
Fix old task cleanup
Apr 20, 2023
3b2654e
Add retry to long haul init
Apr 20, 2023
c74dc83
merge
Apr 20, 2023
365968c
Rearrange methods
Apr 20, 2023
0eabbb1
Fix method name
Apr 20, 2023
fcd79d3
PR feedback from Bryce
Apr 20, 2023
af27f20
PR feedback
Apr 20, 2023
0fb999d
adding cancellation toke for InitializeAsync in module client
tmahmood-microsoft Apr 20, 2023
d418081
Merge branch 'drwill/cancel-twinops-on-disconnect' of https://github.…
tmahmood-microsoft Apr 20, 2023
f45d3fc
Fix MQTT twin response
Apr 20, 2023
04fa965
Add unit tests for payload conventions
Apr 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 67 additions & 39 deletions e2e/LongHaul/device/IotHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ internal sealed class IotHub : IAsyncDisposable
private ConnectionStatusChangeReason _disconnectedReason;
private RecommendedAction _disconnectedRecommendedAction;
private volatile IotHubDeviceClient _deviceClient;
private CancellationToken _ct;

private static readonly TimeSpan s_messageLoopSleepTime = TimeSpan.FromSeconds(3);
private static readonly TimeSpan s_deviceTwinUpdateInterval = TimeSpan.FromSeconds(10);
Expand Down Expand Up @@ -66,8 +67,13 @@ public IotHub(Logger logger, Parameters parameters)
/// <summary>
/// Initializes the connection to IoT Hub.
/// </summary>
public async Task InitializeAsync()
public async Task InitializeAsync(CancellationToken? ct = null)
{
if (ct != null)
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
{
_ct = ct.Value;
}

await _lifetimeControl.WaitAsync().ConfigureAwait(false);

try
Expand Down Expand Up @@ -112,7 +118,7 @@ public async Task SendTelemetryMessagesAsync(Logger logger, CancellationToken ct
var pendingMessages = new List<TelemetryMessage>(maxBulkMessages);
logger.LoggerContext.Add(OperationName, LoggingConstants.TelemetryMessage);
var sw = new Stopwatch();

while (!ct.IsCancellationRequested)
{
logger.Metric(MessageBacklog, _messagesToSend.Count);
Expand Down Expand Up @@ -176,38 +182,6 @@ public async Task SendTelemetryMessagesAsync(Logger logger, CancellationToken ct
}
}

public async Task ReportReadOnlyPropertiesAsync(Logger logger, CancellationToken ct)
{
logger.LoggerContext.Add(OperationName, ReportTwinProperties);
var sw = new Stopwatch();

while (!ct.IsCancellationRequested)
{
try
{
var reported = new ReportedProperties
{
{ "TotalTelemetryMessagesSent", _totalTelemetryMessagesSent },
};

logger.Trace($"Updating reported properties.", TraceSeverity.Information);
sw.Restart();
await _deviceClient.UpdateReportedPropertiesAsync(reported, ct).ConfigureAwait(false);
sw.Stop();

++_totalTwinUpdatesReported;
logger.Metric(TotalTwinUpdatesReported, _totalTwinUpdatesReported);
logger.Metric(ReportedTwinUpdateOperationSeconds, sw.Elapsed.TotalSeconds);
}
catch (Exception ex)
{
logger.Trace($"Exception when reporting properties when connected is {IsConnected}\n{ex}");
}

await Task.Delay(s_deviceTwinUpdateInterval, ct).ConfigureAwait(false);
}
}

public void AddTelemetry(
TelemetryBase telemetryObject,
IDictionary<string, string> extraProperties = null)
Expand Down Expand Up @@ -244,7 +218,26 @@ public void AddTelemetry(
_messagesToSend.Enqueue(iotMessage);
}

public async Task SetPropertiesAsync(string keyName, object properties, Logger logger, CancellationToken ct)
public async Task ReportReadOnlyPropertiesAsync(Logger logger, CancellationToken ct)
{
logger.LoggerContext.Add(OperationName, ReportTwinProperties);
var sw = new Stopwatch();

while (!ct.IsCancellationRequested)
{
sw.Restart();
await SetPropertiesAsync("totalTelemetryMessagesSent", _totalTelemetryMessagesSent, logger, ct).ConfigureAwait(false);
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
sw.Stop();

++_totalTwinUpdatesReported;
logger.Metric(TotalTwinUpdatesReported, _totalTwinUpdatesReported);
logger.Metric(ReportedTwinUpdateOperationSeconds, sw.Elapsed.TotalSeconds);

await Task.Delay(s_deviceTwinUpdateInterval, ct).ConfigureAwait(false);
}
}

public async Task SetPropertiesAsync<T>(string keyName, T properties, Logger logger, CancellationToken ct)
{
Debug.Assert(_deviceClient != null);
Debug.Assert(properties != null);
Expand All @@ -257,11 +250,11 @@ public async Task SetPropertiesAsync(string keyName, object properties, Logger l
{
try
{
await _deviceClient
long version = await _deviceClient
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
.UpdateReportedPropertiesAsync(reportedProperties, ct)
.ConfigureAwait(false);

logger.Trace($"Set the reported property with name [{keyName}] in device twin.", TraceSeverity.Information);
logger.Trace($"Set the reported property with key {keyName} and value {properties} in device twin; observed version {version}.", TraceSeverity.Information);
break;
}
catch (Exception ex)
Expand All @@ -270,6 +263,28 @@ await _deviceClient
await Task.Delay(s_retryInterval, ct).ConfigureAwait(false);
}
}

while (!ct.IsCancellationRequested)
{
try
{
TwinProperties twin = await _deviceClient.GetTwinPropertiesAsync(ct).ConfigureAwait(false);
if (!twin.Reported.TryGetValue<T>(keyName, out T actualValue))
{
logger.Trace($"Couldn't find the reported property {keyName} in the device twin.", TraceSeverity.Warning);
}
else if (!actualValue.Equals(properties))
{
logger.Trace($"Couldn't validate value for {keyName} was set to {properties}, found {actualValue}.");
}
break;
}
catch (Exception ex)
{
logger.Trace($"Exception getting twin\n{ex}", TraceSeverity.Warning);
await Task.Delay(s_retryInterval, ct).ConfigureAwait(false);
}
}
}

public async Task UploadFilesAsync(Logger logger, CancellationToken ct)
Expand Down Expand Up @@ -410,7 +425,19 @@ private async void ConnectionStatusChangesHandlerAsync(ConnectionStatusInfo conn
{
case RecommendedAction.OpenConnection:
_logger.Trace($"Following recommended action of reinitializing the client.", TraceSeverity.Information);
await InitializeAsync().ConfigureAwait(false);
while (!_ct.IsCancellationRequested)
{
try
{
await InitializeAsync().ConfigureAwait(false);
drwill-ms marked this conversation as resolved.
Show resolved Hide resolved
break;
}
catch (Exception ex)
{
_logger.Trace($"Exception re-initializing client\n{ex}", TraceSeverity.Warning);
await Task.Delay(s_retryInterval, _ct).ConfigureAwait(false);
}
}
break;

case RecommendedAction.PerformNormally:
Expand Down Expand Up @@ -479,7 +506,8 @@ private async Task DesiredPropertyUpdateCallbackAsync(DesiredProperties properti

if (reported.Any())
{
await _deviceClient.UpdateReportedPropertiesAsync(reported).ConfigureAwait(false);
long version = await _deviceClient.UpdateReportedPropertiesAsync(reported).ConfigureAwait(false);
_logger.Trace($"Updated {reported.Count()} properties and observed new version {version}.", TraceSeverity.Information);

_totalDesiredPropertiesHandled += reported.Count();
_logger.Metric(TotalDesiredPropertiesHandled, _totalDesiredPropertiesHandled);
Expand Down
3 changes: 2 additions & 1 deletion e2e/LongHaul/device/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private static async Task Main(string[] args)
{
try
{
await iotHub.InitializeAsync().ConfigureAwait(false);
await iotHub.InitializeAsync(cts.Token).ConfigureAwait(false);
break;
}
catch (Exception ex)
Expand Down Expand Up @@ -110,6 +110,7 @@ await Task
.ConfigureAwait(false);
}
catch (OperationCanceledException) { } // user signalled an exit
catch (AggregateException ex) when (ex.InnerException is OperationCanceledException) { } // user signaled an exit
brycewang-microsoft marked this conversation as resolved.
Show resolved Hide resolved
catch (Exception ex)
{
s_logger.Trace($"Device app failed with exception {ex}", TraceSeverity.Error);
Expand Down
5 changes: 5 additions & 0 deletions e2e/LongHaul/device/SystemHealthTelemetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using System.Net.NetworkInformation;
using System.Text.Json.Serialization;
using Newtonsoft.Json;

namespace Microsoft.Azure.Devices.LongHaul.Device
{
Expand All @@ -23,15 +24,19 @@ public SystemHealthTelemetry(int port)
TcpPortFilter.Add(port);
}

[JsonProperty("processCpuUsagePercent")]
[JsonPropertyName("processCpuUsagePercent")]
public double ProcessCpuUsagePercent { get; set; } = UpdateCpuUsage();

[JsonProperty("totalAssignedMemoryBytes")]
[JsonPropertyName("totalAssignedMemoryBytes")]
public long TotalAssignedMemoryBytes { get; set; } = s_currentProcess.WorkingSet64;

[JsonProperty("totalGCBytes")]
[JsonPropertyName("totalGCBytes")]
public long TotalGCBytes { get; set; } = GC.GetTotalMemory(false);

[JsonProperty("activeTcpConnections")]
[JsonPropertyName("activeTcpConnections")]
public long ActiveTcpConnections { get; set; } = UpdateTcpConnections();

Expand Down
17 changes: 17 additions & 0 deletions e2e/LongHaul/device/SystemProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,35 @@

using System.Runtime.InteropServices;
using System.Text.Json.Serialization;
using Newtonsoft.Json;

namespace Microsoft.Azure.Devices.LongHaul.Device
{
internal class SystemProperties
{
[JsonProperty("systemArchitecture")]
[JsonPropertyName("systemArchitecture")]
public string SystemArchitecture { get; set; } = RuntimeInformation.OSArchitecture.ToString();

[JsonProperty("osVersion")]
[JsonPropertyName("osVersion")]
public string OsVersion { get; set; } = RuntimeInformation.OSDescription;

[JsonProperty("frameworkDescription")]
[JsonPropertyName("frameworkDescription")]
public string FrameworkDescription { get; set; } = RuntimeInformation.FrameworkDescription;

public override bool Equals(object obj)
{
return obj is SystemProperties other
&& SystemArchitecture == other.SystemArchitecture
&& OsVersion == other.OsVersion
&& FrameworkDescription == other.FrameworkDescription;
}

public override int GetHashCode()
{
return SystemArchitecture.GetHashCode() ^ OsVersion.GetHashCode() ^ FrameworkDescription.GetHashCode();
}
}
}
2 changes: 2 additions & 0 deletions e2e/LongHaul/device/TelemetryBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System;
using System.Text.Json.Serialization;
using Newtonsoft.Json;

namespace Microsoft.Azure.Devices.LongHaul.Device
{
Expand All @@ -11,6 +12,7 @@ internal abstract class TelemetryBase
/// <summary>
/// The date/time the event occurred, in UTC.
/// </summary>
[JsonProperty("eventDateTimeUtc")]
[JsonPropertyName("eventDateTimeUtc")]
public DateTime? EventDateTimeUtc { get; set; } = DateTime.UtcNow;
}
Expand Down
54 changes: 30 additions & 24 deletions e2e/LongHaul/module/IotHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ internal sealed class IotHub : IAsyncDisposable
private readonly string _moduleConnectionString;
private readonly IotHubClientOptions _clientOptions;
private readonly Logger _logger;
private CancellationToken _ct;

private SemaphoreSlim _lifetimeControl = new(1, 1);

Expand Down Expand Up @@ -56,8 +57,12 @@ public IotHub(Logger logger, Parameters parameters)
/// <summary>
/// Initializes the connection to IoT Hub.
/// </summary>
public async Task InitializeAsync()
public async Task InitializeAsync(CancellationToken? ct = null)
{
if (ct != null)
{
_ct = ct.Value;
}
await _lifetimeControl.WaitAsync().ConfigureAwait(false);
var sw = new Stopwatch();

Expand Down Expand Up @@ -175,25 +180,13 @@ public async Task ReportReadOnlyPropertiesAsync(Logger logger, CancellationToken
var sw = new Stopwatch();
while (!ct.IsCancellationRequested)
{
try
{
var reported = new ReportedProperties
{
{ "TotalTelemetryMessagesSent", _totalTelemetryMessagesToModuleSent },
};

sw.Restart();
await _moduleClient.UpdateReportedPropertiesAsync(reported, ct).ConfigureAwait(false);
sw.Stop();
sw.Restart();
await SetPropertiesAsync("totalTelemetryMessagesSent", _totalTelemetryMessagesToModuleSent, logger, ct).ConfigureAwait(false);
sw.Stop();

++_totalTwinUpdatesToModuleReported;
logger.Metric(TotalTwinUpdatesToModuleReported, _totalTwinUpdatesToModuleReported);
logger.Metric(ReportedTwinUpdateToModuleOperationSeconds, sw.Elapsed.TotalSeconds);
}
catch (Exception ex)
{
logger.Trace($"Exception when reporting properties when connected is {IsConnected}\n{ex}");
}
++_totalTwinUpdatesToModuleReported;
logger.Metric(TotalTwinUpdatesToModuleReported, _totalTwinUpdatesToModuleReported);
logger.Metric(ReportedTwinUpdateToModuleOperationSeconds, sw.Elapsed.TotalSeconds);

await Task.Delay(s_deviceTwinUpdateInterval, ct).ConfigureAwait(false);
}
Expand Down Expand Up @@ -249,11 +242,11 @@ public async Task SetPropertiesAsync(string keyName, object properties, Logger l
{
try
{
await _moduleClient
long version = await _moduleClient
.UpdateReportedPropertiesAsync(reportedProperties, ct)
.ConfigureAwait(false);
.ConfigureAwait(false);

logger.Trace($"Set the reported property with name [{keyName}] in device twin.", TraceSeverity.Information);
logger.Trace($"Set the reported property with key {keyName} and value {properties} in device twin; observed version {version}.", TraceSeverity.Information);
break;
}
catch (Exception ex)
Expand Down Expand Up @@ -333,7 +326,19 @@ private async void ConnectionStatusChangesHandlerAsync(ConnectionStatusInfo conn
{
case RecommendedAction.OpenConnection:
_logger.Trace($"Following recommended action of reinitializing the client.", TraceSeverity.Information);
await InitializeAsync().ConfigureAwait(false);
while (!_ct.IsCancellationRequested)
{
try
{
await InitializeAsync().ConfigureAwait(false);
break;
}
catch (Exception ex)
{
_logger.Trace($"Exception re-initializing client\n{ex}", TraceSeverity.Warning);
await Task.Delay(s_retryInterval, _ct).ConfigureAwait(false);
}
}
break;

case RecommendedAction.PerformNormally:
Expand Down Expand Up @@ -403,7 +408,8 @@ private async Task DesiredPropertyUpdateCallbackAsync(DesiredProperties properti

if (reported.Any())
{
await _moduleClient.UpdateReportedPropertiesAsync(reported).ConfigureAwait(false);
long version = await _moduleClient.UpdateReportedPropertiesAsync(reported).ConfigureAwait(false);
_logger.Trace($"Updated {reported.Count()} properties and observed new version {version}.", TraceSeverity.Information);

_totalDesiredPropertiesToModuleHandled += reported.Count();
_logger.Metric(TotalDesiredPropertiesToModuleHandled, _totalDesiredPropertiesToModuleHandled);
Expand Down
3 changes: 2 additions & 1 deletion e2e/LongHaul/module/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private static async Task Main(string[] args)
{
try
{
await iotHub.InitializeAsync().ConfigureAwait(false);
await iotHub.InitializeAsync(cts.Token).ConfigureAwait(false);
break;
}
catch (Exception ex)
Expand All @@ -96,6 +96,7 @@ await Task
.ConfigureAwait(false);
}
catch (TaskCanceledException) { } // user signalled an exit
catch (AggregateException ex) when (ex.InnerException is OperationCanceledException) { } // user signaled an exit
catch (Exception ex)
{
s_logger.Trace($"Device app failed with exception {ex}", TraceSeverity.Error);
Expand Down
Loading