Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed issues related to store_offset and next_offset during Barrier op in consume batch API #4208

Merged
merged 17 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ librdkafka v2.0.3 is a bugfix release:
the timeout to be reset (#4176).
* Fix seek partition timeout, was one thousand times lower than the passed
value (#4230).
* Batch consumer fixes: TODO: describe (#4208).


## Fixes
Expand Down
24 changes: 24 additions & 0 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -1148,3 +1148,27 @@ void rd_kafka_offset_store_init(rd_kafka_toppar_t *rktp) {

rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_OFFSET_STORE;
}


/**
* Update toppar app_offset and store_offset (if enabled) to the provided
* offset.
*
*/
void rd_kafka_update_app_offset(rd_kafka_t *rk,
rd_kafka_toppar_t *rktp,
int64_t offset,
rd_dolock_t do_lock) {

if (do_lock)
rd_kafka_toppar_lock(rktp);

rktp->rktp_app_offset = offset;
if (rk->rk_conf.enable_auto_offset_store)
rd_kafka_offset_store0(rktp, offset,
/* force: ignore assignment state */
rd_true, RD_DONT_LOCK);

if (do_lock)
rd_kafka_toppar_unlock(rktp);
}
5 changes: 5 additions & 0 deletions src/rdkafka_offset.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,9 @@ void rd_kafka_offset_reset(rd_kafka_toppar_t *rktp,

void rd_kafka_offset_query_tmr_cb(rd_kafka_timers_t *rkts, void *arg);

void rd_kafka_update_app_offset(rd_kafka_t *rk,
rd_kafka_toppar_t *rktp,
int64_t offset,
rd_dolock_t do_lock);

#endif /* _RDKAFKA_OFFSET_H_ */
8 changes: 1 addition & 7 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -923,11 +923,5 @@ void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, rd_kafka_op_t *rko) {

offset = rko->rko_u.fetch.rkm.rkm_rkmessage.offset + 1;

rd_kafka_toppar_lock(rktp);
rktp->rktp_app_offset = offset;
if (rk->rk_conf.enable_auto_offset_store)
rd_kafka_offset_store0(rktp, offset,
/* force: ignore assignment state */
rd_true, RD_DONT_LOCK);
rd_kafka_toppar_unlock(rktp);
rd_kafka_update_app_offset(rk, rktp, offset, RD_DO_LOCK);
}
69 changes: 53 additions & 16 deletions src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -539,15 +539,17 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq,
*
* @locality Any thread.
*/
static size_t rd_kafka_purge_outdated_messages(rd_kafka_toppar_t *rktp,
int32_t version,
rd_kafka_message_t **rkmessages,
size_t cnt) {
static size_t
rd_kafka_purge_outdated_messages(rd_kafka_toppar_t *rktp,
int32_t version,
rd_kafka_message_t **rkmessages,
size_t cnt,
struct rd_kafka_op_tailq *ctrl_msg_q) {
size_t valid_count = 0;
size_t i;
rd_kafka_op_t *rko, *next;

for (i = 0; i < cnt; i++) {
rd_kafka_op_t *rko;
rko = rkmessages[i]->_private;
if (rko->rko_rktp == rktp &&
rd_kafka_op_version_outdated(rko, version)) {
Expand All @@ -559,6 +561,19 @@ static size_t rd_kafka_purge_outdated_messages(rd_kafka_toppar_t *rktp,
valid_count++;
}
}

/* Discard outdated control msgs ops */
next = TAILQ_FIRST(ctrl_msg_q);
while (next) {
rko = next;
next = TAILQ_NEXT(rko, rko_link);
if (rko->rko_rktp == rktp &&
rd_kafka_op_version_outdated(rko, version)) {
TAILQ_REMOVE(ctrl_msg_q, rko, rko_link);
rd_kafka_op_destroy(rko);
}
}

return valid_count;
}

Expand All @@ -577,10 +592,13 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
size_t rkmessages_size) {
unsigned int cnt = 0;
TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
struct rd_kafka_op_tailq ctrl_msg_q =
TAILQ_HEAD_INITIALIZER(ctrl_msg_q);
rd_kafka_op_t *rko, *next;
rd_kafka_t *rk = rkq->rkq_rk;
rd_kafka_q_t *fwdq;
struct timespec timeout_tspec;
int i;

mtx_lock(&rkq->rkq_lock);
if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
Expand Down Expand Up @@ -624,7 +642,8 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,

if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) {
cnt = (unsigned int)rd_kafka_purge_outdated_messages(
rko->rko_rktp, rko->rko_version, rkmessages, cnt);
rko->rko_rktp, rko->rko_version, rkmessages, cnt,
&ctrl_msg_q);
rd_kafka_op_destroy(rko);
continue;
}
Expand All @@ -649,22 +668,27 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
}
rd_dassert(res == RD_KAFKA_OP_RES_PASS);

if (!rko->rko_err && rko->rko_type == RD_KAFKA_OP_FETCH) {
/* Store offset, etc. */
rd_kafka_fetch_op_app_prepare(rk, rko);

/* If this is a control messages, don't return
* message to application, only store the offset */
if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) {
rd_kafka_op_destroy(rko);
continue;
}
/* If this is a control messages, don't return message to
* application. Add it to a tmp queue from where we can store
* the offset and destroy the op */
if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) {
TAILQ_INSERT_TAIL(&ctrl_msg_q, rko, rko_link);
continue;
}

/* Get rkmessage from rko and append to array. */
rkmessages[cnt++] = rd_kafka_message_get(rko);
}

