diff --git a/hadoop-unit-standalone/pom.xml b/hadoop-unit-standalone/pom.xml index b0e8f64f..8a717de2 100644 --- a/hadoop-unit-standalone/pom.xml +++ b/hadoop-unit-standalone/pom.xml @@ -14,13 +14,6 @@ 3.5.0 1.1.0 - 2.11.3 - 1.0.4 - 3.3.0 - 4.2.0.2.5.3.0-37 - 1.2.3 - 1.4.0 - 2.9.0 @@ -76,115 +69,6 @@ org.apache.httpcomponents httpclient - - - fr.jetoile.hadoop - hadoop-unit-client-kafka - test - - - - fr.jetoile.hadoop - hadoop-unit-client-solrcloud - test - - - - fr.jetoile.hadoop - hadoop-unit-client-hdfs - test - - - - org.codehaus.jackson - jackson-core-asl - 1.9.11 - test - - - - fr.jetoile.hadoop - hadoop-unit-client-hive - test - - - - fr.jetoile.hadoop - hadoop-unit-client-alluxio - test - - - - org.neo4j.driver - neo4j-java-driver - ${neo4j-java-driver.version} - test - - - - org.mongodb - mongo-java-driver - ${mongo-java-driver.version} - test - - - - org.apache.hbase - hbase-client - ${hbase-client.version} - test - - - hadoop-auth - org.apache.hadoop - - - hadoop-yarn-api - org.apache.hadoop - - - - - - org.apache.oozie - oozie-client - ${oozie-client.version} - test - - - - com.datastax.cassandra - cassandra-driver-core - ${cassandra-driver-core.version} - test - - - - org.elasticsearch.client - rest - 5.4.3 - test - - - - redis.clients - jedis - ${jedis.version} - test - - - - org.apache.hadoop - hadoop-yarn-api - ${hadoop.version} - test - - - - fr.jetoile.hadoop - hadoop-unit-client-commons - test - diff --git a/hadoop-unit-standalone/src/test/java/com/google/common/base/Stopwatch.java b/hadoop-unit-standalone/src/test/java/com/google/common/base/Stopwatch.java deleted file mode 100644 index 7c6d8e0f..00000000 --- a/hadoop-unit-standalone/src/test/java/com/google/common/base/Stopwatch.java +++ /dev/null @@ -1,282 +0,0 @@ -/* - * Copyright (C) 2008 The Guava Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.common.base; - -import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; -import static java.util.concurrent.TimeUnit.MICROSECONDS; -import static java.util.concurrent.TimeUnit.MILLISECONDS; -import static java.util.concurrent.TimeUnit.NANOSECONDS; -import static java.util.concurrent.TimeUnit.SECONDS; - -import com.google.common.annotations.Beta; -import com.google.common.annotations.GwtCompatible; -import com.google.common.annotations.GwtIncompatible; - -import java.util.concurrent.TimeUnit; - -/** - * An object that measures elapsed time in nanoseconds. It is useful to measure - * elapsed time using this class instead of direct calls to {@link - * System#nanoTime} for a few reasons: - * - * - * - *

Basic usage: - *

- *   Stopwatch stopwatch = new Stopwatch().{@link #start start}();
- *   doSomething();
- *   stopwatch.{@link #stop stop}(); // optional
- *
- *   long millis = stopwatch.{@link #elapsedMillis elapsedMillis}();
- *
- *   log.info("that took: " + stopwatch); // formatted string like "12.3 ms"
- * 
- * - *

Stopwatch methods are not idempotent; it is an error to start or stop a - * stopwatch that is already in the desired state. - * - *

When testing code that uses this class, use the {@linkplain - * #Stopwatch(Ticker) alternate constructor} to supply a fake or mock ticker. - * This allows you to - * simulate any valid behavior of the stopwatch. - * - *

Note: This class is not thread-safe. - * - * @author Kevin Bourrillion - * @since 10.0 - */ - -/** - * for hbase test, guava 12 should be used because of conflict with cassandra driver - */ -@Beta -@GwtCompatible(emulated=true) -public final class Stopwatch { - private final Ticker ticker; - private boolean isRunning; - private long elapsedNanos; - private long startTick; - - /** - * Creates (but does not start) a new stopwatch using {@link System#nanoTime} - * as its time source. - */ - public Stopwatch() { - this(Ticker.systemTicker()); - } - - /** - * Creates (but does not start) a new stopwatch, using the specified time - * source. - */ - public Stopwatch(Ticker ticker) { - this.ticker = checkNotNull(ticker); - } - - /** - * Returns {@code true} if {@link #start()} has been called on this stopwatch, - * and {@link #stop()} has not been called since the last call to {@code - * start()}. - */ - public boolean isRunning() { - return isRunning; - } - - /** - * Starts the stopwatch. - * - * @return this {@code Stopwatch} instance - * @throws IllegalStateException if the stopwatch is already running. - */ - public Stopwatch start() { - checkState(!isRunning); - isRunning = true; - startTick = ticker.read(); - return this; - } - - /** - * Stops the stopwatch. Future reads will return the fixed duration that had - * elapsed up to this point. - * - * @return this {@code Stopwatch} instance - * @throws IllegalStateException if the stopwatch is already stopped. - */ - public Stopwatch stop() { - long tick = ticker.read(); - checkState(isRunning); - isRunning = false; - elapsedNanos += tick - startTick; - return this; - } - - /** - * Sets the elapsed time for this stopwatch to zero, - * and places it in a stopped state. - * - * @return this {@code Stopwatch} instance - */ - public Stopwatch reset() { - elapsedNanos = 0; - isRunning = false; - return this; - } - - private long elapsedNanos() { - return isRunning ? ticker.read() - startTick + elapsedNanos : elapsedNanos; - } - - /** - * Returns the current elapsed time shown on this stopwatch, expressed - * in the desired time unit, with any fraction rounded down. - * - *

Note that the overhead of measurement can be more than a microsecond, so - * it is generally not useful to specify {@link TimeUnit#NANOSECONDS} - * precision here. - */ - public long elapsedTime(TimeUnit desiredUnit) { - return desiredUnit.convert(elapsedNanos(), NANOSECONDS); - } - - /** - * Returns the current elapsed time shown on this stopwatch, expressed - * in milliseconds, with any fraction rounded down. This is identical to - * {@code elapsedTime(TimeUnit.MILLISECONDS}. - */ - public long elapsedMillis() { - return elapsedTime(MILLISECONDS); - } - - /** - * Returns a string representation of the current elapsed time; equivalent to - * {@code toString(4)} (four significant figures). - */ - @GwtIncompatible("String.format()") - @Override public String toString() { - return toString(4); - } - - /** - * Returns a string representation of the current elapsed time, choosing an - * appropriate unit and using the specified number of significant figures. - * For example, at the instant when {@code elapsedTime(NANOSECONDS)} would - * return {1234567}, {@code toString(4)} returns {@code "1.235 ms"}. - */ - @GwtIncompatible("String.format()") - public String toString(int significantDigits) { - long nanos = elapsedNanos(); - - TimeUnit unit = chooseUnit(nanos); - double value = (double) nanos / NANOSECONDS.convert(1, unit); - - // Too bad this functionality is not exposed as a regular method call - return String.format("%." + significantDigits + "g %s", - value, abbreviate(unit)); - } - - private static TimeUnit chooseUnit(long nanos) { - if (SECONDS.convert(nanos, NANOSECONDS) > 0) { - return SECONDS; - } - if (MILLISECONDS.convert(nanos, NANOSECONDS) > 0) { - return MILLISECONDS; - } - if (MICROSECONDS.convert(nanos, NANOSECONDS) > 0) { - return MICROSECONDS; - } - return NANOSECONDS; - } - - private static String abbreviate(TimeUnit unit) { - switch (unit) { - case NANOSECONDS: - return "ns"; - case MICROSECONDS: - return "\u03bcs"; // μs - case MILLISECONDS: - return "ms"; - case SECONDS: - return "s"; - default: - throw new AssertionError(); - } - } - - - /** - * Creates (but does not start) a new stopwatch using {@link System#nanoTime} - * as its time source. - * - * @since 15.0 - */ - public static Stopwatch createUnstarted() { - return new Stopwatch(); - } - - /** - * Creates (but does not start) a new stopwatch, using the specified time - * source. - * - * @since 15.0 - */ - public static Stopwatch createUnstarted(Ticker ticker) { - return new Stopwatch(ticker); - } - - /** - * Creates (and starts) a new stopwatch using {@link System#nanoTime} - * as its time source. - * - * @since 15.0 - */ - public static Stopwatch createStarted() { - return new Stopwatch().start(); - } - - /** - * Creates (and starts) a new stopwatch, using the specified time - * source. - * - * @since 15.0 - */ - public static Stopwatch createStarted(Ticker ticker) { - return new Stopwatch(ticker).start(); - } - - /** - * Returns the current elapsed time shown on this stopwatch, expressed - * in the desired time unit, with any fraction rounded down. - * - *

Note that the overhead of measurement can be more than a microsecond, so - * it is generally not useful to specify {@link TimeUnit#NANOSECONDS} - * precision here. - * - * @since 14.0 (since 10.0 as {@code elapsedTime()}) - */ - public long elapsed(TimeUnit desiredUnit) { - return desiredUnit.convert(elapsedNanos(), NANOSECONDS); - } -} \ No newline at end of file diff --git a/hadoop-unit-standalone/src/test/java/com/google/common/io/Closeables.java b/hadoop-unit-standalone/src/test/java/com/google/common/io/Closeables.java deleted file mode 100644 index ffb1c051..00000000 --- a/hadoop-unit-standalone/src/test/java/com/google/common/io/Closeables.java +++ /dev/null @@ -1,67 +0,0 @@ - -package com.google.common.io; - -import com.google.common.annotations.Beta; -import com.google.common.annotations.VisibleForTesting; -import java.io.Closeable; -import java.io.IOException; -import java.io.InputStream; -import java.io.Reader; -import java.util.logging.Level; -import java.util.logging.Logger; -import javax.annotation.Nullable; - -@Beta -public final class Closeables { - @VisibleForTesting - static final Logger logger = Logger.getLogger(Closeables.class.getName()); - - private Closeables() { - } - - public static void close(@Nullable Closeable closeable, boolean swallowIOException) throws IOException { - if(closeable != null) { - try { - closeable.close(); - } catch (IOException var3) { - if(!swallowIOException) { - throw var3; - } - - logger.log(Level.WARNING, "IOException thrown while closing Closeable.", var3); - } - - } - } - - public static void closeQuietly(@Nullable InputStream inputStream) { - try { - close(inputStream, true); - } catch (IOException var2) { - throw new AssertionError(var2); - } - } - - public static void closeQuietly(@Nullable Reader reader) { - try { - close(reader, true); - } catch (IOException var2) { - throw new AssertionError(var2); - } - } - - //HACK : backport this method for hbase (disable table) - /** - * Equivalent to calling {@code close(closeable, true)}, but with no - * IOException in the signature. - * @param closeable the {@code Closeable} object to be closed, or null, in - * which case this method does nothing - */ - public static void closeQuietly(@Nullable Closeable closeable) { - try { - close(closeable, true); - } catch (IOException e) { - logger.log(Level.SEVERE, "IOException should not have been thrown.", e); - } - } -} \ No newline at end of file diff --git a/hadoop-unit-standalone/src/test/java/fr/jetoile/hadoopunit/HadoopStandaloneBootstrapTest.java b/hadoop-unit-standalone/src/test/java/fr/jetoile/hadoopunit/HadoopStandaloneBootstrapTest.java deleted file mode 100644 index 94569710..00000000 --- a/hadoop-unit-standalone/src/test/java/fr/jetoile/hadoopunit/HadoopStandaloneBootstrapTest.java +++ /dev/null @@ -1,60 +0,0 @@ -package fr.jetoile.hadoopunit; - -import fr.jetoile.hadoopunit.exception.BootstrapException; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.junit.Test; - -import java.io.BufferedReader; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.util.Collections; -import java.util.Enumeration; -import java.util.Properties; -import java.util.stream.Collectors; -import java.util.stream.StreamSupport; - -import static fr.jetoile.hadoopunit.HadoopUnitConfig.DEFAULT_PROPS_FILE; - -public class HadoopStandaloneBootstrapTest { - - - @Test - public void test() throws BootstrapException, FileNotFoundException, ConfigurationException { - PropertiesConfiguration configuration = new PropertiesConfiguration("hadoop.properties"); - PropertiesConfiguration hadoopUnitConfiguration = new PropertiesConfiguration(DEFAULT_PROPS_FILE); - - - try { - BufferedReader br = new BufferedReader(new FileReader(configuration.getFile())); -// String line = null; -// while ((line = br.readLine()) != null) { -// -// } - - Properties p = new Properties(); - p.load(br); - - - Enumeration enumeration = (Enumeration) p.propertyNames(); - Enumeration enumeration2 = (Enumeration) p.propertyNames(); - - System.out.println("========"); - - while (enumeration2.hasMoreElements()) { - System.out.println(enumeration2.nextElement()); - } - - System.out.println("========"); - -// Collections.list(componentKeys).stream().map(String::toString).filter(c -> configuration.getBoolean(c)).collect(Collectors.toList()); - - - Collections.list(enumeration).stream().map(String::toString).filter(c -> configuration.getBoolean(c)).forEach(System.out::println); - - } catch (IOException e) { - e.printStackTrace(); - } - } -} \ No newline at end of file diff --git a/hadoop-unit-standalone/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java b/hadoop-unit-standalone/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java deleted file mode 100644 index 2dec6f52..00000000 --- a/hadoop-unit-standalone/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java +++ /dev/null @@ -1,814 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package fr.jetoile.hadoopunit.integrationtest; - - -import alluxio.AlluxioURI; -import alluxio.client.file.FileInStream; -import alluxio.client.file.URIStatus; -import alluxio.exception.AlluxioException; -import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.Session; -import com.mongodb.*; -import fr.jetoile.hadoopunit.exception.BootstrapException; -import fr.jetoile.hadoopunit.exception.NotFoundServiceException; -import fr.jetoile.hadoopunit.integrationtest.simpleyarnapp.Client; -import fr.jetoile.hadoopunit.test.alluxio.AlluxioUtils; -import fr.jetoile.hadoopunit.test.hdfs.HdfsUtils; -import fr.jetoile.hadoopunit.test.kafka.KafkaConsumerUtils; -import fr.jetoile.hadoopunit.test.kafka.KafkaProducerUtils; -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.configuration.PropertiesConfiguration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.*; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.http.HttpEntity; -import org.apache.http.HttpHost; -import org.apache.http.entity.ContentType; -import org.apache.http.nio.entity.NStringEntity; -import org.apache.http.util.EntityUtils; -import org.apache.oozie.client.OozieClient; -import org.apache.oozie.client.WorkflowJob; -import org.apache.solr.client.solrj.SolrServerException; -import org.apache.solr.client.solrj.impl.CloudSolrClient; -import org.apache.solr.common.SolrDocument; -import org.apache.solr.common.SolrInputDocument; -import org.apache.zookeeper.KeeperException; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.elasticsearch.client.RestClient; -import org.junit.*; -import org.neo4j.driver.v1.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import redis.clients.jedis.Jedis; - -import javax.net.ssl.*; -import java.io.*; -import java.net.*; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.security.KeyManagementException; -import java.security.NoSuchAlgorithmException; -import java.security.cert.CertificateException; -import java.security.cert.X509Certificate; -import java.sql.Connection; -import java.sql.*; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Properties; - -import static fr.jetoile.hadoopunit.client.commons.HadoopUnitClientConfig.*; -import static org.fest.assertions.Assertions.assertThat; -import static org.junit.Assert.*; - -@Ignore -public class ManualIntegrationBootstrapTest { - - static private Configuration configuration; - - static private Logger LOGGER = LoggerFactory.getLogger(ManualIntegrationBootstrapTest.class); - - public static final int NB_FILE = 1; - public static final String PATH = "/fooDirectory"; - - @BeforeClass - public static void setup() throws BootstrapException { - try { - configuration = new PropertiesConfiguration(DEFAULT_PROPS_FILE); - } catch (ConfigurationException e) { - throw new BootstrapException("bad config", e); - } - } - - - @AfterClass - public static void tearDown() throws BootstrapException { - } - - @Test - public void solrCloudShouldStart() throws IOException, SolrServerException, KeeperException, InterruptedException { - - String collectionName = configuration.getString(SOLR_COLLECTION_NAME); - - String zkHostString = configuration.getString(ZOOKEEPER_HOST_KEY) + ":" + configuration.getInt(ZOOKEEPER_PORT_KEY); - CloudSolrClient client = new CloudSolrClient(zkHostString); - - for (int i = 0; i < 1000; ++i) { - SolrInputDocument doc = new SolrInputDocument(); - doc.addField("cat", "book"); - doc.addField("id", "book-" + i); - doc.addField("name", "The Legend of the Hobbit part " + i); - client.add(collectionName, doc); - if (i % 100 == 0) client.commit(collectionName); // periodically flush - } - client.commit("collection1"); - - SolrDocument collection1 = client.getById(collectionName, "book-1"); - - assertNotNull(collection1); - - assertThat(collection1.getFieldValue("name")).isEqualTo("The Legend of the Hobbit part 1"); - - - client.close(); - } - - @Test - public void kafkaShouldStart() throws Exception { - - // Producer - for (int i = 0; i < 10; i++) { - String payload = generateMessage(i); - KafkaProducerUtils.INSTANCE.produceMessages(configuration.getString(KAFKA_TEST_TOPIC_KEY), String.valueOf(i), payload); - } - - - // Consumer - KafkaConsumerUtils.INSTANCE.consumeMessagesWithNewApi(configuration.getString(KAFKA_TEST_TOPIC_KEY), 10); - - // Assert num of messages produced = num of message consumed - Assert.assertEquals(configuration.getLong(KAFKA_TEST_MESSAGE_COUNT_KEY), KafkaConsumerUtils.INSTANCE.getNumRead()); - } - - private String generateMessage(int i) { - JSONObject obj = new JSONObject(); - try { - obj.put("id", String.valueOf(i)); - obj.put("msg", "test-message" + 1); - } catch (JSONException e) { - e.printStackTrace(); - } - return obj.toString(); - } - - @Test - public void hiveServer2ShouldStart() throws InterruptedException, ClassNotFoundException, SQLException { - -// assertThat(Utils.available("127.0.0.1", 20103)).isFalse(); - - // Load the Hive JDBC driver - LOGGER.info("HIVE: Loading the Hive JDBC Driver"); - Class.forName("org.apache.hive.jdbc.HiveDriver"); - - // - // Create an ORC table and describe it - // - // Get the connection - Connection con = DriverManager.getConnection("jdbc:hive2://" + - configuration.getString(HIVE_SERVER2_HOSTNAME_KEY) + ":" + - configuration.getInt(HIVE_SERVER2_PORT_KEY) + "/" + - configuration.getString(HIVE_TEST_DATABASE_NAME_KEY), - "user", - "pass"); - - // Create the DB - Statement stmt; - try { - String createDbDdl = "CREATE DATABASE IF NOT EXISTS " + - configuration.getString(HIVE_TEST_DATABASE_NAME_KEY); - stmt = con.createStatement(); - LOGGER.info("HIVE: Running Create Database Statement: {}", createDbDdl); - stmt.execute(createDbDdl); - } catch (Exception e) { - e.printStackTrace(); - } - - // Drop the table incase it still exists - String dropDdl = "DROP TABLE " + configuration.getString(HIVE_TEST_DATABASE_NAME_KEY) + "." + - configuration.getString(HIVE_TEST_TABLE_NAME_KEY); - stmt = con.createStatement(); - LOGGER.info("HIVE: Running Drop Table Statement: {}", dropDdl); - stmt.execute(dropDdl); - - // Create the ORC table - String createDdl = "CREATE TABLE IF NOT EXISTS " + - configuration.getString(HIVE_TEST_DATABASE_NAME_KEY) + "." + - configuration.getString(HIVE_TEST_TABLE_NAME_KEY) + " (id INT, msg STRING) " + - "PARTITIONED BY (dt STRING) " + - "CLUSTERED BY (id) INTO 16 BUCKETS " + - "STORED AS ORC tblproperties(\"orc.compress\"=\"NONE\")"; - stmt = con.createStatement(); - LOGGER.info("HIVE: Running Create Table Statement: {}", createDdl); - stmt.execute(createDdl); - - // Issue a describe on the new table and display the output - LOGGER.info("HIVE: Validating Table was Created: "); - ResultSet resultSet = stmt.executeQuery("DESCRIBE FORMATTED " + - configuration.getString(HIVE_TEST_TABLE_NAME_KEY)); - int count = 0; - while (resultSet.next()) { - ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); - for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) { - System.out.print(resultSet.getString(i)); - } - System.out.println(); - count++; - } - assertEquals(33, count); - - // Drop the table - dropDdl = "DROP TABLE " + configuration.getString(HIVE_TEST_DATABASE_NAME_KEY) + "." + - configuration.getString(HIVE_TEST_TABLE_NAME_KEY); - stmt = con.createStatement(); - LOGGER.info("HIVE: Running Drop Table Statement: {}", dropDdl); - stmt.execute(dropDdl); - } - - - @Test - public void hdfsShouldStart() throws Exception { - FileSystem hdfsFsHandle = HdfsUtils.INSTANCE.getFileSystem(); - - - FSDataOutputStream writer = hdfsFsHandle.create(new Path(configuration.getString(HDFS_TEST_FILE_KEY))); - writer.writeUTF(configuration.getString(HDFS_TEST_STRING_KEY)); - writer.close(); - - // Read the file and compare to test string - FSDataInputStream reader = hdfsFsHandle.open(new Path(configuration.getString(HDFS_TEST_FILE_KEY))); - assertEquals(reader.readUTF(), configuration.getString(HDFS_TEST_STRING_KEY)); - reader.close(); - - URL url = new URL( - String.format("http://%s:%s/webhdfs/v1?op=GETHOMEDIRECTORY&user.name=guest", - configuration.getString(HDFS_NAMENODE_HOST_KEY), - configuration.getInt(HDFS_NAMENODE_HTTP_PORT_KEY))); - URLConnection connection = url.openConnection(); - connection.setRequestProperty("Accept-Charset", "UTF-8"); - BufferedReader response = new BufferedReader(new InputStreamReader(connection.getInputStream())); - String line = response.readLine(); - response.close(); - assertThat("{\"Path\":\"/user/guest\"}").isEqualTo(line); - - } - - @Test - public void hBaseShouldStart() throws Exception { - - String tableName = configuration.getString(HBASE_TEST_TABLE_NAME_KEY); - String colFamName = configuration.getString(HBASE_TEST_COL_FAMILY_NAME_KEY); - String colQualiferName = configuration.getString(HBASE_TEST_COL_QUALIFIER_NAME_KEY); - Integer numRowsToPut = configuration.getInt(HBASE_TEST_NUM_ROWS_TO_PUT_KEY); - - org.apache.hadoop.conf.Configuration hbaseConfiguration = HBaseConfiguration.create(); - hbaseConfiguration.set("hbase.zookeeper.quorum", configuration.getString(ZOOKEEPER_HOST_KEY)); - hbaseConfiguration.setInt("hbase.zookeeper.property.clientPort", configuration.getInt(ZOOKEEPER_PORT_KEY)); - hbaseConfiguration.set("hbase.master", "127.0.0.1:" + configuration.getInt(HBASE_MASTER_PORT_KEY)); - hbaseConfiguration.set("zookeeper.znode.parent", configuration.getString(HBASE_ZNODE_PARENT_KEY)); - - LOGGER.info("HBASE: Deleting table {}", tableName); - deleteHbaseTable(tableName, hbaseConfiguration); - - LOGGER.info("HBASE: Creating table {} with column family {}", tableName, colFamName); - createHbaseTable(tableName, colFamName, hbaseConfiguration); - - LOGGER.info("HBASE: Populate the table with {} rows.", numRowsToPut); - for (int i = 0; i < numRowsToPut; i++) { - putRow(tableName, colFamName, String.valueOf(i), colQualiferName, "row_" + i, hbaseConfiguration); - } - - LOGGER.info("HBASE: Fetching and comparing the results"); - for (int i = 0; i < numRowsToPut; i++) { - Result result = getRow(tableName, colFamName, String.valueOf(i), colQualiferName, hbaseConfiguration); - assertEquals("row_" + i, new String(result.value())); - } - - } - - - @Test - @Ignore - public void oozieWithRealHiveWorkflowShouldStart() throws Exception { - - LOGGER.info("OOZIE: Test Submit Workflow Start"); - - org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - conf.set("fs.default.name", "hdfs://" + configuration.getString(HDFS_NAMENODE_HOST_KEY) + ":" + configuration.getInt(HDFS_NAMENODE_PORT_KEY)); - URI uri = URI.create("hdfs://" + configuration.getString(HDFS_NAMENODE_HOST_KEY) + ":" + configuration.getInt(HDFS_NAMENODE_PORT_KEY)); - FileSystem hdfsFs = FileSystem.get(uri, conf); - OozieClient oozieClient = new OozieClient("http://" + configuration.getString(OOZIE_HOST) + ":" + configuration.getInt(OOZIE_PORT) + "/oozie"); - - - hdfsFs.mkdirs(new Path("/khanh/test2")); - hdfsFs.mkdirs(new Path("/khanh/work2")); - hdfsFs.mkdirs(new Path("/khanh/etc2")); - hdfsFs.copyFromLocalFile(new Path(ManualIntegrationBootstrapTest.class.getClassLoader().getResource("workflow2.xml").toURI()), new Path("/khanh/test2/workflow.xml")); - hdfsFs.copyFromLocalFile(new Path(ManualIntegrationBootstrapTest.class.getClassLoader().getResource("hive-site.xml").toURI()), new Path("/khanh/etc2/hive-site.xml")); - hdfsFs.copyFromLocalFile(new Path(ManualIntegrationBootstrapTest.class.getClassLoader().getResource("test.csv").toURI()), new Path("/khanh/work2/test.csv")); - hdfsFs.copyFromLocalFile(new Path(ManualIntegrationBootstrapTest.class.getClassLoader().getResource("test.hql").toURI()), new Path("/khanh/etc2/test.hql")); - - //write job.properties - Properties oozieConf = oozieClient.createConfiguration(); - oozieConf.setProperty(OozieClient.APP_PATH, "hdfs://localhost:20112/khanh/test2/workflow.xml"); - oozieConf.setProperty(OozieClient.USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); - oozieConf.setProperty("jobTracker", "localhost:37001"); - oozieConf.setProperty("nameNode", "hdfs://localhost:20112"); - oozieConf.setProperty("hiveTry", "hdfs://localhost:20112/khanh/etc2/test.hql"); - - //submit and check - final String jobId = oozieClient.run(oozieConf); - - while (oozieClient.getJobInfo(jobId).getStatus() != WorkflowJob.Status.RUNNING) { - System.out.println("========== workflow job status " + oozieClient.getJobInfo(jobId).getStatus()); - Thread.sleep(1000); - } - - while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) { - System.out.println("========== workflow job status " + oozieClient.getJobInfo(jobId).getStatus()); - System.out.println("========== job is running"); - Thread.sleep(1000); - } - - System.out.println("=============== OOZIE: Final Workflow status" + oozieClient.getJobInfo(jobId).getStatus()); - WorkflowJob wf = oozieClient.getJobInfo(jobId); - System.out.println("=============== OOZIE: Workflow: {}" + wf.toString()); - - assertEquals(WorkflowJob.Status.SUCCEEDED, wf.getStatus()); - - hdfsFs.close(); - } - - @Test - @Ignore - public void oozieWithRealWorkflowShouldStart() throws Exception { - - LOGGER.info("OOZIE: Test Submit Workflow Start"); - - org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - conf.set("fs.default.name", "hdfs://" + configuration.getString(HDFS_NAMENODE_HOST_KEY) + ":" + configuration.getInt(HDFS_NAMENODE_PORT_KEY)); - URI uri = URI.create("hdfs://" + configuration.getString(HDFS_NAMENODE_HOST_KEY) + ":" + configuration.getInt(HDFS_NAMENODE_PORT_KEY)); - FileSystem hdfsFs = FileSystem.get(uri, conf); - OozieClient oozieClient = new OozieClient("http://" + configuration.getString(OOZIE_HOST) + ":" + configuration.getInt(OOZIE_PORT) + "/oozie"); - - - String testInputFile = "test_input.txt"; - String testInputDir = "/tmp/test_input_dir"; - - // Setup input directory and file - hdfsFs.mkdirs(new Path(testInputDir)); - hdfsFs.copyFromLocalFile( - new Path(getClass().getClassLoader().getResource(testInputFile).toURI()), new Path(testInputDir)); - - hdfsFs.mkdirs(new Path("/khanh/test")); - hdfsFs.copyFromLocalFile(new Path(ManualIntegrationBootstrapTest.class.getClassLoader().getResource("workflow2.xml").toURI()), new Path("/khanh/test/workflow.xml")); - - //write job.properties - Properties oozieConf = oozieClient.createConfiguration(); - oozieConf.setProperty(OozieClient.APP_PATH, "hdfs://localhost:20112/khanh/test/workflow.xml"); - oozieConf.setProperty(OozieClient.USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); - oozieConf.setProperty("jobTracker", "localhost:37001"); - oozieConf.setProperty("nameNode", "hdfs://localhost:20112"); - oozieConf.setProperty("doOption", "true"); - - //submit and check - final String jobId = oozieClient.run(oozieConf); - WorkflowJob wf = oozieClient.getJobInfo(jobId); - assertNotNull(wf); - - LOGGER.info("OOZIE: Workflow: {}", wf.toString()); - - while (oozieClient.getJobInfo(jobId).getStatus() != WorkflowJob.Status.RUNNING) { - System.out.println("========== workflow job status " + oozieClient.getJobInfo(jobId).getStatus()); - Thread.sleep(1000); - } - - while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) { - System.out.println("========== workflow job status " + oozieClient.getJobInfo(jobId).getStatus()); - System.out.println("========== job is running"); - Thread.sleep(1000); - } - - System.out.println("=============== OOZIE: Final Workflow status" + oozieClient.getJobInfo(jobId).getStatus()); - wf = oozieClient.getJobInfo(jobId); - System.out.println("=============== OOZIE: Workflow: {}" + wf.toString()); - - assertEquals(WorkflowJob.Status.SUCCEEDED, wf.getStatus()); - - - hdfsFs.close(); - } - - @Test - public void oozieShouldStart() throws Exception { - - LOGGER.info("OOZIE: Test Submit Workflow Start"); - - org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); - conf.set("fs.default.name", "hdfs://" + configuration.getString(HDFS_NAMENODE_HOST_KEY) + ":" + configuration.getInt(HDFS_NAMENODE_PORT_KEY)); - - URI uri = URI.create("hdfs://" + configuration.getString(HDFS_NAMENODE_HOST_KEY) + ":" + configuration.getInt(HDFS_NAMENODE_PORT_KEY)); - - FileSystem hdfsFs = FileSystem.get(uri, conf); - - OozieClient oozieClient = new OozieClient("http://" + configuration.getString(OOZIE_HOST) + ":" + configuration.getInt(OOZIE_PORT) + "/oozie"); - - Path appPath = new Path(hdfsFs.getHomeDirectory(), "testApp"); - hdfsFs.mkdirs(new Path(appPath, "lib")); - Path workflow = new Path(appPath, "workflow.xml"); - - //write workflow.xml - String wfApp = "" + - " " + - " " + - ""; - - Writer writer = new OutputStreamWriter(hdfsFs.create(workflow)); - writer.write(wfApp); - writer.close(); - - //write job.properties - Properties oozieConf = oozieClient.createConfiguration(); - oozieConf.setProperty(OozieClient.APP_PATH, workflow.toString()); - oozieConf.setProperty(OozieClient.USER_NAME, UserGroupInformation.getCurrentUser().getUserName()); - - //submit and check - final String jobId = oozieClient.submit(oozieConf); - WorkflowJob wf = oozieClient.getJobInfo(jobId); - assertNotNull(wf); - assertEquals(WorkflowJob.Status.PREP, wf.getStatus()); - - LOGGER.info("OOZIE: Workflow: {}", wf.toString()); - hdfsFs.close(); - - } - - @Test - public void knoxWithWebhbaseShouldStart() throws Exception { - - String tableName = configuration.getString(HBASE_TEST_TABLE_NAME_KEY); - String colFamName = configuration.getString(HBASE_TEST_COL_FAMILY_NAME_KEY); - String colQualiferName = configuration.getString(HBASE_TEST_COL_QUALIFIER_NAME_KEY); - Integer numRowsToPut = configuration.getInt(HBASE_TEST_NUM_ROWS_TO_PUT_KEY); - - org.apache.hadoop.conf.Configuration hbaseConfiguration = HBaseConfiguration.create(); - hbaseConfiguration.set("hbase.zookeeper.quorum", configuration.getString(ZOOKEEPER_HOST_KEY)); - hbaseConfiguration.setInt("hbase.zookeeper.property.clientPort", configuration.getInt(ZOOKEEPER_PORT_KEY)); - hbaseConfiguration.set("hbase.master", "127.0.0.1:" + configuration.getInt(HBASE_MASTER_PORT_KEY)); - hbaseConfiguration.set("zookeeper.znode.parent", configuration.getString(HBASE_ZNODE_PARENT_KEY)); - - LOGGER.info("HBASE: Deleting table {}", tableName); - deleteHbaseTable(tableName, hbaseConfiguration); - - LOGGER.info("HBASE: Creating table {} with column family {}", tableName, colFamName); - createHbaseTable(tableName, colFamName, hbaseConfiguration); - - LOGGER.info("HBASE: Populate the table with {} rows.", numRowsToPut); - for (int i = 0; i < numRowsToPut; i++) { - putRow(tableName, colFamName, String.valueOf(i), colQualiferName, "row_" + i, hbaseConfiguration); - } - - URL url = new URL(String.format("http://%s:%s/", - configuration.getString(KNOX_HOST_KEY), - configuration.getString(HBASE_REST_PORT_KEY))); - URLConnection connection = url.openConnection(); - connection.setRequestProperty("Accept-Charset", "UTF-8"); - try (BufferedReader response = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { - String line = response.readLine(); - assertTrue(line.contains(tableName)); - } - - url = new URL(String.format("http://%s:%s/%s/schema", - configuration.getString(KNOX_HOST_KEY), - configuration.getString(HBASE_REST_PORT_KEY), - tableName)); - connection = url.openConnection(); - connection.setRequestProperty("Accept-Charset", "UTF-8"); - try (BufferedReader response = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { - String line = response.readLine(); - assertTrue(line.contains("{ NAME=> 'hbase_test_table', IS_META => 'false', COLUMNS => [ { NAME => 'cf1', BLOOMFILTER => 'ROW'")); - } - - // Knox clients need self trusted certificates in tests - defaultBlindTrust(); - - // Read the hbase throught Knox - url = new URL(String.format("https://%s:%s/gateway/mycluster/hbase", - configuration.getString(KNOX_HOST_KEY), - configuration.getString(KNOX_PORT_KEY))); - connection = url.openConnection(); - connection.setRequestProperty("Accept-Charset", "UTF-8"); - try (BufferedReader response = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { - String line = response.readLine(); - assertTrue(line.contains(tableName)); - } - - url = new URL(String.format("https://%s:%s/gateway/mycluster/hbase/%s/schema", - configuration.getString(KNOX_HOST_KEY), - configuration.getString(KNOX_PORT_KEY), - tableName)); - connection = url.openConnection(); - connection.setRequestProperty("Accept-Charset", "UTF-8"); - try (BufferedReader response = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { - String line = response.readLine(); - assertTrue(line.contains("{ NAME=> 'hbase_test_table', IS_META => 'false', COLUMNS => [ { NAME => 'cf1', BLOOMFILTER => 'ROW'")); - } - } - - - @Test - public void testStartAndStopServerMode() throws InterruptedException { - Jedis jedis = new Jedis("127.0.0.1", configuration.getInt(REDIS_PORT_KEY)); - Assert.assertNotNull(jedis.info()); - System.out.println(jedis.info()); - jedis.close(); - } - - @Test - public void alluxioShouldStart() throws IOException, AlluxioException, InterruptedException { - alluxio.client.file.FileSystem fs = AlluxioUtils.INSTANCE.getFileSystem(); - writeFile(fs); - - assertTrue(readFile(fs)); - - HdfsUtils.INSTANCE.getFileSystem().mkdirs(new Path("/khanh/alluxio")); - FSDataOutputStream writer = HdfsUtils.INSTANCE.getFileSystem().create(new Path("/khanh/alluxio/test.txt"), true); - writer.writeUTF(configuration.getString(HDFS_TEST_STRING_KEY)); - writer.close(); - - fs.mount(new AlluxioURI(PATH + "/hdfs"), new AlluxioURI("hdfs://localhost:20112/khanh/alluxio")); - assertTrue(fs.exists(new AlluxioURI(PATH + "/hdfs/test.txt"))); - - fs.unmount(new AlluxioURI(PATH + "/hdfs")); - assertFalse(fs.exists(new AlluxioURI(PATH + "/hdfs/test.txt"))); - } - - private boolean readFile(alluxio.client.file.FileSystem fs) throws IOException, AlluxioException { - boolean pass = true; - for (int i = 0; i < NB_FILE; i++) { - AlluxioURI filePath = new AlluxioURI(PATH + "/part-" + i); - LOGGER.debug("Reading data from {}", filePath); - - FileInStream is = fs.openFile(filePath); - URIStatus status = fs.getStatus(filePath); - ByteBuffer buf = ByteBuffer.allocate((int) status.getBlockSizeBytes()); - is.read(buf.array()); - buf.order(ByteOrder.nativeOrder()); - for (int k = 0; k < NB_FILE; k++) { - pass = pass && (buf.getInt() == k); - } - is.close(); - } - return pass; - } - - private void writeFile(alluxio.client.file.FileSystem fs) throws IOException, AlluxioException { - for (int i = 0; i < NB_FILE; i++) { - ByteBuffer buf = ByteBuffer.allocate(80); - buf.order(ByteOrder.nativeOrder()); - for (int k = 0; k < NB_FILE; k++) { - buf.putInt(k); - } - buf.flip(); - AlluxioURI filePath = new AlluxioURI(PATH + "/part-" + i); - LOGGER.debug("Writing data to {}", filePath); - - OutputStream os = fs.createFile(filePath); - os.write(buf.array()); - os.close(); - } - } - - private static void createHbaseTable(String tableName, String colFamily, - org.apache.hadoop.conf.Configuration configuration) throws Exception { - - final HBaseAdmin admin = new HBaseAdmin(configuration); - HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); - HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(colFamily); - - hTableDescriptor.addFamily(hColumnDescriptor); - admin.createTable(hTableDescriptor); - } - - private static void deleteHbaseTable(String tableName, org.apache.hadoop.conf.Configuration configuration) throws Exception { - - final HBaseAdmin admin = new HBaseAdmin(configuration); - if (admin.tableExists(tableName)) { - admin.disableTable(tableName); - admin.deleteTable(tableName); - } - } - - private static void putRow(String tableName, String colFamName, String rowKey, String colQualifier, String value, - org.apache.hadoop.conf.Configuration configuration) throws Exception { - HTable table = new HTable(configuration, tableName); - Put put = new Put(Bytes.toBytes(rowKey)); - put.add(Bytes.toBytes(colFamName), Bytes.toBytes(colQualifier), Bytes.toBytes(value)); - table.put(put); - table.flushCommits(); - table.close(); - } - - private static Result getRow(String tableName, String colFamName, String rowKey, String colQualifier, - org.apache.hadoop.conf.Configuration configuration) throws Exception { - Result result; - HTable table = new HTable(configuration, tableName); - Get get = new Get(Bytes.toBytes(rowKey)); - get.addColumn(Bytes.toBytes(colFamName), Bytes.toBytes(colQualifier)); - get.setMaxVersions(1); - result = table.get(get); - return result; - } - - private void defaultBlindTrust() throws NoSuchAlgorithmException, KeyManagementException { - TrustManager[] trustAllCerts = new TrustManager[]{ - new X509ExtendedTrustManager() { - @Override - public X509Certificate[] getAcceptedIssuers() { - return null; - } - - @Override - public void checkClientTrusted(X509Certificate[] certs, String authType) { - } - - @Override - public void checkServerTrusted(X509Certificate[] certs, String authType) { - } - - @Override - public void checkClientTrusted(X509Certificate[] xcs, String string, Socket socket) throws CertificateException { - - } - - @Override - public void checkServerTrusted(X509Certificate[] xcs, String string, Socket socket) throws CertificateException { - - } - - @Override - public void checkClientTrusted(X509Certificate[] xcs, String string, SSLEngine ssle) throws CertificateException { - - } - - @Override - public void checkServerTrusted(X509Certificate[] xcs, String string, SSLEngine ssle) throws CertificateException { - - } - - } - }; - SSLContext sc = SSLContext.getInstance("SSL"); - sc.init(null, trustAllCerts, new java.security.SecureRandom()); - HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory()); - HostnameVerifier allHostsValid = new HostnameVerifier() { - @Override - public boolean verify(String hostname, SSLSession session) { - return true; - } - }; - HttpsURLConnection.setDefaultHostnameVerifier(allHostsValid); - } - - @Test - public void mongodbShouldStart() throws UnknownHostException { - MongoClient mongo = new MongoClient(configuration.getString(MONGO_IP_KEY), configuration.getInt(MONGO_PORT_KEY)); - - DB db = mongo.getDB(configuration.getString(MONGO_DATABASE_NAME_KEY)); - DBCollection col = db.createCollection(configuration.getString(MONGO_COLLECTION_NAME_KEY), - new BasicDBObject()); - - col.save(new BasicDBObject("testDoc", new java.util.Date())); - LOGGER.info("MONGODB: Number of items in collection: {}", col.count()); - assertEquals(1, col.count()); - - DBCursor cursor = col.find(); - while (cursor.hasNext()) { - LOGGER.info("MONGODB: Document output: {}", cursor.next()); - } - cursor.close(); - } - - @Test - public void cassandraShouldStart() throws NotFoundServiceException { - Cluster cluster = Cluster.builder() - .addContactPoints(configuration.getString(CASSANDRA_IP_KEY)).withPort(configuration.getInt(CASSANDRA_PORT_KEY)).build(); - Session session = cluster.connect(); - - session.execute("create KEYSPACE test WITH replication = {'class': 'SimpleStrategy' , 'replication_factor': '1' }"); - session.execute("CREATE TABLE test.test (user text, value text, PRIMARY KEY (user))"); - session.execute("insert into test.test(user, value) values('user1', 'value1')"); - session.execute("insert into test.test(user, value) values('user2', 'value2')"); - - com.datastax.driver.core.ResultSet execute = session.execute("select * from test.test"); - - List res = execute.all(); - assertEquals(res.size(), 2); - assertEquals(res.get(0).getString("user"), "user2"); - assertEquals(res.get(0).getString("value"), "value2"); - assertEquals(res.get(1).getString("user"), "user1"); - - } - - @Test - public void elasticSearchShouldStart() throws NotFoundServiceException, IOException, JSONException { - - RestClient restClient = RestClient.builder( - new HttpHost(configuration.getString(ELASTICSEARCH_IP_KEY), configuration.getInt(ELASTICSEARCH_HTTP_PORT_KEY), "http")).build(); - - org.elasticsearch.client.Response response = restClient.performRequest("GET", "/", - Collections.singletonMap("pretty", "true")); - System.out.println(EntityUtils.toString(response.getEntity())); - - // indexing document - HttpEntity entity = new NStringEntity( - "{\n" + - " \"user\" : \"kimchy\",\n" + - " \"post_date\" : \"2009-11-15T14:12:12\",\n" + - " \"message\" : \"trying out Elasticsearch\"\n" + - "}", ContentType.APPLICATION_JSON); - - org.elasticsearch.client.Response indexResponse = restClient.performRequest( - "PUT", - "/twitter/tweet/1", - Collections.emptyMap(), - entity); - - response = restClient.performRequest("GET", "/_search", - Collections.singletonMap("pretty", "true")); - - - String result = EntityUtils.toString(response.getEntity()); - System.out.println(result); - JSONObject obj = new JSONObject(result); - int nbResult = obj.getJSONObject("hits").getInt("total"); - assertThat(nbResult).isEqualTo(1); - - restClient.close(); - } - - @Test - public void neo4jShouldStartWithRealDriver() { - - org.neo4j.driver.v1.Driver driver = GraphDatabase.driver( - "bolt://localhost:13533", - Config.build() - .withEncryptionLevel(Config.EncryptionLevel.NONE) - .toConfig() - ); - - List results = new ArrayList<>(); - try (org.neo4j.driver.v1.Session session = driver.session()) { - session.run("CREATE (person:Person {name: {name}, title:'King'})", Values.parameters("name", "Arthur")); - - StatementResult result = session.run("MATCH (a:Person) WHERE a.name = 'Arthur' RETURN a.name AS name, a.title AS title"); - while (result.hasNext()) { - Record record = result.next(); - results.add(record); - LOGGER.debug(record.get("title").asString() + " " + record.get("name").asString()); - } - } - - assertEquals(1, results.size()); - assertEquals("King", results.get(0).get("title").asString()); - assertEquals("Arthur", results.get(0).get("name").asString()); - } - - @Test - public void testYarnLocalClusterIntegrationTest() { - - String[] args = new String[7]; - args[0] = "whoami"; - args[1] = "1"; - args[2] = getClass().getClassLoader().getResource("simple-yarn-app-1.1.0.jar").toString(); - args[3] = configuration.getString(YARN_RESOURCE_MANAGER_ADDRESS_KEY); - args[4] = configuration.getString(YARN_RESOURCE_MANAGER_HOSTNAME_KEY); - args[5] = configuration.getString(YARN_RESOURCE_MANAGER_SCHEDULER_ADDRESS_KEY); - args[6] = configuration.getString(YARN_RESOURCE_MANAGER_RESOURCE_TRACKER_ADDRESS_KEY); - - - try { - Client.main(args); - } catch (Exception e) { - e.printStackTrace(); - } - - - } - - -} - diff --git a/hadoop-unit-standalone/src/test/java/fr/jetoile/hadoopunit/integrationtest/simpleyarnapp/Client.java b/hadoop-unit-standalone/src/test/java/fr/jetoile/hadoopunit/integrationtest/simpleyarnapp/Client.java deleted file mode 100644 index 869fdbb8..00000000 --- a/hadoop-unit-standalone/src/test/java/fr/jetoile/hadoopunit/integrationtest/simpleyarnapp/Client.java +++ /dev/null @@ -1,154 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package fr.jetoile.hadoopunit.integrationtest.simpleyarnapp; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.ApplicationConstants; -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; -import org.apache.hadoop.yarn.api.records.*; -import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.client.api.YarnClientApplication; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.util.Apps; -import org.apache.hadoop.yarn.util.ConverterUtils; -import org.apache.hadoop.yarn.util.Records; - -import java.io.File; -import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - - -public class Client { - - Configuration conf = new YarnConfiguration(); - - public void run(String[] args) throws Exception { - final String command = args[0]; - final int n = Integer.valueOf(args[1]); - final Path jarPath = new Path(args[2]); - final String resourceManagerAddress = args[3]; - final String resourceManagerHostname = args[4]; - final String resourceManagerSchedulerAddress = args[5]; - final String resourceManagerResourceTrackerAddress = args[6]; - - // Create yarnClient - YarnConfiguration conf = new YarnConfiguration(); - conf.set("yarn.resourcemanager.address", resourceManagerAddress); - conf.set("yarn.resourcemanager.hostname", resourceManagerHostname); - conf.set("yarn.resourcemanager.scheduler.address", resourceManagerSchedulerAddress); - conf.set("yarn.resourcemanager.resource-tracker.address", resourceManagerResourceTrackerAddress); - YarnClient yarnClient = YarnClient.createYarnClient(); - yarnClient.init(conf); - yarnClient.start(); - - // Create application via yarnClient - YarnClientApplication app = yarnClient.createApplication(); - - // Set up the container launch context for the application master - ContainerLaunchContext amContainer = - Records.newRecord(ContainerLaunchContext.class); - amContainer.setCommands( - Collections.singletonList( - "$JAVA_HOME/bin/java" + - " -Xmx256M" + - " com.hortonworks.simpleyarnapp.ApplicationMaster" + - " " + command + - " " + String.valueOf(n) + - " " + resourceManagerAddress + - " " + resourceManagerHostname + - " " + resourceManagerSchedulerAddress + - " " + resourceManagerResourceTrackerAddress + - " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + - " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" - ) - ); - - // Setup jar for ApplicationMaster - LocalResource appMasterJar = Records.newRecord(LocalResource.class); - setupAppMasterJar(jarPath, appMasterJar); - amContainer.setLocalResources( - Collections.singletonMap("simple-yarn-app-1.1.0.jar", appMasterJar)); - - // Setup CLASSPATH for ApplicationMaster - Map appMasterEnv = new HashMap(); - setupAppMasterEnv(appMasterEnv); - amContainer.setEnvironment(appMasterEnv); - - // Set up resource type requirements for ApplicationMaster - Resource capability = Records.newRecord(Resource.class); - capability.setVirtualCores(1); - - // Finally, set-up ApplicationSubmissionContext for the application - ApplicationSubmissionContext appContext = - app.getApplicationSubmissionContext(); - appContext.setApplicationName("simple-yarn-app"); // application name - appContext.setAMContainerSpec(amContainer); - appContext.setResource(capability); - appContext.setQueue("default"); // queue - - // Submit application - ApplicationId appId = appContext.getApplicationId(); - System.out.println("Submitting application " + appId); - yarnClient.submitApplication(appContext); - - ApplicationReport appReport = yarnClient.getApplicationReport(appId); - YarnApplicationState appState = appReport.getYarnApplicationState(); - while (appState != YarnApplicationState.FINISHED && - appState != YarnApplicationState.KILLED && - appState != YarnApplicationState.FAILED) { - Thread.sleep(100); - appReport = yarnClient.getApplicationReport(appId); - appState = appReport.getYarnApplicationState(); - } - - System.out.println( - "Application " + appId + " finished with" + - " state " + appState + - " at " + appReport.getFinishTime()); - - } - - private void setupAppMasterJar(Path jarPath, LocalResource appMasterJar) throws IOException { - FileStatus jarStat = FileSystem.get(conf).getFileStatus(jarPath); - appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); - appMasterJar.setSize(jarStat.getLen()); - appMasterJar.setTimestamp(jarStat.getModificationTime()); - appMasterJar.setType(LocalResourceType.FILE); - appMasterJar.setVisibility(LocalResourceVisibility.PUBLIC); - } - - @SuppressWarnings("deprecation") - private void setupAppMasterEnv(Map appMasterEnv) { - for (String c : conf.getStrings( - YarnConfiguration.YARN_APPLICATION_CLASSPATH, - YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) { - Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), - c.trim()); - } - Apps.addToEnvironment(appMasterEnv, - Environment.CLASSPATH.name(), - Environment.PWD.$() + File.separator + "*"); - } - - public static void main(String[] args) throws Exception { - Client c = new Client(); - c.run(args); - } - -} diff --git a/pom.xml b/pom.xml index 3d4ad070..4716c84c 100755 --- a/pom.xml +++ b/pom.xml @@ -159,7 +159,6 @@ 2.1.0 4.4.1 - 18.0 1.4 9.3.21.v20170918 3.1.0 @@ -542,12 +541,6 @@ ${hadoop.version} - - com.google.guava - guava - ${guava.version} - - org.apache.httpcomponents httpclient