diff --git a/src/rdkafka_queue.c b/src/rdkafka_queue.c index 7feae144f9..cc46ab4b1d 100644 --- a/src/rdkafka_queue.c +++ b/src/rdkafka_queue.c @@ -544,7 +544,7 @@ rd_kafka_purge_outdated_messages(rd_kafka_toppar_t *rktp, int32_t version, rd_kafka_message_t **rkmessages, size_t cnt, - struct rd_kafka_op_tailq ctrl_msg_q) { + struct rd_kafka_op_tailq *ctrl_msg_q) { size_t valid_count = 0; size_t i; rd_kafka_op_t *rko, *next; @@ -563,13 +563,15 @@ rd_kafka_purge_outdated_messages(rd_kafka_toppar_t *rktp, } /* Discard outdated control msgs ops */ - next = TAILQ_FIRST(&ctrl_msg_q); - while (next && next->rko_rktp == rktp && - rd_kafka_op_version_outdated(next, version)) { + next = TAILQ_FIRST(ctrl_msg_q); + while (next) { rko = next; next = TAILQ_NEXT(rko, rko_link); - TAILQ_REMOVE(&ctrl_msg_q, rko, rko_link); - rd_kafka_op_destroy(rko); + if (rko->rko_rktp == rktp && + rd_kafka_op_version_outdated(rko, version)) { + TAILQ_REMOVE(ctrl_msg_q, rko, rko_link); + rd_kafka_op_destroy(rko); + } } return valid_count; @@ -641,7 +643,7 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, if (unlikely(rko->rko_type == RD_KAFKA_OP_BARRIER)) { cnt = (unsigned int)rd_kafka_purge_outdated_messages( rko->rko_rktp, rko->rko_version, rkmessages, cnt, - ctrl_msg_q); + &ctrl_msg_q); rd_kafka_op_destroy(rko); continue; } @@ -670,9 +672,6 @@ int rd_kafka_q_serve_rkmessages(rd_kafka_q_t *rkq, * application. Add it to a tmp queue from where we can store * the offset and destroy the op */ if (unlikely(rd_kafka_op_is_ctrl_msg(rko))) { - /* FIXME: Fix TAILQ_REMOVE call to handle this case */ - if (TAILQ_FIRST(&ctrl_msg_q) == NULL) - TAILQ_INIT(&ctrl_msg_q); TAILQ_INSERT_TAIL(&ctrl_msg_q, rko, rko_link); continue; } diff --git a/tests/0137-barrier_batch_consume.c b/tests/0137-barrier_batch_consume.c index fe9652ea98..c4fd8f0a4d 100644 --- a/tests/0137-barrier_batch_consume.c +++ b/tests/0137-barrier_batch_consume.c @@ -87,6 +87,8 @@ static int consumer_batch_queue(void *arg) { rd_kafka_message_destroy(rkmessage[i]); } + rd_free(rkmessage); + return 0; } @@ -130,8 +132,7 @@ static void do_test_consume_batch_with_seek(void) { produce_msg_cnt / partition_cnt); /* Create consumers */ - consumer = - test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL); + consumer = test_create_consumer(topic, NULL, conf, NULL); test_consumer_subscribe(consumer, topic); test_consumer_wait_assignment(consumer, rd_false); @@ -159,7 +160,7 @@ static void do_test_consume_batch_with_seek(void) { err = rd_kafka_seek_partitions(consumer, seek_toppars, 2000); TEST_ASSERT(!err, - "Failed to seek partition %d for topic %s to offset %lld", + "Failed to seek partition %d for topic %s to offset %ld", seek_partition, topic, seek_offset); thrd_join(thread_id, NULL); @@ -221,8 +222,7 @@ static void do_test_consume_batch_with_pause_and_resume_different_batch(void) { produce_msg_cnt / partition_cnt); /* Create consumers */ - consumer = - test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL); + consumer = test_create_consumer(topic, NULL, conf, NULL); test_consumer_subscribe(consumer, topic); test_consumer_wait_assignment(consumer, rd_false); @@ -327,8 +327,7 @@ static void do_test_consume_batch_with_pause_and_resume_same_batch(void) { produce_msg_cnt / partition_cnt); /* Create consumers */ - consumer = - test_create_consumer(topic, NULL, rd_kafka_conf_dup(conf), NULL); + consumer = test_create_consumer(topic, NULL, conf, NULL); test_consumer_subscribe(consumer, topic); test_consumer_wait_assignment(consumer, rd_false); @@ -458,6 +457,10 @@ static void do_test_consume_batch_store_offset(void) { TEST_MSGVER_BY_OFFSET, 0, expected_msg_cnt); + test_msgver_clear(&mv); + + rd_kafka_conf_destroy(conf); + SUB_TEST_PASS(); } @@ -577,7 +580,7 @@ static void do_test_consume_batch_control_msgs(void) { rd_kafka_committed(consumer, pause_partition_list, timeout_ms); TEST_ASSERT(pause_partition_list->elems[0].offset == expected_offset, - "Expected offset should be %lld, but it is %lld", + "Expected offset should be %ld, but it is %ld", expected_offset, pause_partition_list->elems[0].offset); rd_kafka_topic_partition_list_destroy(pause_partition_list);