Skip to content

Commit

Permalink
Forawrd some stmt to master (apache#944)
Browse files Browse the repository at this point in the history
1. SHOW PROC
2. SHOW PROC web action
3. ADMIN SHOW stmt
4. SHOW ROUTINE LOAD stmt
  • Loading branch information
morningman authored and EmmyMiao87 committed Apr 23, 2019
1 parent 5de31a6 commit 16f10a7
Show file tree
Hide file tree
Showing 10 changed files with 170 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,9 @@ public ShowResultSetMetaData getMetaData() {
}
return builder.build();
}

@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,8 @@ public ShowResultSetMetaData getMetaData() {
return builder.build();
}

@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
}
5 changes: 5 additions & 0 deletions fe/src/main/java/org/apache/doris/analysis/ShowProcStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,9 @@ public ShowResultSetMetaData getMetaData() {
}
return builder.build();
}

@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ private void checkLabelName(Analyzer analyzer) throws AnalysisException {
name = labelName == null ? null : labelName.getLabelName();
}

public static List<String> getTitleNames() {
return TITLE_NAMES;
}

@Override
public ShowResultSetMetaData getMetaData() {
Expand All @@ -135,7 +138,8 @@ public ShowResultSetMetaData getMetaData() {
return builder.build();
}

public static List<String> getTitleNames() {
return TITLE_NAMES;
@Override
public RedirectStatus getRedirectStatus() {
return RedirectStatus.FORWARD_NO_SYNC;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
replica.setPathHash(backendTabletInfo.getPath_hash());
}

if (backendTabletInfo.isSetSchema_hash()
&& replica.getSchemaHash() != backendTabletInfo.getSchema_hash()) {
replica.setSchemaHash(backendTabletInfo.getSchema_hash());
}

if (needRecover(replica, tabletMeta.getOldSchemaHash(), backendTabletInfo)) {
LOG.warn("replica {} of tablet {} on backend {} need recovery. "
+ "replica in FE: {}, report version {}-{}, report schema hash: {},"
Expand Down
30 changes: 16 additions & 14 deletions fe/src/main/java/org/apache/doris/http/BaseAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,18 +47,20 @@
import io.netty.channel.ChannelProgressiveFutureListener;
import io.netty.channel.DefaultFileRegion;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.http.Cookie;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpChunkedInput;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.ServerCookieEncoder;
import io.netty.handler.codec.http.cookie.Cookie;
import io.netty.handler.codec.http.cookie.ServerCookieEncoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.stream.ChunkedFile;
import io.netty.util.CharsetUtil;
Expand Down Expand Up @@ -93,7 +95,7 @@ public void handleRequest(BaseRequest request) throws Exception {
} catch (Exception e) {
LOG.warn("fail to process url: {}", request.getRequest().uri(), e);
if (e instanceof UnauthorizedException) {
response.updateHeader(HttpHeaders.Names.WWW_AUTHENTICATE, "Basic realm=\"\"");
response.updateHeader(HttpHeaderNames.WWW_AUTHENTICATE.toString(), "Basic realm=\"\"");
writeResponse(request, response, HttpResponseStatus.UNAUTHORIZED);
} else {
writeResponse(request, response, HttpResponseStatus.NOT_FOUND);
Expand Down Expand Up @@ -123,17 +125,17 @@ protected void writeResponse(BaseRequest request, BaseResponse response, HttpRes

checkDefaultContentTypeHeader(response, responseObj);
if (!method.equals(HttpMethod.HEAD)) {
response.updateHeader(HttpHeaders.Names.CONTENT_LENGTH,
response.updateHeader(HttpHeaderNames.CONTENT_LENGTH.toString(),
String.valueOf(responseObj.content().readableBytes()));
}
writeCustomHeaders(response, responseObj);
writeCookies(response, responseObj);

boolean keepAlive = HttpHeaders.isKeepAlive(request.getRequest());
boolean keepAlive = HttpUtil.isKeepAlive(request.getRequest());
if (!keepAlive) {
request.getContext().write(responseObj).addListener(ChannelFutureListener.CLOSE);
} else {
responseObj.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
responseObj.headers().set(HttpHeaderNames.CONNECTION.toString(), HttpHeaderValues.KEEP_ALIVE.toString());
request.getContext().write(responseObj);
}
}
Expand All @@ -142,8 +144,8 @@ protected void writeFileResponse(BaseRequest request, BaseResponse response, Htt
File resFile) {
HttpResponse responseObj = new DefaultHttpResponse(HttpVersion.HTTP_1_1, status);

if (HttpHeaders.isKeepAlive(request.getRequest())) {
response.updateHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
if (HttpUtil.isKeepAlive(request.getRequest())) {
response.updateHeader(HttpHeaderNames.CONNECTION.toString(), HttpHeaderValues.KEEP_ALIVE.toString());
}

ChannelFuture sendFileFuture;
Expand All @@ -155,7 +157,7 @@ protected void writeFileResponse(BaseRequest request, BaseResponse response, Htt
rafFile = new RandomAccessFile(resFile, "r");
long fileLength = 0;
fileLength = rafFile.length();
response.updateHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(fileLength));
response.updateHeader(HttpHeaderNames.CONTENT_LENGTH.toString(), String.valueOf(fileLength));
writeCookies(response, responseObj);
writeCustomHeaders(response, responseObj);

Expand Down Expand Up @@ -205,7 +207,7 @@ public void operationComplete(ChannelProgressiveFuture future) {
});

// Decide whether to close the connection or not.
boolean keepAlive = HttpHeaders.isKeepAlive(request.getRequest());
boolean keepAlive = HttpUtil.isKeepAlive(request.getRequest());
if (!keepAlive) {
// Close the connection when the whole content is written out.
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
Expand All @@ -215,9 +217,9 @@ public void operationComplete(ChannelProgressiveFuture future) {
// Set 'CONTENT_TYPE' header if it havn't been set.
protected void checkDefaultContentTypeHeader(BaseResponse response, Object responseOj) {
if (!Strings.isNullOrEmpty(response.getContentType())) {
response.updateHeader(HttpHeaders.Names.CONTENT_TYPE, response.getContentType());
response.updateHeader(HttpHeaderNames.CONTENT_TYPE.toString(), response.getContentType());
} else {
response.updateHeader(HttpHeaders.Names.CONTENT_TYPE, "text/html");
response.updateHeader(HttpHeaderNames.CONTENT_TYPE.toString(), "text/html");
}
}

Expand All @@ -229,7 +231,7 @@ protected void writeCustomHeaders(BaseResponse response, HttpResponse responseOb

protected void writeCookies(BaseResponse response, HttpResponse responseObj) {
for (Cookie cookie : response.getCookies()) {
responseObj.headers().add(HttpHeaders.Names.SET_COOKIE, ServerCookieEncoder.encode(cookie));
responseObj.headers().add(HttpHeaderNames.SET_COOKIE.toString(), ServerCookieEncoder.LAX.encode(cookie));
}
}

Expand Down
2 changes: 1 addition & 1 deletion fe/src/main/java/org/apache/doris/http/BaseResponse.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.Map;
import java.util.Set;

import io.netty.handler.codec.http.Cookie;
import io.netty.handler.codec.http.cookie.Cookie;

public class BaseResponse {
private String contentType;
Expand Down
82 changes: 57 additions & 25 deletions fe/src/main/java/org/apache/doris/http/action/SystemAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.doris.http.action;

import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.proc.ProcDirInterface;
import org.apache.doris.common.proc.ProcNodeInterface;
Expand All @@ -25,10 +27,17 @@
import org.apache.doris.http.BaseRequest;
import org.apache.doris.http.BaseResponse;
import org.apache.doris.http.IllegalArgException;
import org.apache.doris.mysql.privilege.PaloAuth;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.MasterOpExecutor;
import org.apache.doris.qe.ShowResultSet;
import org.apache.doris.system.SystemInfoService;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;

import java.util.List;
import java.util.stream.Collectors;

import io.netty.handler.codec.http.HttpMethod;

Expand All @@ -47,50 +56,73 @@ public void executeGet(BaseRequest request, BaseResponse response) {
getPageHeader(request, response.getContent());

String currentPath = request.getSingleParameter("path");
ProcNodeInterface node = null;
// root path is default path
if (Strings.isNullOrEmpty(currentPath)) {
currentPath = "/";
}

node = getProcNode(currentPath);
appendSystemInfo(response.getContent(), node, currentPath);
appendSystemInfo(response.getContent(), currentPath, currentPath);

getPageFooter(response.getContent());
writeResponse(request, response);
}

private void appendSystemInfo(StringBuilder buffer, ProcNodeInterface procNode, String path) {
private void appendSystemInfo(StringBuilder buffer, String procPath, String path) {
buffer.append("<h2>System Info</h2>");
buffer.append("<p>This page lists the system info, like /proc in Linux.</p>");
buffer.append("<p class=\"text-info\"> Current path: " + path + "</p>");

ProcNodeInterface procNode = getProcNode(procPath);
if (procNode == null) {
buffer.append("<p class=\"text-error\"> No such proc path["
+ path
+ "]</p>");
buffer.append("<p class=\"text-error\"> No such proc path[" + path + "]</p>");
return;
}
boolean isDir = (procNode instanceof ProcDirInterface);

List<String> columnNames = null;
List<List<String>> rows = null;
if (!Catalog.getCurrentCatalog().isMaster()) {
// forward to master
String showProcStmt = "SHOW PROC \"" + procPath + "\"";
ConnectContext context = new ConnectContext(null);
context.setCatalog(Catalog.getCurrentCatalog());
context.setCluster(SystemInfoService.DEFAULT_CLUSTER);
context.setQualifiedUser(PaloAuth.ADMIN_USER);
MasterOpExecutor masterOpExecutor = new MasterOpExecutor(showProcStmt, context,
RedirectStatus.FORWARD_NO_SYNC);
try {
masterOpExecutor.execute();
} catch (Exception e) {
buffer.append("<p class=\"text-error\"> Failed to forward request to master</p>");
return;
}

boolean isDir = false;
if (procNode instanceof ProcDirInterface) {
isDir = true;
}
ShowResultSet resultSet = masterOpExecutor.getProxyResultSet();
if (resultSet == null) {
buffer.append("<p class=\"text-error\"> Failed to get result from master</p>");
return;
}

ProcResult result;
try {
result = procNode.fetchResult();
} catch (AnalysisException e) {
buffer.append("<p class=\"text-error\"> The result is null, "
+ "maybe haven't be implemented completely[" + e.getMessage() + "], please check.</p>");
buffer.append("<p class=\"text-info\"> "
+ "INFO: ProcNode type is [" + procNode.getClass().getName()
+ "]</p>");
return;
columnNames = resultSet.getMetaData().getColumns().stream().map(c -> c.getName()).collect(
Collectors.toList());
rows = resultSet.getResultRows();
} else {
ProcResult result;
try {
result = procNode.fetchResult();
} catch (AnalysisException e) {
buffer.append("<p class=\"text-error\"> The result is null, "
+ "maybe haven't be implemented completely[" + e.getMessage() + "], please check.</p>");
buffer.append("<p class=\"text-info\"> "
+ "INFO: ProcNode type is [" + procNode.getClass().getName()
+ "]</p>");
return;
}

columnNames = result.getColumnNames();
rows = result.getRows();
}

List<String> columnNames = result.getColumnNames();
List<List<String>> rows = result.getRows();
Preconditions.checkNotNull(columnNames);
Preconditions.checkNotNull(rows);

appendBackButton(buffer, path);
appendTableHeader(buffer, columnNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,22 @@

package org.apache.doris.http.rest;

import java.io.File;
import java.util.Set;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.apache.doris.common.Config;
import org.apache.doris.http.ActionController;
import org.apache.doris.http.BaseRequest;
import org.apache.doris.http.BaseResponse;
import org.apache.doris.http.IllegalArgException;

import com.google.common.base.Strings;
import com.google.common.collect.Sets;

import io.netty.handler.codec.http.HttpHeaders;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.File;
import java.util.Set;

import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;

Expand Down Expand Up @@ -82,7 +83,7 @@ public void execute(BaseRequest request, BaseResponse response) {
writeFileResponse(request, response, HttpResponseStatus.OK, logFile);
} else if (method.equals(HttpMethod.HEAD)) {
long fileLength = logFile.length();
response.updateHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(fileLength));
response.updateHeader(HttpHeaderNames.CONTENT_LENGTH.toString(), String.valueOf(fileLength));
writeResponse(request, response, HttpResponseStatus.OK);
} else {
response.appendContent(new RestBaseResult("HTTP method is not allowed.").toJson());
Expand Down
Loading

0 comments on commit 16f10a7

Please sign in to comment.