Skip to content

Commit

Permalink
Moved store offsets call at the end of the consume batch api
Browse files Browse the repository at this point in the history
  • Loading branch information
pranavrth committed Feb 22, 2023
1 parent 076405e commit fa0a24e
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 8 deletions.
4 changes: 2 additions & 2 deletions src/rdkafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -3003,7 +3003,7 @@ static rd_kafka_op_res_t rd_kafka_consume_cb(rd_kafka_t *rk,

rkmessage = rd_kafka_message_get(rko);

rd_kafka_fetch_op_app_prepare(rk, rko);
rd_kafka_fetch_op_app_prepare(rk, rko, rd_true /*defautl*/);

ctx->consume_cb(rkmessage, ctx->opaque);

Expand Down Expand Up @@ -3136,7 +3136,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) {
rkmessage = rd_kafka_message_get(rko);

/* Store offset, etc */
rd_kafka_fetch_op_app_prepare(rk, rko);
rd_kafka_fetch_op_app_prepare(rk, rko, rd_true /*default*/);

rd_kafka_set_last_error(0, 0);

Expand Down
2 changes: 1 addition & 1 deletion src/rdkafka_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ const rd_kafka_message_t *rd_kafka_event_message_next(rd_kafka_event_t *rkev) {
return NULL;

/* Store offset, etc. */
rd_kafka_fetch_op_app_prepare(NULL, rko);
rd_kafka_fetch_op_app_prepare(NULL, rko, rd_true /* default */);

return rkmessage;

Expand Down
11 changes: 8 additions & 3 deletions src/rdkafka_op.c
Original file line number Diff line number Diff line change
Expand Up @@ -834,7 +834,7 @@ rd_kafka_op_res_t rd_kafka_op_handle_std(rd_kafka_t *rk,
else if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) {
/* Control messages must not be exposed to the application
* but we need to store their offsets. */
rd_kafka_fetch_op_app_prepare(rk, rko);
rd_kafka_fetch_op_app_prepare(rk, rko, rd_true /* default */);
return RD_KAFKA_OP_RES_HANDLED;
} else if (cb_type != RD_KAFKA_Q_CB_EVENT &&
rko->rko_type & RD_KAFKA_OP_CB)
Expand Down Expand Up @@ -903,13 +903,18 @@ rd_kafka_op_res_t rd_kafka_op_handle(rd_kafka_t *rk,
* This must be called just prior to passing/returning a consumed
* message to the application.
*
* @param update_store_offsets Added control for the caller to specify
* whether to store offset or not.
*
* Performs:
* - Store offset for fetched message + 1.
* - Updates the application offset (rktp_app_offset).
*
* @locks rktp_lock and rk_lock MUST NOT be held
*/
void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, rd_kafka_op_t *rko) {
void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk,
rd_kafka_op_t *rko,
rd_bool_t update_store_offsets) {
rd_kafka_toppar_t *rktp;
int64_t offset;

Expand All @@ -925,7 +930,7 @@ void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, rd_kafka_op_t *rko) {

rd_kafka_toppar_lock(rktp);
rktp->rktp_app_offset = offset;
if (rk->rk_conf.enable_auto_offset_store)
if (rk->rk_conf.enable_auto_offset_store && update_store_offsets)
rd_kafka_offset_store0(rktp, offset,
/* force: ignore assignment state */
rd_true, RD_DONT_LOCK);
Expand Down
4 changes: 3 additions & 1 deletion src/rdkafka_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -741,7 +741,9 @@ extern rd_atomic32_t rd_kafka_op_cnt;

void rd_kafka_op_print(FILE *fp, const char *prefix, rd_kafka_op_t *rko);

void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, rd_kafka_op_t *rko);
void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk,
rd_kafka_op_t *rko,
rd_bool_t update_store_offsets);


#define rd_kafka_op_is_ctrl_msg(rko) \
Expand Down
17 changes: 16 additions & 1 deletion src/rdkafka_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
rd_kafka_t *rk = rkq->rkq_rk;
rd_kafka_q_t *fwdq;
struct timespec timeout_tspec;
int i;

mtx_lock(&rkq->rkq_lock);
if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) {
Expand Down Expand Up @@ -651,7 +652,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,

if (!rko->rko_err && rko->rko_type == RD_KAFKA_OP_FETCH) {
/* Store offset, etc. */
rd_kafka_fetch_op_app_prepare(rk, rko);
rd_kafka_fetch_op_app_prepare(rk, rko, rd_false);

/* If this is a control messages, don't return
* message to application, only store the offset */
Expand All @@ -665,6 +666,20 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq,
rkmessages[cnt++] = rd_kafka_message_get(rko);
}

if (rk->rk_conf.enable_auto_offset_store) {
for (i = cnt - 1; i >= 0; i--) {
rko = (rd_kafka_op_t *)rkmessages[i]->_private;
rd_kafka_toppar_t *rktp = rko->rko_rktp;
int64_t offset = rkmessages[i]->offset;
if (unlikely(rktp->rktp_stored_offset <= offset)) {
rd_kafka_offset_store0(
rko->rko_rktp, offset + 1,
/* force: ignore assignment state */
rd_true, RD_DO_LOCK);
}
}
}

/* Discard non-desired and already handled ops */
next = TAILQ_FIRST(&tmpq);
while (next) {
Expand Down
76 changes: 76 additions & 0 deletions tests/0137-barrier_batch_consume.c
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,84 @@ static void do_test_consume_batch_with_seek(void) {
// }


static void do_test_consume_batch_store_offset(void) {
rd_kafka_queue_t *rkq;
const char *topic;
rd_kafka_t *consumer;
int p;
int i;
uint64_t testid;
rd_kafka_conf_t *conf;
consumer_t consumer_args = RD_ZERO_INIT;
test_msgver_t mv;
thrd_t thread_id;
rd_kafka_error_t *err;
const int produce_partition_cnt = 1;
const int timeout_ms = 10000;
const int consume_msg_cnt = 4;
const int no_of_consume = 2;
const int produce_msg_cnt = 8;
const int expected_msg_cnt = produce_msg_cnt;

SUB_TEST();

test_conf_init(&conf, NULL, 60);
test_conf_set(conf, "enable.auto.commit", "false");
test_conf_set(conf, "enable.auto.offset.store", "true");
test_conf_set(conf, "auto.offset.reset", "earliest");

testid = test_id_generate();
test_msgver_init(&mv, testid);

/* Produce messages */
topic = test_mk_topic_name("0137-barrier_batch_consume", 1);

for (p = 0; p < produce_partition_cnt; p++)
test_produce_msgs_easy(topic, testid, p,
produce_msg_cnt / produce_partition_cnt);

for (i = 0; i < no_of_consume; i++) {

/* Create consumers */
consumer = test_create_consumer(topic, NULL,
rd_kafka_conf_dup(conf), NULL);
test_consumer_subscribe(consumer, topic);
test_consumer_wait_assignment(consumer, rd_false);

/* Create generic consume queue */
rkq = rd_kafka_queue_get_consumer(consumer);

consumer_args.what = "CONSUMER";
consumer_args.rkq = rkq;
consumer_args.timeout_ms = timeout_ms;
consumer_args.consume_msg_cnt = consume_msg_cnt;
consumer_args.expected_msg_cnt =
produce_msg_cnt / no_of_consume;
consumer_args.rk = consumer;
consumer_args.testid = testid;
consumer_args.mv = &mv;
consumer_args.test = test_curr;

consumer_batch_queue(&consumer_args);
rd_kafka_commit(consumer, NULL, rd_false);

rd_kafka_queue_destroy(rkq);
test_consumer_close(consumer);
rd_kafka_destroy(consumer);
}

test_msgver_verify("CONSUME", &mv,
TEST_MSGVER_ORDER | TEST_MSGVER_DUP |
TEST_MSGVER_BY_OFFSET,
0, expected_msg_cnt);

SUB_TEST_PASS();
}


int main_0137_barrier_batch_consume(int argc, char **argv) {
do_test_consume_batch_with_seek();
do_test_consume_batch_store_offset();
// FIXME: Run this test once consume batch is fully fixed.
// do_test_consume_batch_with_pause_and_resume();
return 0;
Expand Down

0 comments on commit fa0a24e

Please sign in to comment.