diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 2874a43ae0e4ad..11953772150856 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -260,6 +260,20 @@ Status KafkaDataConsumer::group_consume(BlockingQueue* queue, break; } [[fallthrough]]; + case RdKafka::ERR__PARTITION_EOF: { + LOG(INFO) << "consumer meet partition eof: " << _id + << " partition offset: " << msg->offset(); + _consuming_partition_ids.erase(msg->partition()); + if (!queue->blocking_put(msg.get())) { + done = true; + } else if (_consuming_partition_ids.size() <= 0) { + msg.release(); + done = true; + } else { + msg.release(); + } + break; + } case RdKafka::ERR_OFFSET_OUT_OF_RANGE: { done = true; std::stringstream ss; @@ -269,15 +283,6 @@ Status KafkaDataConsumer::group_consume(BlockingQueue* queue, st = Status::InternalError(ss.str()); break; } - case RdKafka::ERR__PARTITION_EOF: { - LOG(INFO) << "consumer meet partition eof: " << _id - << " partition offset: " << msg->offset(); - _consuming_partition_ids.erase(msg->partition()); - if (_consuming_partition_ids.size() <= 0) { - done = true; - } - break; - } default: LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr(); done = true; diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 27eea942664fe3..f429dac560caca 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -162,22 +162,28 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr ctx, << ", partition: " << msg->partition() << ", offset: " << msg->offset() << ", len: " << msg->len(); - Status st = (kafka_pipe.get()->*append_data)(static_cast(msg->payload()), - static_cast(msg->len())); - if (st.ok()) { - left_rows--; - left_bytes -= msg->len(); - cmt_offset[msg->partition()] = msg->offset(); - VLOG_NOTICE << "consume partition[" << msg->partition() << " - " << msg->offset() - << "]"; + if (msg->err() == RdKafka::ERR__PARTITION_EOF) { + if (msg->offset() > 0) { + cmt_offset[msg->partition()] = msg->offset() - 1; + } } else { - // failed to append this msg, we must stop - LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id; - eos = true; - { - std::unique_lock lock(_mutex); - if (result_st.ok()) { - result_st = st; + Status st = (kafka_pipe.get()->*append_data)( + static_cast(msg->payload()), static_cast(msg->len())); + if (st.ok()) { + left_rows--; + left_bytes -= msg->len(); + cmt_offset[msg->partition()] = msg->offset(); + VLOG_NOTICE << "consume partition[" << msg->partition() << " - " + << msg->offset() << "]"; + } else { + // failed to append this msg, we must stop + LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id; + eos = true; + { + std::unique_lock lock(_mutex); + if (result_st.ok()) { + result_st = st; + } } } }