Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timeout in stream load planner #1480

Merged
merged 4 commits into from
Jul 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/http/action/mini_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ Status MiniLoadAction::_begin_mini_load(StreamLoadContext* ctx) {
ctx->txn_id = res.txn_id;
// txn has been begun in fe
ctx->need_rollback = true;
LOG(INFO) << "load:" << ctx->label << " txn:" << res.txn_id << " has been begun in fe";
return Status::OK();
}

Expand Down Expand Up @@ -709,6 +710,9 @@ Status MiniLoadAction::_process_put(HttpRequest* req, StreamLoadContext* ctx) {
if (column_separator_it != params.end()) {
put_request.__set_columnSeparator(column_separator_it->second);
}
if (ctx->timeout_second != -1) {
put_request.__set_timeout(ctx->timeout_second);
}

// plan this load
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/stream_load/stream_load_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ class StreamLoadContext {
// optional
std::string sub_label;
double max_filter_ratio = 0.0;
int64_t timeout_second = -1;
int32_t timeout_second = -1;
AuthInfo auth;

// the following members control the max progress of a consuming
Expand Down
3 changes: 3 additions & 0 deletions be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
// submit this params
#ifndef BE_TEST
ctx->ref();
LOG(INFO) << "begin to execute job:" << ctx->label
<< " with txn id:" << ctx->txn_id
<< " with query id:" << print_id(ctx->put_result.params.params.query_id);
auto st = _exec_env->fragment_mgr()->exec_plan_fragment(
ctx->put_result.params,
[ctx] (PlanFragmentExecutor* executor) {
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/tablet_writer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ TabletWriterMgr::~TabletWriterMgr() {

Status TabletWriterMgr::open(const PTabletWriterOpenRequest& params) {
TabletsChannelKey key(params.id(), params.index_id());
LOG(INFO) << "open tablets writer channel: " << key;
std::shared_ptr<TabletsChannel> channel;
{
std::lock_guard<std::mutex> l(_lock);
Expand Down
7 changes: 4 additions & 3 deletions fe/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -349,8 +349,9 @@ public class Config extends ConfigBase {
public static int pull_load_task_default_timeout_second = 14400; // 4 hour

/*
* Default mini load timeout
* Default non-streaming mini load timeout
*/
@Deprecated
@ConfField(mutable = true, masterOnly = true)
public static int mini_load_default_timeout_second = 3600; // 1 hour

Expand All @@ -361,10 +362,10 @@ public class Config extends ConfigBase {
public static int insert_load_default_timeout_second = 3600; // 1 hour

/*
* Default stream load timeout
* Default stream load and streaming mini load timeout
*/
@ConfField(mutable = true, masterOnly = true)
public static int stream_load_default_timeout_second = 300; // 300s
public static int stream_load_default_timeout_second = 600; // 300s

/*
* Max stream load timeout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public MiniLoadJob(long dbId, TMiniLoadBeginRequest request) throws MetaNotFound
if (request.isSetTimeout_second()) {
this.timeoutSecond = request.getTimeout_second();
} else {
this.timeoutSecond = Config.mini_load_default_timeout_second;
this.timeoutSecond = Config.stream_load_default_timeout_second;
}
if (request.isSetMax_filter_ratio()) {
this.maxFilterRatio = request.getMax_filter_ratio();
Expand Down Expand Up @@ -90,7 +90,7 @@ public void beginTxn() throws LabelAlreadyUsedException, BeginTransactionExcepti
transactionId = Catalog.getCurrentGlobalTransactionMgr()
.beginTransaction(dbId, label, -1, "FE: " + FrontendOptions.getLocalHostAddress(),
TransactionState.LoadJobSourceType.BACKEND_STREAMING, id,
timeoutSecond - 1);
timeoutSecond);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,16 @@

package org.apache.doris.planner;

import org.apache.doris.catalog.AggregateType;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.AggregateType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.LoadErrorHub;
import org.apache.doris.task.StreamLoadTask;
Expand Down Expand Up @@ -141,7 +140,7 @@ public TExecPlanFragmentParams plan() throws UserException {
params.setParams(execParams);
TQueryOptions queryOptions = new TQueryOptions();
queryOptions.setQuery_type(TQueryType.LOAD);
queryOptions.setQuery_timeout(Config.stream_load_default_timeout_second);
queryOptions.setQuery_timeout(streamLoadTask.getTimeout());
params.setQuery_options(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
queryGlobals.setNow_string(DATE_FORMAT.format(new Date()));
Expand Down
9 changes: 9 additions & 0 deletions fe/src/main/java/org/apache/doris/task/StreamLoadTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.doris.analysis.SqlParser;
import org.apache.doris.analysis.SqlScanner;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.UserException;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.thrift.TFileFormatType;
Expand Down Expand Up @@ -56,6 +57,7 @@ public class StreamLoadTask {
private String partitions;
private String path;
private boolean negative;
private int timeout = Config.stream_load_default_timeout_second;

public StreamLoadTask(TUniqueId id, long txnId, TFileType fileType, TFileFormatType formatType) {
this.id = id;
Expand Down Expand Up @@ -104,6 +106,10 @@ public boolean getNegative() {
return negative;
}

public int getTimeout() {
return timeout;
}

public static StreamLoadTask fromTStreamLoadPutRequest(TStreamLoadPutRequest request) throws UserException {
StreamLoadTask streamLoadTask = new StreamLoadTask(request.getLoadId(), request.getTxnId(),
request.getFileType(), request.getFormatType());
Expand Down Expand Up @@ -134,6 +140,9 @@ private void setOptionalFromTSLPutRequest(TStreamLoadPutRequest request) throws
if (request.isSetNegative()) {
negative = request.isNegative();
}
if (request.isSetTimeout()) {
timeout = request.getTimeout();
}
}

public static StreamLoadTask fromRoutineLoadJob(RoutineLoadJob routineLoadJob) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,9 @@ public void commitTransaction(long dbId, long transactionId, List<TabletCommitIn
Set<Long> tabletBackends = tablet.getBackendIds();
totalInvolvedBackends.addAll(tabletBackends);
Set<Long> commitBackends = tabletToBackends.get(tabletId);
// save the error replica ids for current tablet
// this param is used for log
Set<Long> errorBackendIdsForTablet = Sets.newHashSet();
for (long tabletBackend : tabletBackends) {
Replica replica = tabletInvertedIndex.getReplica(tabletId, tabletBackend);
if (replica == null) {
Expand Down Expand Up @@ -386,15 +389,18 @@ public void commitTransaction(long dbId, long transactionId, List<TabletCommitIn
}
}
} else {
errorBackendIdsForTablet.add(tabletBackend);
errorReplicaIds.add(replica.getId());
// not remove rollup task here, because the commit maybe failed
// remove rollup task when commit successfully
}
}
if (index.getState() != IndexState.ROLLUP && successReplicaNum < quorumReplicaNum) {
// not throw exception here, wait the upper application retry
LOG.info("Index [{}] success replica num is {} < quorum replica num {}",
index, successReplicaNum, quorumReplicaNum);
LOG.info("Tablet [{}] success replica num is {} < quorum replica num {} "
+ "while error backends {}",
tablet.getId(), successReplicaNum, quorumReplicaNum,
Joiner.on(",").join(errorBackendIdsForTablet));
return;
}
}
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ struct TLoadTxnBeginRequest {
7: required string label
8: optional i64 timestamp
9: optional i64 auth_code
// The real value of timeout should be i32. i64 ensures the compatibility of interface.
10: optional i64 timeout
}

Expand Down Expand Up @@ -521,6 +522,7 @@ struct TStreamLoadPutRequest {
15: optional string partitions
16: optional i64 auth_code
17: optional bool negative
18: optional i32 timeout
}

struct TStreamLoadPutResult {
Expand Down