Skip to content

Commit

Permalink
[Feature](auditloader) Plugin auditloader use auth token to avoid usi…
Browse files Browse the repository at this point in the history
…ng cleartext passwords in config (apache#26278) (apache#26532)

Doris FE will check if stream load http request has auth token after checking password failed;
Plugin audit-log loader can use auth token if plugin config set use_auth_token to true

Co-authored-by: Mingyu Chen <morningman.cmy@gmail.com>
  • Loading branch information
2 people authored and gnehil committed Dec 4, 2023
1 parent 787629a commit 31df83e
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 8 deletions.
112 changes: 111 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.common.LoadException;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
Expand All @@ -43,6 +44,7 @@
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;

import java.net.URI;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
Expand Down Expand Up @@ -81,7 +83,20 @@ public Object streamLoad(HttpServletRequest request,
return redirectToHttps(request);
}

executeCheckPassword(request, response);
try {
executeCheckPassword(request, response);
} catch (UnauthorizedException unauthorizedException) {
if (LOG.isDebugEnabled()) {
LOG.debug("Check password failed, going to check auth token, request: {}", request.toString());
}

if (!checkClusterToken(request)) {
throw unauthorizedException;
} else {
return executeWithClusterToken(request, db, table, true);
}
}

return executeWithoutPassword(request, response, db, table, true);
}

Expand Down Expand Up @@ -224,4 +239,99 @@ private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadExc
}
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}

// NOTE: This function can only be used for AuditlogPlugin stream load for now.
// AuditlogPlugin should be re-disigned carefully, and blow method focuses on
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
private boolean checkClusterToken(HttpServletRequest request) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking cluser token, request {}", request.toString());
}

String authToken = request.getHeader("token");

if (Strings.isNullOrEmpty(authToken)) {
return false;
}

return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(authToken);
}

// NOTE: This function can only be used for AuditlogPlugin stream load for now.
// AuditlogPlugin should be re-disigned carefully, and blow method focuses on
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
private Object executeWithClusterToken(HttpServletRequest request, String db,
String table, boolean isStreamLoad) {
try {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
ctx.setRemoteIP(request.getRemoteAddr());

String dbName = db;
String tableName = table;
// A 'Load' request must have 100-continue header
if (request.getHeader(HttpHeaderNames.EXPECT.toString()) == null) {
return new RestBaseResult("There is no 100-continue header");
}

final String clusterName = ConnectContext.get().getClusterName();
if (Strings.isNullOrEmpty(clusterName)) {
return new RestBaseResult("No cluster selected.");
}

if (Strings.isNullOrEmpty(dbName)) {
return new RestBaseResult("No database selected.");
}

if (Strings.isNullOrEmpty(tableName)) {
return new RestBaseResult("No table selected.");
}

String label = request.getParameter(LABEL_KEY);
if (isStreamLoad) {
label = request.getHeader(LABEL_KEY);
}

if (!isStreamLoad && Strings.isNullOrEmpty(label)) {
// for stream load, the label can be generated by system automatically
return new RestBaseResult("No label selected.");
}

TNetworkAddress redirectAddr = selectRedirectBackend(clusterName);

LOG.info("Redirect load action with auth token to destination={},"
+ "stream: {}, db: {}, tbl: {}, label: {}",
redirectAddr.toString(), isStreamLoad, dbName, tableName, label);

URI urlObj = null;
URI resultUriObj = null;
String urlStr = request.getRequestURI();
String userInfo = null;

try {
urlObj = new URI(urlStr);
resultUriObj = new URI("http", userInfo, redirectAddr.getHostname(),
redirectAddr.getPort(), urlObj.getPath(), "", null);
} catch (Exception e) {
throw new RuntimeException(e);
}
String redirectUrl = resultUriObj.toASCIIString();
if (!Strings.isNullOrEmpty(request.getQueryString())) {
redirectUrl += request.getQueryString();
}
LOG.info("Redirect url: {}", redirectUrl);
RedirectView redirectView = new RedirectView(redirectUrl);
redirectView.setContentType("text/html;charset=utf-8");
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);

return redirectView;
} catch (Exception e) {
LOG.warn("Failed to execute stream load with cluster token, {}", e);
return new RestBaseResult(e.getMessage());
}
}
}
3 changes: 3 additions & 0 deletions fe_plugins/auditloader/src/main/assembly/plugin.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ user=root
# Doris user's password
password=

