Skip to content

Commit

Permalink
fix publish failed return ok
Browse files Browse the repository at this point in the history
  • Loading branch information
yujun777 committed Dec 14, 2023
1 parent 3688312 commit 32d8d10
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 12 deletions.
9 changes: 7 additions & 2 deletions be/src/http/action/report_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
#include "http/action/report_action.h"

#include "common/status.h"
#include "http/http_channel.h"
#include "http/http_status.h"
#include "olap/storage_engine.h"

namespace doris {

Expand All @@ -27,9 +30,11 @@ ReportAction::ReportAction(ExecEnv* exec_env, TPrivilegeHier::type hier, TPrivil

void ReportAction::handle(HttpRequest* req) {
if (StorageEngine::instance()->notify_listener(_report_name)) {
HttpChannel::send_reply(req, HttpStatus::OK, Status::OK().to_string());
HttpChannel::send_reply(req, HttpStatus::OK, Status::OK().to_json());
} else {
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, Status::InternalError("unknown reporter with name: " + _report_name);
HttpChannel::send_reply(
req, HttpStatus::INTERNAL_SERVER_ERROR,
Status::InternalError("unknown reporter with name: " + _report_name).to_json());
}
}

Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1228,14 +1228,15 @@ void StorageEngine::notify_listeners() {
}

bool StorageEngine::notify_listener(std::string_view name) {
bool found = false;
std::lock_guard<std::mutex> l(_report_mtx);
for (auto& listener : _report_listeners) {
if (listener->name() == name) {
listener->notify();
return true;
found = true;
}
}
return false;
return found;
}

// check whether any unused rowsets's id equal to rowset_id
Expand Down
27 changes: 20 additions & 7 deletions be/src/olap/task/engine_publish_version_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ Status EnginePublishVersionTask::execute() {
}
#endif

std::vector<std::shared_ptr<TabletPublishTxnTask>> tablet_tasks;
// each partition
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
int64_t partition_id = par_ver_info.partition_id;
Expand Down Expand Up @@ -242,6 +243,7 @@ Status EnginePublishVersionTask::execute() {

auto tablet_publish_txn_ptr = std::make_shared<TabletPublishTxnTask>(
this, tablet, rowset, partition_id, transaction_id, version, tablet_info);
tablet_tasks.push_back(tablet_publish_txn_ptr);
auto submit_st = token->submit_func([=]() { tablet_publish_txn_ptr->handle(); });
#ifndef NDEBUG
LOG(INFO) << "transaction_id: " << transaction_id << ", partition id: " << partition_id
Expand All @@ -254,6 +256,15 @@ Status EnginePublishVersionTask::execute() {
}
token->wait();

if (res.ok()) {
for (const auto& tablet_task : tablet_tasks) {
res = tablet_task->result();
if (!res.ok()) {
break;
}
}
}

_succ_tablets->clear();
// check if the related tablet remained all have the version
for (auto& par_ver_info : _publish_version_req.partition_version_infos) {
Expand Down Expand Up @@ -343,24 +354,24 @@ void TabletPublishTxnTask::handle() {
rowset_update_lock.lock();
}
_stats.schedule_time_us = MonotonicMicros() - _stats.submit_time_us;
auto publish_status = StorageEngine::instance()->txn_manager()->publish_txn(
_result = StorageEngine::instance()->txn_manager()->publish_txn(
_partition_id, _tablet, _transaction_id, _version, &_stats);
if (!publish_status.ok()) {
if (!_result.ok()) {
LOG(WARNING) << "failed to publish version. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
<< ", res=" << publish_status;
<< ", res=" << _result;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}

// add visible rowset to tablet
int64_t t1 = MonotonicMicros();
publish_status = _tablet->add_inc_rowset(_rowset);
_result = _tablet->add_inc_rowset(_rowset);
_stats.add_inc_rowset_us = MonotonicMicros() - t1;
if (!publish_status.ok() && !publish_status.is<PUSH_VERSION_ALREADY_EXIST>()) {
if (!_result.ok() && !_result.is<PUSH_VERSION_ALREADY_EXIST>()) {
LOG(WARNING) << "fail to add visible rowset to tablet. rowset_id=" << _rowset->rowset_id()
<< ", tablet_id=" << _tablet_info.tablet_id << ", txn_id=" << _transaction_id
<< ", res=" << publish_status;
<< ", res=" << _result;
_engine_publish_version_task->add_error_tablet_id(_tablet_info.tablet_id);
return;
}
Expand All @@ -370,9 +381,11 @@ void TabletPublishTxnTask::handle() {
LOG(INFO) << "publish version successfully on tablet"
<< ", table_id=" << _tablet->table_id() << ", tablet=" << _tablet->tablet_id()
<< ", transaction_id=" << _transaction_id << ", version=" << _version.first
<< ", num_rows=" << _rowset->num_rows() << ", res=" << publish_status
<< ", num_rows=" << _rowset->num_rows() << ", res=" << _result
<< ", cost: " << cost_us << "(us) "
<< (cost_us > 500 * 1000 ? _stats.to_string() : "");

_result = Status::OK();
}

void AsyncTabletPublishTask::handle() {
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/task/engine_publish_version_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class TabletPublishTxnTask {
~TabletPublishTxnTask() = default;

void handle();
Status result() { return _result; }

private:
EnginePublishVersionTask* _engine_publish_version_task = nullptr;
Expand All @@ -80,6 +81,7 @@ class TabletPublishTxnTask {
Version _version;
TabletInfo _tablet_info;
TabletPublishStatistics _stats;
Status _result;
};

class EnginePublishVersionTask : public EngineTask {
Expand Down
1 change: 1 addition & 0 deletions be/src/service/http_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include "http/action/pipeline_task_action.h"
#include "http/action/pprof_actions.h"
#include "http/action/reload_tablet_action.h"
#include "http/action/report_action.h"
#include "http/action/reset_rpc_channel_action.h"
#include "http/action/restore_tablet_action.h"
#include "http/action/snapshot_action.h"
Expand Down
13 changes: 13 additions & 0 deletions regression-test/data/insert_p0/test_be_inject_publish_txn_fail.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_1 --

-- !select_2 --
100

-- !select_1 --
100

-- !select_2 --
100
200

2 changes: 1 addition & 1 deletion regression-test/plugins/plugin_curl_requester.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ logger.info("Added 'update_all_be_config' function to Suite")


Suite.metaClass._be_report = { String ip, int port, String reportName ->
def url = "http://${beIp}:${port}/api/report/${reportName}"
def url = "http://${ip}:${port}/api/report/${reportName}"
def result = Http.GET(url, true)
Http.checkHttpResult(result, NodeType.BE)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite('test_be_inject_publish_txn_fail', 'nonConcurrent') {
def tbl = 'test_be_inject_publish_txn_fail_tbl'
def dbug1 = 'TxnManager.publish_txn.random_failed_before_save_rs_meta'
def dbug2 = 'TxnManager.publish_txn.random_failed_after_save_rs_meta'

def allBeReportTask = { ->
def backendId_to_backendIP = [:]
def backendId_to_backendHttpPort = [:]
getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort)
backendId_to_backendIP.each { beId, beIp ->
def port = backendId_to_backendHttpPort.get(beId) as int
be_report_task(beIp, port)
}
}

def testInsertValue = { dbug, value ->
// insert succ but not visible
GetDebugPoint().enableDebugPointForAllBEs(dbug, [percent : 1.0])
sql "INSERT INTO ${tbl} VALUES (${value})"
sleep(6000)
order_qt_select_1 "SELECT * FROM ${tbl}"

GetDebugPoint().disableDebugPointForAllBEs(dbug)

// be report publish fail to fe, then fe will not remove its task.
// after be report its tasks, fe will resend publish version task to be.
// the txn will visible
allBeReportTask()
sleep(8000)
order_qt_select_2 "SELECT * FROM ${tbl}"
}

try {
sql "DROP TABLE IF EXISTS ${tbl} FORCE"
sql "CREATE TABLE ${tbl} (k INT) DISTRIBUTED BY HASH(k) BUCKETS 5 PROPERTIES ('replication_num' = '1')"

sql "ADMIN SET FRONTEND CONFIG ('agent_task_resend_wait_time_ms' = '1000')"
sql "SET insert_visible_timeout_ms = 2000"

testInsertValue dbug1, 100
testInsertValue dbug2, 200
} catch (Throwable e) {
}

try {
sql "ADMIN SET FRONTEND CONFIG ('agent_task_resend_wait_time_ms' = '5000')"
} catch (Throwable e) {
}

try {
GetDebugPoint().disableDebugPointForAllBEs(dbug1)
} catch (Throwable e) {
}

try {
GetDebugPoint().disableDebugPointForAllBEs(dbug2)
} catch (Throwable e) {
}

sql "DROP TABLE IF EXISTS ${tbl} FORCE"
}

0 comments on commit 32d8d10

Please sign in to comment.