Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug Report] It is not possible to send an event in ReceiveMessageHandler before completing received message with MQTT #3101

Closed
bastyuchenko opened this issue Feb 8, 2023 · 16 comments
Assignees
Labels
bug Something isn't working. fix-checked-in Fix checked into main or preview, but not yet released. IoTSDK Tracks all IoT SDK issues across the board

Comments

@bastyuchenko
Copy link

Context

  • OS: Windows 11 Enterprise (22H2), also tested in Azure Kubernetes Service with Linux containers
  • Application's .NET Target Framework : .net 7
  • SDK version used:
    • Microsoft.Azure.Devices.Client" Version="1.41.3"
    • Microsoft.Azure.Devices.Provisioning.Client" Version="1.19.2"
    • Microsoft.Azure.Devices.Provisioning.Transport.Amqp" Version="1.16.4"
    • Microsoft.Azure.Devices.Shared" Version="1.30.2"

Description of the issue

  1. SetReceiveMessageHandlerAsync and specify a handler (line 30 - 31)
  2. Start receiving the message from Azure IoT Hub in a thread 1 (line 32)
  3. Sending a new event1 in a thread 1 (line 38)
    ..... looks like some exception happens and SDK tries to handle the same message again in the parallel thread
  4. Start receiving the same message from Azure IoT Hub in a thread 2 (line 32)
  5. Sending a new event2 in a thread 2 (line 38)
  6. event1 has been sent (thread1), event2 has been sent (thread2) (line 38-39)
  7. CompleteAsync is called for the first time in a new thread 3 (line 40)
  8. CompleteAsync is called for the second time in the thread 1 (line 40)
  9. The following exception received also twice (in thread3 and in thread 1) "Lock token is stale or never existed. The message will be redelivered. Please discard this lock token and do not retry the operation."
    As a result 2 similar events were sent to IoT Hub

Based on my investigation:

  • TransportType.Mqtt + SetReceiveMessageHandlerAsync -> DOES NOT work
  • TransportType.Amqp + SetReceiveMessageHandlerAsync -> works
  • TransportType.Mqtt + ReceiveAsync (in loop) -> works
  • TransportType.Amqp + ReceiveAsync (in loop) -> works
  • TransportType.Mqtt + SetReceiveMessageHandlerAsync + CompleteAsync before SendEventAsync -> works

Code sample exhibiting the issue

01|  using Microsoft.Azure.Devices.Client;
02|  using Microsoft.Azure.Devices.Provisioning.Client;
03|  using Microsoft.Azure.Devices.Provisioning.Client.Transport;
04|  using Microsoft.Azure.Devices.Shared;
05|  using System.Security.Cryptography.X509Certificates;
06|  using System.Text;
07|
08|  namespace TestIssueForGit
09|  {
10|      internal class Program
11|      {
12|          static async Task Main(string[] args)
13|          {
14|              X509Certificate2 x509Certificate = Helper.LoadProvisioningCertificate();
15|              var security = new SecurityProviderX509Certificate(x509Certificate);
16|              var provClient = ProvisioningDeviceClient.Create(
17|                  "global.azure-devices-provisioning.net",
18|                  Helper.IdScope,
19|                  security,
20|                  new ProvisioningTransportHandlerAmqp());
21|  
22|              var result = await provClient.RegisterAsync();
23|  
24|              IAuthenticationMethod auth = new DeviceAuthenticationWithX509Certificate(
25|                  security.GetRegistrationID(),
26|                  x509Certificate);
27|  
28|              var deviceClient = DeviceClient.Create(result.AssignedHub, auth, TransportType.Mqtt);
29|  
30|              await deviceClient.SetReceiveMessageHandlerAsync(
31|                  async (Message messageC2D, object lbStatus) =>
32|                  {
33|                      try
34|                      {
35|  
36|                          var messageC2DText = Encoding.UTF8.GetString(messageC2D.GetBytes());
37|                          var messageD2C = new Message(Encoding.UTF8.GetBytes(messageC2DText + "_ToCloud"));
38|                          await deviceClient.SendEventAsync(messageD2C);
39|  
40|                          await deviceClient.CompleteAsync(messageC2D);
41|                      }
42|                      catch (Exception ex)
43|                      {
44|                          throw;
45|                      }
46|                  }, null);
47|  
48|              await Task.Delay(-1);
49|          }
50|      }
51|  }

Console log of the issue

Followed the instructions here to capture SDK logs.
I the following logs were captured in section "Microsoft-Azure-Device-Client/ErrorMessage":

