Skip to content

Commit

Permalink
Defect/sporadic exception on disconnect (#45)
Browse files Browse the repository at this point in the history
* Fix sporadic exception in disconnect scenarios

* Fix possible missing of disconnect status message
  • Loading branch information
ThomasSeidenbecher authored Sep 22, 2022
1 parent 6850bbd commit a0dedb1
Showing 1 changed file with 25 additions and 26 deletions.
51 changes: 25 additions & 26 deletions src/Client/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ public void Connect( ClientCredentials credentials = null )
0,
null,
DataSetType.Event,
true,
true );
}

Expand All @@ -254,11 +255,23 @@ public void Connect( ClientCredentials credentials = null )

public void Disconnect()
{
Disconnect( true );
Disconnect( Settings.Client.SendStatusMessages, true );
}

private void Disconnect( bool withCallback = false )
private void Disconnect( bool withStatusMessage, bool withCallback )
{
if ( m_MqttClient != null && m_MqttClient.IsConnected && withStatusMessage )
{
Publish(
Encoding.UTF8.GetBytes( StatusMessageOffline ),
Settings.Client.StatusMessageTopic,
0,
null,
DataSetType.Event,
true,
false );
}

if ( m_Decoder != null )
{
m_Decoder.MessageDecoded -= DecoderOnMessageDecoded;
Expand All @@ -272,20 +285,6 @@ private void Disconnect( bool withCallback = false )

if ( m_MqttClient != null )
{
if ( m_MqttClient.IsConnected )
{
if ( Settings.Client.SendStatusMessages )
{
Publish(
Encoding.UTF8.GetBytes( StatusMessageOffline ),
Settings.Client.StatusMessageTopic,
0,
null,
DataSetType.Event,
true );
}
}

try
{
//m_MqttClient.UnsubscribeAsync(Settings.Client.ClientCertP12).Wait();
Expand Down Expand Up @@ -374,7 +373,7 @@ private Task MqttClientOnDisconnected( MqttClientDisconnectedEventArgs e )
}

ClientDisconnected?.Invoke( this, msg );
Disconnect( false );
Disconnect( false, false );
return Task.CompletedTask;
}

Expand Down Expand Up @@ -485,7 +484,7 @@ public void SendKeepAlive( ProcessDataSet dataSet, string topicPrefix )
else
{
byte[] keepAliveChunk = dataSet.GetEncodedKeepAliveMessage( m_SequenceNumber++ );
Publish( keepAliveChunk, topicPrefix, dataSet.GetWriterId(), "Meas", dataSet.GetDataSetType(), false );
Publish( keepAliveChunk, topicPrefix, dataSet.GetWriterId(), "Meas", dataSet.GetDataSetType(), false, true );
}
}
}
Expand Down Expand Up @@ -516,7 +515,7 @@ public bool SendDataSet( ProcessDataSet dataSet, string topicPrefix, bool delta
if ( sendDelta )
{
byte[] deltaChunk = dataSet.GetEncodedDeltaFrame( m_SequenceNumber++ );
Publish( deltaChunk, topicPrefix, dataSet.GetWriterId(), "Meas", dataSet.GetDataSetType(), false );
Publish( deltaChunk, topicPrefix, dataSet.GetWriterId(), "Meas", dataSet.GetDataSetType(), false, true );
}
else //meta + key
{
Expand All @@ -526,14 +525,14 @@ public bool SendDataSet( ProcessDataSet dataSet, string topicPrefix, bool delta
foreach ( byte[] chunk in metaChunks )
{
bool retain = ( metaChunks.Count == 1 ) && ( !Options.SendMetaMessageWithoutRetain );
Publish( chunk, topicPrefix, dataSet.GetWriterId(), "Meta", dataSet.GetDataSetType(), retain );
Publish( chunk, topicPrefix, dataSet.GetWriterId(), "Meta", dataSet.GetDataSetType(), retain, true );
UpdateLastKeyAndMetaSentTime( dataSet.GetWriterId() );
}
}
List<byte[]> keyChunks = dataSet.GetChunkedKeyFrame( ChunkSize, m_SequenceNumber++ );
foreach ( byte[] chunk in keyChunks )
{
Publish( chunk, topicPrefix, dataSet.GetWriterId(), "Meas", dataSet.GetDataSetType(), false );
Publish( chunk, topicPrefix, dataSet.GetWriterId(), "Meas", dataSet.GetDataSetType(), false, true );
}
}

Expand All @@ -555,7 +554,7 @@ public bool SendRawData( byte[] payload, string topic,bool retain )
bool sent = false;
try
{
Publish( payload, topic, 0, null, DataSetType.Event, retain );
Publish( payload, topic, 0, null, DataSetType.Event, retain, true );
sent = true; //sent is only true if all chunks are sent without exception
}
catch ( DataNotSentException ) { }
Expand Down Expand Up @@ -676,8 +675,8 @@ public bool SendFile( OPCUAFile file, string topicPrefix, ushort writerId )
}
try
{
Publish( metaFrameBytes, topicPrefix, writerId, "Meta", DataSetType.TimeSeriesEventFile, false );
Publish( keyFrameBytes, topicPrefix, writerId, "File", DataSetType.TimeSeriesEventFile, false );
Publish( metaFrameBytes, topicPrefix, writerId, "Meta", DataSetType.TimeSeriesEventFile, false, true);
Publish( keyFrameBytes, topicPrefix, writerId, "File", DataSetType.TimeSeriesEventFile, false, true );
fileSent = true;
}
catch ( DataNotSentException ) { }
Expand Down Expand Up @@ -875,7 +874,7 @@ DataSetType dataSetType
return newTopicName;
}

private void Publish( byte[] payload, string topicPrefix, ushort dataSetWriterId, string messageType, DataSetType dataSetType, bool retain )
private void Publish( byte[] payload, string topicPrefix, ushort dataSetWriterId, string messageType, DataSetType dataSetType, bool retain , bool throwException)
{
string topic = CreateTopicName( topicPrefix, ClientId, dataSetWriterId, messageType, dataSetType );

Expand Down Expand Up @@ -913,7 +912,7 @@ private void Publish( byte[] payload, string topicPrefix, ushort dataSetWriterId
//but due to the AggregatedExceptions it's very difficult to find out what really went wrong...)
}
}
if (!dataSent)
if (!dataSent && throwException)
{
throw new DataNotSentException();
}
Expand Down

0 comments on commit a0dedb1

Please sign in to comment.