Skip to content

Commit

Permalink
Fix MQTT twin response
Browse files Browse the repository at this point in the history
  • Loading branch information
David R. Williamson committed Apr 20, 2023
1 parent d418081 commit f45d3fc
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 78 deletions.
24 changes: 23 additions & 1 deletion e2e/LongHaul/device/IotHub.cs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ public async Task ReportReadOnlyPropertiesAsync(Logger logger, CancellationToken
}
}

public async Task SetPropertiesAsync(string keyName, object properties, Logger logger, CancellationToken ct)
public async Task SetPropertiesAsync<T>(string keyName, T properties, Logger logger, CancellationToken ct)
{
Debug.Assert(_deviceClient != null);
Debug.Assert(properties != null);
Expand All @@ -263,6 +263,28 @@ public async Task SetPropertiesAsync(string keyName, object properties, Logger l
await Task.Delay(s_retryInterval, ct).ConfigureAwait(false);
}
}

while (!ct.IsCancellationRequested)
{
try
{
TwinProperties twin = await _deviceClient.GetTwinPropertiesAsync(ct).ConfigureAwait(false);
if (!twin.Reported.TryGetValue<T>(keyName, out T actualValue))
{
logger.Trace($"Couldn't find the reported property {keyName} in the device twin.", TraceSeverity.Warning);
}
else if (!actualValue.Equals(properties))
{
logger.Trace($"Couldn't validate value for {keyName} was set to {properties}, found {actualValue}.");
}
break;
}
catch (Exception ex)
{
logger.Trace($"Exception getting twin\n{ex}", TraceSeverity.Warning);
await Task.Delay(s_retryInterval, ct).ConfigureAwait(false);
}
}
}

public async Task UploadFilesAsync(Logger logger, CancellationToken ct)
Expand Down
13 changes: 13 additions & 0 deletions e2e/LongHaul/device/SystemProperties.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,18 @@ internal class SystemProperties

[JsonPropertyName("frameworkDescription")]
public string FrameworkDescription { get; set; } = RuntimeInformation.FrameworkDescription;

public override bool Equals(object obj)
{
return obj is SystemProperties other
&& SystemArchitecture == other.SystemArchitecture
&& OsVersion == other.OsVersion
&& FrameworkDescription == other.FrameworkDescription;
}

