Skip to content

Commit

Permalink
add TI for hdfs + hbase
Browse files Browse the repository at this point in the history
  • Loading branch information
jetoile committed Jan 6, 2016
1 parent ce140ab commit c73f242
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,19 @@

import com.github.sakserv.minicluster.config.ConfigVars;
import fr.jetoile.sample.Utils;
import fr.jetoile.sample.component.HdfsBootstrap;
import fr.jetoile.sample.component.SolrCloudBootstrap;
import fr.jetoile.sample.exception.BootstrapException;
import fr.jetoile.sample.kafka.consumer.KafkaTestConsumer;
import fr.jetoile.sample.kafka.producer.KafkaTestProducer;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
Expand All @@ -31,8 +37,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.sql.*;
import java.sql.Connection;
import java.util.ArrayList;
Expand Down Expand Up @@ -196,4 +207,104 @@ public void hiveServer2ShouldStart() throws InterruptedException, ClassNotFoundE
stmt.execute(dropDdl);
}



@Test
public void hdfsShouldStart() throws Exception {

assertThat(Utils.available("127.0.0.1", configuration.getInt(ConfigVars.HDFS_NAMENODE_HTTP_PORT_KEY))).isFalse();

org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("fs.default.name", "hdfs://127.0.0.1:" + configuration.getInt(ConfigVars.HDFS_NAMENODE_PORT_KEY));

URI uri = URI.create ("hdfs://127.0.0.1:" + configuration.getInt(ConfigVars.HDFS_NAMENODE_PORT_KEY));

FileSystem hdfsFsHandle = FileSystem.get (uri, conf);

FSDataOutputStream writer = hdfsFsHandle.create(new Path(configuration.getString(ConfigVars.HDFS_TEST_FILE_KEY)));
writer.writeUTF(configuration.getString(ConfigVars.HDFS_TEST_STRING_KEY));
writer.close();

// Read the file and compare to test string
FSDataInputStream reader = hdfsFsHandle.open(new Path(configuration.getString(ConfigVars.HDFS_TEST_FILE_KEY)));
assertEquals(reader.readUTF(), configuration.getString(ConfigVars.HDFS_TEST_STRING_KEY));
reader.close();
hdfsFsHandle.close();

URL url = new URL(
String.format( "http://localhost:%s/webhdfs/v1?op=GETHOMEDIRECTORY&user.name=guest",
configuration.getInt( ConfigVars.HDFS_NAMENODE_HTTP_PORT_KEY ) ) );
URLConnection connection = url.openConnection();
connection.setRequestProperty( "Accept-Charset", "UTF-8" );
BufferedReader response = new BufferedReader( new InputStreamReader( connection.getInputStream() ) );
String line = response.readLine();
response.close();
assertThat("{\"Path\":\"/user/guest\"}").isEqualTo(line);

}

@Test
public void hBaseShouldStart() throws Exception {

String tableName = configuration.getString(ConfigVars.HBASE_TEST_TABLE_NAME_KEY);
String colFamName = configuration.getString(ConfigVars.HBASE_TEST_COL_FAMILY_NAME_KEY);
String colQualiferName = configuration.getString(ConfigVars.HBASE_TEST_COL_QUALIFIER_NAME_KEY);
Integer numRowsToPut = configuration.getInt(ConfigVars.HBASE_TEST_NUM_ROWS_TO_PUT_KEY);

org.apache.hadoop.conf.Configuration hbaseConfiguration = HBaseConfiguration.create();
hbaseConfiguration.set("hbase.zookeeper.quorum", configuration.getString(ConfigVars.ZOOKEEPER_HOST_KEY));
hbaseConfiguration.setInt("hbase.zookeeper.property.clientPort", configuration.getInt(ConfigVars.ZOOKEEPER_PORT_KEY));
hbaseConfiguration.set("hbase.master", "127.0.0.1:" + configuration.getInt(ConfigVars.HBASE_MASTER_PORT_KEY));
hbaseConfiguration.set("zookeeper.znode.parent", configuration.getString(ConfigVars.HBASE_ZNODE_PARENT_KEY));


LOGGER.info("HBASE: Creating table {} with column family {}", tableName, colFamName);
createHbaseTable(tableName, colFamName, hbaseConfiguration);

LOGGER.info("HBASE: Populate the table with {} rows.", numRowsToPut);
for (int i = 0; i < numRowsToPut; i++) {
putRow(tableName, colFamName, String.valueOf(i), colQualiferName, "row_" + i, hbaseConfiguration);
}

LOGGER.info("HBASE: Fetching and comparing the results");
for (int i = 0; i < numRowsToPut; i++) {
Result result = getRow(tableName, colFamName, String.valueOf(i), colQualiferName, hbaseConfiguration);
assertEquals("row_" + i, new String(result.value()));
}

}

private static void createHbaseTable(String tableName, String colFamily,
org.apache.hadoop.conf.Configuration configuration) throws Exception {

final HBaseAdmin admin = new HBaseAdmin(configuration);
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(colFamily);

hTableDescriptor.addFamily(hColumnDescriptor);
admin.createTable(hTableDescriptor);
}

private static void putRow(String tableName, String colFamName, String rowKey, String colQualifier, String value,
org.apache.hadoop.conf.Configuration configuration) throws Exception {
HTable table = new HTable(configuration, tableName);
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(colFamName), Bytes.toBytes(colQualifier), Bytes.toBytes(value));
table.put(put);
table.flushCommits();
table.close();
}

private static Result getRow(String tableName, String colFamName, String rowKey, String colQualifier,
org.apache.hadoop.conf.Configuration configuration) throws Exception {
Result result;
HTable table = new HTable(configuration, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(colFamName), Bytes.toBytes(colQualifier));
get.setMaxVersions(1);
result = table.get(get);
return result;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,24 @@
import com.github.sakserv.minicluster.config.ConfigVars;
import fr.jetoile.sample.Component;
import fr.jetoile.sample.HadoopBootstrap;
import fr.jetoile.sample.Utils;
import fr.jetoile.sample.component.SolrCloudBootstrap;
import fr.jetoile.sample.exception.BootstrapException;
import fr.jetoile.sample.kafka.consumer.KafkaTestConsumer;
import fr.jetoile.sample.kafka.producer.KafkaTestProducer;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrDocument;
Expand All @@ -20,8 +31,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.sql.*;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -46,7 +63,7 @@ public static void setup() throws BootstrapException {
throw new BootstrapException("bad config", e);
}

hadoopBootstrap = new HadoopBootstrap(Component.ZOOKEEPER, Component.KAFKA, Component.HIVEMETA, Component.HIVESERVER2, Component.SOLRCLOUD);
hadoopBootstrap = new HadoopBootstrap(Component.ZOOKEEPER, Component.HDFS, Component.KAFKA, Component.HIVEMETA, Component.HIVESERVER2, Component.HBASE, Component.SOLRCLOUD);
hadoopBootstrap.startAll();
}

Expand Down Expand Up @@ -186,4 +203,102 @@ public void hiveServer2ShouldStart() throws InterruptedException, ClassNotFoundE
stmt.execute(dropDdl);
}


@Test
public void hdfsShouldStart() throws Exception {

assertThat(Utils.available("127.0.0.1", configuration.getInt(ConfigVars.HDFS_NAMENODE_HTTP_PORT_KEY))).isFalse();

org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("fs.default.name", "hdfs://127.0.0.1:" + configuration.getInt(ConfigVars.HDFS_NAMENODE_PORT_KEY));

URI uri = URI.create ("hdfs://127.0.0.1:" + configuration.getInt(ConfigVars.HDFS_NAMENODE_PORT_KEY));

FileSystem hdfsFsHandle = FileSystem.get (uri, conf);

FSDataOutputStream writer = hdfsFsHandle.create(new Path(configuration.getString(ConfigVars.HDFS_TEST_FILE_KEY)));
writer.writeUTF(configuration.getString(ConfigVars.HDFS_TEST_STRING_KEY));
writer.close();

// Read the file and compare to test string
FSDataInputStream reader = hdfsFsHandle.open(new Path(configuration.getString(ConfigVars.HDFS_TEST_FILE_KEY)));
assertEquals(reader.readUTF(), configuration.getString(ConfigVars.HDFS_TEST_STRING_KEY));
reader.close();
hdfsFsHandle.close();

URL url = new URL(
String.format( "http://localhost:%s/webhdfs/v1?op=GETHOMEDIRECTORY&user.name=guest",
configuration.getInt( ConfigVars.HDFS_NAMENODE_HTTP_PORT_KEY ) ) );
URLConnection connection = url.openConnection();
connection.setRequestProperty( "Accept-Charset", "UTF-8" );
BufferedReader response = new BufferedReader( new InputStreamReader( connection.getInputStream() ) );
String line = response.readLine();
response.close();
assertThat("{\"Path\":\"/user/guest\"}").isEqualTo(line);

}


@Test
public void hBaseShouldStart() throws Exception {

String tableName = configuration.getString(ConfigVars.HBASE_TEST_TABLE_NAME_KEY);
String colFamName = configuration.getString(ConfigVars.HBASE_TEST_COL_FAMILY_NAME_KEY);
String colQualiferName = configuration.getString(ConfigVars.HBASE_TEST_COL_QUALIFIER_NAME_KEY);
Integer numRowsToPut = configuration.getInt(ConfigVars.HBASE_TEST_NUM_ROWS_TO_PUT_KEY);

org.apache.hadoop.conf.Configuration hbaseConfiguration = HBaseConfiguration.create();
hbaseConfiguration.set("hbase.zookeeper.quorum", configuration.getString(ConfigVars.ZOOKEEPER_HOST_KEY));
hbaseConfiguration.setInt("hbase.zookeeper.property.clientPort", configuration.getInt(ConfigVars.ZOOKEEPER_PORT_KEY));
hbaseConfiguration.set("hbase.master", "127.0.0.1:" + configuration.getInt(ConfigVars.HBASE_MASTER_PORT_KEY));
hbaseConfiguration.set("zookeeper.znode.parent", configuration.getString(ConfigVars.HBASE_ZNODE_PARENT_KEY));


LOGGER.info("HBASE: Creating table {} with column family {}", tableName, colFamName);
createHbaseTable(tableName, colFamName, hbaseConfiguration);

LOGGER.info("HBASE: Populate the table with {} rows.", numRowsToPut);
for (int i = 0; i < numRowsToPut; i++) {
putRow(tableName, colFamName, String.valueOf(i), colQualiferName, "row_" + i, hbaseConfiguration);
}

LOGGER.info("HBASE: Fetching and comparing the results");
for (int i = 0; i < numRowsToPut; i++) {
Result result = getRow(tableName, colFamName, String.valueOf(i), colQualiferName, hbaseConfiguration);
assertEquals("row_" + i, new String(result.value()));
}

}

private static void createHbaseTable(String tableName, String colFamily,
org.apache.hadoop.conf.Configuration configuration) throws Exception {

final HBaseAdmin admin = new HBaseAdmin(configuration);
HTableDescriptor hTableDescriptor = new HTableDescriptor(TableName.valueOf(tableName));
HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(colFamily);

hTableDescriptor.addFamily(hColumnDescriptor);
admin.createTable(hTableDescriptor);
}

private static void putRow(String tableName, String colFamName, String rowKey, String colQualifier, String value,
org.apache.hadoop.conf.Configuration configuration) throws Exception {
HTable table = new HTable(configuration, tableName);
Put put = new Put(Bytes.toBytes(rowKey));
put.add(Bytes.toBytes(colFamName), Bytes.toBytes(colQualifier), Bytes.toBytes(value));
table.put(put);
table.flushCommits();
table.close();
}

private static Result getRow(String tableName, String colFamName, String rowKey, String colQualifier,
org.apache.hadoop.conf.Configuration configuration) throws Exception {
Result result;
HTable table = new HTable(configuration, tableName);
Get get = new Get(Bytes.toBytes(rowKey));
get.addColumn(Bytes.toBytes(colFamName), Bytes.toBytes(colQualifier));
get.setMaxVersions(1);
result = table.get(get);
return result;
}
}

0 comments on commit c73f242

Please sign in to comment.