Skip to content

Commit

Permalink
[Fix](group commit) Fix cloud group commit be select strategy (apache…
Browse files Browse the repository at this point in the history
…#39986)

In apache#35558, we optimized be select for group commit. However, we forgot
to apply this strategy to cloud. This PR applys it.

<!--Describe your changes.-->
  • Loading branch information
Yukang-Lian committed Sep 3, 2024
1 parent 0e9fa3d commit b963b09
Showing 1 changed file with 87 additions and 47 deletions.
134 changes: 87 additions & 47 deletions fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class LoadAction extends RestBaseController {

@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_load", method = RequestMethod.PUT)
public Object load(HttpServletRequest request, HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
Expand All @@ -87,20 +87,29 @@ public Object load(HttpServletRequest request, HttpServletResponse response,

@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load", method = RequestMethod.PUT)
public Object streamLoad(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db, @PathVariable(value = TABLE_KEY) String table) {
LOG.info("streamload action, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request));
boolean groupCommit = false;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
if (isGroupCommitBlock(db, table)) {
String msg = "insert table " + table + " is blocked on schema change";
return new RestBaseResult(msg);
if (groupCommitStr != null) {
if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode")
&& !groupCommitStr.equalsIgnoreCase("off_mode")) {
return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`.");
}
if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
groupCommit = true;
if (groupCommitStr.equalsIgnoreCase("async_mode")) {
try {
if (isGroupCommitBlock(db, table)) {
String msg = "insert table " + table + GroupCommitPlanner.SCHEMA_CHANGE;
return new RestBaseResult(msg);
}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
}
if (needRedirect(request.getScheme())) {
Expand Down Expand Up @@ -131,21 +140,32 @@ public Object streamLoadWithSql(HttpServletRequest request, HttpServletResponse
boolean groupCommit = false;
long tableId = -1;
String groupCommitStr = request.getHeader("group_commit");
if (groupCommitStr != null && groupCommitStr.equalsIgnoreCase("async_mode")) {
groupCommit = true;
try {
String[] pair = parseDbAndTb(sql);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
tableId = tbl.getId();
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + " is blocked on schema change";
return new RestBaseResult(msg);
if (groupCommitStr != null) {
if (!groupCommitStr.equalsIgnoreCase("async_mode") && !groupCommitStr.equalsIgnoreCase("sync_mode")
&& !groupCommitStr.equalsIgnoreCase("off_mode")) {
return new RestBaseResult("Header `group_commit` can only be `sync_mode`, `async_mode` or `off_mode`.");
}
if (!groupCommitStr.equalsIgnoreCase("off_mode")) {
try {
groupCommit = true;
String[] pair = parseDbAndTb(sql);
Database db = Env.getCurrentInternalCatalog()
.getDbOrException(pair[0], s -> new TException("database is invalid for dbName: " + s));
Table tbl = db.getTableOrException(pair[1], s -> new TException("table is invalid: " + s));
tableId = tbl.getId();

// async mode needs to write WAL, we need to block load during waiting WAL.
if (groupCommitStr.equalsIgnoreCase("async_mode")) {
if (isGroupCommitBlock(pair[0], pair[1])) {
String msg = "insert table " + pair[1] + GroupCommitPlanner.SCHEMA_CHANGE;
return new RestBaseResult(msg);
}

}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
} catch (Exception e) {
LOG.info("exception:" + e);
return new RestBaseResult(e.getMessage());
}
}
executeCheckPassword(request, response);
Expand Down Expand Up @@ -207,8 +227,9 @@ private String[] parseDbAndTb(String sql) throws Exception {

@RequestMapping(path = "/api/{" + DB_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
public Object streamLoad2PC(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db) {
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db) {
LOG.info("streamload action 2PC, db: {}, headers: {}", db, getAllHeaders(request));
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
Expand All @@ -219,9 +240,10 @@ public Object streamLoad2PC(HttpServletRequest request,

@RequestMapping(path = "/api/{" + DB_KEY + "}/{" + TABLE_KEY + "}/_stream_load_2pc", method = RequestMethod.PUT)
public Object streamLoad2PC_table(HttpServletRequest request,
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
HttpServletResponse response,
@PathVariable(value = DB_KEY) String db,
@PathVariable(value = TABLE_KEY) String table) {
LOG.info("streamload action 2PC, db: {}, tbl: {}, headers: {}", db, table, getAllHeaders(request));
if (needRedirect(request.getScheme())) {
return redirectToHttps(request);
}
Expand Down Expand Up @@ -361,21 +383,7 @@ private TNetworkAddress selectLocalRedirectBackend(boolean groupCommit, HttpServ
throw new LoadException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
}
if (groupCommit) {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setRemoteIP(request.getRemoteAddr());
// We set this variable to fulfill required field 'user' in
// TMasterOpRequest(FrontendService.thrift)
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setThreadLocalInfo();

try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(tableId, ctx, false);
} catch (DdlException e) {
throw new RuntimeException(e);
}
backend = selectBackendForGroupCommit("", request, tableId);
} else {
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
}
Expand All @@ -402,7 +410,7 @@ private boolean checkClusterToken(String token) {
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
private Object executeWithClusterToken(HttpServletRequest request, String db,
String table, boolean isStreamLoad) {
String table, boolean isStreamLoad) {
try {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
Expand Down Expand Up @@ -473,4 +481,36 @@ private Object executeWithClusterToken(HttpServletRequest request, String db,
ConnectContext.remove();
}
}

private String getAllHeaders(HttpServletRequest request) {
StringBuilder headers = new StringBuilder();
Enumeration<String> headerNames = request.getHeaderNames();
while (headerNames.hasMoreElements()) {
String headerName = headerNames.nextElement();
String headerValue = request.getHeader(headerName);
headers.append(headerName).append(":").append(headerValue).append(", ");
}
return headers.toString();
}

private Backend selectBackendForGroupCommit(String clusterName, HttpServletRequest req, long tableId)
throws LoadException {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setRemoteIP(req.getRemoteAddr());
// We set this variable to fulfill required field 'user' in
// TMasterOpRequest(FrontendService.thrift)
ctx.setQualifiedUser(Auth.ADMIN_USER);
ctx.setThreadLocalInfo();

Backend backend = null;
try {
backend = Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommit(tableId, ctx, isCloud);
} catch (DdlException e) {
throw new LoadException(e.getMessage(), e);
}
return backend;
}
}

0 comments on commit b963b09

Please sign in to comment.