for (i = cnt - 1; i >= 0; i--) {
rko = (rd_kafka_op_t *)rkmessages[i]->_private;
rd_kafka_toppar_t *rktp = rko->rko_rktp;
int64_t offset = rkmessages[i]->offset + 1;
if (unlikely(rktp->rktp_app_offset < offset))
rd_kafka_update_app_offset(rk, rktp, offset,
RD_DO_LOCK);
}

/* Discard non-desired and already handled ops */
next = TAILQ_FIRST(&tmpq);
while (next) {
Expand All @@ -673,6 +697,19 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
rd_kafka_op_destroy(rko);
}

/* Discard ctrl msgs */
next = TAILQ_FIRST(&ctrl_msg_q);
while (next) {
rko = next;
next = TAILQ_NEXT(next, rko_link);
rd_kafka_toppar_t *rktp = rko->rko_rktp;
int64_t offset = rko->rko_u.fetch.rkm.rkm_rkmessage.offset + 1;
if (rktp->rktp_app_offset < offset)
rd_kafka_update_app_offset(rk, rktp, offset,
RD_DO_LOCK);
rd_kafka_op_destroy(rko);
}

rd_kafka_app_polled(rk);

return cnt;
Expand Down
4 changes: 2 additions & 2 deletions tests/0080-admin_ut.c
Original file line number Diff line number Diff line change
Expand Up @@ -588,10 +588,10 @@ static void do_test_ListConsumerGroups(const char *what,
err ? errstr2 : "n/a");

errors = rd_kafka_ListConsumerGroups_result_errors(rkev, &errors_cnt);
TEST_ASSERT(errors_cnt == 1, "expected one error, got %" PRIu64,
TEST_ASSERT(errors_cnt == 1, "expected one error, got %" PRIusz,
errors_cnt);
rd_kafka_ListConsumerGroups_result_valid(rkev, &valid_cnt);
TEST_ASSERT(valid_cnt == 0, "expected zero valid groups, got %" PRIu64,
TEST_ASSERT(valid_cnt == 0, "expected zero valid groups, got %" PRIusz,
valid_cnt);

err = rd_kafka_error_code(errors[0]);
Expand Down
2 changes: 2 additions & 0 deletions tests/0122-buffer_cleaning_after_rebalance.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ static int consumer_batch_queue(void *arg) {
rd_kafka_message_destroy(rkmessage[i]);
}

free(rkmessage);

return 0;
}

Expand Down
Loading