diff --git a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java index fa02f458076e6e..71afe476ee865c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java @@ -24,6 +24,7 @@ import org.apache.doris.common.LoadException; import org.apache.doris.httpv2.entity.ResponseEntityBuilder; import org.apache.doris.httpv2.entity.RestBaseResult; +import org.apache.doris.httpv2.exception.UnauthorizedException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.service.ExecuteEnv; @@ -43,6 +44,7 @@ import org.springframework.web.bind.annotation.RestController; import org.springframework.web.servlet.view.RedirectView; +import java.net.URI; import java.util.List; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -81,7 +83,20 @@ public Object streamLoad(HttpServletRequest request, return redirectToHttps(request); } - executeCheckPassword(request, response); + try { + executeCheckPassword(request, response); + } catch (UnauthorizedException unauthorizedException) { + if (LOG.isDebugEnabled()) { + LOG.debug("Check password failed, going to check auth token, request: {}", request.toString()); + } + + if (!checkClusterToken(request)) { + throw unauthorizedException; + } else { + return executeWithClusterToken(request, db, table, true); + } + } + return executeWithoutPassword(request, response, db, table, true); } @@ -224,4 +239,99 @@ private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadExc } return new TNetworkAddress(backend.getHost(), backend.getHttpPort()); } + + // NOTE: This function can only be used for AuditlogPlugin stream load for now. + // AuditlogPlugin should be re-disigned carefully, and blow method focuses on + // temporarily addressing the users' needs for audit logs. + // So this function is not widely tested under general scenario + private boolean checkClusterToken(HttpServletRequest request) { + if (LOG.isDebugEnabled()) { + LOG.debug("Checking cluser token, request {}", request.toString()); + } + + String authToken = request.getHeader("token"); + + if (Strings.isNullOrEmpty(authToken)) { + return false; + } + + return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(authToken); + } + + // NOTE: This function can only be used for AuditlogPlugin stream load for now. + // AuditlogPlugin should be re-disigned carefully, and blow method focuses on + // temporarily addressing the users' needs for audit logs. + // So this function is not widely tested under general scenario + private Object executeWithClusterToken(HttpServletRequest request, String db, + String table, boolean isStreamLoad) { + try { + ConnectContext ctx = new ConnectContext(); + ctx.setEnv(Env.getCurrentEnv()); + ctx.setThreadLocalInfo(); + ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER); + ctx.setRemoteIP(request.getRemoteAddr()); + + String dbName = db; + String tableName = table; + // A 'Load' request must have 100-continue header + if (request.getHeader(HttpHeaderNames.EXPECT.toString()) == null) { + return new RestBaseResult("There is no 100-continue header"); + } + + final String clusterName = ConnectContext.get().getClusterName(); + if (Strings.isNullOrEmpty(clusterName)) { + return new RestBaseResult("No cluster selected."); + } + + if (Strings.isNullOrEmpty(dbName)) { + return new RestBaseResult("No database selected."); + } + + if (Strings.isNullOrEmpty(tableName)) { + return new RestBaseResult("No table selected."); + } + + String label = request.getParameter(LABEL_KEY); + if (isStreamLoad) { + label = request.getHeader(LABEL_KEY); + } + + if (!isStreamLoad && Strings.isNullOrEmpty(label)) { + // for stream load, the label can be generated by system automatically + return new RestBaseResult("No label selected."); + } + + TNetworkAddress redirectAddr = selectRedirectBackend(clusterName); + + LOG.info("Redirect load action with auth token to destination={}," + + "stream: {}, db: {}, tbl: {}, label: {}", + redirectAddr.toString(), isStreamLoad, dbName, tableName, label); + + URI urlObj = null; + URI resultUriObj = null; + String urlStr = request.getRequestURI(); + String userInfo = null; + + try { + urlObj = new URI(urlStr); + resultUriObj = new URI("http", userInfo, redirectAddr.getHostname(), + redirectAddr.getPort(), urlObj.getPath(), "", null); + } catch (Exception e) { + throw new RuntimeException(e); + } + String redirectUrl = resultUriObj.toASCIIString(); + if (!Strings.isNullOrEmpty(request.getQueryString())) { + redirectUrl += request.getQueryString(); + } + LOG.info("Redirect url: {}", redirectUrl); + RedirectView redirectView = new RedirectView(redirectUrl); + redirectView.setContentType("text/html;charset=utf-8"); + redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT); + + return redirectView; + } catch (Exception e) { + LOG.warn("Failed to execute stream load with cluster token, {}", e); + return new RestBaseResult(e.getMessage()); + } + } } diff --git a/fe_plugins/auditloader/src/main/assembly/plugin.conf b/fe_plugins/auditloader/src/main/assembly/plugin.conf index 31f7bd3f35616f..aec8724fd9612c 100755 --- a/fe_plugins/auditloader/src/main/assembly/plugin.conf +++ b/fe_plugins/auditloader/src/main/assembly/plugin.conf @@ -51,3 +51,6 @@ user=root # Doris user's password password= +# Use doris cluster token for stream load authorization, if true, user and password will be ignored. +use_auth_token=false + diff --git a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java index 2aa1246dd6b56b..3cfb0eeeaee3a5 100755 --- a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java +++ b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/AuditLoaderPlugin.java @@ -18,6 +18,7 @@ package org.apache.doris.plugin.audit; import org.apache.doris.common.Config; +import org.apache.doris.catalog.Env; import org.apache.doris.plugin.AuditEvent; import org.apache.doris.plugin.AuditPlugin; import org.apache.doris.plugin.Plugin; @@ -84,7 +85,6 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException { this.lastLoadTimeSlowLog = System.currentTimeMillis(); loadConfig(ctx, info.getProperties()); - this.auditEventQueue = Queues.newLinkedBlockingDeque(conf.maxQueueSize); this.streamLoader = new DorisStreamLoader(conf); this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread"); @@ -209,7 +209,16 @@ private void loadIfNecessary(DorisStreamLoader loader, boolean slowLog) { if (logBuffer.length() >= conf.maxBatchSize || currentTime - lastLoadTime >= conf.maxBatchIntervalSec * 1000) { // begin to load try { - DorisStreamLoader.LoadResponse response = loader.loadBatch(logBuffer, slowLog); + String token = ""; + if (conf.use_auth_token) { + try { + // Acquire token from master + token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken(); + } catch (Exception e) { + LOG.error("Failed to get auth token: {}", e); + } + } + DorisStreamLoader.LoadResponse response = loader.loadBatch(logBuffer, slowLog, token); LOG.debug("audit loader response: {}", response); } catch (Exception e) { LOG.debug("encounter exception when putting current audit batch, discard current batch", e); @@ -248,6 +257,7 @@ public static class AuditLoaderConf { public static final String PROP_ENABLE_SLOW_LOG = "enable_slow_log"; // the max stmt length to be loaded in audit table. public static final String MAX_STMT_LENGTH = "max_stmt_length"; + public static final String USE_AUTH_TOKEN = "use_auth_token"; public long maxBatchSize = 50 * 1024 * 1024; public long maxBatchIntervalSec = 60; @@ -262,6 +272,9 @@ public static class AuditLoaderConf { // the identity of FE which run this plugin public String feIdentity = ""; public int max_stmt_length = 4096; + // auth_token is not used by default + public boolean use_auth_token = false; + public void init(Map properties) throws PluginException { try { @@ -302,6 +315,9 @@ public void init(Map properties) throws PluginException { if (properties.containsKey(MAX_STMT_LENGTH)) { max_stmt_length = Integer.parseInt(properties.get(MAX_STMT_LENGTH)); } + if (properties.containsKey(USE_AUTH_TOKEN)) { + use_auth_token = Boolean.valueOf(properties.get(USE_AUTH_TOKEN)); + } } catch (Exception e) { throw new PluginException(e.getMessage()); } diff --git a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java index d2249a3ea7a2d1..d0d90d1bb58dcc 100644 --- a/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java +++ b/fe_plugins/auditloader/src/main/java/org/apache/doris/plugin/audit/DorisStreamLoader.java @@ -59,11 +59,12 @@ public DorisStreamLoader(AuditLoaderPlugin.AuditLoaderConf conf) { this.feIdentity = conf.feIdentity.replaceAll("\\.", "_"); } - private HttpURLConnection getConnection(String urlStr, String label) throws IOException { + private HttpURLConnection getConnection(String urlStr, String label, String clusterToken) throws IOException { URL url = new URL(urlStr); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setInstanceFollowRedirects(false); conn.setRequestMethod("PUT"); + conn.setRequestProperty("token", clusterToken); conn.setRequestProperty("Authorization", "Basic " + authEncoding); conn.addRequestProperty("Expect", "100-continue"); conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8"); @@ -114,7 +115,7 @@ private String getContent(HttpURLConnection conn) { return response.toString(); } - public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) { + public LoadResponse loadBatch(StringBuilder sb, boolean slowLog, String clusterToken) { Calendar calendar = Calendar.getInstance(); String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), @@ -127,10 +128,10 @@ public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) { // build request and send to fe if (slowLog) { label = "slow" + label; - feConn = getConnection(slowLogLoadUrlStr, label); + feConn = getConnection(slowLogLoadUrlStr, label, clusterToken); } else { label = "audit" + label; - feConn = getConnection(auditLogLoadUrlStr, label); + feConn = getConnection(auditLogLoadUrlStr, label, clusterToken); } int status = feConn.getResponseCode(); // fe send back http response code TEMPORARY_REDIRECT 307 and new be location @@ -143,7 +144,7 @@ public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) { throw new Exception("redirect location is null"); } // build request and send to new be location - beConn = getConnection(location, label); + beConn = getConnection(location, label, clusterToken); // send data to be BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream()); bos.write(sb.toString().getBytes());