From cf2de86e8129c3f690ba0dec010455eecbb8f0c5 Mon Sep 17 00:00:00 2001 From: xy720 Date: Thu, 27 Aug 2020 17:12:23 +0800 Subject: [PATCH 1/5] print log --- .../java/org/apache/doris/common/Config.java | 6 ++++++ .../doris/load/loadv2/SparkEtlJobHandler.java | 16 ++++++++++++++++ .../load/loadv2/SparkLauncherMonitor.java | 19 ++++++++++++++++++- 3 files changed, 40 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 2f53fc47e8887a..b88bc063559f58 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -543,6 +543,12 @@ public class Config extends ConfigBase { @ConfField public static String spark_resource_path = ""; + /** + * The specified spark launcher log dir + */ + @ConfField + public static String spark_launcher_log_dir = sys_log_dir + "/spark_launcher_log"; + /** * Default yarn client path */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index 96b7064453a5bf..f3009b2888a819 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -47,6 +47,7 @@ import org.apache.logging.log4j.Logger; import org.apache.spark.launcher.SparkLauncher; +import java.io.File; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.List; @@ -66,6 +67,7 @@ public class SparkEtlJobHandler { private static final String CONFIG_FILE_NAME = "jobconfig.json"; private static final String JOB_CONFIG_DIR = "configs"; private static final String ETL_JOB_NAME = "doris__%s"; + private static final String LAUNCHER_LOG = "spark_launcher_%s_%s.log"; // 5min private static final long GET_APPID_TIMEOUT_MS = 300000L; // 30s @@ -79,6 +81,9 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo // delete outputPath deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc); + // init local dir + initLocalDir(); + // prepare dpp archive SparkRepository.SparkArchive archive = resource.prepareArchive(); SparkRepository.SparkLibrary dppLibrary = archive.getDppLibrary(); @@ -96,6 +101,8 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo String jobArchiveHdfsPath = spark2xLibrary.remotePath; // spark yarn stage dir String jobStageHdfsPath = resource.getWorkingDir(); + // spark launcher log path + String logFilePath = Config.spark_launcher_log_dir + "/" + String.format(LAUNCHER_LOG, loadJobId, loadLabel); // update archive and stage configs here Map sparkConfigs = resource.getSparkConfigs(); @@ -143,6 +150,7 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo if (!FeConstants.runningUnitTest) { SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle); logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS); + logMonitor.setRedirectLogPath(logFilePath); logMonitor.start(); try { logMonitor.join(); @@ -299,6 +307,14 @@ public Map getEtlFilePaths(String outputPath, BrokerDesc brokerDes return filePathToSize; } + public static synchronized void initLocalDir() { + String logDir = Config.spark_launcher_log_dir; + File file = new File(logDir); + if (!file.exists()) { + file.mkdirs(); + } + } + public void deleteEtlOutputPath(String outputPath, BrokerDesc brokerDesc) { try { BrokerUtil.deletePath(outputPath, brokerDesc); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java index 628037d68895ce..6eb5a1e402717c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java @@ -27,8 +27,11 @@ import org.apache.logging.log4j.Logger; import java.io.BufferedReader; +import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStreamReader; +import java.io.OutputStream; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -67,6 +70,8 @@ public static class LogMonitor extends Thread { private SparkLoadAppHandle handle; private long submitTimeoutMs; private boolean isStop; + private StringBuffer outputBuffer; + private OutputStream outputStream; private static final String STATE = "state"; private static final String QUEUE = "queue"; @@ -82,6 +87,7 @@ public LogMonitor(SparkLoadAppHandle handle) { this.handle = handle; this.process = handle.getProcess(); this.isStop = false; + this.outputBuffer = new StringBuffer(); setSubmitTimeoutMs(DEFAULT_SUBMIT_TIMEOUT_MS); } @@ -89,6 +95,10 @@ public void setSubmitTimeoutMs(long submitTimeoutMs) { this.submitTimeoutMs = submitTimeoutMs; } + public void setRedirectLogPath(String redirectLogPath) throws IOException { + this.outputStream = new FileOutputStream(new File(redirectLogPath), false); + } + // Normally, log monitor will automatically stop if the spark app state changes // to RUNNING. // But if the spark app state changes to FAILED/KILLED/LOST, log monitor will stop @@ -103,7 +113,7 @@ public void run() { try { outReader = new BufferedReader(new InputStreamReader(process.getInputStream())); while (!isStop && (line = outReader.readLine()) != null) { - LOG.info("monitor log: " + line); + outputBuffer.append(line + '\n'); SparkLoadAppHandle.State oldState = handle.getState(); SparkLoadAppHandle.State newState = oldState; // parse state and appId @@ -179,6 +189,10 @@ else if (line.contains(QUEUE) || line.contains(START_TIME) || line.contains(FINA } } } + + if (outputStream != null) { + outputStream.write(outputBuffer.toString().getBytes()); + } } catch (Exception e) { LOG.warn("Exception monitoring process.", e); } finally { @@ -186,6 +200,9 @@ else if (line.contains(QUEUE) || line.contains(START_TIME) || line.contains(FINA if (outReader != null) { outReader.close(); } + if (outputStream != null) { + outputStream.close(); + } } catch (IOException e) { LOG.warn("close buffered reader error", e); } From 5e7bd51efe0f4515f6df65b8bf05ec5135729c9c Mon Sep 17 00:00:00 2001 From: xy720 Date: Thu, 27 Aug 2020 19:27:21 +0800 Subject: [PATCH 2/5] clear log --- .../java/org/apache/doris/catalog/Type.java | 2 +- .../apache/doris/load/loadv2/LoadManager.java | 4 +++- .../doris/load/loadv2/SparkEtlJobHandler.java | 4 +++- .../load/loadv2/SparkLauncherMonitor.java | 1 + .../doris/load/loadv2/SparkLoadAppHandle.java | 8 ++++++++ .../apache/doris/load/loadv2/SparkLoadJob.java | 15 +++++++++++++++ .../load/loadv2/SparkLauncherMonitorTest.java | 18 ++++++++++++++++++ 7 files changed, 49 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java index 6dcf51ed64b19f..d079785f850607 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java @@ -28,9 +28,9 @@ import org.apache.doris.thrift.TTypeDesc; import org.apache.doris.thrift.TTypeNode; import org.apache.doris.thrift.TTypeNodeType; - import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.primitives.Longs; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index 9b0741b835f1ac..5f6d6c31a1b966 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -45,7 +45,6 @@ import org.apache.doris.transaction.GlobalTransactionMgr; import org.apache.doris.transaction.TransactionState; import org.apache.doris.transaction.TransactionStatus; - import com.google.common.base.Strings; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -379,6 +378,9 @@ public void removeOldLoadJob() { && ((currentTimeMs - job.getFinishTimestamp()) / 1000 > Config.label_keep_max_second)) { iter.remove(); dbIdToLabelToLoadJobs.get(job.getDbId()).get(job.getLabel()).remove(job); + if (job instanceof SparkLoadJob) { + ((SparkLoadJob) job).clearSparkLauncherLog(); + } } } } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java index f3009b2888a819..f3e2f91150697b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java @@ -82,7 +82,9 @@ public void submitEtlJob(long loadJobId, String loadLabel, EtlJobConfig etlJobCo deleteEtlOutputPath(etlJobConfig.outputPath, brokerDesc); // init local dir - initLocalDir(); + if (!FeConstants.runningUnitTest) { + initLocalDir(); + } // prepare dpp archive SparkRepository.SparkArchive archive = resource.prepareArchive(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java index 6eb5a1e402717c..eebf1645d44a45 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java @@ -97,6 +97,7 @@ public void setSubmitTimeoutMs(long submitTimeoutMs) { public void setRedirectLogPath(String redirectLogPath) throws IOException { this.outputStream = new FileOutputStream(new File(redirectLogPath), false); + this.handle.setLogPath(redirectLogPath); } // Normally, log monitor will automatically stop if the spark app state changes diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java index 5d75ae9392a6f1..bfe335ad3d8e64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadAppHandle.java @@ -40,6 +40,7 @@ public class SparkLoadAppHandle { private FinalApplicationStatus finalStatus; private String trackingUrl; private String user; + private String logPath; private List listeners; @@ -112,6 +113,8 @@ public void kill() { public String getUser() { return this.user; } + public String getLogPath() { return this.logPath; } + public void setState(State state) { this.state = state; this.fireEvent(false); @@ -147,6 +150,11 @@ public void setUser(String user) { this.fireEvent(true); } + public void setLogPath(String logPath) { + this.logPath = logPath; + this.fireEvent(true); + } + private void fireEvent(boolean isInfoChanged) { if (this.listeners != null) { Iterator iterator = this.listeners.iterator(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index d18416d5bf34d9..77215433e48436 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -94,6 +94,7 @@ import java.io.DataInput; import java.io.DataOutput; +import java.io.File; import java.io.IOException; import java.util.List; import java.util.Map; @@ -713,6 +714,20 @@ protected long getEtlStartTimestamp() { return etlStartTimestamp; } + public SparkLoadAppHandle getHandle() { + return sparkLoadAppHandle; + } + + public void clearSparkLauncherLog() { + String logPath = sparkLoadAppHandle.getLogPath(); + if (!Strings.isNullOrEmpty(logPath)) { + File file = new File(logPath); + if (file.exists()) { + file.delete(); + } + } + } + @Override public void write(DataOutput out) throws IOException { super.write(out); diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java index 8e55d4138db6bd..8f9761896a3ef0 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java @@ -18,10 +18,12 @@ package org.apache.doris.load.loadv2; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.net.URL; @@ -33,6 +35,7 @@ public class SparkLauncherMonitorTest { private FinalApplicationStatus finalApplicationStatus; private String trackingUrl; private String user; + private String logPath; @Before public void setUp() { @@ -43,6 +46,7 @@ public void setUp() { finalApplicationStatus = FinalApplicationStatus.UNDEFINED; trackingUrl = "http://myhost:8388/proxy/application_1573630236805_6864759/"; user = "testugi"; + logPath = "./spark-launcher.log"; } @Test @@ -54,6 +58,7 @@ public void testLogMonitorNormal() { Process process = Runtime.getRuntime().exec(cmd); handle = new SparkLoadAppHandle(process); SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle); + logMonitor.setRedirectLogPath(logPath); logMonitor.start(); try { logMonitor.join(); @@ -63,6 +68,7 @@ public void testLogMonitorNormal() { Assert.fail(); } + // check values Assert.assertEquals(appId, handle.getAppId()); Assert.assertEquals(state, handle.getState()); Assert.assertEquals(queue, handle.getQueue()); @@ -70,5 +76,17 @@ public void testLogMonitorNormal() { Assert.assertEquals(finalApplicationStatus, handle.getFinalStatus()); Assert.assertEquals(trackingUrl, handle.getUrl()); Assert.assertEquals(user, handle.getUser()); + + // check log + File file = new File(logPath); + Assert.assertTrue(file.exists()); + } + + @After + public void clear() { + File file = new File(logPath); + if (file.exists()) { + file.delete(); + } } } From 6fb5f313a2a632bf2ad0f01215cdf776294f46d6 Mon Sep 17 00:00:00 2001 From: xy720 Date: Thu, 27 Aug 2020 19:43:44 +0800 Subject: [PATCH 3/5] remove unrelevant --- fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java | 1 - 1 file changed, 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java index d079785f850607..eb827cbb92d8b7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Type.java @@ -17,7 +17,6 @@ package org.apache.doris.catalog; -import com.google.common.primitives.Longs; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.StringLiteral; import org.apache.doris.common.Pair; From 9f750f2de39491a2ffd1b8ebbc93bdf17c912313 Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 28 Aug 2020 11:19:53 +0800 Subject: [PATCH 4/5] fix null --- .../doris/load/loadv2/SparkLauncherMonitor.java | 10 +++------- .../apache/doris/load/loadv2/SparkLoadJob.java | 16 +++++++--------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java index eebf1645d44a45..72c70f8012e711 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java @@ -70,7 +70,6 @@ public static class LogMonitor extends Thread { private SparkLoadAppHandle handle; private long submitTimeoutMs; private boolean isStop; - private StringBuffer outputBuffer; private OutputStream outputStream; private static final String STATE = "state"; @@ -87,7 +86,6 @@ public LogMonitor(SparkLoadAppHandle handle) { this.handle = handle; this.process = handle.getProcess(); this.isStop = false; - this.outputBuffer = new StringBuffer(); setSubmitTimeoutMs(DEFAULT_SUBMIT_TIMEOUT_MS); } @@ -114,7 +112,9 @@ public void run() { try { outReader = new BufferedReader(new InputStreamReader(process.getInputStream())); while (!isStop && (line = outReader.readLine()) != null) { - outputBuffer.append(line + '\n'); + if (outputStream != null) { + outputStream.write(line.getBytes()); + } SparkLoadAppHandle.State oldState = handle.getState(); SparkLoadAppHandle.State newState = oldState; // parse state and appId @@ -190,10 +190,6 @@ else if (line.contains(QUEUE) || line.contains(START_TIME) || line.contains(FINA } } } - - if (outputStream != null) { - outputStream.write(outputBuffer.toString().getBytes()); - } } catch (Exception e) { LOG.warn("Exception monitoring process.", e); } finally { diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 77215433e48436..7927dc07bfeab5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -714,16 +714,14 @@ protected long getEtlStartTimestamp() { return etlStartTimestamp; } - public SparkLoadAppHandle getHandle() { - return sparkLoadAppHandle; - } - public void clearSparkLauncherLog() { - String logPath = sparkLoadAppHandle.getLogPath(); - if (!Strings.isNullOrEmpty(logPath)) { - File file = new File(logPath); - if (file.exists()) { - file.delete(); + if (sparkLoadAppHandle != null) { + String logPath = sparkLoadAppHandle.getLogPath(); + if (!Strings.isNullOrEmpty(logPath)) { + File file = new File(logPath); + if (file.exists()) { + file.delete(); + } } } } From 449a0545e80ffe97465c4b85e0137c02c6b71bc1 Mon Sep 17 00:00:00 2001 From: xy720 Date: Fri, 28 Aug 2020 11:31:33 +0800 Subject: [PATCH 5/5] \n --- .../java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java index 72c70f8012e711..9a664482ba7d5d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java @@ -113,7 +113,7 @@ public void run() { outReader = new BufferedReader(new InputStreamReader(process.getInputStream())); while (!isStop && (line = outReader.readLine()) != null) { if (outputStream != null) { - outputStream.write(line.getBytes()); + outputStream.write((line + "\n").getBytes()); } SparkLoadAppHandle.State oldState = handle.getState(); SparkLoadAppHandle.State newState = oldState;