Skip to content

Commit

Permalink
support LeaveGroupRequest version 3
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfchimneyrock committed Aug 22, 2023
1 parent 49f180a commit 1b309fa
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 9 deletions.
3 changes: 2 additions & 1 deletion src/rdkafka_cgrp.c
Original file line number Diff line number Diff line change
Expand Up @@ -878,7 +878,8 @@ static void rd_kafka_cgrp_leave(rd_kafka_cgrp_t *rkcg) {
rd_rkb_dbg(rkcg->rkcg_curr_coord, CONSUMER, "LEAVE",
"Leaving group");
rd_kafka_LeaveGroupRequest(
rkcg->rkcg_coord, rkcg->rkcg_group_id->str, member_id,
rkcg->rkcg_coord, rkcg->rkcg_group_id, member_id,
rkcg->rkcg_group_instance_id,
RD_KAFKA_REPLYQ(rkcg->rkcg_ops, 0),
rd_kafka_cgrp_handle_LeaveGroup, rkcg);
} else
Expand Down
31 changes: 25 additions & 6 deletions src/rdkafka_request.c
Original file line number Diff line number Diff line change
Expand Up @@ -1828,8 +1828,9 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb,
* Send LeaveGroupRequest
*/
void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb,
const char *group_id,
const char *member_id,
const rd_kafkap_str_t *group_id,
char *member_id,
const rd_kafkap_str_t *group_instance_id,
rd_kafka_replyq_t replyq,
rd_kafka_resp_cb_t *resp_cb,
void *opaque) {
Expand All @@ -1838,12 +1839,22 @@ void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb,
int features;

ApiVersion = rd_kafka_broker_ApiVersion_supported(
rkb, RD_KAFKAP_LeaveGroup, 0, 1, &features);
rkb, RD_KAFKAP_LeaveGroup, 0, 3, &features);

rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup, 1, 300);
rkbuf = rd_kafka_buf_new_request(rkb, RD_KAFKAP_LeaveGroup, 1,
RD_KAFKAP_STR_SIZE(group_id) +
4 /* array count members */ +
strlen(member_id) + 2 +
RD_KAFKAP_STR_SIZE(group_instance_id));

rd_kafka_buf_write_str(rkbuf, group_id, -1);
rd_kafka_buf_write_str(rkbuf, member_id, -1);
rd_kafka_buf_write_kstr(rkbuf, group_id);
if (ApiVersion <= 2) {
rd_kafka_buf_write_str(rkbuf, member_id, -1);
} else {
rd_kafka_buf_write_arraycnt(rkbuf, 1);
rd_kafka_buf_write_str(rkbuf, member_id, -1);
rd_kafka_buf_write_kstr(rkbuf, group_instance_id);
}

rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0);

Expand Down Expand Up @@ -1878,8 +1889,16 @@ void rd_kafka_handle_LeaveGroup(rd_kafka_t *rk,
goto err;
}


if (request->rkbuf_reqhdr.ApiVersion >= 1)
rd_kafka_buf_read_throttle_time(rkbuf);

rd_kafka_buf_read_i16(rkbuf, &ErrorCode);

/* for ApiVersion >= 3 a list of members who left the group is
* present in the rest of rkbuf, but isn't used here.
*/

err:
actions = rd_kafka_err_action(rkb, ErrorCode, request,
RD_KAFKA_ERR_ACTION_END);
Expand Down
5 changes: 3 additions & 2 deletions src/rdkafka_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,8 +199,9 @@ void rd_kafka_JoinGroupRequest(rd_kafka_broker_t *rkb,


void rd_kafka_LeaveGroupRequest(rd_kafka_broker_t *rkb,
const char *group_id,
const char *member_id,
const rd_kafkap_str_t *group_id,
char *member_id,
const rd_kafkap_str_t *group_instance_id,
rd_kafka_replyq_t replyq,
rd_kafka_resp_cb_t *resp_cb,
void *opaque);
Expand Down

0 comments on commit 1b309fa

Please sign in to comment.