Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-2.0](pick#26278)(auditloader) Plugin auditloader use auth token to avoid using cleartext passwords in config #26532

Merged
merged 1 commit into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 111 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/httpv2/rest/LoadAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.common.LoadException;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.entity.RestBaseResult;
import org.apache.doris.httpv2.exception.UnauthorizedException;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
Expand All @@ -43,6 +44,7 @@
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.view.RedirectView;

import java.net.URI;
import java.util.List;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
Expand Down Expand Up @@ -81,7 +83,20 @@ public Object streamLoad(HttpServletRequest request,
return redirectToHttps(request);
}

executeCheckPassword(request, response);
try {
executeCheckPassword(request, response);
} catch (UnauthorizedException unauthorizedException) {
if (LOG.isDebugEnabled()) {
LOG.debug("Check password failed, going to check auth token, request: {}", request.toString());
}

if (!checkClusterToken(request)) {
throw unauthorizedException;
} else {
return executeWithClusterToken(request, db, table, true);
}
}

return executeWithoutPassword(request, response, db, table, true);
}

Expand Down Expand Up @@ -224,4 +239,99 @@ private TNetworkAddress selectRedirectBackend(String clusterName) throws LoadExc
}
return new TNetworkAddress(backend.getHost(), backend.getHttpPort());
}

// NOTE: This function can only be used for AuditlogPlugin stream load for now.
// AuditlogPlugin should be re-disigned carefully, and blow method focuses on
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
private boolean checkClusterToken(HttpServletRequest request) {
if (LOG.isDebugEnabled()) {
LOG.debug("Checking cluser token, request {}", request.toString());
}

String authToken = request.getHeader("token");

if (Strings.isNullOrEmpty(authToken)) {
return false;
}

return Env.getCurrentEnv().getLoadManager().getTokenManager().checkAuthToken(authToken);
}

// NOTE: This function can only be used for AuditlogPlugin stream load for now.
// AuditlogPlugin should be re-disigned carefully, and blow method focuses on
// temporarily addressing the users' needs for audit logs.
// So this function is not widely tested under general scenario
private Object executeWithClusterToken(HttpServletRequest request, String db,
String table, boolean isStreamLoad) {
try {
ConnectContext ctx = new ConnectContext();
ctx.setEnv(Env.getCurrentEnv());
ctx.setThreadLocalInfo();
ctx.setCluster(SystemInfoService.DEFAULT_CLUSTER);
ctx.setRemoteIP(request.getRemoteAddr());

String dbName = db;
String tableName = table;
// A 'Load' request must have 100-continue header
if (request.getHeader(HttpHeaderNames.EXPECT.toString()) == null) {
return new RestBaseResult("There is no 100-continue header");
}

final String clusterName = ConnectContext.get().getClusterName();
if (Strings.isNullOrEmpty(clusterName)) {
return new RestBaseResult("No cluster selected.");
}

if (Strings.isNullOrEmpty(dbName)) {
return new RestBaseResult("No database selected.");
}

if (Strings.isNullOrEmpty(tableName)) {
return new RestBaseResult("No table selected.");
}

String label = request.getParameter(LABEL_KEY);
if (isStreamLoad) {
label = request.getHeader(LABEL_KEY);
}

if (!isStreamLoad && Strings.isNullOrEmpty(label)) {
// for stream load, the label can be generated by system automatically
return new RestBaseResult("No label selected.");
}

TNetworkAddress redirectAddr = selectRedirectBackend(clusterName);

LOG.info("Redirect load action with auth token to destination={},"
+ "stream: {}, db: {}, tbl: {}, label: {}",
redirectAddr.toString(), isStreamLoad, dbName, tableName, label);

URI urlObj = null;
URI resultUriObj = null;
String urlStr = request.getRequestURI();
String userInfo = null;

try {
urlObj = new URI(urlStr);
resultUriObj = new URI("http", userInfo, redirectAddr.getHostname(),
redirectAddr.getPort(), urlObj.getPath(), "", null);
} catch (Exception e) {
throw new RuntimeException(e);
}
String redirectUrl = resultUriObj.toASCIIString();
if (!Strings.isNullOrEmpty(request.getQueryString())) {
redirectUrl += request.getQueryString();
}
LOG.info("Redirect url: {}", redirectUrl);
RedirectView redirectView = new RedirectView(redirectUrl);
redirectView.setContentType("text/html;charset=utf-8");
redirectView.setStatusCode(org.springframework.http.HttpStatus.TEMPORARY_REDIRECT);

return redirectView;
} catch (Exception e) {
LOG.warn("Failed to execute stream load with cluster token, {}", e);
return new RestBaseResult(e.getMessage());
}
}
}
3 changes: 3 additions & 0 deletions fe_plugins/auditloader/src/main/assembly/plugin.conf
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,6 @@ user=root
# Doris user's password
password=

# Use doris cluster token for stream load authorization, if true, user and password will be ignored.
use_auth_token=false

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.plugin.audit;

