diff --git a/pom.xml b/pom.xml index 9fd187df..661c164a 100755 --- a/pom.xml +++ b/pom.xml @@ -35,6 +35,12 @@ + + com.github.sakserv + hadoop-mini-clusters-kafka + 0.1.3 + + com.github.sakserv hadoop-mini-clusters-hivemetastore diff --git a/src/main/java/fr/jetoile/sample/component/KafkaBootstrap.java b/src/main/java/fr/jetoile/sample/component/KafkaBootstrap.java new file mode 100644 index 00000000..fda6c29e --- /dev/null +++ b/src/main/java/fr/jetoile/sample/component/KafkaBootstrap.java @@ -0,0 +1,99 @@ +package fr.jetoile.sample.component; + +import com.github.sakserv.minicluster.config.ConfigVars; +import com.github.sakserv.minicluster.impl.HdfsLocalCluster; +import com.github.sakserv.minicluster.impl.KafkaLocalBroker; +import fr.jetoile.sample.BootstrapException; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public enum KafkaBootstrap implements Bootstrap { + INSTANCE; + + final private Logger LOGGER = LoggerFactory.getLogger(KafkaBootstrap.class); + + private KafkaLocalBroker kafkaLocalCluster; + + private Configuration configuration; + private String zookeeperConnectionString; + private String host; + private int port; + private int brokerId; + private String tmpDirectory; + + + KafkaBootstrap() { + if (kafkaLocalCluster == null) { + try { + loadConfig(); + } catch (BootstrapException e) { + LOGGER.error("unable to load configuration", e); + } + init(); + build(); + } + } + + private void init() { + + } + + private void build() { + kafkaLocalCluster = new KafkaLocalBroker.Builder() + .setKafkaHostname(host) + .setKafkaPort(port) + .setKafkaBrokerId(brokerId) + .setKafkaProperties(new Properties()) + .setKafkaTempDir(tmpDirectory) + .setZookeeperConnectionString(zookeeperConnectionString) + .build(); + } + + private void loadConfig() throws BootstrapException { + try { + configuration = new PropertiesConfiguration("default.properties"); + } catch (ConfigurationException e) { + throw new BootstrapException("bad config", e); + } + host = configuration.getString(ConfigVars.KAFKA_HOSTNAME_KEY); + port = configuration.getInt(ConfigVars.KAFKA_PORT_KEY); + brokerId = configuration.getInt(ConfigVars.KAFKA_TEST_BROKER_ID_KEY); + tmpDirectory = configuration.getString(ConfigVars.KAFKA_TEST_TEMP_DIR_KEY); + zookeeperConnectionString = configuration.getString(ConfigVars.ZOOKEEPER_HOST_KEY) + ":" + configuration.getInt(ConfigVars.ZOOKEEPER_PORT_KEY); + + } + + @Override + public Bootstrap start() { + try { + kafkaLocalCluster.start(); + } catch (Exception e) { + LOGGER.error("unable to start kafka", e); + } + return this; + } + + @Override + public Bootstrap stop() { + try { + kafkaLocalCluster.stop(true); + } catch (Exception e) { + LOGGER.error("unable to stop kafka", e); + } + return this; + + } + + @Override + public org.apache.hadoop.conf.Configuration getConfiguration() { + throw new UnsupportedOperationException("the method getConfiguration can not be called on KafkaBootstrap"); + } + +} diff --git a/src/test/java/fr/jetoile/sample/component/KafkaBootstrapTest.java b/src/test/java/fr/jetoile/sample/component/KafkaBootstrapTest.java new file mode 100644 index 00000000..b6279cbb --- /dev/null +++ b/src/test/java/fr/jetoile/sample/component/KafkaBootstrapTest.java @@ -0,0 +1,83 @@ +package fr.jetoile.sample.component; + + +import com.github.sakserv.minicluster.config.ConfigVars; +import fr.jetoile.sample.BootstrapException; +import fr.jetoile.sample.kafka.consumer.KafkaTestConsumer; +import fr.jetoile.sample.kafka.producer.KafkaTestProducer; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.*; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class KafkaBootstrapTest { + static private Logger LOGGER = LoggerFactory.getLogger(KafkaBootstrapTest.class); + + static private Bootstrap zookeeper; + static private Bootstrap kafka; + static private Configuration configuration; + + + @BeforeClass + public static void setup() throws Exception { + try { + configuration = new PropertiesConfiguration("default.properties"); + } catch (ConfigurationException e) { + throw new BootstrapException("bad config", e); + } + + zookeeper = ZookeeperBootstrap.INSTANCE.start(); + kafka = KafkaBootstrap.INSTANCE.start(); + } + + @AfterClass + public static void tearDown() throws Exception { + kafka.stop(); + zookeeper.stop(); + } + + + @Test + public void kafkaShouldStart() throws Exception { + + // Producer + KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder() + .setKafkaHostname(configuration.getString(ConfigVars.KAFKA_HOSTNAME_KEY)) + .setKafkaPort(configuration.getInt(ConfigVars.KAFKA_PORT_KEY)) + .setTopic(configuration.getString(ConfigVars.KAFKA_TEST_TOPIC_KEY)) + .setMessageCount(configuration.getInt(ConfigVars.KAFKA_TEST_MESSAGE_COUNT_KEY)) + .build(); + kafkaTestProducer.produceMessages(); + + + // Consumer + List seeds = new ArrayList(); + seeds.add(configuration.getString(ConfigVars.KAFKA_HOSTNAME_KEY)); + KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer(); + kafkaTestConsumer.consumeMessages( + configuration.getInt(ConfigVars.KAFKA_TEST_MESSAGE_COUNT_KEY), + configuration.getString(ConfigVars.KAFKA_TEST_TOPIC_KEY), + 0, + seeds, + configuration.getInt(ConfigVars.KAFKA_PORT_KEY)); + + // Assert num of messages produced = num of message consumed + Assert.assertEquals(configuration.getLong(ConfigVars.KAFKA_TEST_MESSAGE_COUNT_KEY), + kafkaTestConsumer.getNumRead()); + } +} diff --git a/src/test/java/fr/jetoile/sample/kafka/consumer/KafkaTestConsumer.java b/src/test/java/fr/jetoile/sample/kafka/consumer/KafkaTestConsumer.java new file mode 100644 index 00000000..40c757be --- /dev/null +++ b/src/test/java/fr/jetoile/sample/kafka/consumer/KafkaTestConsumer.java @@ -0,0 +1,178 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package fr.jetoile.sample.kafka.consumer; + +import kafka.api.FetchRequest; +import kafka.api.FetchRequestBuilder; +import kafka.common.ErrorMapping; +import kafka.javaapi.*; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.message.MessageAndOffset; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class KafkaTestConsumer { + + // Logger + private static final Logger LOG = LoggerFactory.getLogger(KafkaTestConsumer.class); + + long numRead = 0; + private List m_replicaBrokers = new ArrayList(); + + public void consumeMessages(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws Exception { + // find the meta data about the topic and partition we are interested in + // + PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); + if (metadata == null) { + LOG.info("Can't find metadata for Topic and Partition. Exiting"); + return; + } + if (metadata.leader() == null) { + LOG.info("Can't find Leader for Topic and Partition. Exiting"); + return; + } + String leadBroker = metadata.leader().host(); + String clientName = "Client_" + a_topic + "_" + a_partition; + + SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); + long readOffset = 0L; + + int numErrors = 0; + while (a_maxReads > 0) { + if (consumer == null) { + consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); + } + FetchRequest req = new FetchRequestBuilder() + .clientId(clientName) + .addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka + .build(); + FetchResponse fetchResponse = consumer.fetch(req); + + if (fetchResponse.hasError()) { + numErrors++; + // Something went wrong! + short code = fetchResponse.errorCode(a_topic, a_partition); + LOG.info("Error fetching data from the Broker: {} Reason: {}", leadBroker, code); + if (numErrors > 5) break; + if (code == ErrorMapping.OffsetOutOfRangeCode()) { + // We asked for an invalid offset. For simple case ask for the last element to reset + readOffset = 0L; + continue; + } + consumer.close(); + consumer = null; + leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); + continue; + } + numErrors = 0; + + for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { + long currentOffset = messageAndOffset.offset(); + if (currentOffset < readOffset) { + LOG.info("Found an old offset: {} Expecting: {}", currentOffset, readOffset); + continue; + } + readOffset = messageAndOffset.nextOffset(); + ByteBuffer payload = messageAndOffset.message().payload(); + + byte[] bytes = new byte[payload.limit()]; + payload.get(bytes); + LOG.info("Consumed: {}: {}", String.valueOf(messageAndOffset.offset()), new String(bytes, "UTF-8")); + numRead++; + a_maxReads--; + } + + if (numRead == 0) { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + } + } + } + if (consumer != null) consumer.close(); + } + + private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { + for (int i = 0; i < 3; i++) { + boolean goToSleep = false; + PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); + if (metadata == null) { + goToSleep = true; + } else if (metadata.leader() == null) { + goToSleep = true; + } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { + // first time through if the leader hasn't changed give ZooKeeper a second to recover + // second time, assume the broker did recover before failover, or it was a non-Broker issue + // + goToSleep = true; + } else { + return metadata.leader().host(); + } + if (goToSleep) { + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + } + } + } + LOG.info("Unable to find new leader after Broker failure. Exiting"); + throw new Exception("Unable to find new leader after Broker failure. Exiting"); + } + + private PartitionMetadata findLeader(List a_seedBrokers, int a_port, String a_topic, int a_partition) { + PartitionMetadata returnMetaData = null; + loop: + for (String seed : a_seedBrokers) { + SimpleConsumer consumer = null; + try { + consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); + List topics = Collections.singletonList(a_topic); + TopicMetadataRequest req = new TopicMetadataRequest(topics); + TopicMetadataResponse resp = consumer.send(req); + + List metaData = resp.topicsMetadata(); + for (TopicMetadata item : metaData) { + for (PartitionMetadata part : item.partitionsMetadata()) { + if (part.partitionId() == a_partition) { + returnMetaData = part; + break loop; + } + } + } + } catch (Exception e) { + LOG.info("Error communicating with Broker [{}] to find Leader for [{},{}] Reason: {}", + seed, a_topic, a_partition, e); + } finally { + if (consumer != null) consumer.close(); + } + } + if (returnMetaData != null) { + m_replicaBrokers.clear(); + for (kafka.cluster.BrokerEndPoint replica : returnMetaData.replicas()) { + m_replicaBrokers.add(replica.host()); + } + } + return returnMetaData; + } + + public long getNumRead() { + return numRead; + } + +} diff --git a/src/test/java/fr/jetoile/sample/kafka/producer/KafkaTestProducer.java b/src/test/java/fr/jetoile/sample/kafka/producer/KafkaTestProducer.java new file mode 100644 index 00000000..ef16d02c --- /dev/null +++ b/src/test/java/fr/jetoile/sample/kafka/producer/KafkaTestProducer.java @@ -0,0 +1,123 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package fr.jetoile.sample.kafka.producer; + +import kafka.javaapi.producer.Producer; +import kafka.producer.KeyedMessage; +import kafka.producer.ProducerConfig; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public class KafkaTestProducer { + + // Logger + private static final Logger LOG = LoggerFactory.getLogger(KafkaTestProducer.class); + + private String kafkaHostname; + private Integer kafkaPort; + private String topic; + private Integer messageCount; + + private KafkaTestProducer(Builder builder) { + this.kafkaHostname = builder.kafkaHostname; + this.kafkaPort = builder.kafkaPort; + this.topic = builder.topic; + this.messageCount = builder.messageCount; + } + + public String getKafkaHostname() { + return kafkaHostname; + } + + public Integer getKafkaPort() { + return kafkaPort; + } + + public String getTopic() { + return topic; + } + + public Integer getMessageCount() { + return messageCount; + } + + public static class Builder { + private String kafkaHostname; + private Integer kafkaPort; + private String topic; + private Integer messageCount; + + public Builder setKafkaHostname(String kafkaHostname) { + this.kafkaHostname = kafkaHostname; + return this; + } + + public Builder setKafkaPort(Integer kafkaPort) { + this.kafkaPort = kafkaPort; + return this; + } + + public Builder setTopic(String topic) { + this.topic = topic; + return this; + } + + public Builder setMessageCount(Integer messageCount) { + this.messageCount = messageCount; + return this; + } + + public KafkaTestProducer build() { + KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(this); + return kafkaTestProducer; + } + + } + + public void produceMessages() { + Properties props = new Properties(); + props.put("metadata.broker.list", getKafkaHostname() + ":" + getKafkaPort()); + props.put("serializer.class", "kafka.serializer.StringEncoder"); + ProducerConfig config = new ProducerConfig(props); + Producer producer = new Producer(config); + + // Send 10 messages to the local kafka server: + LOG.info("KAFKA: Preparing to send {} initial messages", messageCount); + for (int i = 0; i < messageCount; i++) { + + // Create the JSON object + JSONObject obj = new JSONObject(); + try { + obj.put("id", String.valueOf(i)); + obj.put("msg", "test-message" + 1); +// obj.put("dt", GenerateRandomDay.genRandomDay()); + } catch (JSONException e) { + e.printStackTrace(); + } + String payload = obj.toString(); + + KeyedMessage data = new KeyedMessage(getTopic(), null, payload); + producer.send(data); + LOG.info("Sent message: {}", data.toString()); + } + LOG.info("KAFKA: Initial messages sent"); + + producer.close(); + } + +} diff --git a/src/test/resources/default.properties b/src/test/resources/default.properties index ec0f93a6..9ef9935a 100644 --- a/src/test/resources/default.properties +++ b/src/test/resources/default.properties @@ -48,4 +48,14 @@ hbase.wal.replication.enabled=false hbase.test.table.name=hbase_test_table hbase.test.col.family.name=cf1 hbase.test.col.qualifier.name=cq1 -hbase.test.num.rows.to.put=50 \ No newline at end of file +hbase.test.num.rows.to.put=50 + +# Kafka +kafka.hostname=localhost +kafka.port=20111 + +# Kafka Test +kafka.test.topic=testtopic +kafka.test.message.count=10 +kafka.test.broker.id=1 +kafka.test.temp.dir=embedded_kafka \ No newline at end of file