Process(23628) (23628)
ThreadID="18,816"
thisOrContextObject="ErrorDelegatingHandler#7256877"
memberName="ExecuteWithErrorHandlingAsync"
message="Exception caught: Microsoft.Azure.Devices.Client.Exceptions.IotHubException: Client must send PUBACK packets in the order in which the corresponding PUBLISH packets were received (QoS 1 messages) per [MQTT-4.6.0-2]. Expected lock token to end with: '2'; actual lock token: '0b5fba79-eed0-4c2a-ad17-6fa7d1164e3e0b5fba79-eed0-4c2a-ad17-6fa7d1164e3e2'. at Microsoft.Azure.Devices.Client.Transport.Mqtt.MqttTransportHandler.CompleteAsync(String lockToken, CancellationToken cancellationToken) at Microsoft.Azure.Devices.Client.Transport.Mqtt.MqttTransportHandler.CompleteIncomingMessageAsync(Message message) at Microsoft.Azure.Devices.Client.Transport.Mqtt.MqttTransportHandler.HandleIncomingMessagesAsync() at Microsoft.Azure.Devices.Client.Transport.Mqtt.MqttTransportHandler.EnsurePendingMessagesAreDeliveredAsync(CancellationToken cancellationToken) at Microsoft.Azure.Devices.Client.Transport.ErrorDelegatingHandler.<>c__DisplayClass27_0.<b__0>d.MoveNext() --- End of stack trace from previous location --- at Microsoft.Azure.Devices.Client.Transport.ErrorDelegatingHandler.ExecuteWithErrorHandlingAsync[T](Func`1 asyncOperation)" ActivityID="0000000d-0001-0000-4c5c-0000ffdcd7b5"

Process(23628) (23628)
ThreadID="46,516"
thisOrContextObject="MqttIotHubAdapter#18940742"
memberName="ChannelInactive"
message="MQTT transport layer has been inactive, will shut down."
ActivityID="00000011-0001-0000-4c5c-0000ffdcd7b5"

Process(23628) (23628)
ThreadID="46,516"
thisOrContextObject="MqttIotHubAdapter#18940742"
memberName="WriteAsync"
message="Received a non-fatal exception while writing data to the MQTT transport layer; will shut down: System.Net.Sockets.SocketException (10054): An existing connection was forcibly closed by the remote host. at Microsoft.Azure.Devices.Client.Transport.Mqtt.MqttIotHubAdapter.SendMessageAsync(IChannelHandlerContext context, Message message) at Microsoft.Azure.Devices.Client.Transport.Mqtt.MqttIotHubAdapter.WriteAsync(IChannelHandlerContext context, Object data)"
ActivityID="0000009b-0001-0000-4c5c-0000ffdcd7b5"

Process(23628) (23628)
ThreadID="47,864"
thisOrContextObject="ErrorDelegatingHandler#7256877"
memberName="ExecuteWithErrorHandlingAsync"
message="Exception caught: System.Net.Sockets.SocketException (10054): An existing connection was forcibly closed by the remote host. at Microsoft.Azure.Devices.Client.Transport.Mqtt.MqttIotHubAdapter.SendMessageAsync(IChannelHandlerContext context, Message message) at Microsoft.Azure.Devices.Client.Transport.Mqtt.MqttIotHubAdapter.WriteAsync(IChannelHandlerContext context, Object data) at Microsoft.Azure.Devices.Client.Transport.Mqtt.MqttTransportHandler.SendEventAsync(Message message, CancellationToken cancellationToken) at Microsoft.Azure.Devices.Client.Transport.ErrorDelegatingHandler.<>c__DisplayClass27_0.<b__0>d.MoveNext() --- End of stack trace from previous location --- at Microsoft.Azure.Devices.Client.Transport.ErrorDelegatingHandler.ExecuteWithErrorHandlingAsync[T](Func`1 asyncOperation)"
ActivityID="0000008e-0001-0000-4c5c-0000ffdcd7b5"

@bastyuchenko bastyuchenko added the bug Something isn't working. label Feb 8, 2023
@github-actions github-actions bot added the IoTSDK Tracks all IoT SDK issues across the board label Feb 8, 2023
@tmahmood-microsoft tmahmood-microsoft self-assigned this Feb 15, 2023
@tmahmood-microsoft
Copy link
Contributor

Hi @bastyuchenko thanks for bringing this issue to my attention.
The issue has been fixed in the following PR: #3116

Please let me know if it still does not work for you.

@tmahmood-microsoft tmahmood-microsoft added the fix-checked-in Fix checked into main or preview, but not yet released. label Feb 16, 2023
@bastyuchenko
Copy link
Author

Hi @tmahmood-microsoft , has you published this change as a new version of package or should I re-install Microsoft.Azure.Devices.Client 1.41.3 Nuget package in my solution? Or I should clone or fork existing SDK and pack it by myself?
How should I receive the fix?

