Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Spark Load] Redirect the spark launcher's log to a separated log file #4470

Merged
merged 5 commits into from
Aug 30, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.doris.catalog;

import com.google.common.primitives.Longs;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.common.Pair;
Expand All @@ -28,9 +27,9 @@
import org.apache.doris.thrift.TTypeDesc;
import org.apache.doris.thrift.TTypeNode;
import org.apache.doris.thrift.TTypeNodeType;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down
6 changes: 6 additions & 0 deletions fe/fe-core/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,12 @@ public class Config extends ConfigBase {
@ConfField
public static String spark_resource_path = "";

/**
* The specified spark launcher log dir
*/
@ConfField
public static String spark_launcher_log_dir = sys_log_dir + "/spark_launcher_log";

/**
* Default yarn client path
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.apache.doris.transaction.GlobalTransactionMgr;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;

import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -379,6 +378,9 @@ public void removeOldLoadJob() {
&& ((currentTimeMs - job.getFinishTimestamp()) / 1000 > Config.label_keep_max_second)) {
iter.remove();
dbIdToLabelToLoadJobs.get(job.getDbId()).get(job.getLabel()).remove(job);
if (job instanceof SparkLoadJob) {
((SparkLoadJob) job).clearSparkLauncherLog();
}
}
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.spark.launcher.SparkLauncher;

import java.io.File;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.List;
Expand All @@ -66,6 +67,7 @@ public class SparkEtlJobHandler {
private static final String CONFIG_FILE_NAME = "jobconfig.json";
private static final String JOB_CONFIG_DIR = "configs";
private static final String ETL_JOB_NAME = "doris__%s";
private static final String LAUNCHER_LOG = "spark_launcher_%s_%s.log";
// 5min
private static final long GET_APPID_TIMEOUT_MS = 300000L;
// 30s
Expand All @@ -79,6 +81,11 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo
// delete outputPath
deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc);

// init local dir
if (!FeConstants.runningUnitTest) {
initLocalDir();
}

// prepare dpp archive
SparkRepository.SparkArchive archive = resource.prepareArchive();
SparkRepository.SparkLibrary dppLibrary = archive.getDppLibrary();
Expand All @@ -96,6 +103,8 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo
String jobArchiveHdfsPath = spark2xLibrary.remotePath;
// spark yarn stage dir
String jobStageHdfsPath = resource.getWorkingDir();
// spark launcher log path
String logFilePath = Config.spark_launcher_log_dir + "/" + String.format(LAUNCHER_LOG, loadJobId, loadLabel);

// update archive and stage configs here
Map<String, String> sparkConfigs = resource.getSparkConfigs();
Expand Down Expand Up @@ -143,6 +152,7 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo
if (!FeConstants.runningUnitTest) {
SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle);
logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
logMonitor.setRedirectLogPath(logFilePath);
logMonitor.start();
try {
logMonitor.join();
Expand Down Expand Up @@ -299,6 +309,14 @@ public Map<String, Long> getEtlFilePaths(String outputPath, BrokerDesc brokerDes
return filePathToSize;
}

public static synchronized void initLocalDir() {
String logDir = Config.spark_launcher_log_dir;
File file = new File(logDir);
if (!file.exists()) {
file.mkdirs();
}
}

public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) {
try {
BrokerUtil.deletePath(outputPath, brokerDesc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
import org.apache.logging.log4j.Logger;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -67,6 +70,7 @@ public static class LogMonitor extends Thread {
private SparkLoadAppHandle handle;
private long submitTimeoutMs;
private boolean isStop;
private OutputStream outputStream;

private static final String STATE = "state";
private static final String QUEUE = "queue";
Expand All @@ -89,6 +93,11 @@ public void setSubmitTimeoutMs(long submitTimeoutMs) {
this.submitTimeoutMs = submitTimeoutMs;
}

public void setRedirectLogPath(String redirectLogPath) throws IOException {
this.outputStream = new FileOutputStream(new File(redirectLogPath), false);
this.handle.setLogPath(redirectLogPath);
}

// Normally, log monitor will automatically stop if the spark app state changes
// to RUNNING.
// But if the spark app state changes to FAILED/KILLED/LOST, log monitor will stop
Expand All @@ -103,7 +112,9 @@ public void run() {
try {
outReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
while (!isStop && (line = outReader.readLine()) != null) {
LOG.info("monitor log: " + line);
if (outputStream != null) {
outputStream.write((line + "\n").getBytes());
}
SparkLoadAppHandle.State oldState = handle.getState();
SparkLoadAppHandle.State newState = oldState;
// parse state and appId
Expand Down Expand Up @@ -186,6 +197,9 @@ else if (line.contains(QUEUE) || line.contains(START_TIME) || line.contains(FINA
if (outReader != null) {
outReader.close();
}
if (outputStream != null) {
outputStream.close();
}
} catch (IOException e) {
LOG.warn("close buffered reader error", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class SparkLoadAppHandle {
private FinalApplicationStatus finalStatus;
private String trackingUrl;
private String user;
private String logPath;

private List<Listener> listeners;

Expand Down Expand Up @@ -112,6 +113,8 @@ public void kill() {

public String getUser() { return this.user; }

public String getLogPath() { return this.logPath; }

public void setState(State state) {
this.state = state;
this.fireEvent(false);
Expand Down Expand Up @@ -147,6 +150,11 @@ public void setUser(String user) {
this.fireEvent(true);
}

public void setLogPath(String logPath) {
this.logPath = logPath;
this.fireEvent(true);
}

private void fireEvent(boolean isInfoChanged) {
if (this.listeners != null) {
Iterator iterator = this.listeners.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@

import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -713,6 +714,18 @@ protected long getEtlStartTimestamp() {
return etlStartTimestamp;
}

public void clearSparkLauncherLog() {
if (sparkLoadAppHandle != null) {
String logPath = sparkLoadAppHandle.getLogPath();
if (!Strings.isNullOrEmpty(logPath)) {
File file = new File(logPath);
if (file.exists()) {
file.delete();
}
}
}
}

@Override
public void write(DataOutput out) throws IOException {
super.write(out);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.doris.load.loadv2;

import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import java.io.File;
import java.io.IOException;
import java.net.URL;

Expand All @@ -33,6 +35,7 @@ public class SparkLauncherMonitorTest {
private FinalApplicationStatus finalApplicationStatus;
private String trackingUrl;
private String user;
private String logPath;

@Before
public void setUp() {
Expand All @@ -43,6 +46,7 @@ public void setUp() {
finalApplicationStatus = FinalApplicationStatus.UNDEFINED;
trackingUrl = "http://myhost:8388/proxy/application_1573630236805_6864759/";
user = "testugi";
logPath = "./spark-launcher.log";
}

@Test
Expand All @@ -54,6 +58,7 @@ public void testLogMonitorNormal() {
Process process = Runtime.getRuntime().exec(cmd);
handle = new SparkLoadAppHandle(process);
SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle);
logMonitor.setRedirectLogPath(logPath);
logMonitor.start();
try {
logMonitor.join();
Expand All @@ -63,12 +68,25 @@ public void testLogMonitorNormal() {
Assert.fail();
}

// check values
Assert.assertEquals(appId, handle.getAppId());
Assert.assertEquals(state, handle.getState());
Assert.assertEquals(queue, handle.getQueue());
Assert.assertEquals(startTime, handle.getStartTime());
Assert.assertEquals(finalApplicationStatus, handle.getFinalStatus());
Assert.assertEquals(trackingUrl, handle.getUrl());
Assert.assertEquals(user, handle.getUser());

// check log
File file = new File(logPath);
Assert.assertTrue(file.exists());
}

@After
public void clear() {
File file = new File(logPath);
if (file.exists()) {
file.delete();
}
}
}