();
+
+ private Properties agentProps;
+ private File sinkOutputDir;
+
+ @Before
+ public void setUp() throws Exception {
+ /* Create 3 temp dirs, each used as value within agentProps */
+
+ final File sinkOutputDir = Files.createTempDir();
+ sinkOutputDir.setWritable(true, false);
+ sinkOutputDir.setReadable(true, false);
+ tempResources.add(sinkOutputDir.getCanonicalFile());
+ final String sinkOutputDirPath = sinkOutputDir.getCanonicalPath();
+ LOGGER.info("Created rolling file sink's output dir: " + sinkOutputDirPath);
+
+ final File channelCheckpointDir = Files.createTempDir();
+ channelCheckpointDir.setWritable(true, false);
+ channelCheckpointDir.setReadable(true, false);
+ tempResources.add(channelCheckpointDir.getCanonicalFile());
+ final String channelCheckpointDirPath = channelCheckpointDir.getCanonicalPath();
+ LOGGER.info("Created file channel's checkpoint dir: " + channelCheckpointDirPath);
+
+ final File channelDataDir = Files.createTempDir();
+ channelDataDir.setWritable(true, false);
+ channelDataDir.setReadable(true, false);
+ tempResources.add(channelDataDir.getCanonicalFile());
+ final String channelDataDirPath = channelDataDir.getCanonicalPath();
+ LOGGER.info("Created file channel's data dir: " + channelDataDirPath);
+
+ /* Build props to pass to flume agent */
+
+ Properties agentProps = new Properties();
+
+ // Active sets
+ agentProps.put("a1.channels", "c1");
+ agentProps.put("a1.sources", "r1");
+ agentProps.put("a1.sinks", "k1");
+
+ // c1
+ agentProps.put("a1.channels.c1.type", "FILE");
+ agentProps.put("a1.channels.c1.checkpointDir", channelCheckpointDirPath);
+ agentProps.put("a1.channels.c1.dataDirs", channelDataDirPath);
+
+ // r1
+ agentProps.put("a1.sources.r1.channels", "c1");
+ agentProps.put("a1.sources.r1.type", "EXEC");
+ agentProps.put("a1.sources.r1.command", "seq 1 100");
+
+ // k1
+ agentProps.put("a1.sinks.k1.channel", "c1");
+ agentProps.put("a1.sinks.k1.type", "FILE_ROLL");
+ agentProps.put("a1.sinks.k1.sink.directory", sinkOutputDirPath);
+ agentProps.put("a1.sinks.k1.sink.rollInterval", "0");
+
+ this.agentProps = agentProps;
+ this.sinkOutputDir = sinkOutputDir;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ DockerInstall.getInstance().stopAgent();
+ for (File tempResource : tempResources) {
+ tempResource.delete();
+ }
+ agentProps = null;
+ }
+
+ /**
+ * File channel in/out test. Verifies that all events inserted into the
+ * file channel are received by the sink in order.
+ *
+ * The EXEC source creates 100 events where the event bodies have
+ * sequential numbers. The source puts those events into the file channel,
+ * and the FILE_ROLL The sink is expected to take all 100 events in FIFO
+ * order.
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testInOut() throws Exception {
+ LOGGER.debug("testInOut() started.");
+
+
+ DockerInstall.getInstance().startAgent("a1", agentProps, tempResources);
+ TimeUnit.SECONDS.sleep(10); // Wait for source and sink to finish
+ // TODO make this more deterministic
+
+ /* Create expected output */
+ StringBuffer sb = new StringBuffer();
+ for (int i = 1; i <= 100; i++) {
+ sb.append(i).append("\n");
+ }
+ String expectedOutput = sb.toString();
+ LOGGER.info("Created expected output: " + expectedOutput);
+
+ /* Create actual output file */
+
+ File[] sinkOutputDirChildren = sinkOutputDir.listFiles();
+ // Only 1 file should be in FILE_ROLL sink's dir (rolling is disabled)
+ Assert.assertEquals("Expected FILE_ROLL sink's dir to have only 1 child," +
+ " but found " + sinkOutputDirChildren.length + " children.",
+ 1, sinkOutputDirChildren.length);
+ File actualOutput = sinkOutputDirChildren[0];
+
+ if (!Files.toString(actualOutput, Charsets.UTF_8).equals(expectedOutput)) {
+ LOGGER.error("Actual output doesn't match expected output.\n");
+ throw new AssertionError("FILE_ROLL sink's actual output doesn't " +
+ "match expected output.");
+ }
+
+ LOGGER.debug("testInOut() ended.");
+ }
+}
diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/DockerInstall.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/DockerInstall.java
new file mode 100644
index 0000000000..715bf8170a
--- /dev/null
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/DockerInstall.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.test.util;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Files;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Attempts to setup a staged install using explicitly specified tar-ball
+ * distribution or by using relative path into the flume-ng-dist module.
+ */
+public class DockerInstall extends StagedInstall {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DockerInstall.class);
+
+ private String configFilePath;
+
+ private ProcessShutdownHook shutdownHook;
+ private ProcessInputStreamConsumer consumer;
+ private String containerId;
+
+ private static DockerInstall INSTANCE;
+ private String dockerImageId;
+
+ public static synchronized DockerInstall getInstance() throws Exception {
+ if (INSTANCE == null) {
+ INSTANCE = new DockerInstall();
+ }
+ return INSTANCE;
+ }
+
+ public synchronized boolean isRunning() {
+ return containerId != null;
+ }
+
+ public synchronized void stopAgent() throws Exception {
+ if (containerId == null) {
+ throw new Exception("Process not found");
+ }
+
+ LOGGER.info("Shutting down agent process");
+
+ ImmutableList.Builder builder = new ImmutableList.Builder<>();
+ builder.add("/bin/sh");
+ builder.add("-c");
+ builder.add("'docker kill " + containerId + "'");
+
+ List cmdArgs = builder.build();
+
+ File tempShellFileKiller = File.createTempFile("docker", ".sh");
+ tempShellFileKiller.setExecutable(true);
+ tempShellFileKiller.deleteOnExit();
+ Files.write(Joiner.on(" ").join(cmdArgs).getBytes(StandardCharsets.UTF_8), tempShellFileKiller);
+
+ ProcessBuilder processKiller = new ProcessBuilder(tempShellFileKiller.getAbsolutePath());
+
+ Process killer = processKiller.start();
+ killer.waitFor();
+
+ containerId = null;
+ consumer.interrupt();
+ consumer = null;
+ configFilePath = null;
+ Runtime.getRuntime().removeShutdownHook(shutdownHook);
+ shutdownHook = null;
+
+ Thread.sleep(3000); // sleep for 3s to let system shutdown
+ }
+
+ public synchronized void startAgent(String name, Properties properties) throws Exception {
+ startAgent(name, properties, new HashMap<>(), new HashMap<>(), new ArrayList<>());
+ }
+
+ public synchronized void startAgent(String name, Properties properties, List mountPoints) throws Exception {
+ startAgent(name, properties, new HashMap<>(), new HashMap<>(), mountPoints);
+ }
+
+ public synchronized void startAgent(
+ String name, Properties properties, Map environmentVariables,
+ Map commandOptions, List mountPoints)
+ throws Exception {
+ Preconditions.checkArgument(!name.isEmpty(), "agent name must not be empty");
+ Preconditions.checkNotNull(properties, "properties object must not be null");
+
+ // State per invocation - config file, process, shutdown hook
+ String agentName = name;
+
+ if (containerId != null) {
+ throw new Exception("A process is already running");
+ }
+ LOGGER.info("Starting process for agent: " + agentName + " using config: " + properties);
+
+ File configFile = createConfigurationFile(agentName, properties);
+ configFilePath = configFile.getCanonicalPath();
+
+ String configFileName = configFile.getName();
+ String logFileName = "flume-" + agentName + "-"
+ + configFileName.substring(0, configFileName.indexOf('.')) + ".log";
+
+ LOGGER.info("Created configuration file: " + configFilePath);
+
+ ImmutableList.Builder builder = new ImmutableList.Builder<>();
+ builder.add("/bin/sh");
+ builder.add("-c");
+
+
+ StringBuilder sb = new StringBuilder("'");
+ sb.append("docker run ");
+ sb.append(" --detach");
+ sb.append(" -v ").append(confDirPath).append(":").append(confDirPath);
+ sb.append(" -v ").append(getStageDir().toString()).append(":").append(getStageDir().toString());
+ sb.append(" -v ").append(logDirPath).append(":").append(logDirPath);
+
+ mountPoints.forEach(file -> sb.append(" -v ").append(file.toString()).append(":").append(file.toString()));
+
+ sb.append(" -t ").append(dockerImageId).append(" agent");
+ sb.append(" --conf ").append(confDirPath);
+ sb.append(" --conf-file ").append(configFilePath);
+ sb.append(" --name ").append(agentName);
+ sb.append(" -D").append(ENV_FLUME_LOG_DIR).append( "=").append(logDirPath);
+ sb.append(" -D" ).append( ENV_FLUME_ROOT_LOGGER ).append( "=" ).append( ENV_FLUME_ROOT_LOGGER_VALUE);
+ sb.append(" -D" ).append( ENV_FLUME_LOG_FILE ).append( "=").append(logFileName);
+ sb.append("'");
+
+ builder.add(sb.toString());
+
+ commandOptions.forEach((key, value) -> builder.add(key, value));
+
+ List cmdArgs = builder.build();
+
+ File tempShellFile = File.createTempFile("docker", ".sh");
+ tempShellFile.setExecutable(true);
+ tempShellFile.deleteOnExit();
+ Files.write(Joiner.on(" ").join(cmdArgs).getBytes(StandardCharsets.UTF_8), tempShellFile);
+
+ ProcessBuilder pb = new ProcessBuilder(tempShellFile.getAbsolutePath());
+
+ Map env = pb.environment();
+ env.putAll(environmentVariables);
+
+ pb.directory(baseDir);
+
+ Process process = pb.start();
+
+ BufferedReader reader =
+ new BufferedReader(new InputStreamReader(process.getInputStream()));
+ StringBuilder containerIdSb = new StringBuilder();
+ String line;
+ while ( (line = reader.readLine()) != null) {
+ containerIdSb.append(line);
+ }
+
+ containerId = containerIdSb.toString();
+
+ if (!process.isAlive()) {
+ throw new RuntimeException("Docker container did not start: " + process.exitValue() + " " + containerId);
+ }
+
+ ImmutableList.Builder logBuilder = new ImmutableList.Builder();
+ logBuilder.add("/bin/sh");
+ logBuilder.add("-c");
+ logBuilder.add("'docker logs --follow " + containerId + "'");
+
+ List logCmdArgs = logBuilder.build();
+
+ File tempLogShellFile = File.createTempFile("docker", ".sh");
+ tempLogShellFile.setExecutable(true);
+ tempLogShellFile.deleteOnExit();
+ Files.write(Joiner.on(" ").join(logCmdArgs).getBytes(StandardCharsets.UTF_8), tempLogShellFile);
+
+ ProcessBuilder logReaderPb = new ProcessBuilder(tempLogShellFile.getAbsolutePath());
+ Process logReaderProc = logReaderPb.start();
+
+ consumer = new ProcessInputStreamConsumer(logReaderProc.getInputStream());
+ consumer.start();
+
+ shutdownHook = new ProcessShutdownHook();
+ Runtime.getRuntime().addShutdownHook(shutdownHook);
+
+ Thread.sleep(3000); // sleep for 3s to let system initialize
+ }
+
+ private DockerInstall() throws Exception {
+ super();
+ dockerImageId = getDockerImageId();
+ }
+
+ private static String getDockerImageId() throws Exception {
+ File dockerImageIdFile = new File("../flume-ng-dist/target/docker/image-id");
+ return Files.readFirstLine(dockerImageIdFile, StandardCharsets.UTF_8);
+ }
+
+}
diff --git a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
index b75f7a809c..737ef25615 100644
--- a/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
+++ b/flume-ng-tests/src/test/java/org/apache/flume/test/util/StagedInstall.java
@@ -58,11 +58,11 @@ public class StagedInstall {
public static final String ENV_FLUME_ROOT_LOGGER_VALUE = "DEBUG,LOGFILE";
public static final String ENV_FLUME_LOG_FILE = "flume.log.file";
- private final File stageDir;
- private final File baseDir;
- private final String launchScriptPath;
- private final String confDirPath;
- private final String logDirPath;
+ final File stageDir;
+ final File baseDir;
+ final String launchScriptPath;
+ final String confDirPath;
+ final String logDirPath;
// State per invocation - config file, process, shutdown hook
private String agentName;
@@ -196,7 +196,7 @@ public synchronized File getStageDir() {
return stageDir;
}
- private File createConfigurationFile(String agentName, Properties properties)
+ File createConfigurationFile(String agentName, Properties properties)
throws Exception {
Preconditions.checkNotNull(properties, "properties object must not be null");
@@ -222,70 +222,73 @@ private File createConfigurationFile(String agentName, Properties properties)
return file;
}
- private StagedInstall() throws Exception {
-
- String tarballPath = System.getProperty(PROP_PATH_TO_DIST_TARBALL);
- if (tarballPath == null || tarballPath.trim().length() == 0) {
- LOGGER.info("No value specified for system property: "
- + PROP_PATH_TO_DIST_TARBALL
- + ". Will attempt to use relative path to locate dist tarball.");
+ StagedInstall() {
+ try {
+ String tarballPath = System.getProperty(PROP_PATH_TO_DIST_TARBALL);
+ if (tarballPath == null || tarballPath.trim().length() == 0) {
+ LOGGER.info("No value specified for system property: "
+ + PROP_PATH_TO_DIST_TARBALL
+ + ". Will attempt to use relative path to locate dist tarball.");
- tarballPath = getRelativeTarballPath();
- }
+ tarballPath = getRelativeTarballPath();
+ }
- if (tarballPath == null || tarballPath.trim().length() == 0) {
- throw new Exception("Failed to locate tar-ball distribution. "
- + "Please specify explicitly via system property: "
- + PROP_PATH_TO_DIST_TARBALL);
- }
+ if (tarballPath == null || tarballPath.trim().length() == 0) {
+ throw new RuntimeException("Failed to locate tar-ball distribution. "
+ + "Please specify explicitly via system property: "
+ + PROP_PATH_TO_DIST_TARBALL);
+ }
- // Validate
- File tarballFile = new File(tarballPath);
- if (!tarballFile.isFile() || !tarballFile.canRead()) {
- throw new Exception("The tarball distribution file is invalid: "
- + tarballPath + ". You can override this by explicitly setting the "
- + "system property: " + PROP_PATH_TO_DIST_TARBALL);
- }
+ // Validate
+ File tarballFile = new File(tarballPath);
+ if (!tarballFile.isFile() || !tarballFile.canRead()) {
+ throw new RuntimeException("The tarball distribution file is invalid: "
+ + tarballPath + ". You can override this by explicitly setting the "
+ + "system property: " + PROP_PATH_TO_DIST_TARBALL);
+ }
- LOGGER.info("Dist tarball to use: " + tarballPath);
+ LOGGER.info("Dist tarball to use: " + tarballPath);
- // Now set up a staging directory for this distribution
- stageDir = getStagingDirectory();
+ // Now set up a staging directory for this distribution
+ stageDir = getStagingDirectory();
- // Deflate the gzip compressed archive
- File tarFile = gunzipDistTarball(tarballFile, stageDir);
+ // Deflate the gzip compressed archive
+ File tarFile = gunzipDistTarball(tarballFile, stageDir);
- // Untar the deflated file
- untarTarFile(tarFile, stageDir);
+ // Untar the deflated file
+ untarTarFile(tarFile, stageDir);
- // Delete the tarfile
- tarFile.delete();
+ // Delete the tarfile
+ tarFile.delete();
- LOGGER.info("Dist tarball staged to: " + stageDir);
+ LOGGER.info("Dist tarball staged to: " + stageDir);
- File rootDir = stageDir;
- File[] listBaseDirs = stageDir.listFiles();
- if (listBaseDirs != null && listBaseDirs.length == 1
- && listBaseDirs[0].isDirectory()) {
- rootDir = listBaseDirs[0];
- }
- baseDir = rootDir;
+ File rootDir = stageDir;
+ File[] listBaseDirs = stageDir.listFiles();
+ if (listBaseDirs != null && listBaseDirs.length == 1
+ && listBaseDirs[0].isDirectory()) {
+ rootDir = listBaseDirs[0];
+ }
+ baseDir = rootDir;
- // Give execute permissions to the bin/flume-ng script
- File launchScript = new File(baseDir, "bin/flume-ng");
- giveExecutePermissions(launchScript);
+ // Give execute permissions to the bin/flume-ng script
+ File launchScript = new File(baseDir, "bin/flume-ng");
+ giveExecutePermissions(launchScript);
- launchScriptPath = launchScript.getCanonicalPath();
+ launchScriptPath = launchScript.getCanonicalPath();
- File confDir = new File(baseDir, "conf");
- confDirPath = confDir.getCanonicalPath();
+ File confDir = new File(baseDir, "conf");
+ confDirPath = confDir.getCanonicalPath();
- File logDir = new File(baseDir, "logs");
- logDir.mkdirs();
+ File logDir = new File(baseDir, "logs");
+ logDir.mkdirs();
- logDirPath = logDir.getCanonicalPath();
+ logDirPath = logDir.getCanonicalPath();
- LOGGER.info("Staged install root directory: " + rootDir.getCanonicalPath());
+ LOGGER.info("Staged install root directory: " + rootDir.getCanonicalPath());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
private void giveExecutePermissions(File file) throws Exception {
@@ -501,7 +504,7 @@ public static void waitUntilPortOpens(String host, int port, long timeout)
}
}
- private class ProcessShutdownHook extends Thread {
+ class ProcessShutdownHook extends Thread {
public void run() {
synchronized (StagedInstall.this) {
if (StagedInstall.this.process != null) {
@@ -511,10 +514,10 @@ public void run() {
}
}
- private static class ProcessInputStreamConsumer extends Thread {
+ static class ProcessInputStreamConsumer extends Thread {
private final InputStream is;
- private ProcessInputStreamConsumer(InputStream is) {
+ ProcessInputStreamConsumer(InputStream is) {
this.is = is;
this.setDaemon(true);
}
diff --git a/pom.xml b/pom.xml
index 8be1064995..85371d6b75 100644
--- a/pom.xml
+++ b/pom.xml
@@ -65,6 +65,8 @@ limitations under the License.
1.9
5.1.0
10.14.2.0
+ 1.4.13
+ false
4.1.18
2.13.2
2.13.2.1
@@ -253,6 +255,14 @@ limitations under the License.
+
+
+ skipDocker
+
+ true
+ **/*Docker.java
+
+
2009