Skip to content

Commit

Permalink
[fix](routine-load) fix routine load pause when Kafka data deleted af…
Browse files Browse the repository at this point in the history
…ter TTL(apache#37288) (apache#39183)

pick (apache#37288)

When using routine load, After the data load is completed, the lag is
still a positive number:
```
  Lag: {"0":16,"1":15,"2":16,"3":16,"4":16,"5":16,"6":15,"7":16,"8":16,"9":16,"10":15,"11":16,"12":15,"13":15,"14":16,"15":16,"16":17,"17":15,"18":16,"19":15,"20":16,"21":16,"22":16,"23":16,"24":15,"25":17,"26":17,"27":16,"28":16,"29":16,"30":16,"31":17,"32":14,"33":16,"34":17,"35":16,"36":15,"37":15,"38":15,"39":16,"40":16,"41":16,"42":15,"43":15,"44":17,"45":16,"46":15,"47":15,"48":16,"49":17,"50":16,"51":15,"52":16,"53":15,"54":15,"55":17,"56":16,"57":17,"58":16,"59":16,"60":15,"61":15,"62":16,"63":16,"64":17,"65":16,"66":15,"67":16,"68":17,"69":16,"70":15,"71":17}
```
and the routing load is paused when the Kafka data reaches TTL and is
deleted, the error is `out of range`.

The reason why this happened is EOF has it offset which needed
statistics.

**note(important):**
After the bug is fixed, if you set
```
"property.enable.partition.eof" = "false"
```
in your routine load job, it will meet the problem. For EOF has offset,
and the config is true in Doris default.
  • Loading branch information
sollhui authored Aug 11, 2024
1 parent f2a82f6 commit e19f603
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 24 deletions.
23 changes: 14 additions & 9 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,20 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
break;
}
[[fallthrough]];
case RdKafka::ERR__PARTITION_EOF: {
LOG(INFO) << "consumer meet partition eof: " << _id
<< " partition offset: " << msg->offset();
_consuming_partition_ids.erase(msg->partition());
if (!queue->blocking_put(msg.get())) {
done = true;
} else if (_consuming_partition_ids.size() <= 0) {
msg.release();
done = true;
} else {
msg.release();
}
break;
}
case RdKafka::ERR_OFFSET_OUT_OF_RANGE: {
done = true;
std::stringstream ss;
Expand All @@ -269,15 +283,6 @@ Status KafkaDataConsumer::group_consume(BlockingQueue<RdKafka::Message*>* queue,
st = Status::InternalError<false>(ss.str());
break;
}
case RdKafka::ERR__PARTITION_EOF: {
LOG(INFO) << "consumer meet partition eof: " << _id
<< " partition offset: " << msg->offset();
_consuming_partition_ids.erase(msg->partition());
if (_consuming_partition_ids.size() <= 0) {
done = true;
}
break;
}
default:
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
Expand Down
36 changes: 21 additions & 15 deletions be/src/runtime/routine_load/data_consumer_group.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,22 +162,28 @@ Status KafkaDataConsumerGroup::start_all(std::shared_ptr<StreamLoadContext> ctx,
<< ", partition: " << msg->partition() << ", offset: " << msg->offset()
<< ", len: " << msg->len();

Status st = (kafka_pipe.get()->*append_data)(static_cast<const char*>(msg->payload()),
static_cast<size_t>(msg->len()));
if (st.ok()) {
left_rows--;
left_bytes -= msg->len();
cmt_offset[msg->partition()] = msg->offset();
VLOG_NOTICE << "consume partition[" << msg->partition() << " - " << msg->offset()
<< "]";
if (msg->err() == RdKafka::ERR__PARTITION_EOF) {
if (msg->offset() > 0) {
cmt_offset[msg->partition()] = msg->offset() - 1;
}
} else {
// failed to append this msg, we must stop
LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id;
eos = true;
{
std::unique_lock<std::mutex> lock(_mutex);
if (result_st.ok()) {
result_st = st;
Status st = (kafka_pipe.get()->*append_data)(
static_cast<const char*>(msg->payload()), static_cast<size_t>(msg->len()));
if (st.ok()) {
left_rows--;
left_bytes -= msg->len();
cmt_offset[msg->partition()] = msg->offset();
VLOG_NOTICE << "consume partition[" << msg->partition() << " - "
<< msg->offset() << "]";
} else {
// failed to append this msg, we must stop
LOG(WARNING) << "failed to append msg to pipe. grp: " << _grp_id;
eos = true;
{
std::unique_lock<std::mutex> lock(_mutex);
if (result_st.ok()) {
result_st = st;
}
}
}
}
Expand Down

0 comments on commit e19f603

Please sign in to comment.