Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15755: LeaveGroupResponse v0 - v2 loses its member under certain error conditions #14635

Merged
merged 5 commits into from
Nov 16, 2023

Conversation

wolfchimneyrock
Copy link
Contributor

@wolfchimneyrock wolfchimneyrock commented Oct 25, 2023

KAFKA-14367 introduced this check, but we have seen since upgrading to broker 3.4.1 with both Sarama and Librdkafka clients when issuing LeaveGroup and getting an error, the single member gets lost somewhere in the broker between the request and the response.

instead of raising an exception when there are 0 members in the response, let it pass like it did before KAFKA-14367.

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

KIP-848 introduced this check, but we have seen with both Sarama and Librdkafka clients when issuing LeaveGroup and getting an error, the single member gets lost somewhere in the broker between the request and the response.

instead of raising an exception when there are 0 members in the response, let it pass like it did before KIP-848.
@dajac
Copy link
Contributor

dajac commented Oct 25, 2023

@wolfchimneyrock Thanks for the PR. I am not sure to fully understand the issue yet. Could you please elaborate a bit more about the condition leading to it?

@wolfchimneyrock
Copy link
Contributor Author

@wolfchimneyrock Thanks for the PR. I am not to fully understand the issue yet. Could you please elaborate a bit more about the condition leading to it?

there is more context here:

confluentinc/librdkafka#4402

@wolfchimneyrock
Copy link
Contributor Author

wolfchimneyrock commented Oct 25, 2023

I'm not sure I can elaborate on what the underlying error response that would have been sent with the LeaveGroup is, since the current code raises that exception without printing the topLevelError

also it is very rare, out of around 800k LeaveGroup requests per week since our update to broker 3.4.1 I have seen this 400 times.

@wolfchimneyrock
Copy link
Contributor Author

wolfchimneyrock commented Oct 25, 2023

Here is a broker trace:

[2023-10-24 01:17:17,214] ERROR [KafkaApi-28598] Unexpected error handling request RequestHeader(apiKey=LEAVE_GROUP, apiVersion=1, clientId=REDACTED, correlationId=116775, headerVersion=1) -- LeaveGroupRequestData(groupId=REDACTED, memberId='REDACTED-73967453-93c4-4f3f-bcef-32c1f280350f', members=[]) with context RequestContext(header=RequestHeader(apiKey=LEAVE_GROUP, apiVersion=1, clientId=REDACTED, correlationId=116775, headerVersion=1), connectionId='REDACTED', clientAddress=/REDACTED, principal=REDACTED, listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=confluent-kafka-python, softwareVersion=1.7.0-rdkafka-1.7.0), fromPrivilegedListener=false, principalSerde=Optional[REDACTED]) (kafka.server.KafkaApis)
java.util.concurrent.CompletionException: org.apache.kafka.common.errors.UnsupportedVersionException: LeaveGroup response version 1 can only contain one member, got 0 members.
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
	at java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:936)
	at java.base/java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:950)
	at java.base/java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2340)
	at kafka.server.KafkaApis.handleLeaveGroupRequest(KafkaApis.scala:1796)
	at kafka.server.KafkaApis.handle(KafkaApis.scala:196)
	at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: LeaveGroup response version 1 can only contain one member, got 0 members.

I see that the request has memberId set and members=[] as expected with an apiVersion=1 message ... maybe the LeaveGroupResponse just sees members=[] and isn't correctly populating the members list with the given memberId from the LeaveGroupRequest

@dajac
Copy link
Contributor

dajac commented Oct 26, 2023

Thanks @wolfchimneyrock. We will take a look into it.

@dajac dajac added the KIP-848 The Next Generation of the Consumer Rebalance Protocol label Oct 26, 2023
@dajac
Copy link
Contributor

dajac commented Oct 27, 2023

@wolfchimneyrock Thanks again for the PR. We verified and your suggestion makes sense. I have two asks:

  1. Could you please file a bug and use the the jira id in the PR?
  2. Would it be possible to add a small unit test for the fix?

@wolfchimneyrock
Copy link
Contributor Author

@wolfchimneyrock Thanks again for the PR. We verified and your suggestion makes sense. I have two asks:

  1. Could you please file a bug and use the the jira id in the PR?
  2. Would it be possible to add a small unit test for the fix?

sure, I've applied for an apache JIRA account, and I'll start on a unit test.

@wolfchimneyrock wolfchimneyrock changed the title MINOR: LeaveGroupResponse v0 - v2 loses its member under certain error conditions KAFKA-15755: LeaveGroupResponse v0 - v2 loses its member under certain error conditions Oct 30, 2023
Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfchimneyrock Thanks for the PR! I left a few comments for consideration.