@tmahmood-microsoft
Copy link
Contributor

Hi @bastyuchenko this change will be added to the next release of Microsoft.Azure.Devices.Client. For now, please clone the existing SDK and test if the fix works for you.

@bastyuchenko
Copy link
Author

Hi @tmahmood-microsoft,
Yes, now it is fixed and works for case I have 1 message in my Azure IoT Hub.
BUT it does not work in case I have more than 1 message.

Behavior for case I have 2 messages in the queue:

  1. Run the console app with code in the topic;

  2. Handling message1 and sending message (rows 32-38)

  3. Handling message2 and sending message (rows 32-38)

  4. Completing message1 (row 40)

  5. Completing message2 (row 40)

  6. Catching the first time (row 43)
    Microsoft.Azure.Devices.Client.Exceptions.IotHubException: Client must send PUBACK packets in the order in which the corresponding PUBLISH packets were received (QoS 1 messages) per [MQTT-4.6.0-2]. Expected lock token to end with: '22'; actual lock token: '********---****-771b1d5e629323'.

  7. Catching the second time (row 43)
    Microsoft.Azure.Devices.Client.Exceptions.IotHubException: Client must send PUBACK packets in the order in which the corresponding PUBLISH packets were received (QoS 1 messages) per [MQTT-4.6.0-2]. Expected lock token to end with: '23'; actual lock token: '********---****-771b1d5e629322'.

@bastyuchenko
Copy link
Author

Hi @tmahmood-microsoft ,
Could you please take a look the scenario I described above?

@tmahmood-microsoft
Copy link
Contributor

Hi @bastyuchenko I apologize for the delay. I am now looking into this and will have a fix for you soon.

@tmahmood-microsoft
Copy link
Contributor

Hi @bastyuchenko I have not been able to reproduce the second scenario. Could you please elaborate on how to reproduce this? Once you are handling message 1 from Hub, you should complete it before handling message 2 from Hub, are you intentionally delaying completing message 1?

@bastyuchenko
Copy link
Author

bastyuchenko commented Mar 23, 2023

@tmahmood-microsoft why have you decided that I'm delaying completing message? The code is the same as specified in the description of the bug above. Do you see any delaying there?

I have 2 messages in IoT Hub and it seems in Azure IoT SDK for .NET, method deviceClient.SetReceiveMessageHandlerAsync(...) and its callback method start listening, handling and completing these messages in different threads in parallel.

@tmahmood-microsoft
Copy link
Contributor

Hi @bastyuchenko I was making sure I am not missing anything.
After some more testing and having 10+ messages in the queue, I am able to reproduce the issue on my end. I am currently working towards fixing the issue.

@tmahmood-microsoft
Copy link
Contributor

Hi @bastyuchenko this is a known issue for MQTT spec used in the SDK where the messages need to be ack'ed according to the order that they were sent in.
The workaround for this issue is to enqueue the received messages and process then one after another. This would result in sequential processing.
I am attaching a sample code snippet with the suggested workaround.

Furthermore, I would highly recommend you try the v2/preview release of the SDK. The v2/preview version uses a better MQTT spec and does not require the workaround.

`

  namespace TestIssueForGit
  {
      internal class Program
      {
          private static ConcurrentQueue<Message> s_receivedMessagesQueue = new ConcurrentQueue<Message>();
          private static DeviceClient? s_deviceClient;

          private static async Task Main(string[] args)
          {
              using var cts = new CancellationTokenSource();
              s_deviceClient = DeviceClient.CreateFromConnectionString(
                  "HostName=.....",
                  TransportType.Mqtt_Tcp_Only);
  
              await s_deviceClient.SetReceiveMessageHandlerAsync(CloudToDeviceMessageHandler, s_deviceClient);
              await CompleteReceivedMessagesAsync(s_deviceClient, cts.Token);
          }
  
          private static Task CloudToDeviceMessageHandler(Message receivedMessage, object context)
          {
              s_receivedMessagesQueue.Enqueue(receivedMessage);
              return Task.CompletedTask;
          }
  
          private static async Task CompleteReceivedMessagesAsync(DeviceClient deviceClient, CancellationToken token)
          {
              while (!token.IsCancellationRequested)
              {
                  if(s_receivedMessagesQueue.TryPeek(out Message messageToBeCompleted))
                  {
                      try
                      {
                          var messageD2C = new Message(Encoding.UTF8.GetBytes("MessageToCloud"));
                          await deviceClient.SendEventAsync(messageD2C, token);
                          await deviceClient.CompleteAsync(messageToBeCompleted, token);
                          s_receivedMessagesQueue.TryDequeue(out _);
                      }
                      catch (Exception ex)
                      {
                          throw;
                      }
                  }
              }
          }
      }
  }`

Please let me know if you still face any issues.

@bastyuchenko
Copy link
Author

Hi @tmahmood-microsoft ,
I've tried to launch your workaround mentioned above and cought exception in the "catch"
Cannot access a disposed object.
Object name: 'System.Net.Security.SslStream'.

It looks that the only difference is that I use Self-signed X509 certificate for authentication, and you use the authentication based on a connection string.

@bastyuchenko
Copy link
Author

bastyuchenko commented Mar 28, 2023

Another workaround that I found is CompleteAsync a received message before performing Send a new message operation within the ReceiveMessageHandler callback.

01|  using Microsoft.Azure.Devices.Client;  
02|  using Microsoft.Azure.Devices.Provisioning.Client;  
03|  using Microsoft.Azure.Devices.Provisioning.Client.Transport;  
04|  using Microsoft.Azure.Devices.Shared;  
05|  using System.Security.Cryptography.X509Certificates;  
06|  using System.Text;  
07|  
08|  namespace TestIssueForGit  
09|  {  
10|      internal class Program  
11|      {
12|          static async Task Main(string[] args)  
13|          {
14|              X509Certificate2 x509Certificate = Helper.LoadProvisioningCertificate();  
15|              var security = new SecurityProviderX509Certificate(x509Certificate);  
16|              var provClient = ProvisioningDeviceClient.Create(  
17|                  "global.azure-devices-provisioning.net",  
18|                  Helper.IdScope,  
19|                  security,  
20|                  new ProvisioningTransportHandlerAmqp());  
21|    
22|              var result = await provClient.RegisterAsync();  
23|    
24|              IAuthenticationMethod auth = new DeviceAuthenticationWithX509Certificate(  
25|                  security.GetRegistrationID(),  
26|                  x509Certificate);  
27|    
28|              var deviceClient = DeviceClient.Create(result.AssignedHub, auth, TransportType.Mqtt);  
29|    
30|              await deviceClient.SetReceiveMessageHandlerAsync(  
31|                  async (Message messageC2D, object lbStatus) =>  
32|                  {  
33|                      try  
34|                      {  
35|                          await deviceClient.CompleteAsync(messageC2D);
36|                          var messageC2DText = Encoding.UTF8.GetString(messageC2D.GetBytes());  
37|                          var messageD2C = new Message(Encoding.UTF8.GetBytes(messageC2DText + "_ToCloud"));  
38|                          await deviceClient.SendEventAsync(messageD2C);  
39|    
40|                          
41|                      }  
42|                      catch (Exception ex)  
43|                      {  
44|                          throw;  
45|                      }  
46|                  }, null);  
47|    
48|              await Task.Delay(-1);  
49|          }  
50|      }  
51|  }  

@bastyuchenko
Copy link
Author

Also, I tried the v2/preview release of the SDK. It looks like v2 SDK works like a charm for my task. Thus, maybe I will move to v2 SDK.

@tmahmood-microsoft that you for your research.

BTW, any information about v2 SDK stable version release date? I'm looking forward to it.

@tmahmood-microsoft
Copy link
Contributor

tmahmood-microsoft commented Mar 29, 2023

Hi @bastyuchenko, yes, completing the message before the send the event would be the easiest solution to this issue.
Another workaround for this would be using a global lock to ensure that only one message is processed at a time. Specifically, sending the event and completing the message within a semaphore lock.

We do not have an exact date for the v2 stable release, but it should be within a month.

@bastyuchenko
Copy link
Author

bastyuchenko commented Apr 12, 2024

Hi @tmahmood-microsoft,

I still use v2 SDK but this version still in preview state. Moreover, I don't see much activity in preview/v2 branch unlike main branch with v1 SDK. v1 SDK received a lot of updates, issue fixes and improvement but I'm not sure that all this improvements and fixes were merged to or implemented in preview/v2 branch.

I'm not sure that Microsoft is going to proceed of developing v2 further or release stable version whenever in the future.
Maybe it is better for me move back from preview/v2 to v1 ?

Could you please shed light a little bit on the situation with SDK v2?

@bastyuchenko bastyuchenko reopened this Apr 17, 2024
@ryanwinter
Copy link
Contributor

Hi @bastyuchenko,

We stopped development on the v2 SDK preview earlier this year and have decided to continue our efforts to support the v1 SDK. It was a hard decision but we think this is the better decision to avoid supporting multiple major versions.

Please use the v1 SDK, as this will continue to receive critical and security bugs, and thankyou for your support in testing the v2 SDK preview.

Thanks
Ryan

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working. fix-checked-in Fix checked into main or preview, but not yet released. IoTSDK Tracks all IoT SDK issues across the board
Projects
None yet
Development

No branches or pull requests

3 participants