public override int GetHashCode()
{
return SystemArchitecture.GetHashCode() ^ OsVersion.GetHashCode() ^ FrameworkDescription.GetHashCode();
}
}
}
133 changes: 57 additions & 76 deletions iothub/device/src/Pipeline/MqttTransportHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1176,115 +1176,96 @@ private void HandleTwinResponse(MqttApplicationMessageReceivedEventArgs received
{
byte[] payloadBytes = receivedEventArgs.ApplicationMessage.Payload ?? Array.Empty<byte>();

if (_pendingTwinOperations.TryRemove(receivedRequestId, out PendingMqttTwinOperation patchTwinOperation))
if (_pendingTwinOperations.TryRemove(receivedRequestId, out PendingMqttTwinOperation twinOperation))
{
if (Logging.IsEnabled)
Logging.Info(this, $"Received response to patch twin request with request id {receivedRequestId}.", nameof(HandleTwinResponse));

if (status != 200)
IotHubClientErrorResponseMessage ParseError(byte[] payloadBytes)
{
IotHubClientErrorResponseMessage errorResponse = null;

// This will only ever contain an error message which is encoded based on service contract (UTF-8).
if (payloadBytes.Length > 0)
if (payloadBytes.Length == 0)
{
string errorResponseString = Encoding.UTF8.GetString(payloadBytes);
try
{
errorResponse = DefaultPayloadConvention.Instance.GetObject<IotHubClientErrorResponseMessage>(errorResponseString);
}
catch (JsonException ex)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Failed to parse twin patch error response JSON. Message body: '{errorResponseString}'. Exception: {ex}. ", nameof(HandleTwinResponse));
return null;
}

errorResponse = new IotHubClientErrorResponseMessage
{
Message = errorResponseString,
};
}
string errorResponseString = Encoding.UTF8.GetString(payloadBytes);
try
{
return DefaultPayloadConvention.Instance.GetObject<IotHubClientErrorResponseMessage>(errorResponseString);
}
catch (JsonException ex)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Failed to parse twin patch error response JSON. Message body: '{errorResponseString}'. Exception: {ex}. ", nameof(HandleTwinResponse));

return new IotHubClientErrorResponseMessage
{
Message = errorResponseString,
};
}
}

if (twinOperation.TwinPatchTask != null)
{
IotHubClientErrorResponseMessage error = status == 204
? null
: ParseError(payloadBytes);

// This received message is in response to an update reported properties request.
var patchTwinResponse = new PatchTwinResponse
{
Status = status,
Version = version,
ErrorResponseMessage = errorResponse,
ErrorResponseMessage = error,
};

patchTwinOperation.TwinPatchTask.TrySetResult(patchTwinResponse);
twinOperation.TwinPatchTask.TrySetResult(patchTwinResponse);
}
else
else // should be a "get twin" operation
{
try
var getTwinResponse = new GetTwinResponse
{
// Use the encoder that has been agreed to between the client and service to decode the byte[] response
// The response is deserialized into an SDK-defined type based on service-defined NewtonSoft.Json-based json property name.
// For this reason, we use NewtonSoft Json serializer for this deserialization.
TwinDocument clientTwinProperties = DefaultPayloadConvention.Instance.GetObject<TwinDocument>(payloadBytes);
Status = status,
};

var getTwinResponse = new GetTwinResponse
if (status != 200)
{
getTwinResponse.ErrorResponseMessage = ParseError(payloadBytes);
twinOperation.TwinResponseTask.TrySetResult(getTwinResponse);
}
else
{
try
{
Status = status,
Twin = new TwinProperties(
// Use the encoder that has been agreed to between the client and service to decode the byte[] response
// The response is deserialized into an SDK-defined type based on service-defined NewtonSoft.Json-based json property name.
// For this reason, we use Newtonsoft.Json serializer for this deserialization.
TwinDocument clientTwinProperties = status == 200
? DefaultPayloadConvention.Instance.GetObject<TwinDocument>(payloadBytes)
: null;

getTwinResponse.Twin = new TwinProperties(
new DesiredProperties(clientTwinProperties.Desired)
{
PayloadConvention = _payloadConvention,
},
new ReportedProperties(clientTwinProperties.Reported, true)
{
PayloadConvention = _payloadConvention,
}),
};

patchTwinOperation.TwinResponseTask.TrySetResult(getTwinResponse);
}
catch (JsonReaderException ex)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Failed to parse Twin JSON. Message body: '{Encoding.UTF8.GetString(payloadBytes)}'. Exception: {ex}.", nameof(HandleTwinResponse));

patchTwinOperation.TwinResponseTask.TrySetException(ex);
}
}
}
else if (_pendingTwinOperations.TryRemove(receivedRequestId, out PendingMqttTwinOperation pendingPatchOperation))
{
if (Logging.IsEnabled)
Logging.Info(this, $"Received response to patch twin request with request id {receivedRequestId}.", nameof(HandleTwinResponse));

IotHubClientErrorResponseMessage errorResponse = null;

// This will only ever contain an error message which is encoded based on service contract (UTF-8).
if (payloadBytes.Length > 0)
{
string errorResponseString = Encoding.UTF8.GetString(payloadBytes);
try
{
errorResponse = DefaultPayloadConvention.Instance.GetObject<IotHubClientErrorResponseMessage>(errorResponseString);
}
catch (JsonException ex)
{
if (Logging.IsEnabled)
Logging.Error(this, $"Failed to parse twin patch error response JSON. Message body: '{errorResponseString}'. Exception: {ex}. ", nameof(HandleTwinResponse));
});

errorResponse = new IotHubClientErrorResponseMessage
twinOperation.TwinResponseTask.TrySetResult(getTwinResponse);
}
catch (JsonReaderException ex)
{
Message = errorResponseString,
};
if (Logging.IsEnabled)
Logging.Error(this, $"Failed to parse Twin JSON. Message body: '{Encoding.UTF8.GetString(payloadBytes)}'. Exception: {ex}.", nameof(HandleTwinResponse));

twinOperation.TwinResponseTask.TrySetException(ex);
}
}
}

// This received message is in response to an update reported properties request.
var patchTwinResponse = new PatchTwinResponse
{
Status = status,
Version = version,
ErrorResponseMessage = errorResponse,
};

pendingPatchOperation.TwinPatchTask.TrySetResult(patchTwinResponse);
}
else if (Logging.IsEnabled)
{
Expand Down
9 changes: 8 additions & 1 deletion iothub/device/src/Pipeline/TransportHandlerBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,14 @@ protected TransportHandlerBase(PipelineContext context, IDelegatingHandler nextH
{
}

protected static TimeSpan TwinResponseTimeout { get; } = TimeSpan.FromHours(1);
/// <summary>
/// Twin operations are initiated one one link/topic and the response is received on another, where the
/// waiting client call is completed via a TaskCompletionSource linked to the request Id.
/// For various reasons, the device may never observe the response from the service and the client call will
/// sit indefinitely, however unlikely. In order to prevent an ever increasing dictionary, we'll occasionally
/// review these pending operations, and cancel/remove them from the dictionary.
/// </summary>
protected private static TimeSpan TwinResponseTimeout { get; } = TimeSpan.FromHours(1);

public override Task WaitForTransportClosedAsync()
{
Expand Down

0 comments on commit f45d3fc

Please sign in to comment.