@@ -62,7 +62,7 @@ public LeaveGroupResponse(LeaveGroupResponseData data, short version) {
if (version >= 3) {
this.data = data;
} else {
if (data.members().size() != 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking a bit more into this issue. I think that we could harden a bit the logic because we could effectively get no members only when there is a top level error. How about this?

            if (data.errorCode() != Errors.NONE.code()) {
                this.data = new LeaveGroupResponseData().setErrorCode(data.errorCode());
            } else {
                if (data.members().size() != 1) {
                    throw new UnsupportedVersionException("LeaveGroup response version " + version +
                        " can only contain one member, got " + data.members().size() + " members.");
                }
                this.data = new LeaveGroupResponseData().setErrorCode(data.members().get(0).errorCode());
            }

@dongnuo123 What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use

if (data.errorCode() != Errors.NONE.code() && data.members().size() == 0) {
   this.data = new LeaveGroupResponseData().setErrorCode(data.errorCode());
} else {...

I can't think of a situation where the top level error code is set and the response has multiple members but just to make sure.

@@ -165,4 +167,33 @@ public void testEqualityWithMemberResponses() {
assertEquals(primaryResponse.hashCode(), reversedResponse.hashCode());
}
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You could use the following to parameterized the test.

@ParameterizedTest
@ApiKeyVersionsSource(apiKey = ApiKeys.LEAVE_GROUP)

@dajac
Copy link
Contributor

dajac commented Nov 9, 2023

@wolfchimneyrock Would you have time to address my comments in the coming days?

@wolfchimneyrock
Copy link
Contributor Author

@wolfchimneyrock Would you have time to address my comments in the coming days?

yes, though I thought you were waiting on comment from @dongnuo123

@dajac
Copy link
Contributor

dajac commented Nov 9, 2023

@wolfchimneyrock Understood. If you agree with my suggestion, I think that we could just do it. Sorry for the confusion.

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wolfchimneyrock Thanks for the update! I left a few nits for consideration.


if (version < 3) {
assertThrows(UnsupportedVersionException.class,
() -> new LeaveGroupResponse(data, version));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we indent this line with 4 spaces in order to remain consistent with the existing code?

() -> new LeaveGroupResponse(data, version));
} else {
LeaveGroupResponse response = new LeaveGroupResponse(data, version);
assertEquals(Errors.NONE, response.topLevelError());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we also assert the members here?


if (version < 3) {
assertThrows(UnsupportedVersionException.class,
() -> new LeaveGroupResponse(data, version));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Could we also intend this one with 4 spaces?

() -> new LeaveGroupResponse(data, version));
} else {
LeaveGroupResponse response = new LeaveGroupResponse(data, version);
assertEquals(Errors.NONE, response.topLevelError());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should we also assert members here?

@wolfchimneyrock
Copy link
Contributor Author

looks like all the jenkins agents ran out of disk space. After your comments, the tests still pass locally for me.

Copy link
Contributor

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! thanks for the patch, @wolfchimneyrock. Waiting on the build to merge it.

@dajac
Copy link
Contributor

dajac commented Nov 15, 2023

I cannot get a clean build for this one. @wolfchimneyrock Could you please merge trunk to your branch?

@dajac
Copy link
Contributor

dajac commented Nov 15, 2023

@wolfchimneyrock I just did it. Let's see how the next build goes.

@dajac dajac merged commit 3fd6293 into apache:trunk Nov 16, 2023
1 check failed
dajac pushed a commit that referenced this pull request Nov 16, 2023
…n error conditions (#14635)

This patch fixes a bug in the LeaveGroupResponse construction. Basically, when a top level error is set, no members are expected but the current check always requires one for versions prior to version 3.

Reviewers: David Jacot <djacot@confluent.io>
(cherry picked from commit 3fd6293)
dajac pushed a commit that referenced this pull request Nov 16, 2023
…n error conditions (#14635)

This patch fixes a bug in the LeaveGroupResponse construction. Basically, when a top level error is set, no members are expected but the current check always requires one for versions prior to version 3.

Reviewers: David Jacot <djacot@confluent.io>
(cherry picked from commit 3fd6293)
dajac pushed a commit that referenced this pull request Nov 16, 2023
…n error conditions (#14635)

This patch fixes a bug in the LeaveGroupResponse construction. Basically, when a top level error is set, no members are expected but the current check always requires one for versions prior to version 3.

Reviewers: David Jacot <djacot@confluent.io>
(cherry picked from commit 3fd6293)
@dajac
Copy link
Contributor

dajac commented Nov 16, 2023

Merged to trunk, 3.6, 3.5 and 3.4.

mjsax pushed a commit to confluentinc/kafka that referenced this pull request Nov 22, 2023
…n error conditions (apache#14635)

This patch fixes a bug in the LeaveGroupResponse construction. Basically, when a top level error is set, no members are expected but the current check always requires one for versions prior to version 3.

Reviewers: David Jacot <djacot@confluent.io>
(cherry picked from commit 3fd6293)
rreddy-22 pushed a commit to rreddy-22/kafka-rreddy that referenced this pull request Jan 2, 2024
…n error conditions (apache#14635)

This patch fixes a bug in the LeaveGroupResponse construction. Basically, when a top level error is set, no members are expected but the current check always requires one for versions prior to version 3.

Reviewers: David Jacot <djacot@confluent.io>
yyu1993 pushed a commit to yyu1993/kafka that referenced this pull request Feb 15, 2024
…n error conditions (apache#14635)

This patch fixes a bug in the LeaveGroupResponse construction. Basically, when a top level error is set, no members are expected but the current check always requires one for versions prior to version 3.

Reviewers: David Jacot <djacot@confluent.io>
clolov pushed a commit to clolov/kafka that referenced this pull request Apr 5, 2024
…n error conditions (apache#14635)

This patch fixes a bug in the LeaveGroupResponse construction. Basically, when a top level error is set, no members are expected but the current check always requires one for versions prior to version 3.

Reviewers: David Jacot <djacot@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
KIP-848 The Next Generation of the Consumer Rebalance Protocol
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants