Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka messages retireved from Outbox are sent with null bodies if PublishRawJson is used #1111

Closed
WORMrus opened this issue Oct 31, 2024 · 1 comment · Fixed by #1120
Closed

Comments

@WORMrus
Copy link

WORMrus commented Oct 31, 2024

Describe the bug
Basically the title. I dug around the sources and likely found the exact reason why that happens. Please see below

To Reproduce
Steps to reproduce the behavior:

  1. Set up an app with an outgoing Kafka message
  2. Enable the outbox for it
  3. Use PublishRawJson() for that message
  4. Send a message such that it gets written to the outbox but can't be delivered (e.g. stop Kafka)
  5. (Important) Restart the app so the message is retrieved from the storage (I used Postgress, but nothing seems to indicate the importance of Postgress specifically)
  6. Reconnect the app with Kafka (e.g. restart Kafka)
  7. Check the message being delivered. It's null

Expected behavior
The message gets delivered with its actual content instead of null.

Note that it works if you skip step 5. In this case, the original in-memory object is used. It have the needed properties set and everything works correctly.

Additional context
I believe, the root cause of this is actually in two places.

EnvelopeSerializer.Deserialize

At some point, MessageDatabase.LoadOutgoingAsync() checks the outbox and loads whatever is in there. To create Envelopes it uses EnvelopeSerializer.Deserialize(). Digging deeper into its readSingle(BinaryReader br) we can see that the Envelope object is created as follows:

var msg = new Envelope
{
SentAt = DateTime.FromBinary(br.ReadInt64())
};

This .ctor does not set the Message property and nothing in the deserializer apparently does. I've tried putting a conditional breakpoint on the Envelope.Message setter. It is indeed never accessed.

KafkaSenderProtocol.SendBatchAsync()

When SendBatchAsync attempts to send a message, it tries to create a Message instance from a batch of Envelops that, in our case, were recovered from the storage. To do this, in my case, JsonOnlyMapper.MapEnvelopeToOutgoing is used here:

internal static Message<string, string> CreateMessage(this IKafkaEnvelopeMapper mapper, Envelope envelope)
{
var message = new Message<string, string>
{
Key = !string.IsNullOrEmpty(envelope.PartitionKey) ? envelope.PartitionKey : envelope.Id.ToString(),
Value = Encoding.Default.GetString(envelope.Data),
Headers = new Headers()
};
mapper.MapEnvelopeToOutgoing(envelope, message);
return message;
}

This method requiers that Envelope.Message is set, otherwise it produces null in the resulting Message.Value thus overriding the perfectly good (at least for my case) value that was set by the caller of the mapper.

Probably the deserializer needs to check Envelope.Message for null and fall back to using Encoding.Default.GetString(envelope.Data) like its caller does?

@jeremydmiller
Copy link
Member

@WORMrus Sigh, we had a similar issue recently w/ Rabbit MQ and I should have looked through all the transports. I'm rolling up a fix for 3.1.1 shortly. You did all the leg work here though, and thanks for that!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants