Skip to content

Commit

Permalink
Fixed issues related to store_offset and app_offset during barrier op…
Browse files Browse the repository at this point in the history
… in consume batch API (#4208)

these offsets were set before the batch was completed, making it possible to commit
offsets that had not been received by the application, or to skip messages after a resume
  • Loading branch information
pranavrth authored Mar 22, 2023
1 parent 2bdd39e commit 5139719
Show file tree
Hide file tree
Showing 8 changed files with 501 additions and 120 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ librdkafka v2.0.3 is a bugfix release:
the timeout to be reset (#4176).
* Fix seek partition timeout, was one thousand times lower than the passed
value (#4230).
* Batch consumer fixes: TODO: describe (#4208).


## Fixes
Expand Down
24 changes: 24 additions & 0 deletions src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -1148,3 +1148,27 @@ void rd_kafka_offset_store_init(rd_kafka_toppar_t *rktp) {

rktp->rktp_flags |= RD_KAFKA_TOPPAR_F_OFFSET_STORE;
}


/**
* Update toppar app_offset and store_offset (if enabled) to the provided
* offset.
*
*/
void rd_kafka_update_app_offset(rd_kafka_t *rk,
rd_kafka_toppar_t *rktp,
int64_t offset,
rd_dolock_t do_lock) {

if (do_lock)
rd_kafka_toppar_lock(rktp);

rktp->rktp_app_offset = offset;
if (rk->rk_conf.enable_auto_offset_store)
rd_kafka_offset_store0(rktp, offset,
/* force: ignore assignment state */
rd_true, RD_DONT_LOCK);

if (do_lock)
rd_kafka_toppar_unlock(rktp);
}
5 changes: 5 additions & 0 deletions src/rdkafka_offset.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,9 @@ void rd_kafka_offset_reset(rd_kafka_toppar_t *rktp,

void rd_kafka_offset_query_tmr_cb(rd_kafka_timers_t *rkts, void *arg);

void rd_kafka_update_app_offset(rd_kafka_t *rk,
rd_kafka_toppar_t *rktp,
int64_t offset,
rd_dolock_t do_lock);

#endif /* _RDKAFKA_OFFSET_H_ */
8 changes: 1 addition & 7 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -923,11 +923,5 @@ void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, rd_kafka_op_t *rko) {

offset = rko->rko_u.fetch.rkm.rkm_rkmessage.offset + 1;

rd_kafka_toppar_lock(rktp);
rktp->rktp_app_offset = offset;
if (rk->rk_conf.enable_auto_offset_store)
rd_kafka_offset_store0(rktp, offset,
/* force: ignore assignment state */
rd_true, RD_DONT_LOCK);
rd_kafka_toppar_unlock(rktp);
rd_kafka_update_app_offset(rk, rktp, offset, RD_DO_LOCK);
}
69 changes: 53 additions & 16 deletions src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -539,15 +539,17 @@ int rd_kafka_q_serve(rd_kafka_q_t *rkq,
*
* @locality Any thread.
*/
static size_t rd_kafka_purge_outdated_messages(rd_kafka_toppar_t *rktp,
int32_t version,
rd_kafka_message_t **rkmessages,
size_t cnt) {
static size_t
rd_kafka_purge_outdated_messages(rd_kafka_toppar_t *rktp,
int32_t version,
rd_kafka_message_t **rkmessages,
size_t cnt,
struct rd_kafka_op_tailq *ctrl_msg_q) {
size_t valid_count = 0;
size_t i;
rd_kafka_op_t *rko, *next;

for (i = 0; i < cnt; i++) {
rd_kafka_op_t *rko;
rko = rkmessages[i]->_private;
if (rko->rko_rktp == rktp &&
rd_kafka_op_version_outdated(rko, version)) {
Expand All @@ -559,6 +561,19 @@ static size_t rd_kafka_purge_outdated_messages(rd_kafka_toppar_t *rktp,
valid_count++;
}
}

/* Discard outdated control msgs ops */
next = TAILQ_FIRST(ctrl_msg_q);
while (next) {
rko = next;
next = TAILQ_NEXT(rko, rko_link);
if (rko->rko_rktp == rktp &&
rd_kafka_op_version_outdated(rko, version)) {
TAILQ_REMOVE(ctrl_msg_q, rko, rko_link);
rd_kafka_op_destroy(rko);
}
}

return valid_count;
}

Expand All @@ -577,10 +592,13 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
size_t rkmessages_size) {
unsigned int cnt = 0;
TAILQ_HEAD(, rd_kafka_op_s) tmpq = TAILQ_HEAD_INITIALIZER(tmpq);
struct rd_kafka_op_tailq ctrl_msg_q =
TAILQ_HEAD_INITIALIZER(ctrl_msg_q);
rd_kafka_op_t *rko, *next;
rd_kafka_t *rk = rkq->rkq_rk;
rd_kafka_q_t *fwdq;
struct timespec timeout_tspec;
int i;

mtx_lock(&rkq->rkq_lock);
if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
Expand Down Expand Up @@ -624,7 +642,8 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,

if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) {
cnt = (unsigned int)rd_kafka_purge_outdated_messages(
rko->rko_rktp, rko->rko_version, rkmessages, cnt);
rko->rko_rktp, rko->rko_version, rkmessages, cnt,
&ctrl_msg_q);
rd_kafka_op_destroy(rko);
continue;
}
Expand All @@ -649,22 +668,27 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
}
rd_dassert(res == RD_KAFKA_OP_RES_PASS);

if (!rko->rko_err && rko->rko_type == RD_KAFKA_OP_FETCH) {
/* Store offset, etc. */
rd_kafka_fetch_op_app_prepare(rk, rko);

/* If this is a control messages, don't return
* message to application, only store the offset */
if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) {
rd_kafka_op_destroy(rko);
continue;
}
/* If this is a control messages, don't return message to
* application. Add it to a tmp queue from where we can store
* the offset and destroy the op */
if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) {
TAILQ_INSERT_TAIL(&ctrl_msg_q, rko, rko_link);
continue;
}

/* Get rkmessage from rko and append to array. */
rkmessages[cnt++] = rd_kafka_message_get(rko);
}

for (i = cnt - 1; i >= 0; i--) {
rko = (rd_kafka_op_t *)rkmessages[i]->_private;
rd_kafka_toppar_t *rktp = rko->rko_rktp;
int64_t offset = rkmessages[i]->offset + 1;
if (unlikely(rktp->rktp_app_offset < offset))
rd_kafka_update_app_offset(rk, rktp, offset,
RD_DO_LOCK);
}

/* Discard non-desired and already handled ops */
next = TAILQ_FIRST(&tmpq);
while (next) {
Expand All @@ -673,6 +697,19 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
rd_kafka_op_destroy(rko);
}

/* Discard ctrl msgs */
next = TAILQ_FIRST(&ctrl_msg_q);
while (next) {
rko = next;
next = TAILQ_NEXT(next, rko_link);
rd_kafka_toppar_t *rktp = rko->rko_rktp;
int64_t offset = rko->rko_u.fetch.rkm.rkm_rkmessage.offset + 1;
if (rktp->rktp_app_offset < offset)
rd_kafka_update_app_offset(rk, rktp, offset,
RD_DO_LOCK);
rd_kafka_op_destroy(rko);
}

rd_kafka_app_polled(rk);

return cnt;
Expand Down
4 changes: 2 additions & 2 deletions tests/0080-admin_ut.c
Original file line number Diff line number Diff line change
Expand Up @@ -588,10 +588,10 @@ static void do_test_ListConsumerGroups(const char *what,
err ? errstr2 : "n/a");

errors = rd_kafka_ListConsumerGroups_result_errors(rkev, &errors_cnt);
TEST_ASSERT(errors_cnt == 1, "expected one error, got %" PRIu64,
TEST_ASSERT(errors_cnt == 1, "expected one error, got %" PRIusz,
errors_cnt);
rd_kafka_ListConsumerGroups_result_valid(rkev, &valid_cnt);
TEST_ASSERT(valid_cnt == 0, "expected zero valid groups, got %" PRIu64,
TEST_ASSERT(valid_cnt == 0, "expected zero valid groups, got %" PRIusz,
valid_cnt);

err = rd_kafka_error_code(errors[0]);
Expand Down
2 changes: 2 additions & 0 deletions tests/0122-buffer_cleaning_after_rebalance.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ static int consumer_batch_queue(void *arg) {
rd_kafka_message_destroy(rkmessage[i]);
}

free(rkmessage);

return 0;
}

Expand Down
Loading

0 comments on commit 5139719

Please sign in to comment.