From 1e057b20c2915632c3ea1a9956d62d371f4a7816 Mon Sep 17 00:00:00 2001 From: jetoile Date: Sun, 17 Apr 2016 19:38:00 +0200 Subject: [PATCH] kafka client unit add --- .../hadoop-unit-client-kafka/pom.xml | 41 ++++++ .../test/kafka/KafkaConsumerUtils.java | 115 +++++++++++++++ .../test/kafka/KafkaProducerUtils.java | 63 ++++++++ hadoop-unit-client/pom.xml | 1 + hadoop-unit-kafka/pom.xml | 19 +++ .../component/KafkaBootstrapTest.java | 46 +++--- .../kafka/consumer/KafkaTestConsumer.java | 95 ------------ .../kafka/producer/KafkaTestProducer.java | 135 ------------------ .../IntegrationBootstrapTest.java | 42 +++--- .../ManualIntegrationBootstrapTest.java | 43 +++--- .../kafka/consumer/KafkaTestConsumer.java | 97 ------------- .../kafka/producer/KafkaTestProducer.java | 134 ----------------- .../IntegrationBootstrapTest.java | 42 +++--- .../ManualIntegrationBootstrapTest.java | 43 +++--- .../kafka/consumer/KafkaTestConsumer.java | 97 ------------- .../kafka/producer/KafkaTestProducer.java | 134 ----------------- hadoop-unit-standalone/pom.xml | 6 + pom.xml | 7 + 18 files changed, 363 insertions(+), 797 deletions(-) create mode 100644 hadoop-unit-client/hadoop-unit-client-kafka/pom.xml create mode 100644 hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaConsumerUtils.java create mode 100644 hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaProducerUtils.java delete mode 100644 hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java delete mode 100644 hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java delete mode 100644 hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java delete mode 100644 hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java delete mode 100644 hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java delete mode 100644 hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java diff --git a/hadoop-unit-client/hadoop-unit-client-kafka/pom.xml b/hadoop-unit-client/hadoop-unit-client-kafka/pom.xml new file mode 100644 index 00000000..723faa08 --- /dev/null +++ b/hadoop-unit-client/hadoop-unit-client-kafka/pom.xml @@ -0,0 +1,41 @@ + + + + hadoop-unit-client + fr.jetoile.hadoop + 1.4-SNAPSHOT + + 4.0.0 + + hadoop-unit-client-kafka + + + + fr.jetoile.hadoop + hadoop-unit-commons + + + + org.apache.kafka + kafka_2.10 + + + + commons-configuration + commons-configuration + + + + org.slf4j + slf4j-api + + + + ch.qos.logback + logback-classic + + + + \ No newline at end of file diff --git a/hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaConsumerUtils.java b/hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaConsumerUtils.java new file mode 100644 index 00000000..c1f69a10 --- /dev/null +++ b/hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaConsumerUtils.java @@ -0,0 +1,115 @@ +package fr.jetoile.hadoopunit.test.kafka; + +import fr.jetoile.hadoopunit.HadoopUnitConfig; +import fr.jetoile.hadoopunit.exception.ConfigException; +import kafka.api.FetchRequestBuilder; +import kafka.javaapi.FetchResponse; +import kafka.javaapi.consumer.SimpleConsumer; +import kafka.javaapi.message.ByteBufferMessageSet; +import kafka.message.MessageAndOffset; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UnsupportedEncodingException; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.Properties; + +public enum KafkaConsumerUtils { + INSTANCE; + + // Logger + private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerUtils.class); + + private String kafkaHostname; + private Integer kafkaPort; + + private Configuration configuration; + + long numRead = 0; + + KafkaConsumerUtils() { + try { + loadConfig(); + } catch (ConfigException e) { + System.exit(-1); + } + } + + + public void consumeMessagesWithOldApi(String topic, int nbMessageToRead) throws UnsupportedEncodingException { + SimpleConsumer simpleConsumer = new SimpleConsumer(kafkaHostname, + kafkaPort, + 30000, + 2, + "test"); + + System.out.println("Testing single fetch"); + kafka.api.FetchRequest req = new FetchRequestBuilder() + .clientId("test") + .addFetch(topic, 0, 0L, 100) + .build(); + while (numRead != nbMessageToRead) { + FetchResponse fetchResponse = simpleConsumer.fetch(req); + printMessages(fetchResponse.messageSet(topic, 0)); + numRead++; + } + } + + + public void consumeMessagesWithNewApi(String topic, int nbMessageToRead) { + Properties props = new Properties(); + props.put("bootstrap.servers", kafkaHostname + ":" + kafkaPort); + props.put("group.id", "test"); + props.put("auto.offset.reset", "earliest"); + props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); + + KafkaConsumer consumer = new KafkaConsumer<>(props); + consumer.subscribe(Arrays.asList(topic)); + + try { + while (numRead != nbMessageToRead) { + ConsumerRecords records = consumer.poll(1000); + for (ConsumerRecord record : records) { + numRead++; +// LOG.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value()); + LOG.debug(record.value()); + } + } + } finally { + consumer.close(); + } + } + + public long getNumRead() { + return numRead; + } + + private void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException { + for (MessageAndOffset messageAndOffset : messageSet) { + ByteBuffer payload = messageAndOffset.message().payload(); + byte[] bytes = new byte[payload.limit()]; + payload.get(bytes); + LOG.debug(new String(bytes, "UTF-8")); + } + } + + private void loadConfig() throws ConfigException { + try { + configuration = new PropertiesConfiguration(HadoopUnitConfig.DEFAULT_PROPS_FILE); + } catch (ConfigurationException e) { + throw new ConfigException("bad config", e); + } + + kafkaHostname = configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY); + kafkaPort = configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY); + } + +} diff --git a/hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaProducerUtils.java b/hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaProducerUtils.java new file mode 100644 index 00000000..4fab53ee --- /dev/null +++ b/hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaProducerUtils.java @@ -0,0 +1,63 @@ +package fr.jetoile.hadoopunit.test.kafka; + +import fr.jetoile.hadoopunit.HadoopUnitConfig; +import fr.jetoile.hadoopunit.exception.ConfigException; +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Properties; + +public enum KafkaProducerUtils { + INSTANCE; + + // Logger + private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerUtils.class); + + private String kafkaHostname; + private Integer kafkaPort; + private Properties props; + + private Configuration configuration; + + KafkaProducerUtils() { + try { + loadConfig(); + props = new Properties(); + props.put("bootstrap.servers", kafkaHostname + ":" + kafkaPort); + props.put("acks", "all"); + props.put("retries", 0); + props.put("batch.size", 10); + props.put("linger.ms", 1); + props.put("buffer.memory", 33554432); + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + + } catch (ConfigException e) { + System.exit(-1); + } + } + + public void produceMessages(String topic, String key, String message) { + KafkaProducer producer = new KafkaProducer<>(props); + producer.send(new ProducerRecord<>(topic, key, message)); + producer.close(); + } + + private void loadConfig() throws ConfigException { + try { + configuration = new PropertiesConfiguration(HadoopUnitConfig.DEFAULT_PROPS_FILE); + } catch (ConfigurationException e) { + throw new ConfigException("bad config", e); + } + + kafkaHostname = configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY); + kafkaPort = configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY); + } + +} diff --git a/hadoop-unit-client/pom.xml b/hadoop-unit-client/pom.xml index fd81d3c7..76a5d77a 100755 --- a/hadoop-unit-client/pom.xml +++ b/hadoop-unit-client/pom.xml @@ -17,6 +17,7 @@ hadoop-unit-client-hdfs hadoop-unit-client-solrcloud hadoop-unit-client-spark + hadoop-unit-client-kafka diff --git a/hadoop-unit-kafka/pom.xml b/hadoop-unit-kafka/pom.xml index 4cacb096..33552cc1 100644 --- a/hadoop-unit-kafka/pom.xml +++ b/hadoop-unit-kafka/pom.xml @@ -1,4 +1,18 @@ + + @@ -33,5 +47,10 @@ org.apache.kafka kafka_2.10 + + + fr.jetoile.hadoop + hadoop-unit-client-kafka + \ No newline at end of file diff --git a/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/component/KafkaBootstrapTest.java b/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/component/KafkaBootstrapTest.java index 21e699d0..21f918d9 100644 --- a/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/component/KafkaBootstrapTest.java +++ b/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/component/KafkaBootstrapTest.java @@ -18,18 +18,17 @@ import fr.jetoile.hadoopunit.HadoopUnitConfig; import fr.jetoile.hadoopunit.HadoopBootstrap; import fr.jetoile.hadoopunit.exception.BootstrapException; -import fr.jetoile.hadoopunit.kafka.consumer.KafkaTestConsumer; -import fr.jetoile.hadoopunit.kafka.producer.KafkaTestProducer; +import fr.jetoile.hadoopunit.test.kafka.KafkaConsumerUtils; +import fr.jetoile.hadoopunit.test.kafka.KafkaProducerUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; - import static org.junit.Assert.assertEquals; public class KafkaBootstrapTest { @@ -61,28 +60,29 @@ public static void tearDown() throws Exception { public void kafkaShouldStart() throws Exception { // Producer - KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder() - .setKafkaHostname(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY)) - .setKafkaPort(configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY)) - .setTopic(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY)) - .setMessageCount(configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY)) - .build(); - kafkaTestProducer.produceMessages(); + for (int i = 0; i < 10; i++) { + String payload = generateMessage(i); + KafkaProducerUtils.INSTANCE.produceMessages(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), String.valueOf(i), payload); + } + // Consumer - List seeds = new ArrayList(); - seeds.add(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY)); - KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer(); - kafkaTestConsumer.consumeMessages2( - configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), - configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), - 0, - seeds, - configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY)); +// KafkaConsumerUtils.INSTANCE.consumeMessagesWithOldApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10); + KafkaConsumerUtils.INSTANCE.consumeMessagesWithNewApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10); // Assert num of messages produced = num of message consumed - Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), - kafkaTestConsumer.getNumRead()); + Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), KafkaConsumerUtils.INSTANCE.getNumRead()); + } + + private String generateMessage(int i) { + JSONObject obj = new JSONObject(); + try { + obj.put("id", String.valueOf(i)); + obj.put("msg", "test-message" + 1); + } catch (JSONException e) { + e.printStackTrace(); + } + return obj.toString(); } } diff --git a/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java b/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java deleted file mode 100644 index 8e736095..00000000 --- a/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package fr.jetoile.hadoopunit.kafka.consumer; - -import kafka.api.FetchRequestBuilder; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.MessageAndOffset; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.util.*; - -public class KafkaTestConsumer { - - // Logger - private static final Logger LOG = LoggerFactory.getLogger(KafkaTestConsumer.class); - - long numRead = 0; - private List m_replicaBrokers = new ArrayList(); - - - public void consumeMessages(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws Exception { - Properties props = new Properties(); - props.put("bootstrap.servers", a_seedBrokers.get(0) + ":" + a_port); -// props.put("metadata.broker.list", a_seedBrokers.get(0) + ":" + a_port); - props.put("group.id", "test"); - props.put("enable.auto.commit", "true"); - props.put("auto.commit.interval.ms", "100"); - props.put("session.timeout.ms", "30000"); - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - KafkaConsumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Arrays.asList(a_topic)); - while (numRead == 0) { - ConsumerRecords records = consumer.poll(100); - for (ConsumerRecord record : records) { - System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); - numRead++; - } - } - } - - - public void consumeMessages2(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws UnsupportedEncodingException { - SimpleConsumer simpleConsumer = new SimpleConsumer(a_seedBrokers.get(0), - a_port, - 30000, - 2, - "test"); - - System.out.println("Testing single fetch"); - kafka.api.FetchRequest req = new FetchRequestBuilder() - .clientId("test") - .addFetch(a_topic, 0, 0L, 100) - .build(); - while (numRead != 10) { - FetchResponse fetchResponse = simpleConsumer.fetch(req); - printMessages(fetchResponse.messageSet(a_topic, 0)); - numRead++; - } - } - - public long getNumRead() { - return numRead; - } - - private void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException { - for(MessageAndOffset messageAndOffset: messageSet) { - ByteBuffer payload = messageAndOffset.message().payload(); - byte[] bytes = new byte[payload.limit()]; - payload.get(bytes); - System.out.println(new String(bytes, "UTF-8")); - } - } - -} diff --git a/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java b/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java deleted file mode 100644 index 2229d33c..00000000 --- a/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package fr.jetoile.hadoopunit.kafka.producer; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Properties; - -public class KafkaTestProducer { - - // Logger - private static final Logger LOG = LoggerFactory.getLogger(KafkaTestProducer.class); - - private String kafkaHostname; - private Integer kafkaPort; - private String topic; - private Integer messageCount; - - private KafkaTestProducer(Builder builder) { - this.kafkaHostname = builder.kafkaHostname; - this.kafkaPort = builder.kafkaPort; - this.topic = builder.topic; - this.messageCount = builder.messageCount; - } - - public String getKafkaHostname() { - return kafkaHostname; - } - - public Integer getKafkaPort() { - return kafkaPort; - } - - public String getTopic() { - return topic; - } - - public Integer getMessageCount() { - return messageCount; - } - - public static class Builder { - private String kafkaHostname; - private Integer kafkaPort; - private String topic; - private Integer messageCount; - - public Builder setKafkaHostname(String kafkaHostname) { - this.kafkaHostname = kafkaHostname; - return this; - } - - public Builder setKafkaPort(Integer kafkaPort) { - this.kafkaPort = kafkaPort; - return this; - } - - public Builder setTopic(String topic) { - this.topic = topic; - return this; - } - - public Builder setMessageCount(Integer messageCount) { - this.messageCount = messageCount; - return this; - } - - public KafkaTestProducer build() { - KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(this); - return kafkaTestProducer; - } - - } - - public void produceMessages() { - Properties props = new Properties(); - props.put("bootstrap.servers", getKafkaHostname() + ":" + getKafkaPort()); - props.put("acks", "all"); - props.put("retries", 0); - props.put("batch.size", 10); - props.put("linger.ms", 1); - props.put("buffer.memory", 33554432); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - Producer producer = new KafkaProducer(props); -// props.put("metadata.broker.list", getKafkaHostname() + ":" + getKafkaPort()); -// props.put("serializer.class", "kafka.serializer.StringEncoder"); -// ProducerConfig config = new ProducerConfig(props); -// Producer fr.jetoile.hadoop.kafka.producer = new Producer(config); - - // Send 10 messages to the local kafka server: - LOG.info("KAFKA: Preparing to send {} initial messages", messageCount); - for (int i = 0; i < messageCount; i++) { - - // Create the JSON object - JSONObject obj = new JSONObject(); - try { - obj.put("id", String.valueOf(i)); - obj.put("msg", "test-message" + 1); -// obj.put("dt", GenerateRandomDay.genRandomDay()); - } catch (JSONException e) { - e.printStackTrace(); - } - String payload = obj.toString(); - -// KeyedMessage data = new KeyedMessage(getTopic(), null, payload); -// fr.jetoile.hadoop.kafka.producer.send(data); - producer.send(new ProducerRecord<>(getTopic(), String.valueOf(i), obj.toString())); - LOG.info("Sent message: {}", obj.toString()); - } - LOG.info("KAFKA: Initial messages sent"); - - producer.close(); - } - -} diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java b/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java index aeb9bacf..ff903192 100644 --- a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java +++ b/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java @@ -24,9 +24,9 @@ import fr.jetoile.hadoopunit.component.OozieBootstrap; import fr.jetoile.hadoopunit.exception.BootstrapException; import fr.jetoile.hadoopunit.exception.NotFoundServiceException; -import fr.jetoile.hadoopunit.kafka.consumer.KafkaTestConsumer; -import fr.jetoile.hadoopunit.kafka.producer.KafkaTestProducer; import fr.jetoile.hadoopunit.test.hdfs.HdfsUtils; +import fr.jetoile.hadoopunit.test.kafka.KafkaConsumerUtils; +import fr.jetoile.hadoopunit.test.kafka.KafkaProducerUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; @@ -44,6 +44,8 @@ import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowJob; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; @@ -93,29 +95,29 @@ public static void tearDown() throws BootstrapException { public void kafkaShouldStart() throws Exception { // Producer - KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder() - .setKafkaHostname(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY)) - .setKafkaPort(configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY)) - .setTopic(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY)) - .setMessageCount(configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY)) - .build(); - kafkaTestProducer.produceMessages(); + for (int i = 0; i < 10; i++) { + String payload = generateMessage(i); + KafkaProducerUtils.INSTANCE.produceMessages(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), String.valueOf(i), payload); + } + // Consumer - List seeds = new ArrayList(); - seeds.add(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY)); - KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer(); - kafkaTestConsumer.consumeMessages2( - configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), - configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), - 0, - seeds, - configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY)); + KafkaConsumerUtils.INSTANCE.consumeMessagesWithNewApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10); // Assert num of messages produced = num of message consumed - Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), - kafkaTestConsumer.getNumRead()); + Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), KafkaConsumerUtils.INSTANCE.getNumRead()); + } + + private String generateMessage(int i) { + JSONObject obj = new JSONObject(); + try { + obj.put("id", String.valueOf(i)); + obj.put("msg", "test-message" + 1); + } catch (JSONException e) { + e.printStackTrace(); + } + return obj.toString(); } @Test diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java b/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java index 1bbc1750..e945b54f 100644 --- a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java +++ b/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java @@ -22,9 +22,9 @@ import fr.jetoile.hadoopunit.component.OozieBootstrap; import fr.jetoile.hadoopunit.exception.BootstrapException; import fr.jetoile.hadoopunit.exception.NotFoundServiceException; -import fr.jetoile.hadoopunit.kafka.consumer.KafkaTestConsumer; -import fr.jetoile.hadoopunit.kafka.producer.KafkaTestProducer; import fr.jetoile.hadoopunit.test.hdfs.HdfsUtils; +import fr.jetoile.hadoopunit.test.kafka.KafkaConsumerUtils; +import fr.jetoile.hadoopunit.test.kafka.KafkaProducerUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; @@ -42,6 +42,8 @@ import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.WorkflowJob; import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.client.Client; @@ -56,7 +58,6 @@ import java.net.*; import java.sql.Connection; import java.sql.*; -import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -90,29 +91,29 @@ public static void tearDown() throws BootstrapException { public void kafkaShouldStart() throws Exception { // Producer - KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder() - .setKafkaHostname(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY)) - .setKafkaPort(configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY)) - .setTopic(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY)) - .setMessageCount(configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY)) - .build(); - kafkaTestProducer.produceMessages(); + for (int i = 0; i < 10; i++) { + String payload = generateMessage(i); + KafkaProducerUtils.INSTANCE.produceMessages(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), String.valueOf(i), payload); + } + // Consumer - List seeds = new ArrayList(); - seeds.add(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY)); - KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer(); - kafkaTestConsumer.consumeMessages2( - configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), - configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), - 0, - seeds, - configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY)); + KafkaConsumerUtils.INSTANCE.consumeMessagesWithNewApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10); // Assert num of messages produced = num of message consumed - Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), - kafkaTestConsumer.getNumRead()); + Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), KafkaConsumerUtils.INSTANCE.getNumRead()); + } + + private String generateMessage(int i) { + JSONObject obj = new JSONObject(); + try { + obj.put("id", String.valueOf(i)); + obj.put("msg", "test-message" + 1); + } catch (JSONException e) { + e.printStackTrace(); + } + return obj.toString(); } @Test diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java b/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java deleted file mode 100644 index 5d787666..00000000 --- a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package fr.jetoile.hadoopunit.kafka.consumer; - -import kafka.api.FetchRequestBuilder; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.MessageAndOffset; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - -public class KafkaTestConsumer { - - // Logger - private static final Logger LOG = LoggerFactory.getLogger(KafkaTestConsumer.class); - - long numRead = 0; - private List m_replicaBrokers = new ArrayList(); - - - public void consumeMessages(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws Exception { - Properties props = new Properties(); - props.put("bootstrap.servers", a_seedBrokers.get(0) + ":" + a_port); -// props.put("metadata.broker.list", a_seedBrokers.get(0) + ":" + a_port); - props.put("group.id", "test"); - props.put("enable.auto.commit", "true"); - props.put("auto.commit.interval.ms", "100"); - props.put("session.timeout.ms", "30000"); - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - KafkaConsumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Arrays.asList(a_topic)); - while (numRead == 0) { - ConsumerRecords records = consumer.poll(100); - for (ConsumerRecord record : records) { - System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); - numRead++; - } - } - } - - - public void consumeMessages2(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws UnsupportedEncodingException { - SimpleConsumer simpleConsumer = new SimpleConsumer(a_seedBrokers.get(0), - a_port, - 30000, - 2, - "test"); - - System.out.println("Testing single fetch"); - kafka.api.FetchRequest req = new FetchRequestBuilder() - .clientId("test") - .addFetch(a_topic, 0, 0L, 100) - .build(); - while (numRead != 10) { - FetchResponse fetchResponse = simpleConsumer.fetch(req); - printMessages(fetchResponse.messageSet(a_topic, 0)); - numRead++; - } - } - - public long getNumRead() { - return numRead; - } - - private void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException { - for(MessageAndOffset messageAndOffset: messageSet) { - ByteBuffer payload = messageAndOffset.message().payload(); - byte[] bytes = new byte[payload.limit()]; - payload.get(bytes); - System.out.println(new String(bytes, "UTF-8")); - } - } - -} diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java b/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java deleted file mode 100644 index 1998fa16..00000000 --- a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package fr.jetoile.hadoopunit.kafka.producer; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Properties; - -public class KafkaTestProducer { - - // Logger - private static final Logger LOG = LoggerFactory.getLogger(KafkaTestProducer.class); - - private String kafkaHostname; - private Integer kafkaPort; - private String topic; - private Integer messageCount; - - private KafkaTestProducer(Builder builder) { - this.kafkaHostname = builder.kafkaHostname; - this.kafkaPort = builder.kafkaPort; - this.topic = builder.topic; - this.messageCount = builder.messageCount; - } - - public String getKafkaHostname() { - return kafkaHostname; - } - - public Integer getKafkaPort() { - return kafkaPort; - } - - public String getTopic() { - return topic; - } - - public Integer getMessageCount() { - return messageCount; - } - - public static class Builder { - private String kafkaHostname; - private Integer kafkaPort; - private String topic; - private Integer messageCount; - - public Builder setKafkaHostname(String kafkaHostname) { - this.kafkaHostname = kafkaHostname; - return this; - } - - public Builder setKafkaPort(Integer kafkaPort) { - this.kafkaPort = kafkaPort; - return this; - } - - public Builder setTopic(String topic) { - this.topic = topic; - return this; - } - - public Builder setMessageCount(Integer messageCount) { - this.messageCount = messageCount; - return this; - } - - public KafkaTestProducer build() { - KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(this); - return kafkaTestProducer; - } - - } - - public void produceMessages() { - Properties props = new Properties(); - props.put("bootstrap.servers", getKafkaHostname() + ":" + getKafkaPort()); - props.put("acks", "all"); - props.put("retries", 0); - props.put("batch.size", 10); - props.put("linger.ms", 1); - props.put("buffer.memory", 33554432); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - Producer producer = new KafkaProducer(props); -// props.put("metadata.broker.list", getKafkaHostname() + ":" + getKafkaPort()); -// props.put("serializer.class", "kafka.serializer.StringEncoder"); -// ProducerConfig config = new ProducerConfig(props); -// Producer fr.jetoile.hadoop.kafka.producer = new Producer(config); - - // Send 10 messages to the local kafka server: - LOG.info("KAFKA: Preparing to send {} initial messages", messageCount); - for (int i = 0; i < messageCount; i++) { - - // Create the JSON object - JSONObject obj = new JSONObject(); - try { - obj.put("id", String.valueOf(i)); - obj.put("msg", "test-message" + 1); -// obj.put("dt", GenerateRandomDay.genRandomDay()); - } catch (JSONException e) { - e.printStackTrace(); - } - String payload = obj.toString(); - -// KeyedMessage data = new KeyedMessage(getTopic(), null, payload); -// fr.jetoile.hadoop.kafka.producer.send(data); - producer.send(new ProducerRecord<>(getTopic(), String.valueOf(i), obj.toString())); - LOG.info("Sent message: {}", obj.toString()); - } - LOG.info("KAFKA: Initial messages sent"); - - producer.close(); - } - -} diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java b/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java index 4bbbd7d5..d86b3533 100644 --- a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java +++ b/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java @@ -25,9 +25,9 @@ import fr.jetoile.hadoopunit.component.SolrCloudBootstrap; import fr.jetoile.hadoopunit.exception.BootstrapException; import fr.jetoile.hadoopunit.exception.NotFoundServiceException; -import fr.jetoile.hadoopunit.kafka.consumer.KafkaTestConsumer; -import fr.jetoile.hadoopunit.kafka.producer.KafkaTestProducer; import fr.jetoile.hadoopunit.test.hdfs.HdfsUtils; +import fr.jetoile.hadoopunit.test.kafka.KafkaConsumerUtils; +import fr.jetoile.hadoopunit.test.kafka.KafkaProducerUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; @@ -49,6 +49,8 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; import org.apache.zookeeper.KeeperException; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,29 +122,29 @@ public void solrCloudShouldStart() throws IOException, SolrServerException, Keep public void kafkaShouldStart() throws Exception { // Producer - KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder() - .setKafkaHostname(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY)) - .setKafkaPort(configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY)) - .setTopic(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY)) - .setMessageCount(configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY)) - .build(); - kafkaTestProducer.produceMessages(); + for (int i = 0; i < 10; i++) { + String payload = generateMessage(i); + KafkaProducerUtils.INSTANCE.produceMessages(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), String.valueOf(i), payload); + } + // Consumer - List seeds = new ArrayList(); - seeds.add(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY)); - KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer(); - kafkaTestConsumer.consumeMessages2( - configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), - configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), - 0, - seeds, - configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY)); + KafkaConsumerUtils.INSTANCE.consumeMessagesWithNewApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10); // Assert num of messages produced = num of message consumed - Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), - kafkaTestConsumer.getNumRead()); + Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), KafkaConsumerUtils.INSTANCE.getNumRead()); + } + + private String generateMessage(int i) { + JSONObject obj = new JSONObject(); + try { + obj.put("id", String.valueOf(i)); + obj.put("msg", "test-message" + 1); + } catch (JSONException e) { + e.printStackTrace(); + } + return obj.toString(); } @Test diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java b/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java index 7262fce8..51803a98 100644 --- a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java +++ b/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java @@ -23,9 +23,9 @@ import fr.jetoile.hadoopunit.component.SolrCloudBootstrap; import fr.jetoile.hadoopunit.exception.BootstrapException; import fr.jetoile.hadoopunit.exception.NotFoundServiceException; -import fr.jetoile.hadoopunit.kafka.consumer.KafkaTestConsumer; -import fr.jetoile.hadoopunit.kafka.producer.KafkaTestProducer; import fr.jetoile.hadoopunit.test.hdfs.HdfsUtils; +import fr.jetoile.hadoopunit.test.kafka.KafkaConsumerUtils; +import fr.jetoile.hadoopunit.test.kafka.KafkaProducerUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.ConfigurationException; import org.apache.commons.configuration.PropertiesConfiguration; @@ -47,6 +47,8 @@ import org.apache.solr.common.SolrDocument; import org.apache.solr.common.SolrInputDocument; import org.apache.zookeeper.KeeperException; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; import org.junit.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,7 +60,6 @@ import java.net.UnknownHostException; import java.sql.Connection; import java.sql.*; -import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -120,29 +121,29 @@ public void solrCloudShouldStart() throws IOException, SolrServerException, Keep public void kafkaShouldStart() throws Exception { // Producer - KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder() - .setKafkaHostname(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY)) - .setKafkaPort(configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY)) - .setTopic(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY)) - .setMessageCount(configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY)) - .build(); - kafkaTestProducer.produceMessages(); + for (int i = 0; i < 10; i++) { + String payload = generateMessage(i); + KafkaProducerUtils.INSTANCE.produceMessages(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), String.valueOf(i), payload); + } + // Consumer - List seeds = new ArrayList(); - seeds.add(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY)); - KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer(); - kafkaTestConsumer.consumeMessages2( - configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), - configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), - 0, - seeds, - configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY)); + KafkaConsumerUtils.INSTANCE.consumeMessagesWithNewApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10); // Assert num of messages produced = num of message consumed - Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), - kafkaTestConsumer.getNumRead()); + Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), KafkaConsumerUtils.INSTANCE.getNumRead()); + } + + private String generateMessage(int i) { + JSONObject obj = new JSONObject(); + try { + obj.put("id", String.valueOf(i)); + obj.put("msg", "test-message" + 1); + } catch (JSONException e) { + e.printStackTrace(); + } + return obj.toString(); } @Test diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java b/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java deleted file mode 100644 index 5d787666..00000000 --- a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package fr.jetoile.hadoopunit.kafka.consumer; - -import kafka.api.FetchRequestBuilder; -import kafka.javaapi.FetchResponse; -import kafka.javaapi.consumer.SimpleConsumer; -import kafka.javaapi.message.ByteBufferMessageSet; -import kafka.message.MessageAndOffset; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.KafkaConsumer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.UnsupportedEncodingException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Properties; - -public class KafkaTestConsumer { - - // Logger - private static final Logger LOG = LoggerFactory.getLogger(KafkaTestConsumer.class); - - long numRead = 0; - private List m_replicaBrokers = new ArrayList(); - - - public void consumeMessages(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws Exception { - Properties props = new Properties(); - props.put("bootstrap.servers", a_seedBrokers.get(0) + ":" + a_port); -// props.put("metadata.broker.list", a_seedBrokers.get(0) + ":" + a_port); - props.put("group.id", "test"); - props.put("enable.auto.commit", "true"); - props.put("auto.commit.interval.ms", "100"); - props.put("session.timeout.ms", "30000"); - props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); - KafkaConsumer consumer = new KafkaConsumer<>(props); - consumer.subscribe(Arrays.asList(a_topic)); - while (numRead == 0) { - ConsumerRecords records = consumer.poll(100); - for (ConsumerRecord record : records) { - System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value()); - numRead++; - } - } - } - - - public void consumeMessages2(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws UnsupportedEncodingException { - SimpleConsumer simpleConsumer = new SimpleConsumer(a_seedBrokers.get(0), - a_port, - 30000, - 2, - "test"); - - System.out.println("Testing single fetch"); - kafka.api.FetchRequest req = new FetchRequestBuilder() - .clientId("test") - .addFetch(a_topic, 0, 0L, 100) - .build(); - while (numRead != 10) { - FetchResponse fetchResponse = simpleConsumer.fetch(req); - printMessages(fetchResponse.messageSet(a_topic, 0)); - numRead++; - } - } - - public long getNumRead() { - return numRead; - } - - private void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException { - for(MessageAndOffset messageAndOffset: messageSet) { - ByteBuffer payload = messageAndOffset.message().payload(); - byte[] bytes = new byte[payload.limit()]; - payload.get(bytes); - System.out.println(new String(bytes, "UTF-8")); - } - } - -} diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java b/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java deleted file mode 100644 index 1998fa16..00000000 --- a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package fr.jetoile.hadoopunit.kafka.producer; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.Producer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Properties; - -public class KafkaTestProducer { - - // Logger - private static final Logger LOG = LoggerFactory.getLogger(KafkaTestProducer.class); - - private String kafkaHostname; - private Integer kafkaPort; - private String topic; - private Integer messageCount; - - private KafkaTestProducer(Builder builder) { - this.kafkaHostname = builder.kafkaHostname; - this.kafkaPort = builder.kafkaPort; - this.topic = builder.topic; - this.messageCount = builder.messageCount; - } - - public String getKafkaHostname() { - return kafkaHostname; - } - - public Integer getKafkaPort() { - return kafkaPort; - } - - public String getTopic() { - return topic; - } - - public Integer getMessageCount() { - return messageCount; - } - - public static class Builder { - private String kafkaHostname; - private Integer kafkaPort; - private String topic; - private Integer messageCount; - - public Builder setKafkaHostname(String kafkaHostname) { - this.kafkaHostname = kafkaHostname; - return this; - } - - public Builder setKafkaPort(Integer kafkaPort) { - this.kafkaPort = kafkaPort; - return this; - } - - public Builder setTopic(String topic) { - this.topic = topic; - return this; - } - - public Builder setMessageCount(Integer messageCount) { - this.messageCount = messageCount; - return this; - } - - public KafkaTestProducer build() { - KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(this); - return kafkaTestProducer; - } - - } - - public void produceMessages() { - Properties props = new Properties(); - props.put("bootstrap.servers", getKafkaHostname() + ":" + getKafkaPort()); - props.put("acks", "all"); - props.put("retries", 0); - props.put("batch.size", 10); - props.put("linger.ms", 1); - props.put("buffer.memory", 33554432); - props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); - - Producer producer = new KafkaProducer(props); -// props.put("metadata.broker.list", getKafkaHostname() + ":" + getKafkaPort()); -// props.put("serializer.class", "kafka.serializer.StringEncoder"); -// ProducerConfig config = new ProducerConfig(props); -// Producer fr.jetoile.hadoop.kafka.producer = new Producer(config); - - // Send 10 messages to the local kafka server: - LOG.info("KAFKA: Preparing to send {} initial messages", messageCount); - for (int i = 0; i < messageCount; i++) { - - // Create the JSON object - JSONObject obj = new JSONObject(); - try { - obj.put("id", String.valueOf(i)); - obj.put("msg", "test-message" + 1); -// obj.put("dt", GenerateRandomDay.genRandomDay()); - } catch (JSONException e) { - e.printStackTrace(); - } - String payload = obj.toString(); - -// KeyedMessage data = new KeyedMessage(getTopic(), null, payload); -// fr.jetoile.hadoop.kafka.producer.send(data); - producer.send(new ProducerRecord<>(getTopic(), String.valueOf(i), obj.toString())); - LOG.info("Sent message: {}", obj.toString()); - } - LOG.info("KAFKA: Initial messages sent"); - - producer.close(); - } - -} diff --git a/hadoop-unit-standalone/pom.xml b/hadoop-unit-standalone/pom.xml index ce6befb4..0e2afcb9 100644 --- a/hadoop-unit-standalone/pom.xml +++ b/hadoop-unit-standalone/pom.xml @@ -86,6 +86,12 @@ test + + fr.jetoile.hadoop + hadoop-unit-client-kafka + test + + fr.jetoile.hadoop hadoop-unit-client-solrcloud diff --git a/pom.xml b/pom.xml index 7ef580dc..252c4abe 100755 --- a/pom.xml +++ b/pom.xml @@ -200,6 +200,13 @@ test + + fr.jetoile.hadoop + hadoop-unit-client-kafka + ${hadoop-unit.version} + test + + fr.jetoile.hadoop hadoop-unit-commons