import org.apache.doris.common.Config;
import org.apache.doris.catalog.Env;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.plugin.AuditPlugin;
import org.apache.doris.plugin.Plugin;
Expand Down Expand Up @@ -84,7 +85,6 @@ public void init(PluginInfo info, PluginContext ctx) throws PluginException {
this.lastLoadTimeSlowLog = System.currentTimeMillis();

loadConfig(ctx, info.getProperties());

this.auditEventQueue = Queues.newLinkedBlockingDeque(conf.maxQueueSize);
this.streamLoader = new DorisStreamLoader(conf);
this.loadThread = new Thread(new LoadWorker(this.streamLoader), "audit loader thread");
Expand Down Expand Up @@ -209,7 +209,16 @@ private void loadIfNecessary(DorisStreamLoader loader, boolean slowLog) {
if (logBuffer.length() >= conf.maxBatchSize || currentTime - lastLoadTime >= conf.maxBatchIntervalSec * 1000) {
// begin to load
try {
DorisStreamLoader.LoadResponse response = loader.loadBatch(logBuffer, slowLog);
String token = "";
if (conf.use_auth_token) {
try {
// Acquire token from master
token = Env.getCurrentEnv().getLoadManager().getTokenManager().acquireToken();
} catch (Exception e) {
LOG.error("Failed to get auth token: {}", e);
}
}
DorisStreamLoader.LoadResponse response = loader.loadBatch(logBuffer, slowLog, token);
LOG.debug("audit loader response: {}", response);
} catch (Exception e) {
LOG.debug("encounter exception when putting current audit batch, discard current batch", e);
Expand Down Expand Up @@ -248,6 +257,7 @@ public static class AuditLoaderConf {
public static final String PROP_ENABLE_SLOW_LOG = "enable_slow_log";
// the max stmt length to be loaded in audit table.
public static final String MAX_STMT_LENGTH = "max_stmt_length";
public static final String USE_AUTH_TOKEN = "use_auth_token";

public long maxBatchSize = 50 * 1024 * 1024;
public long maxBatchIntervalSec = 60;
Expand All @@ -262,6 +272,9 @@ public static class AuditLoaderConf {
// the identity of FE which run this plugin
public String feIdentity = "";
public int max_stmt_length = 4096;
// auth_token is not used by default
public boolean use_auth_token = false;


public void init(Map<String, String> properties) throws PluginException {
try {
Expand Down Expand Up @@ -302,6 +315,9 @@ public void init(Map<String, String> properties) throws PluginException {
if (properties.containsKey(MAX_STMT_LENGTH)) {
max_stmt_length = Integer.parseInt(properties.get(MAX_STMT_LENGTH));
}
if (properties.containsKey(USE_AUTH_TOKEN)) {
use_auth_token = Boolean.valueOf(properties.get(USE_AUTH_TOKEN));
}
} catch (Exception e) {
throw new PluginException(e.getMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,12 @@ public DorisStreamLoader(AuditLoaderPlugin.AuditLoaderConf conf) {
this.feIdentity = conf.feIdentity.replaceAll("\\.", "_");
}

private HttpURLConnection getConnection(String urlStr, String label) throws IOException {
private HttpURLConnection getConnection(String urlStr, String label, String clusterToken) throws IOException {
URL url = new URL(urlStr);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setInstanceFollowRedirects(false);
conn.setRequestMethod("PUT");
conn.setRequestProperty("token", clusterToken);
conn.setRequestProperty("Authorization", "Basic " + authEncoding);
conn.addRequestProperty("Expect", "100-continue");
conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
Expand Down Expand Up @@ -114,7 +115,7 @@ private String getContent(HttpURLConnection conn) {
return response.toString();
}

public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) {
public LoadResponse loadBatch(StringBuilder sb, boolean slowLog, String clusterToken) {
Calendar calendar = Calendar.getInstance();
String label = String.format("_log_%s%02d%02d_%02d%02d%02d_%s",
calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH),
Expand All @@ -127,10 +128,10 @@ public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) {
// build request and send to fe
if (slowLog) {
label = "slow" + label;
feConn = getConnection(slowLogLoadUrlStr, label);
feConn = getConnection(slowLogLoadUrlStr, label, clusterToken);
} else {
label = "audit" + label;
feConn = getConnection(auditLogLoadUrlStr, label);
feConn = getConnection(auditLogLoadUrlStr, label, clusterToken);
}
int status = feConn.getResponseCode();
// fe send back http response code TEMPORARY_REDIRECT 307 and new be location
Expand All @@ -143,7 +144,7 @@ public LoadResponse loadBatch(StringBuilder sb, boolean slowLog) {
throw new Exception("redirect location is null");
}
// build request and send to new be location
beConn = getConnection(location, label);
beConn = getConnection(location, label, clusterToken);
// send data to be
BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
bos.write(sb.toString().getBytes());
Expand Down