diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index e61e1fdc2e07a7..92840721581671 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -264,8 +264,13 @@ Status KafkaDataConsumer::group_consume(BlockingQueue* queue, LOG(INFO) << "consumer meet partition eof: " << _id << " partition offset: " << msg->offset(); _consuming_partition_ids.erase(msg->partition()); - if (_consuming_partition_ids.size() <= 0) { + if (!queue->blocking_put(msg.get())) { done = true; + } else if (_consuming_partition_ids.size() <= 0) { + msg.release(); + done = true; + } else { + msg.release(); } break; } diff --git a/be/src/runtime/routine_load/data_consumer_group.cpp b/be/src/runtime/routine_load/data_consumer_group.cpp index 8d07b0ec81afe9..fc714fab6e0782 100644 --- a/be/src/runtime/routine_load/data_consumer_group.cpp +++ b/be/src/runtime/routine_load/data_consumer_group.cpp @@ -164,22 +164,28 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr ctx, << ", partition: " << msg->partition() << ", offset: " << msg->offset() << ", len: " << msg->len(); - Status st = (kafka_pipe.get()->*append_data)(static_cast(msg->payload()), - static_cast(msg->len())); - if (st.ok()) { - left_rows--; - left_bytes -= msg->len(); - cmt_offset[msg->partition()] = msg->offset(); - VLOG_NOTICE << "consume partition[" << msg->partition() << " - " << msg->offset() - << "]"; + if (msg->err() == RdKafka::ERR__PARTITION_EOF) { + if (msg->offset() > 0) { + cmt_offset[msg->partition()] = msg->offset() - 1; + } } else { - // failed to append this msg, we must stop - LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id; - eos = true; - { - std::unique_lock lock(_mutex); - if (result_st.ok()) { - result_st = st; + Status st = (kafka_pipe.get()->*append_data)( + static_cast(msg->payload()), static_cast(msg->len())); + if (st.ok()) { + left_rows--; + left_bytes -= msg->len(); + cmt_offset[msg->partition()] = msg->offset(); + VLOG_NOTICE << "consume partition[" << msg->partition() << " - " + << msg->offset() << "]"; + } else { + // failed to append this msg, we must stop + LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id; + eos = true; + { + std::unique_lock lock(_mutex); + if (result_st.ok()) { + result_st = st; + } } } }