diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 9645843d32e..e9ebdd13a85 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -107,7 +107,7 @@ public class RemoteInterpreterServer extends Thread private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterServer.class); - private static final int DEFAULT_SHUTDOWN_TIMEOUT = 2000; + public static final int DEFAULT_SHUTDOWN_TIMEOUT = 2000; private String interpreterGroupId; private InterpreterGroup interpreterGroup; diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java index abe6d0a2721..eb0b65b2dc5 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/util/ProcessLauncher.java @@ -197,7 +197,7 @@ protected void processLine(String s, int i) { try { redirectedContext.out.write(s + "\n"); } catch (IOException e) { - e.printStackTrace(); + LOGGER.error("unable to write to redirectedContext", e); } } } diff --git a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java index c9ae7f4ce9a..6feeec6b5a3 100644 --- a/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java +++ b/zeppelin-plugins/launcher/cluster/src/main/java/org/apache/zeppelin/interpreter/launcher/ClusterInterpreterProcess.java @@ -3,10 +3,10 @@ import java.io.IOException; import java.util.Map; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; -public class ClusterInterpreterProcess extends RemoteInterpreterManagedProcess { +public class ClusterInterpreterProcess extends ExecRemoteInterpreterProcess { public ClusterInterpreterProcess( String intpRunner, @@ -22,8 +22,7 @@ public ClusterInterpreterProcess( String interpreterGroupId, boolean isUserImpersonated) { - super(intpRunner, - intpEventServerPort, + super(intpEventServerPort, intpEventServerHost, interpreterPortRange, intpDir, @@ -33,7 +32,8 @@ public ClusterInterpreterProcess( connectionPoolSize, interpreterSettingName, interpreterGroupId, - isUserImpersonated); + isUserImpersonated, + intpRunner); } @Override diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java index d52276d716a..46caee95f02 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java @@ -24,12 +24,13 @@ import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.InterpreterRunner; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; import java.io.IOException; import java.util.Map; @@ -68,14 +69,14 @@ public InterpreterClient launchDirectly(InterpreterLaunchContext context) throws false); } else { // create new remote process - String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/" + String localRepoPath = zConf.getInterpreterLocalRepoPath() + File.separator + context.getInterpreterSettingId(); - return new RemoteInterpreterManagedProcess( - runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(), + return new ExecRemoteInterpreterProcess( context.getIntpEventServerPort(), context.getIntpEventServerHost(), zConf.getInterpreterPortRange(), zConf.getInterpreterDir() + "/" + groupName, localRepoPath, buildEnvFromProperties(context), connectTimeout, connectionPoolSize, name, - context.getInterpreterGroupId(), option.isUserImpersonate()); + context.getInterpreterGroupId(), option.isUserImpersonate(), + runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath()); } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java new file mode 100644 index 00000000000..11415133eb1 --- /dev/null +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ExecRemoteInterpreterProcess.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.interpreter.remote; + +import java.io.IOException; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.exec.CommandLine; +import org.apache.commons.exec.ExecuteException; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.zeppelin.interpreter.YarnAppMonitor; +import org.apache.zeppelin.interpreter.util.ProcessLauncher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +public class ExecRemoteInterpreterProcess extends RemoteInterpreterManagedProcess { + + private static final Logger LOGGER = LoggerFactory.getLogger(ExecRemoteInterpreterProcess.class); + + private static final Pattern YARN_APP_PATTER = Pattern.compile("Submitted application (\\w+)"); + + private final String interpreterRunner; + private InterpreterProcessLauncher interpreterProcessLauncher; + + public ExecRemoteInterpreterProcess( + int intpEventServerPort, + String intpEventServerHost, + String interpreterPortRange, + String intpDir, + String localRepoDir, + Map env, + int connectTimeout, + int connectionPoolSize, + String interpreterSettingName, + String interpreterGroupId, + boolean isUserImpersonated, + String intpRunner) { + super(intpEventServerPort, intpEventServerHost, interpreterPortRange, intpDir, localRepoDir, env, connectTimeout, + connectionPoolSize, interpreterSettingName, interpreterGroupId, isUserImpersonated); + this.interpreterRunner = intpRunner; + } + + @Override + public void start(String userName) throws IOException { + // start server process + CommandLine cmdLine = CommandLine.parse(interpreterRunner); + cmdLine.addArgument("-d", false); + cmdLine.addArgument(getInterpreterDir(), false); + cmdLine.addArgument("-c", false); + cmdLine.addArgument(getIntpEventServerHost(), false); + cmdLine.addArgument("-p", false); + cmdLine.addArgument(String.valueOf(intpEventServerPort), false); + cmdLine.addArgument("-r", false); + cmdLine.addArgument(getInterpreterPortRange(), false); + cmdLine.addArgument("-i", false); + cmdLine.addArgument(getInterpreterGroupId(), false); + if (isUserImpersonated() && !userName.equals("anonymous")) { + cmdLine.addArgument("-u", false); + cmdLine.addArgument(userName, false); + } + cmdLine.addArgument("-l", false); + cmdLine.addArgument(getLocalRepoDir(), false); + cmdLine.addArgument("-g", false); + cmdLine.addArgument(getInterpreterSettingName(), false); + + interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, getEnv()); + interpreterProcessLauncher.launch(); + interpreterProcessLauncher.waitForReady(getConnectTimeout()); + if (interpreterProcessLauncher.isLaunchTimeout()) { + throw new IOException( + String.format("Interpreter Process creation is time out in %d seconds", getConnectTimeout() / 1000) + "\n" + + "You can increase timeout threshold via " + + "setting zeppelin.interpreter.connect.timeout of this interpreter.\n" + + interpreterProcessLauncher.getErrorMessage()); + } + + if (!interpreterProcessLauncher.isRunning()) { + throw new IOException("Fail to launch interpreter process:\n" + interpreterProcessLauncher.getErrorMessage()); + } else { + String launchOutput = interpreterProcessLauncher.getProcessLaunchOutput(); + Matcher m = YARN_APP_PATTER.matcher(launchOutput); + if (m.find()) { + String appId = m.group(1); + LOGGER.info("Detected yarn app: {}, add it to YarnAppMonitor", appId); + YarnAppMonitor.get().addYarnApp(ConverterUtils.toApplicationId(appId), this); + } + } + } + + @Override + public void processStarted(int port, String host) { + super.processStarted(port, host); + // for yarn cluster it may be transitioned from COMPLETED to RUNNING. + interpreterProcessLauncher.onProcessRunning(); + } + + @Override + public void stop() { + if (isRunning()) { + super.stop(); + // wait for a clean shutdown + this.interpreterProcessLauncher.waitForShutdown(RemoteInterpreterServer.DEFAULT_SHUTDOWN_TIMEOUT + 500); + // kill process + this.interpreterProcessLauncher.stop(); + this.interpreterProcessLauncher = null; + LOGGER.info("Remote exec process of interpreter group: {} is terminated", getInterpreterGroupId()); + } else { + LOGGER.warn("Try to stop a not running interpreter process of interpreter group: {}", getInterpreterGroupId()); + } + } + + @VisibleForTesting + public String getInterpreterRunner() { + return interpreterRunner; + } + + @Override + public boolean isRunning() { + return interpreterProcessLauncher != null && interpreterProcessLauncher.isRunning(); + } + + @Override + public String getErrorMessage() { + return this.interpreterProcessLauncher != null + ? this.interpreterProcessLauncher.getErrorMessage() + : ""; + } + + private class InterpreterProcessLauncher extends ProcessLauncher { + + public InterpreterProcessLauncher(CommandLine commandLine, Map envs) { + super(commandLine, envs); + } + + public void waitForShutdown(int timeout) { + synchronized (this) { + long startTime = System.currentTimeMillis(); + long timeoutTime = startTime + timeout; + while (state == State.RUNNING && !Thread.currentThread().isInterrupted()) { + long timetoTimeout = timeoutTime - System.currentTimeMillis(); + if (timetoTimeout <= 0) { + LOGGER.warn("Shutdown timeout reached"); + break; + } + try { + wait(timetoTimeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("waitForShutdown interrupted", e); + } + } + } + } + + @Override + public void waitForReady(int timeout) { + synchronized (this) { + long startTime = System.currentTimeMillis(); + long timeoutTime = startTime + timeout; + while (state != State.RUNNING && !Thread.currentThread().isInterrupted()) { + long timetoTimeout = timeoutTime - System.currentTimeMillis(); + if (timetoTimeout <= 0) { + LOGGER.warn("Ready timeout reached"); + break; + } + try { + wait(timetoTimeout); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOGGER.error("waitForReady interrupted", e); + } + } + } + this.stopCatchLaunchOutput(); + if (state == State.LAUNCHED) { + onTimeout(); + } + } + + @Override + public void onProcessRunning() { + super.onProcessRunning(); + synchronized (this) { + notifyAll(); + } + } + + @Override + public void onProcessComplete(int exitValue) { + LOGGER.warn("Process is exited with exit value {}", exitValue); + if (getEnv().getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER", "false").equals("false")) { + // don't call notify in yarn-cluster mode + synchronized (this) { + notifyAll(); + } + } + // For yarn-cluster mode, client process will exit with exit value 0 + // after submitting spark app. So don't move to TERMINATED state when exitValue + // is 0. + if (exitValue != 0) { + transition(State.TERMINATED); + } else { + transition(State.COMPLETED); + } + } + + @Override + public void onProcessFailed(ExecuteException e) { + super.onProcessFailed(e); + synchronized (this) { + notifyAll(); + } + } + } +} diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 24360007dac..c2aca53b332 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -17,32 +17,21 @@ package org.apache.zeppelin.interpreter.remote; -import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.exec.CommandLine; -import org.apache.commons.exec.ExecuteException; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.zeppelin.interpreter.YarnAppMonitor; -import org.apache.zeppelin.interpreter.util.ProcessLauncher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; /** * This class manages start / stop of remote interpreter process */ -public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess { +public abstract class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess { private static final Logger LOGGER = LoggerFactory.getLogger( RemoteInterpreterManagedProcess.class); - private static final Pattern YARN_APP_PATTER = - Pattern.compile("Submitted application (\\w+)"); - private final String interpreterRunner; + private final String interpreterPortRange; - private InterpreterProcessLauncher interpreterProcessLauncher; + private String host = null; private int port = -1; private final String interpreterDir; @@ -55,7 +44,6 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess { private Map env; public RemoteInterpreterManagedProcess( - String intpRunner, int intpEventServerPort, String intpEventServerHost, String interpreterPortRange, @@ -68,7 +56,6 @@ public RemoteInterpreterManagedProcess( String interpreterGroupId, boolean isUserImpersonated) { super(connectTimeout, connectionPoolSize, intpEventServerHost, intpEventServerPort); - this.interpreterRunner = intpRunner; this.interpreterPortRange = interpreterPortRange; this.env = env; this.interpreterDir = intpDir; @@ -88,71 +75,18 @@ public int getPort() { return port; } - @Override - public void start(String userName) throws IOException { - // start server process - CommandLine cmdLine = CommandLine.parse(interpreterRunner); - cmdLine.addArgument("-d", false); - cmdLine.addArgument(interpreterDir, false); - cmdLine.addArgument("-c", false); - cmdLine.addArgument(intpEventServerHost, false); - cmdLine.addArgument("-p", false); - cmdLine.addArgument(String.valueOf(intpEventServerPort), false); - cmdLine.addArgument("-r", false); - cmdLine.addArgument(interpreterPortRange, false); - cmdLine.addArgument("-i", false); - cmdLine.addArgument(interpreterGroupId, false); - if (isUserImpersonated && !userName.equals("anonymous")) { - cmdLine.addArgument("-u", false); - cmdLine.addArgument(userName, false); - } - cmdLine.addArgument("-l", false); - cmdLine.addArgument(localRepoDir, false); - cmdLine.addArgument("-g", false); - cmdLine.addArgument(interpreterSettingName, false); - - interpreterProcessLauncher = new InterpreterProcessLauncher(cmdLine, env); - interpreterProcessLauncher.launch(); - interpreterProcessLauncher.waitForReady(getConnectTimeout()); - if (interpreterProcessLauncher.isLaunchTimeout()) { - throw new IOException(String.format("Interpreter Process creation is time out in %d seconds", - getConnectTimeout()/1000) + "\n" + "You can increase timeout threshold via " + - "setting zeppelin.interpreter.connect.timeout of this interpreter.\n" + - interpreterProcessLauncher.getErrorMessage()); - } - - if (!interpreterProcessLauncher.isRunning()) { - throw new IOException("Fail to launch interpreter process:\n" + - interpreterProcessLauncher.getErrorMessage()); - } else { - String launchOutput = interpreterProcessLauncher.getProcessLaunchOutput(); - Matcher m = YARN_APP_PATTER.matcher(launchOutput); - if (m.find()) { - String appId = m.group(1); - LOGGER.info("Detected yarn app: {}, add it to YarnAppMonitor", appId); - YarnAppMonitor.get().addYarnApp(ConverterUtils.toApplicationId(appId), this); - } - } - } - @Override public void stop() { - if (isRunning()) { - LOGGER.info("Kill interpreter process for interpreter group: {}", getInterpreterGroupId()); - try { - callRemoteFunction(client -> { - client.shutdown(); - return null; - }); - } catch (Exception e) { - LOGGER.warn("ignore the exception when shutting down", e); - } - + LOGGER.info("Stop interpreter process for interpreter group: {}", getInterpreterGroupId()); + try { + callRemoteFunction(client -> { + client.shutdown(); + return null; + }); // Shutdown connection shutdown(); - this.interpreterProcessLauncher.stop(); - this.interpreterProcessLauncher = null; - LOGGER.info("Remote process of interpreter group: {} is terminated", getInterpreterGroupId()); + } catch (Exception e) { + LOGGER.warn("ignore the exception when shutting down", e); } } @@ -160,8 +94,6 @@ public void stop() { public void processStarted(int port, String host) { this.port = port; this.host = host; - // for yarn cluster it may be transitioned from COMPLETED to RUNNING. - interpreterProcessLauncher.onProcessRunning(); } // called when remote interpreter process is stopped, e.g. YarnAppsMonitor will call this @@ -170,21 +102,26 @@ public void processStopped(String errorMessage) { this.errorMessage = errorMessage; } - @VisibleForTesting public Map getEnv() { return env; } - @VisibleForTesting public String getLocalRepoDir() { return localRepoDir; } - @VisibleForTesting public String getInterpreterDir() { return interpreterDir; } + public String getIntpEventServerHost() { + return intpEventServerHost; + } + + public String getInterpreterPortRange() { + return interpreterPortRange; + } + @Override public String getInterpreterSettingName() { return interpreterSettingName; @@ -195,85 +132,12 @@ public String getInterpreterGroupId() { return interpreterGroupId; } - @VisibleForTesting - public String getInterpreterRunner() { - return interpreterRunner; - } - - @VisibleForTesting public boolean isUserImpersonated() { return isUserImpersonated; } - @Override - public boolean isRunning() { - return interpreterProcessLauncher != null && interpreterProcessLauncher.isRunning() - && errorMessage == null; - } - @Override public String getErrorMessage() { - String interpreterProcessError = this.interpreterProcessLauncher != null - ? this.interpreterProcessLauncher.getErrorMessage() : ""; - return errorMessage != null ? errorMessage : interpreterProcessError; - } - - private class InterpreterProcessLauncher extends ProcessLauncher { - - public InterpreterProcessLauncher(CommandLine commandLine, - Map envs) { - super(commandLine, envs); - } - - @Override - public void waitForReady(int timeout) { - synchronized (this) { - if (state != State.RUNNING) { - try { - wait(timeout); - } catch (InterruptedException e) { - LOGGER.error("Remote interpreter is not accessible", e); - } - } - } - this.stopCatchLaunchOutput(); - if (state == State.LAUNCHED) { - onTimeout(); - } - } - - @Override - public void onProcessRunning() { - super.onProcessRunning(); - synchronized(this) { - notify(); - } - } - - @Override - public void onProcessComplete(int exitValue) { - LOGGER.warn("Process is exited with exit value " + exitValue); - if (env.getOrDefault("ZEPPELIN_SPARK_YARN_CLUSTER", "false").equals("false")) { - // don't call notify in yarn-cluster mode - synchronized (this) { - notify(); - } - } - // For yarn-cluster mode, client process will exit with exit value 0 - // after submitting spark app. So don't move to TERMINATED state when exitValue is 0. - if (exitValue != 0) { - transition(State.TERMINATED); - } else { - transition(State.COMPLETED); - } - } - - @Override - public void onProcessFailed(ExecuteException e) { - super.onProcessFailed(e); - synchronized (this) { - notify(); - } - } + return errorMessage; } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index 6aff86a40ea..5c997e19f3e 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -21,7 +21,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.integration.DownloadUtils; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess; import org.apache.zeppelin.util.Util; import org.junit.Before; import org.junit.Test; @@ -72,8 +72,8 @@ public void testConnectTimeOut() throws IOException { option.setUserImpersonate(true); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertTrue(client instanceof ExecRemoteInterpreterProcess); + ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess) client; assertEquals("name", interpreterProcess.getInterpreterSettingName()); assertEquals(zeppelinHome + "/interpreter/groupName", interpreterProcess.getInterpreterDir()); assertEquals(zeppelinHome + "/local-repo/groupId", interpreterProcess.getLocalRepoDir()); @@ -98,8 +98,8 @@ public void testLocalMode() throws IOException { InterpreterOption option = new InterpreterOption(); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertTrue( client instanceof ExecRemoteInterpreterProcess); + ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess) client; assertEquals("spark", interpreterProcess.getInterpreterSettingName()); assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); @@ -126,8 +126,8 @@ public void testYarnClientMode_1() throws IOException { InterpreterOption option = new InterpreterOption(); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertTrue( client instanceof ExecRemoteInterpreterProcess); + ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess) client; assertEquals("spark", interpreterProcess.getInterpreterSettingName()); assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); @@ -159,8 +159,8 @@ public void testYarnClientMode_2() throws IOException { InterpreterOption option = new InterpreterOption(); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertTrue( client instanceof ExecRemoteInterpreterProcess); + ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess) client; assertEquals("spark", interpreterProcess.getInterpreterSettingName()); assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); @@ -192,8 +192,8 @@ public void testYarnClusterMode_1() throws IOException { InterpreterOption option = new InterpreterOption(); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertTrue( client instanceof ExecRemoteInterpreterProcess); + ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess) client; assertEquals("spark", interpreterProcess.getInterpreterSettingName()); assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); @@ -238,8 +238,8 @@ public void testYarnClusterMode_2() throws IOException { Files.createFile(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar")); InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertTrue(client instanceof ExecRemoteInterpreterProcess); + ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess) client; assertEquals("spark", interpreterProcess.getInterpreterSettingName()); assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); @@ -285,8 +285,8 @@ public void testYarnClusterMode_3() throws IOException { Files.createDirectories(localRepoPath); InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertTrue(client instanceof ExecRemoteInterpreterProcess); + ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess) client; assertEquals("spark", interpreterProcess.getInterpreterSettingName()); assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java index f0f60d9ba3d..8e695f36e41 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java @@ -19,7 +19,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess; import org.junit.Before; import org.junit.Test; @@ -48,8 +48,8 @@ public void testLauncher() throws IOException { option.setUserImpersonate(true); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertTrue(client instanceof ExecRemoteInterpreterProcess); + ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess) client; assertEquals("name", interpreterProcess.getInterpreterSettingName()); assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); @@ -73,8 +73,8 @@ public void testConnectTimeOut() throws IOException { option.setUserImpersonate(true); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertTrue(client instanceof ExecRemoteInterpreterProcess); + ExecRemoteInterpreterProcess interpreterProcess = (ExecRemoteInterpreterProcess) client; assertEquals("name", interpreterProcess.getInterpreterSettingName()); assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir());