Skip to content

Commit

Permalink
add method for programmatic bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
jetoile committed Jan 6, 2016
1 parent 0d5acf4 commit bc37034
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 0 deletions.
10 changes: 10 additions & 0 deletions src/main/java/fr/jetoile/sample/HadoopBootstrap.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ public class HadoopBootstrap {
private List<Component> componentsToStart = new ArrayList<>();
private List<Component> componentsToStop = new ArrayList<>();

public HadoopBootstrap(Component... components) throws BootstrapException {

Arrays.asList(components).stream().forEach(c -> {
componentsToStart.add(c);
});

componentsToStop = Lists.newArrayList(this.componentsToStart);
Collections.reverse(componentsToStop);
}

public HadoopBootstrap() throws BootstrapException {
try {
configuration = new PropertiesConfiguration("hadoop.properties");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package fr.jetoile.sample.integrationtest;


import com.github.sakserv.minicluster.config.ConfigVars;
import fr.jetoile.sample.Component;
import fr.jetoile.sample.HadoopBootstrap;
import fr.jetoile.sample.component.SolrCloudBootstrap;
import fr.jetoile.sample.exception.BootstrapException;
import fr.jetoile.sample.kafka.consumer.KafkaTestConsumer;
import fr.jetoile.sample.kafka.producer.KafkaTestProducer;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.CloudSolrClient;
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.zookeeper.KeeperException;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;

import static junit.framework.TestCase.assertNotNull;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertEquals;

public class IntegrationBootstrapTest2 {

static private Configuration configuration;

static private HadoopBootstrap hadoopBootstrap;

static private Logger LOGGER = LoggerFactory.getLogger(IntegrationBootstrapTest2.class);


@BeforeClass
public static void setup() throws BootstrapException {
try {
configuration = new PropertiesConfiguration("default.properties");
} catch (ConfigurationException e) {
throw new BootstrapException("bad config", e);
}

hadoopBootstrap = new HadoopBootstrap(Component.ZOOKEEPER, Component.KAFKA, Component.HIVEMETA, Component.HIVESERVER2, Component.SOLRCLOUD);
hadoopBootstrap.startAll();
}


@AfterClass
public static void tearDown() throws BootstrapException {
hadoopBootstrap.stopAll();
}

@Test
public void solrCloudShouldStart() throws IOException, SolrServerException, KeeperException, InterruptedException {

String collectionName = configuration.getString(SolrCloudBootstrap.SOLR_COLLECTION_NAME);

String zkHostString = configuration.getString(ConfigVars.ZOOKEEPER_HOST_KEY) + ":" + configuration.getInt(ConfigVars.ZOOKEEPER_PORT_KEY);
CloudSolrClient client = new CloudSolrClient(zkHostString);

for (int i = 0; i < 1000; ++i) {
SolrInputDocument doc = new SolrInputDocument();
doc.addField("cat", "book");
doc.addField("id", "book-" + i);
doc.addField("name", "The Legend of the Hobbit part " + i);
client.add(collectionName, doc);
if (i % 100 == 0) client.commit(collectionName); // periodically flush
}
client.commit("collection1");

SolrDocument collection1 = client.getById(collectionName, "book-1");

assertNotNull(collection1);

assertThat(collection1.getFieldValue("name")).isEqualTo("The Legend of the Hobbit part 1");


client.close();
}

@Test
public void kafkaShouldStart() throws Exception {

// Producer
KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder()
.setKafkaHostname(configuration.getString(ConfigVars.KAFKA_HOSTNAME_KEY))
.setKafkaPort(configuration.getInt(ConfigVars.KAFKA_PORT_KEY))
.setTopic(configuration.getString(ConfigVars.KAFKA_TEST_TOPIC_KEY))
.setMessageCount(configuration.getInt(ConfigVars.KAFKA_TEST_MESSAGE_COUNT_KEY))
.build();
kafkaTestProducer.produceMessages();


// Consumer
List<String> seeds = new ArrayList<String>();
seeds.add(configuration.getString(ConfigVars.KAFKA_HOSTNAME_KEY));
KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer();
kafkaTestConsumer.consumeMessages(
configuration.getInt(ConfigVars.KAFKA_TEST_MESSAGE_COUNT_KEY),
configuration.getString(ConfigVars.KAFKA_TEST_TOPIC_KEY),
0,
seeds,
configuration.getInt(ConfigVars.KAFKA_PORT_KEY));

// Assert num of messages produced = num of message consumed
Assert.assertEquals(configuration.getLong(ConfigVars.KAFKA_TEST_MESSAGE_COUNT_KEY),
kafkaTestConsumer.getNumRead());
}

@Test
public void hiveServer2ShouldStart() throws InterruptedException, ClassNotFoundException, SQLException {

// assertThat(Utils.available("127.0.0.1", 20103)).isFalse();

// Load the Hive JDBC driver
LOGGER.info("HIVE: Loading the Hive JDBC Driver");
Class.forName("org.apache.hive.jdbc.HiveDriver");

//
// Create an ORC table and describe it
//
// Get the connection
Connection con = DriverManager.getConnection("jdbc:hive2://" +
configuration.getString(ConfigVars.HIVE_SERVER2_HOSTNAME_KEY) + ":" +
configuration.getInt(ConfigVars.HIVE_SERVER2_PORT_KEY) + "/" +
configuration.getString(ConfigVars.HIVE_TEST_DATABASE_NAME_KEY),
"user",
"pass");

// Create the DB
Statement stmt;
try {
String createDbDdl = "CREATE DATABASE IF NOT EXISTS " +
configuration.getString(ConfigVars.HIVE_TEST_DATABASE_NAME_KEY);
stmt = con.createStatement();
LOGGER.info("HIVE: Running Create Database Statement: {}", createDbDdl);
stmt.execute(createDbDdl);
} catch (Exception e) {
e.printStackTrace();
}

// Drop the table incase it still exists
String dropDdl = "DROP TABLE " + configuration.getString(ConfigVars.HIVE_TEST_DATABASE_NAME_KEY) + "." +
configuration.getString(ConfigVars.HIVE_TEST_TABLE_NAME_KEY);
stmt = con.createStatement();
LOGGER.info("HIVE: Running Drop Table Statement: {}", dropDdl);
stmt.execute(dropDdl);

// Create the ORC table
String createDdl = "CREATE TABLE IF NOT EXISTS " +
configuration.getString(ConfigVars.HIVE_TEST_DATABASE_NAME_KEY) + "." +
configuration.getString(ConfigVars.HIVE_TEST_TABLE_NAME_KEY) + " (id INT, msg STRING) " +
"PARTITIONED BY (dt STRING) " +
"CLUSTERED BY (id) INTO 16 BUCKETS " +
"STORED AS ORC tblproperties(\"orc.compress\"=\"NONE\")";
stmt = con.createStatement();
LOGGER.info("HIVE: Running Create Table Statement: {}", createDdl);
stmt.execute(createDdl);

// Issue a describe on the new table and display the output
LOGGER.info("HIVE: Validating Table was Created: ");
ResultSet resultSet = stmt.executeQuery("DESCRIBE FORMATTED " +
configuration.getString(ConfigVars.HIVE_TEST_TABLE_NAME_KEY));
int count = 0;
while (resultSet.next()) {
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
System.out.print(resultSet.getString(i));
}
System.out.println();
count++;
}
assertEquals(33, count);

// Drop the table
dropDdl = "DROP TABLE " + configuration.getString(ConfigVars.HIVE_TEST_DATABASE_NAME_KEY) + "." +
configuration.getString(ConfigVars.HIVE_TEST_TABLE_NAME_KEY);
stmt = con.createStatement();
LOGGER.info("HIVE: Running Drop Table Statement: {}", dropDdl);
stmt.execute(dropDdl);
}

}

0 comments on commit bc37034

Please sign in to comment.