diff --git a/src/rdkafka.c b/src/rdkafka.c index 8eedd9f94b..4cde8990d2 100644 --- a/src/rdkafka.c +++ b/src/rdkafka.c @@ -3003,7 +3003,7 @@ static rd_kafka_op_res_t rd_kafka_consume_cb(rd_kafka_t *rk, rkmessage = rd_kafka_message_get(rko); - rd_kafka_fetch_op_app_prepare(rk, rko); + rd_kafka_fetch_op_app_prepare(rk, rko, rd_true /*defautl*/); ctx->consume_cb(rkmessage, ctx->opaque); @@ -3136,7 +3136,7 @@ rd_kafka_consume0(rd_kafka_t *rk, rd_kafka_q_t *rkq, int timeout_ms) { rkmessage = rd_kafka_message_get(rko); /* Store offset, etc */ - rd_kafka_fetch_op_app_prepare(rk, rko); + rd_kafka_fetch_op_app_prepare(rk, rko, rd_true /*default*/); rd_kafka_set_last_error(0, 0); diff --git a/src/rdkafka_event.c b/src/rdkafka_event.c index ffd1a17805..f713cb4213 100644 --- a/src/rdkafka_event.c +++ b/src/rdkafka_event.c @@ -125,7 +125,7 @@ const rd_kafka_message_t *rd_kafka_event_message_next(rd_kafka_event_t *rkev) { return NULL; /* Store offset, etc. */ - rd_kafka_fetch_op_app_prepare(NULL, rko); + rd_kafka_fetch_op_app_prepare(NULL, rko, rd_true /* default */); return rkmessage; diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index e1324c513f..cc82bd557e 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -834,7 +834,7 @@ rd_kafka_op_res_t rd_kafka_op_handle_std(rd_kafka_t *rk, else if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) { /* Control messages must not be exposed to the application * but we need to store their offsets. */ - rd_kafka_fetch_op_app_prepare(rk, rko); + rd_kafka_fetch_op_app_prepare(rk, rko, rd_true /* default */); return RD_KAFKA_OP_RES_HANDLED; } else if (cb_type != RD_KAFKA_Q_CB_EVENT && rko->rko_type & RD_KAFKA_OP_CB) @@ -903,13 +903,18 @@ rd_kafka_op_res_t rd_kafka_op_handle(rd_kafka_t *rk, * This must be called just prior to passing/returning a consumed * message to the application. * + * @param update_store_offsets Added control for the caller to specify + * whether to store offset or not. + * * Performs: * - Store offset for fetched message + 1. * - Updates the application offset (rktp_app_offset). * * @locks rktp_lock and rk_lock MUST NOT be held */ -void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, rd_kafka_op_t *rko) { +void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, + rd_kafka_op_t *rko, + rd_bool_t update_store_offsets) { rd_kafka_toppar_t *rktp; int64_t offset; @@ -925,7 +930,7 @@ void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, rd_kafka_op_t *rko) { rd_kafka_toppar_lock(rktp); rktp->rktp_app_offset = offset; - if (rk->rk_conf.enable_auto_offset_store) + if (rk->rk_conf.enable_auto_offset_store && update_store_offsets) rd_kafka_offset_store0(rktp, offset, /* force: ignore assignment state */ rd_true, RD_DONT_LOCK); diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index 05b967100a..4911858836 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -741,7 +741,9 @@ extern rd_atomic32_t rd_kafka_op_cnt; void rd_kafka_op_print(FILE *fp, const char *prefix, rd_kafka_op_t *rko); -void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, rd_kafka_op_t *rko); +void rd_kafka_fetch_op_app_prepare(rd_kafka_t *rk, + rd_kafka_op_t *rko, + rd_bool_t update_store_offsets); #define rd_kafka_op_is_ctrl_msg(rko) \ diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index 6a829c4515..0168b758d4 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -581,6 +581,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, rd_kafka_t *rk = rkq->rkq_rk; rd_kafka_q_t *fwdq; struct timespec timeout_tspec; + int i; mtx_lock(&rkq->rkq_lock); if ((fwdq = rd_kafka_q_fwd_get(rkq, 0))) { @@ -651,7 +652,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, if (!rko->rko_err && rko->rko_type == RD_KAFKA_OP_FETCH) { /* Store offset, etc. */ - rd_kafka_fetch_op_app_prepare(rk, rko); + rd_kafka_fetch_op_app_prepare(rk, rko, rd_false); /* If this is a control messages, don't return * message to application, only store the offset */ @@ -665,6 +666,20 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, rkmessages[cnt++] = rd_kafka_message_get(rko); } + if (rk->rk_conf.enable_auto_offset_store) { + for (i = cnt - 1; i >= 0; i--) { + rko = (rd_kafka_op_t *)rkmessages[i]->_private; + rd_kafka_toppar_t *rktp = rko->rko_rktp; + int64_t offset = rkmessages[i]->offset; + if (unlikely(rktp->rktp_stored_offset <= offset)) { + rd_kafka_offset_store0( + rko->rko_rktp, offset + 1, + /* force: ignore assignment state */ + rd_true, RD_DO_LOCK); + } + } + } + /* Discard non-desired and already handled ops */ next = TAILQ_FIRST(&tmpq); while (next) { diff --git a/tests/0137-barrier_batch_consume.c b/tests/0137-barrier_batch_consume.c index b8b4199b8e..86b90ef9ea 100644 --- a/tests/0137-barrier_batch_consume.c +++ b/tests/0137-barrier_batch_consume.c @@ -282,8 +282,84 @@ static void do_test_consume_batch_with_seek(void) { // } +static void do_test_consume_batch_store_offset(void) { + rd_kafka_queue_t *rkq; + const char *topic; + rd_kafka_t *consumer; + int p; + int i; + uint64_t testid; + rd_kafka_conf_t *conf; + consumer_t consumer_args = RD_ZERO_INIT; + test_msgver_t mv; + thrd_t thread_id; + rd_kafka_error_t *err; + const int produce_partition_cnt = 1; + const int timeout_ms = 10000; + const int consume_msg_cnt = 4; + const int no_of_consume = 2; + const int produce_msg_cnt = 8; + const int expected_msg_cnt = produce_msg_cnt; + + SUB_TEST(); + + test_conf_init(&conf, NULL, 60); + test_conf_set(conf, "enable.auto.commit", "false"); + test_conf_set(conf, "enable.auto.offset.store", "true"); + test_conf_set(conf, "auto.offset.reset", "earliest"); + + testid = test_id_generate(); + test_msgver_init(&mv, testid); + + /* Produce messages */ + topic = test_mk_topic_name("0137-barrier_batch_consume", 1); + + for (p = 0; p < produce_partition_cnt; p++) + test_produce_msgs_easy(topic, testid, p, + produce_msg_cnt / produce_partition_cnt); + + for (i = 0; i < no_of_consume; i++) { + + /* Create consumers */ + consumer = test_create_consumer(topic, NULL, + rd_kafka_conf_dup(conf), NULL); + test_consumer_subscribe(consumer, topic); + test_consumer_wait_assignment(consumer, rd_false); + + /* Create generic consume queue */ + rkq = rd_kafka_queue_get_consumer(consumer); + + consumer_args.what = "CONSUMER"; + consumer_args.rkq = rkq; + consumer_args.timeout_ms = timeout_ms; + consumer_args.consume_msg_cnt = consume_msg_cnt; + consumer_args.expected_msg_cnt = + produce_msg_cnt / no_of_consume; + consumer_args.rk = consumer; + consumer_args.testid = testid; + consumer_args.mv = &mv; + consumer_args.test = test_curr; + + consumer_batch_queue(&consumer_args); + rd_kafka_commit(consumer, NULL, rd_false); + + rd_kafka_queue_destroy(rkq); + test_consumer_close(consumer); + rd_kafka_destroy(consumer); + } + + test_msgver_verify("CONSUME", &mv, + TEST_MSGVER_ORDER | TEST_MSGVER_DUP | + TEST_MSGVER_BY_OFFSET, + 0, expected_msg_cnt); + + SUB_TEST_PASS(); +} + + int main_0137_barrier_batch_consume(int argc, char **argv) { do_test_consume_batch_with_seek(); + do_test_consume_batch_store_offset(); // FIXME: Run this test once consume batch is fully fixed. // do_test_consume_batch_with_pause_and_resume(); return 0;