Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt-kafka single group support cont #731

Merged
merged 12 commits into from
Jan 17, 2024
Merged

Mqtt-kafka single group support cont #731

merged 12 commits into from
Jan 17, 2024

Conversation

akrambek
Copy link
Contributor

@akrambek akrambek commented Jan 11, 2024

Description

Mqtt-kafka single group support cont

Fixes #698

@akrambek akrambek changed the base branch from develop to feature/mqtt-kafka January 11, 2024 23:54
Copy link
Contributor

@jfallows jfallows left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need scenario(s) to check the behavior when producer id changes on the same client produce stream, either to a different producer id value or to no producer id.

This will happen frequently if multiple clients are producing at a mixture of qos to the same kafka topic such as mqtt-messages.

@@ -542,11 +545,21 @@ private int flushRecordInit(
final int maxEncodeableBytes = client.encodeSlotLimit + client.valueCompleteSize + produceRecordFramingSize;

if (client.encodeSlot != NO_SLOT &&
maxEncodeableBytes > encodePool.slotCapacity())
(maxEncodeableBytes > encodePool.slotCapacity() ||
client.producerId != producerId && client.producerEpoch != producerEpoch))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
client.producerId != producerId && client.producerEpoch != producerEpoch))
client.producerId != producerId || client.producerEpoch != producerEpoch))

only need one of these to differ to trigger request since both are expected to be consistent for entire batch, right?

.baseSequence(RECORD_BATCH_SEQUENCE_NONE)
.producerId(client.producerId)
.producerEpoch(client.producerEpoch)
.baseSequence(sequence)
.recordCount(encodeableRecordCount)
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to reset producerId and producerEpoch after encoding request so that next request can batch as expected?

{
client.doEncodeRequestIfNecessary(traceId, budgetId);
}

if (client.producerId == RECORD_BATCH_PRODUCER_ID_NONE ||
client.producerId != producerId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be && instead?

Please remove extra whitespace around !=.

if (client.producerId == RECORD_BATCH_PRODUCER_ID_NONE ||
client.producerId != producerId)
{
client.sequence = sequence;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest we rename to client.baseSequence.

.typeId(zilla:id("kafka"))
.produce()
.topic("test")
.partition(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to match producerId and producerEpoch here too, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is part of dataEx I have filled missing ones. We can't send producerId and producerEpoch as part of beginEx since we don't know the partitions and we need partitions to fetch.

.typeId(zilla:id("kafka"))
.produce()
.topic("test")
.partition(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

producerId and producerEpoch?

Copy link
Contributor

@jfallows jfallows left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks pretty good.

Please add a negative test to reject gap in sequence for non-zero producerId in client produce factory.

Please add a test for replaying sequence from earlier value, but still no gaps.

.merged()
.produce()
.key("a")
.hashKey("key7")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does produce flush need both key and hashKey or just hashKey?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@jfallows jfallows merged commit 380e1a8 into aklivity:feature/mqtt-kafka Jan 17, 2024
3 checks passed
@jfallows jfallows linked an issue Jan 17, 2024 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

mqtt-kafka binding uses 2 different consumer groups per mqtt client
2 participants