Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more logs and metrics to trace the broker load process #1530

Merged
merged 5 commits into from
Jul 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions be/src/exec/tablet_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,10 @@ Status NodeChannel::_wait_in_flight_packet() {
<< ", error_text=" << _add_batch_closure->cntl.ErrorText();
return Status::InternalError("failed to send batch");
}

if (_add_batch_closure->result.has_execution_time_us()) {
_parent->increase_node_add_batch_time_us(_node_id, _add_batch_closure->result.execution_time_us());
}
return {_add_batch_closure->result.status()};
}

Expand Down Expand Up @@ -599,7 +603,7 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
for (auto channel : _channels) {
status = channel->close(state);
if (!status.ok()) {
LOG(WARNING) << "close channel failed, load_id=" << _load_id
LOG(WARNING) << "close channel failed, load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id;
}
}
Expand All @@ -612,8 +616,17 @@ Status OlapTableSink::close(RuntimeState* state, Status close_status) {
COUNTER_SET(_validate_data_timer, _validate_data_ns);
COUNTER_SET(_wait_in_flight_packet_timer, _wait_in_flight_packet_ns);
COUNTER_SET(_serialize_batch_timer, _serialize_batch_ns);

state->update_num_rows_load_filtered(_number_filtered_rows);

// print log of add batch time of all node, for tracing load performance easily
std::stringstream ss;
ss << "finished to close olap table sink. load_id=" << print_id(_load_id)
<< ", txn_id=" << _txn_id << ", node add batch time(ns): ";
for (auto const& pair: _node_add_batch_time_map) {
ss << "{" << pair.first << "=" << pair.second << "}";
}
LOG(INFO) << ss.str();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should VLOG(), and use be.conf to switch it to open.

and you should use if (LOG.level) to enclose string constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

VLOG can not be used online..
I think add one log for one load job is OK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use pattern to make only this module

like http://rpg.ifi.uzh.ch/docs/glog.html#verbose


} else {
for (auto channel : _channels) {
channel->cancel();
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/tablet_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,9 @@ class OlapTableSink : public DataSink {
// at a time can modify them.
int64_t* mutable_wait_in_flight_packet_ns() { return &_wait_in_flight_packet_ns; }
int64_t* mutable_serialize_batch_ns() { return &_serialize_batch_ns; }
void increase_node_add_batch_time_us(int64_t be_id, int64_t time_ns) {
_node_add_batch_time_map[be_id] += time_ns;
}

private:
// convert input batch to output batch which will be loaded into OLAP table.
Expand Down Expand Up @@ -257,6 +260,9 @@ class OlapTableSink : public DataSink {
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _wait_in_flight_packet_timer = nullptr;
RuntimeProfile::Counter* _serialize_batch_timer = nullptr;

// BE id -> execution time of add batch in us
std::unordered_map<int64_t, int64_t> _node_add_batch_time_map;
};

}
Expand Down
19 changes: 12 additions & 7 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,19 @@ void MemTable::insert(Tuple* tuple) {
}

OLAPStatus MemTable::flush(RowsetWriterSharedPtr rowset_writer) {
Table::Iterator it(_skip_list);
for (it.SeekToFirst(); it.Valid(); it.Next()) {
const char* row = it.key();
_schema->finalize(row);
RETURN_NOT_OK(rowset_writer->add_row(row, _schema));
int64_t duration_ns = 0;
{
SCOPED_RAW_TIMER(&duration_ns);
Table::Iterator it(_skip_list);
for (it.SeekToFirst(); it.Valid(); it.Next()) {
const char* row = it.key();
_schema->finalize(row);
RETURN_NOT_OK(rowset_writer->add_row(row, _schema));
}
RETURN_NOT_OK(rowset_writer->flush());
}

RETURN_NOT_OK(rowset_writer->flush());
DorisMetrics::memtable_flush_total.increment(1);
DorisMetrics::memtable_flush_duration_us.increment(duration_ns / 1000);
return OLAP_SUCCESS;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ OLAPStatus TxnManager::commit_txn(
&& load_info.rowset != nullptr
&& load_info.rowset->rowset_id() != rowset_ptr->rowset_id()) {
// find a rowset with different rowset id, then it should not happen, just return errors
LOG(WARNING) << "find transaction exists when add to engine."
LOG(WARNING) << "find transaction exists when add to engine. but rowset ids are not same."
<< "partition_id: " << key.first
<< ", transaction_id: " << key.second
<< ", tablet: " << tablet_info.to_string()
Expand Down
9 changes: 6 additions & 3 deletions be/src/runtime/tablet_writer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "runtime/mem_tracker.h"
#include "runtime/row_batch.h"
#include "runtime/tuple_row.h"
#include "service/backend_options.h"
#include "util/bitmap.h"
#include "util/stopwatch.hpp"
#include "olap/delta_writer.h"
Expand Down Expand Up @@ -152,9 +153,11 @@ Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& params) {
}
auto st = it->second->write(row_batch.get_row(i)->get_tuple(0));
if (st != OLAP_SUCCESS) {
LOG(WARNING) << "tablet writer writer failed, tablet_id=" << it->first
<< ", transaction_id=" << _txn_id;
return Status::InternalError("tablet writer write failed");
std::stringstream ss;
ss << "tablet writer write failed, tablet_id=" << it->first
<< ", transaction_id=" << _txn_id << ", status=" << st;
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str() + ", be: " + BackendOptions::get_localhost());
}
}
_next_seqs[params.sender_id()]++;
Expand Down
19 changes: 12 additions & 7 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,19 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
_tablet_worker_pool.offer(
[request, response, done, this] () {
brpc::ClosureGuard closure_guard(done);
auto st = _exec_env->tablet_writer_mgr()->add_batch(*request, response->mutable_tablet_vec());
if (!st.ok()) {
LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg()
<< ", id=" << request->id()
<< ", index_id=" << request->index_id()
<< ", sender_id=" << request->sender_id();
int64_t execution_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
auto st = _exec_env->tablet_writer_mgr()->add_batch(*request, response->mutable_tablet_vec());
if (!st.ok()) {
LOG(WARNING) << "tablet writer add batch failed, message=" << st.get_error_msg()
<< ", id=" << request->id()
<< ", index_id=" << request->index_id()
<< ", sender_id=" << request->sender_id();
}
st.to_protobuf(response->mutable_status());
}
st.to_protobuf(response->mutable_status());
response->set_execution_time_us(execution_time_ns / 1000);
});
}

Expand Down
6 changes: 6 additions & 0 deletions be/src/util/doris_metrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ IntCounter DorisMetrics::txn_exec_plan_total;
IntCounter DorisMetrics::stream_receive_bytes_total;
IntCounter DorisMetrics::stream_load_rows_total;

IntCounter DorisMetrics::memtable_flush_total;
IntCounter DorisMetrics::memtable_flush_duration_us;

// gauges
IntGauge DorisMetrics::memory_pool_bytes_total;
IntGauge DorisMetrics::process_thread_num;
Expand Down Expand Up @@ -138,6 +141,9 @@ void DorisMetrics::initialize(
REGISTER_DORIS_METRIC(query_scan_rows);
REGISTER_DORIS_METRIC(ranges_processed_total);

REGISTER_DORIS_METRIC(memtable_flush_total);
REGISTER_DORIS_METRIC(memtable_flush_duration_us);

// push request
_metrics->register_metric(
"push_requests_total", MetricLabels().add("status", "SUCCESS"),
Expand Down
3 changes: 3 additions & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ class DorisMetrics {
static IntCounter stream_receive_bytes_total;
static IntCounter stream_load_rows_total;

static IntCounter memtable_flush_total;
static IntCounter memtable_flush_duration_us;

// Gauges
static IntGauge memory_pool_bytes_total;
static IntGauge process_thread_num;
Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/catalog/Catalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ private Catalog() {
this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat);
this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat);

this.loadTaskScheduler = new MasterTaskExecutor(10);
this.loadTaskScheduler = new MasterTaskExecutor(Config.async_load_task_pool_size);
this.loadJobScheduler = new LoadJobScheduler();
this.loadManager = new LoadManager(loadJobScheduler);
this.loadTimeoutChecker = new LoadTimeoutChecker(loadManager);
Expand Down
21 changes: 14 additions & 7 deletions fe/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,6 @@ public class Config extends ConfigBase {
* minimal intervals between two publish version action
*/
@ConfField public static int publish_version_interval_ms = 100;

/*
* maximun concurrent running txn num including prepare, commit txns under a single db
* txn manager will reject coming txns
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_running_txn_num_per_db = 100;

/*
* Maximal wait seconds for straggler node in load
Expand Down Expand Up @@ -393,6 +386,20 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, masterOnly = true)
public static int desired_max_waiting_jobs = 100;

/*
* maximun concurrent running txn num including prepare, commit txns under a single db
* txn manager will reject coming txns
*/
@ConfField(mutable = true, masterOnly = true)
public static int max_running_txn_num_per_db = 100;

/*
* The load task executor pool size. This pool size limits the max running load tasks.
* Currently, it only limits the load task of broker load, pending and loading phases.
* It should be less than 'max_running_txn_num_per_db'
*/
public static int async_load_task_pool_size = 10;

/*
* Same meaning as *tablet_create_timeout_second*, but used when delete a tablet.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,13 @@ public BrokerLoadPendingTask(BrokerLoadJob loadTaskCallback,

@Override
void executeTask() throws UserException {
LOG.info("begin to execute broker pending task. job: {}", callback.getCallbackId());
getAllFileStatus();
}

private void getAllFileStatus()
throws UserException {
long start = System.currentTimeMillis();
for (Map.Entry<Long, List<BrokerFileGroup>> entry : tableToBrokerFileList.entrySet()) {
long tableId = entry.getKey();

Expand All @@ -79,6 +81,8 @@ private void getAllFileStatus()
}

((BrokerPendingTaskAttachment) attachment).addFileStatus(tableId, fileStatusList);
LOG.info("get {} files to be loaded. cost: {} ms, job: {}",
fileStatusList.size(), (System.currentTimeMillis() - start), callback.getCallbackId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ public class LoadLoadingTask extends LoadTask {

private LoadingTaskPlanner planner;

private String errMsg;

public LoadLoadingTask(Database db, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> fileGroups,
long jobDeadlineMs, long execMemLimit, boolean strictMode,
Expand All @@ -75,12 +73,13 @@ public LoadLoadingTask(Database db, OlapTable table,
}

public void init(List<List<TBrokerFileStatus>> fileStatusList, int fileNum) throws UserException {
planner = new LoadingTaskPlanner(txnId, db.getId(), table, brokerDesc, fileGroups, strictMode);
planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, strictMode);
planner.plan(fileStatusList, fileNum);
}

@Override
protected void executeTask() throws Exception{
LOG.info("begin to execute loading task. job: {}. left retry: {}", callback.getCallbackId(), retryTime);
retryTime--;
executeOnce();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public abstract class LoadTask extends MasterTask {
protected FailMsg failMsg = new FailMsg();
protected int retryTime = 1;

public LoadTask(LoadTaskCallback callback){
public LoadTask(LoadTaskCallback callback) {
this.signature = Catalog.getCurrentCatalog().getNextId();
this.callback = callback;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class LoadingTaskPlanner {
private static final Logger LOG = LogManager.getLogger(LoadingTaskPlanner.class);

// Input params
private final long loadJobId;
private final long txnId;
private final long dbId;
private final OlapTable table;
Expand All @@ -77,9 +78,10 @@ public class LoadingTaskPlanner {

private int nextNodeId = 0;

public LoadingTaskPlanner(long txnId, long dbId, OlapTable table,
public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table,
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
boolean strictMode) {
this.loadJobId = loadJobId;
this.txnId = txnId;
this.dbId = dbId;
this.table = table;
Expand Down Expand Up @@ -108,7 +110,7 @@ public void plan(List<List<TBrokerFileStatus>> fileStatusesList, int filesAdded)
// 1. Broker scan node
BrokerScanNode scanNode = new BrokerScanNode(new PlanNodeId(nextNodeId++), tupleDesc, "BrokerScanNode",
fileStatusesList, filesAdded);
scanNode.setLoadInfo(table, brokerDesc, fileGroups, strictMode);
scanNode.setLoadInfo(loadJobId, txnId, table, brokerDesc, fileGroups, strictMode);
scanNode.init(analyzer);
scanNode.finalize(analyzer);
scanNodes.add(scanNode);
Expand Down
25 changes: 19 additions & 6 deletions fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@

package org.apache.doris.planner;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.ArithmeticExpr;
import org.apache.doris.analysis.BinaryPredicate;
Expand Down Expand Up @@ -66,6 +62,12 @@
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand Down Expand Up @@ -106,6 +108,8 @@ public int compare(TBrokerFileStatus o1, TBrokerFileStatus o2) {
private long bytesPerInstance;

// Parameters need to process
private long loadJobId = -1; // -1 means this scan node is not for a load job
private long txnId = -1;
private Table targetTable;
private BrokerDesc brokerDesc;
private List<BrokerFileGroup> fileGroups;
Expand Down Expand Up @@ -187,10 +191,14 @@ public void setLoadInfo(Table targetTable,
this.fileGroups = fileGroups;
}

public void setLoadInfo(Table targetTable,
public void setLoadInfo(long loadJobId,
long txnId,
Table targetTable,
BrokerDesc brokerDesc,
List<BrokerFileGroup> fileGroups,
boolean strictMode) {
this.loadJobId = loadJobId;
this.txnId = txnId;
this.targetTable = targetTable;
this.brokerDesc = brokerDesc;
this.fileGroups = fileGroups;
Expand Down Expand Up @@ -466,7 +474,6 @@ private TScanRangeLocations newLocations(TBrokerScanRangeParams params, String b
// Generate on broker scan range
TBrokerScanRange brokerScanRange = new TBrokerScanRange();
brokerScanRange.setParams(params);
// TODO(zc):
int numBroker = Math.min(3, numBe);
for (int i = 0; i < numBroker; ++i) {
FsBroker broker = null;
Expand Down Expand Up @@ -674,6 +681,12 @@ public void finalize(Analyzer analyzer) throws UserException {
LOG.debug("Scan range is {}", locations);
}
}

if (loadJobId != -1) {
LOG.info("broker load job {} with txn {} has {} scan range: {}",
loadJobId, txnId, locationsList.size(),
locationsList.stream().map(loc -> loc.locations.get(0).backend_id).toArray());
}
}

@Override
Expand Down
3 changes: 2 additions & 1 deletion fe/src/main/java/org/apache/doris/qe/QeProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.qe;

import org.apache.doris.common.UserException;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
import org.apache.doris.thrift.TUniqueId;
Expand All @@ -26,7 +27,7 @@

public interface QeProcessor {

TReportExecStatusResult reportExecStatus(TReportExecStatusParams params);
TReportExecStatusResult reportExecStatus(TReportExecStatusParams params, TNetworkAddress beAddr);

void registerQuery(TUniqueId queryId, Coordinator coord) throws UserException;

Expand Down
Loading