# Use doris cluster token for stream load authorization, if true, user and password will be ignored.
use_auth_token=false

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.plugin.audit;

import org.apache.doris.common.Config;
import org.apache.doris.catalog.Env;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.AuditPlugin;
import org.apache.doris.plugin.Plugin;
Expand Down Expand Up @@ -84,7 +85,6 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException {
this.lastLoadTimeSlowLog = System.currentTimeMillis();

loadConfig(ctx, info.getProperties());

this.auditEventQueue = Queues.newLinkedBlockingDeque(conf.maxQueueSize);
this.streamLoader = new DorisStreamLoader(conf);
this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread");
Expand Down Expand Up @@ -209,7 +209,16 @@ private void loadIfNecessary(DorisStreamLoader loader, boolean slowLog) {
if (logBuffer.length() >= conf.maxBatchSize || currentTime - lastLoadTime >= conf.maxBatchIntervalSec * 1000) {
// begin to load
try {
DorisStreamLoader.LoadResponse response = loader.loadBatch(logBuffer, slowLog);
String token = "";
if (conf.use_auth_token) {
try {
// Acquire token from master
token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
} catch (Exception e) {
LOG.error("Failed to get auth token: {}", e);
}
}
DorisStreamLoader.LoadResponse response = loader.loadBatch(logBuffer, slowLog, token);
LOG.debug("audit loader response: {}", response);
} catch (Exception e) {
LOG.debug("encounter exception when putting current audit batch, discard current batch", e);
Expand Down Expand Up @@ -248,6 +257,7 @@ public static class AuditLoaderConf {
public static final String PROP_ENABLE_SLOW_LOG = "enable_slow_log";
// the max stmt length to be loaded in audit table.
public static final String MAX_STMT_LENGTH = "max_stmt_length";
public static final String USE_AUTH_TOKEN = "use_auth_token";

public long maxBatchSize = 50 * 1024 * 1024;
public long maxBatchIntervalSec = 60;
Expand All @@ -262,6 +272,9 @@ public static class AuditLoaderConf {
// the identity of FE which run this plugin
public String feIdentity = "";
public int max_stmt_length = 4096;
// auth_token is not used by default
public boolean use_auth_token = false;


public void init(Map<String, String> properties) throws PluginException {
try {
Expand Down Expand Up @@ -302,6 +315,9 @@ public void init(Map<String, String> properties) throws PluginException {
if (properties.containsKey(MAX_STMT_LENGTH)) {
max_stmt_length = Integer.parseInt(properties.get(MAX_STMT_LENGTH));
}
if (properties.containsKey(USE_AUTH_TOKEN)) {
use_auth_token = Boolean.valueOf(properties.get(USE_AUTH_TOKEN));
}
} catch (Exception e) {
throw new PluginException(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ public DorisStreamLoader(AuditLoaderPlugin.AuditLoaderConf conf) {
this.feIdentity = conf.feIdentity.replaceAll("\\.", "_");
}

private HttpURLConnection getConnection(String urlStr, String label) throws IOException {
private HttpURLConnection getConnection(String urlStr, String label, String clusterToken) throws IOException {
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
conn.setRequestProperty("token", clusterToken);
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
Expand Down Expand Up @@ -114,7 +115,7 @@ private String getContent(HttpURLConnection conn) {
return response.toString();
}

public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) {
public LoadResponse loadBatch(StringBuilder sb, boolean slowLog, String clusterToken) {
Calendar calendar = Calendar.getInstance();
String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
Expand All @@ -127,10 +128,10 @@ public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) {
// build request and send to fe
if (slowLog) {
label = "slow" + label;
feConn = getConnection(slowLogLoadUrlStr, label);
feConn = getConnection(slowLogLoadUrlStr, label, clusterToken);
} else {
label = "audit" + label;
feConn = getConnection(auditLogLoadUrlStr, label);
feConn = getConnection(auditLogLoadUrlStr, label, clusterToken);
}
int status = feConn.getResponseCode();
// fe send back http response code TEMPORARY_REDIRECT 307 and new be location
Expand All @@ -143,7 +144,7 @@ public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) {
throw new Exception("redirect location is null");
}
// build request and send to new be location
beConn = getConnection(location, label);
beConn = getConnection(location, label, clusterToken);
// send data to be
BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
bos.write(sb.toString().getBytes());
Expand Down

0 comments on commit 31df83e

Please sign in to comment.