From c73f2421fc924652a42aa66a49333285b60c9e7a Mon Sep 17 00:00:00 2001 From: jetoile Date: Wed, 6 Jan 2016 20:31:14 +0100 Subject: [PATCH] add TI for hdfs + hbase --- .../IntegrationBootstrapTest.java | 111 +++++++++++++++++ .../IntegrationBootstrapTest2.java | 117 +++++++++++++++++- 2 files changed, 227 insertions(+), 1 deletion(-) diff --git a/src/test/java/fr/jetoile/sample/integrationtest/IntegrationBootstrapTest.java b/src/test/java/fr/jetoile/sample/integrationtest/IntegrationBootstrapTest.java index a67b8ccb..4bf653b7 100644 --- a/src/test/java/fr/jetoile/sample/integrationtest/IntegrationBootstrapTest.java +++ b/src/test/java/fr/jetoile/sample/integrationtest/IntegrationBootstrapTest.java @@ -3,6 +3,7 @@ import com.github.sakserv.minicluster.config.ConfigVars; import fr.jetoile.sample.Utils; +import fr.jetoile.sample.component.HdfsBootstrap; import fr.jetoile.sample.component.SolrCloudBootstrap; import fr.jetoile.sample.exception.BootstrapException; import fr.jetoile.sample.kafka.consumer.KafkaTestConsumer; @@ -10,6 +11,11 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; @@ -31,8 +37,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URL; +import java.net.URLConnection; import java.sql.*; import java.sql.Connection; import java.util.ArrayList; @@ -196,4 +207,104 @@ public void hiveServer2ShouldStart() throws InterruptedException, ClassNotFoundE stmt.execute(dropDdl); } + + + @Test + public void hdfsShouldStart() throws Exception { + + assertThat(Utils.available("127.0.0.1", configuration.getInt(ConfigVars.HDFS_NAMENODE_HTTP_PORT_KEY))).isFalse(); + + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set("fs.default.name", "hdfs://127.0.0.1:" + configuration.getInt(ConfigVars.HDFS_NAMENODE_PORT_KEY)); + + URI uri = URI.create ("hdfs://127.0.0.1:" + configuration.getInt(ConfigVars.HDFS_NAMENODE_PORT_KEY)); + + FileSystem hdfsFsHandle = FileSystem.get (uri, conf); + + FSDataOutputStream writer = hdfsFsHandle.create(new Path(configuration.getString(ConfigVars.HDFS_TEST_FILE_KEY))); + writer.writeUTF(configuration.getString(ConfigVars.HDFS_TEST_STRING_KEY)); + writer.close(); + + // Read the file and compare to test string + FSDataInputStream reader = hdfsFsHandle.open(new Path(configuration.getString(ConfigVars.HDFS_TEST_FILE_KEY))); + assertEquals(reader.readUTF(), configuration.getString(ConfigVars.HDFS_TEST_STRING_KEY)); + reader.close(); + hdfsFsHandle.close(); + + URL url = new URL( + String.format( "http://localhost:%s/webhdfs/v1?op=GETHOMEDIRECTORY&user.name=guest", + configuration.getInt( ConfigVars.HDFS_NAMENODE_HTTP_PORT_KEY ) ) ); + URLConnection connection = url.openConnection(); + connection.setRequestProperty( "Accept-Charset", "UTF-8" ); + BufferedReader response = new BufferedReader( new InputStreamReader( connection.getInputStream() ) ); + String line = response.readLine(); + response.close(); + assertThat("{\"Path\":\"/user/guest\"}").isEqualTo(line); + + } + + @Test + public void hBaseShouldStart() throws Exception { + + String tableName = configuration.getString(ConfigVars.HBASE_TEST_TABLE_NAME_KEY); + String colFamName = configuration.getString(ConfigVars.HBASE_TEST_COL_FAMILY_NAME_KEY); + String colQualiferName = configuration.getString(ConfigVars.HBASE_TEST_COL_QUALIFIER_NAME_KEY); + Integer numRowsToPut = configuration.getInt(ConfigVars.HBASE_TEST_NUM_ROWS_TO_PUT_KEY); + + org.apache.hadoop.conf.Configuration hbaseConfiguration = HBaseConfiguration.create(); + hbaseConfiguration.set("hbase.zookeeper.quorum", configuration.getString(ConfigVars.ZOOKEEPER_HOST_KEY)); + hbaseConfiguration.setInt("hbase.zookeeper.property.clientPort", configuration.getInt(ConfigVars.ZOOKEEPER_PORT_KEY)); + hbaseConfiguration.set("hbase.master", "127.0.0.1:" + configuration.getInt(ConfigVars.HBASE_MASTER_PORT_KEY)); + hbaseConfiguration.set("zookeeper.znode.parent", configuration.getString(ConfigVars.HBASE_ZNODE_PARENT_KEY)); + + + LOGGER.info("HBASE: Creating table {} with column family {}", tableName, colFamName); + createHbaseTable(tableName, colFamName, hbaseConfiguration); + + LOGGER.info("HBASE: Populate the table with {} rows.", numRowsToPut); + for (int i = 0; i < numRowsToPut; i++) { + putRow(tableName, colFamName, String.valueOf(i), colQualiferName, "row_" + i, hbaseConfiguration); + } + + LOGGER.info("HBASE: Fetching and comparing the results"); + for (int i = 0; i < numRowsToPut; i++) { + Result result = getRow(tableName, colFamName, String.valueOf(i), colQualiferName, hbaseConfiguration); + assertEquals("row_" + i, new String(result.value())); + } + + } + + private static void createHbaseTable(String tableName, String colFamily, + org.apache.hadoop.conf.Configuration configuration) throws Exception { + + final HBaseAdmin admin = new HBaseAdmin(configuration); + HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); + HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(colFamily); + + hTableDescriptor.addFamily(hColumnDescriptor); + admin.createTable(hTableDescriptor); + } + + private static void putRow(String tableName, String colFamName, String rowKey, String colQualifier, String value, + org.apache.hadoop.conf.Configuration configuration) throws Exception { + HTable table = new HTable(configuration, tableName); + Put put = new Put(Bytes.toBytes(rowKey)); + put.add(Bytes.toBytes(colFamName), Bytes.toBytes(colQualifier), Bytes.toBytes(value)); + table.put(put); + table.flushCommits(); + table.close(); + } + + private static Result getRow(String tableName, String colFamName, String rowKey, String colQualifier, + org.apache.hadoop.conf.Configuration configuration) throws Exception { + Result result; + HTable table = new HTable(configuration, tableName); + Get get = new Get(Bytes.toBytes(rowKey)); + get.addColumn(Bytes.toBytes(colFamName), Bytes.toBytes(colQualifier)); + get.setMaxVersions(1); + result = table.get(get); + return result; + } + + } diff --git a/src/test/java/fr/jetoile/sample/integrationtest/IntegrationBootstrapTest2.java b/src/test/java/fr/jetoile/sample/integrationtest/IntegrationBootstrapTest2.java index f87f2560..449b841d 100644 --- a/src/test/java/fr/jetoile/sample/integrationtest/IntegrationBootstrapTest2.java +++ b/src/test/java/fr/jetoile/sample/integrationtest/IntegrationBootstrapTest2.java @@ -4,6 +4,7 @@ import com.github.sakserv.minicluster.config.ConfigVars; import fr.jetoile.sample.Component; import fr.jetoile.sample.HadoopBootstrap; +import fr.jetoile.sample.Utils; import fr.jetoile.sample.component.SolrCloudBootstrap; import fr.jetoile.sample.exception.BootstrapException; import fr.jetoile.sample.kafka.consumer.KafkaTestConsumer; @@ -11,6 +12,16 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.common.SolrDocument; @@ -20,8 +31,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URL; +import java.net.URLConnection; import java.sql.*; +import java.sql.Connection; import java.util.ArrayList; import java.util.List; @@ -46,7 +63,7 @@ public static void setup() throws BootstrapException { throw new BootstrapException("bad config", e); } - hadoopBootstrap = new HadoopBootstrap(Component.ZOOKEEPER, Component.KAFKA, Component.HIVEMETA, Component.HIVESERVER2, Component.SOLRCLOUD); + hadoopBootstrap = new HadoopBootstrap(Component.ZOOKEEPER, Component.HDFS, Component.KAFKA, Component.HIVEMETA, Component.HIVESERVER2, Component.HBASE, Component.SOLRCLOUD); hadoopBootstrap.startAll(); } @@ -186,4 +203,102 @@ public void hiveServer2ShouldStart() throws InterruptedException, ClassNotFoundE stmt.execute(dropDdl); } + + @Test + public void hdfsShouldStart() throws Exception { + + assertThat(Utils.available("127.0.0.1", configuration.getInt(ConfigVars.HDFS_NAMENODE_HTTP_PORT_KEY))).isFalse(); + + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + conf.set("fs.default.name", "hdfs://127.0.0.1:" + configuration.getInt(ConfigVars.HDFS_NAMENODE_PORT_KEY)); + + URI uri = URI.create ("hdfs://127.0.0.1:" + configuration.getInt(ConfigVars.HDFS_NAMENODE_PORT_KEY)); + + FileSystem hdfsFsHandle = FileSystem.get (uri, conf); + + FSDataOutputStream writer = hdfsFsHandle.create(new Path(configuration.getString(ConfigVars.HDFS_TEST_FILE_KEY))); + writer.writeUTF(configuration.getString(ConfigVars.HDFS_TEST_STRING_KEY)); + writer.close(); + + // Read the file and compare to test string + FSDataInputStream reader = hdfsFsHandle.open(new Path(configuration.getString(ConfigVars.HDFS_TEST_FILE_KEY))); + assertEquals(reader.readUTF(), configuration.getString(ConfigVars.HDFS_TEST_STRING_KEY)); + reader.close(); + hdfsFsHandle.close(); + + URL url = new URL( + String.format( "http://localhost:%s/webhdfs/v1?op=GETHOMEDIRECTORY&user.name=guest", + configuration.getInt( ConfigVars.HDFS_NAMENODE_HTTP_PORT_KEY ) ) ); + URLConnection connection = url.openConnection(); + connection.setRequestProperty( "Accept-Charset", "UTF-8" ); + BufferedReader response = new BufferedReader( new InputStreamReader( connection.getInputStream() ) ); + String line = response.readLine(); + response.close(); + assertThat("{\"Path\":\"/user/guest\"}").isEqualTo(line); + + } + + + @Test + public void hBaseShouldStart() throws Exception { + + String tableName = configuration.getString(ConfigVars.HBASE_TEST_TABLE_NAME_KEY); + String colFamName = configuration.getString(ConfigVars.HBASE_TEST_COL_FAMILY_NAME_KEY); + String colQualiferName = configuration.getString(ConfigVars.HBASE_TEST_COL_QUALIFIER_NAME_KEY); + Integer numRowsToPut = configuration.getInt(ConfigVars.HBASE_TEST_NUM_ROWS_TO_PUT_KEY); + + org.apache.hadoop.conf.Configuration hbaseConfiguration = HBaseConfiguration.create(); + hbaseConfiguration.set("hbase.zookeeper.quorum", configuration.getString(ConfigVars.ZOOKEEPER_HOST_KEY)); + hbaseConfiguration.setInt("hbase.zookeeper.property.clientPort", configuration.getInt(ConfigVars.ZOOKEEPER_PORT_KEY)); + hbaseConfiguration.set("hbase.master", "127.0.0.1:" + configuration.getInt(ConfigVars.HBASE_MASTER_PORT_KEY)); + hbaseConfiguration.set("zookeeper.znode.parent", configuration.getString(ConfigVars.HBASE_ZNODE_PARENT_KEY)); + + + LOGGER.info("HBASE: Creating table {} with column family {}", tableName, colFamName); + createHbaseTable(tableName, colFamName, hbaseConfiguration); + + LOGGER.info("HBASE: Populate the table with {} rows.", numRowsToPut); + for (int i = 0; i < numRowsToPut; i++) { + putRow(tableName, colFamName, String.valueOf(i), colQualiferName, "row_" + i, hbaseConfiguration); + } + + LOGGER.info("HBASE: Fetching and comparing the results"); + for (int i = 0; i < numRowsToPut; i++) { + Result result = getRow(tableName, colFamName, String.valueOf(i), colQualiferName, hbaseConfiguration); + assertEquals("row_" + i, new String(result.value())); + } + + } + + private static void createHbaseTable(String tableName, String colFamily, + org.apache.hadoop.conf.Configuration configuration) throws Exception { + + final HBaseAdmin admin = new HBaseAdmin(configuration); + HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName)); + HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(colFamily); + + hTableDescriptor.addFamily(hColumnDescriptor); + admin.createTable(hTableDescriptor); + } + + private static void putRow(String tableName, String colFamName, String rowKey, String colQualifier, String value, + org.apache.hadoop.conf.Configuration configuration) throws Exception { + HTable table = new HTable(configuration, tableName); + Put put = new Put(Bytes.toBytes(rowKey)); + put.add(Bytes.toBytes(colFamName), Bytes.toBytes(colQualifier), Bytes.toBytes(value)); + table.put(put); + table.flushCommits(); + table.close(); + } + + private static Result getRow(String tableName, String colFamName, String rowKey, String colQualifier, + org.apache.hadoop.conf.Configuration configuration) throws Exception { + Result result; + HTable table = new HTable(configuration, tableName); + Get get = new Get(Bytes.toBytes(rowKey)); + get.addColumn(Bytes.toBytes(colFamName), Bytes.toBytes(colQualifier)); + get.setMaxVersions(1); + result = table.get(get); + return result; + } }