diff --git a/hadoop-unit-client/hadoop-unit-client-kafka/pom.xml b/hadoop-unit-client/hadoop-unit-client-kafka/pom.xml
new file mode 100644
index 00000000..723faa08
--- /dev/null
+++ b/hadoop-unit-client/hadoop-unit-client-kafka/pom.xml
@@ -0,0 +1,41 @@
+
+
+
+ hadoop-unit-client
+ fr.jetoile.hadoop
+ 1.4-SNAPSHOT
+
+ 4.0.0
+
+ hadoop-unit-client-kafka
+
+
+
+ fr.jetoile.hadoop
+ hadoop-unit-commons
+
+
+
+ org.apache.kafka
+ kafka_2.10
+
+
+
+ commons-configuration
+ commons-configuration
+
+
+
+ org.slf4j
+ slf4j-api
+
+
+
+ ch.qos.logback
+ logback-classic
+
+
+
+
\ No newline at end of file
diff --git a/hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaConsumerUtils.java b/hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaConsumerUtils.java
new file mode 100644
index 00000000..c1f69a10
--- /dev/null
+++ b/hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaConsumerUtils.java
@@ -0,0 +1,115 @@
+package fr.jetoile.hadoopunit.test.kafka;
+
+import fr.jetoile.hadoopunit.HadoopUnitConfig;
+import fr.jetoile.hadoopunit.exception.ConfigException;
+import kafka.api.FetchRequestBuilder;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.MessageAndOffset;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Properties;
+
+public enum KafkaConsumerUtils {
+ INSTANCE;
+
+ // Logger
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerUtils.class);
+
+ private String kafkaHostname;
+ private Integer kafkaPort;
+
+ private Configuration configuration;
+
+ long numRead = 0;
+
+ KafkaConsumerUtils() {
+ try {
+ loadConfig();
+ } catch (ConfigException e) {
+ System.exit(-1);
+ }
+ }
+
+
+ public void consumeMessagesWithOldApi(String topic, int nbMessageToRead) throws UnsupportedEncodingException {
+ SimpleConsumer simpleConsumer = new SimpleConsumer(kafkaHostname,
+ kafkaPort,
+ 30000,
+ 2,
+ "test");
+
+ System.out.println("Testing single fetch");
+ kafka.api.FetchRequest req = new FetchRequestBuilder()
+ .clientId("test")
+ .addFetch(topic, 0, 0L, 100)
+ .build();
+ while (numRead != nbMessageToRead) {
+ FetchResponse fetchResponse = simpleConsumer.fetch(req);
+ printMessages(fetchResponse.messageSet(topic, 0));
+ numRead++;
+ }
+ }
+
+
+ public void consumeMessagesWithNewApi(String topic, int nbMessageToRead) {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", kafkaHostname + ":" + kafkaPort);
+ props.put("group.id", "test");
+ props.put("auto.offset.reset", "earliest");
+ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+
+ KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Arrays.asList(topic));
+
+ try {
+ while (numRead != nbMessageToRead) {
+ ConsumerRecords records = consumer.poll(1000);
+ for (ConsumerRecord record : records) {
+ numRead++;
+// LOG.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
+ LOG.debug(record.value());
+ }
+ }
+ } finally {
+ consumer.close();
+ }
+ }
+
+ public long getNumRead() {
+ return numRead;
+ }
+
+ private void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
+ for (MessageAndOffset messageAndOffset : messageSet) {
+ ByteBuffer payload = messageAndOffset.message().payload();
+ byte[] bytes = new byte[payload.limit()];
+ payload.get(bytes);
+ LOG.debug(new String(bytes, "UTF-8"));
+ }
+ }
+
+ private void loadConfig() throws ConfigException {
+ try {
+ configuration = new PropertiesConfiguration(HadoopUnitConfig.DEFAULT_PROPS_FILE);
+ } catch (ConfigurationException e) {
+ throw new ConfigException("bad config", e);
+ }
+
+ kafkaHostname = configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY);
+ kafkaPort = configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY);
+ }
+
+}
diff --git a/hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaProducerUtils.java b/hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaProducerUtils.java
new file mode 100644
index 00000000..4fab53ee
--- /dev/null
+++ b/hadoop-unit-client/hadoop-unit-client-kafka/src/main/java/fr/jetoile/hadoopunit/test/kafka/KafkaProducerUtils.java
@@ -0,0 +1,63 @@
+package fr.jetoile.hadoopunit.test.kafka;
+
+import fr.jetoile.hadoopunit.HadoopUnitConfig;
+import fr.jetoile.hadoopunit.exception.ConfigException;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Properties;
+
+public enum KafkaProducerUtils {
+ INSTANCE;
+
+ // Logger
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerUtils.class);
+
+ private String kafkaHostname;
+ private Integer kafkaPort;
+ private Properties props;
+
+ private Configuration configuration;
+
+ KafkaProducerUtils() {
+ try {
+ loadConfig();
+ props = new Properties();
+ props.put("bootstrap.servers", kafkaHostname + ":" + kafkaPort);
+ props.put("acks", "all");
+ props.put("retries", 0);
+ props.put("batch.size", 10);
+ props.put("linger.ms", 1);
+ props.put("buffer.memory", 33554432);
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+ } catch (ConfigException e) {
+ System.exit(-1);
+ }
+ }
+
+ public void produceMessages(String topic, String key, String message) {
+ KafkaProducer producer = new KafkaProducer<>(props);
+ producer.send(new ProducerRecord<>(topic, key, message));
+ producer.close();
+ }
+
+ private void loadConfig() throws ConfigException {
+ try {
+ configuration = new PropertiesConfiguration(HadoopUnitConfig.DEFAULT_PROPS_FILE);
+ } catch (ConfigurationException e) {
+ throw new ConfigException("bad config", e);
+ }
+
+ kafkaHostname = configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY);
+ kafkaPort = configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY);
+ }
+
+}
diff --git a/hadoop-unit-client/pom.xml b/hadoop-unit-client/pom.xml
index fd81d3c7..76a5d77a 100755
--- a/hadoop-unit-client/pom.xml
+++ b/hadoop-unit-client/pom.xml
@@ -17,6 +17,7 @@
hadoop-unit-client-hdfs
hadoop-unit-client-solrcloud
hadoop-unit-client-spark
+ hadoop-unit-client-kafka
diff --git a/hadoop-unit-kafka/pom.xml b/hadoop-unit-kafka/pom.xml
index 4cacb096..33552cc1 100644
--- a/hadoop-unit-kafka/pom.xml
+++ b/hadoop-unit-kafka/pom.xml
@@ -1,4 +1,18 @@
+
+
@@ -33,5 +47,10 @@
org.apache.kafka
kafka_2.10
+
+
+ fr.jetoile.hadoop
+ hadoop-unit-client-kafka
+
\ No newline at end of file
diff --git a/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/component/KafkaBootstrapTest.java b/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/component/KafkaBootstrapTest.java
index 21e699d0..21f918d9 100644
--- a/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/component/KafkaBootstrapTest.java
+++ b/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/component/KafkaBootstrapTest.java
@@ -18,18 +18,17 @@
import fr.jetoile.hadoopunit.HadoopUnitConfig;
import fr.jetoile.hadoopunit.HadoopBootstrap;
import fr.jetoile.hadoopunit.exception.BootstrapException;
-import fr.jetoile.hadoopunit.kafka.consumer.KafkaTestConsumer;
-import fr.jetoile.hadoopunit.kafka.producer.KafkaTestProducer;
+import fr.jetoile.hadoopunit.test.kafka.KafkaConsumerUtils;
+import fr.jetoile.hadoopunit.test.kafka.KafkaProducerUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-
import static org.junit.Assert.assertEquals;
public class KafkaBootstrapTest {
@@ -61,28 +60,29 @@ public static void tearDown() throws Exception {
public void kafkaShouldStart() throws Exception {
// Producer
- KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder()
- .setKafkaHostname(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY))
- .setKafkaPort(configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY))
- .setTopic(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY))
- .setMessageCount(configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY))
- .build();
- kafkaTestProducer.produceMessages();
+ for (int i = 0; i < 10; i++) {
+ String payload = generateMessage(i);
+ KafkaProducerUtils.INSTANCE.produceMessages(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), String.valueOf(i), payload);
+ }
+
// Consumer
- List seeds = new ArrayList();
- seeds.add(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY));
- KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer();
- kafkaTestConsumer.consumeMessages2(
- configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY),
- configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY),
- 0,
- seeds,
- configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY));
+// KafkaConsumerUtils.INSTANCE.consumeMessagesWithOldApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10);
+ KafkaConsumerUtils.INSTANCE.consumeMessagesWithNewApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10);
// Assert num of messages produced = num of message consumed
- Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY),
- kafkaTestConsumer.getNumRead());
+ Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), KafkaConsumerUtils.INSTANCE.getNumRead());
+ }
+
+ private String generateMessage(int i) {
+ JSONObject obj = new JSONObject();
+ try {
+ obj.put("id", String.valueOf(i));
+ obj.put("msg", "test-message" + 1);
+ } catch (JSONException e) {
+ e.printStackTrace();
+ }
+ return obj.toString();
}
}
diff --git a/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java b/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java
deleted file mode 100644
index 8e736095..00000000
--- a/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package fr.jetoile.hadoopunit.kafka.consumer;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.*;
-
-public class KafkaTestConsumer {
-
- // Logger
- private static final Logger LOG = LoggerFactory.getLogger(KafkaTestConsumer.class);
-
- long numRead = 0;
- private List m_replicaBrokers = new ArrayList();
-
-
- public void consumeMessages(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws Exception {
- Properties props = new Properties();
- props.put("bootstrap.servers", a_seedBrokers.get(0) + ":" + a_port);
-// props.put("metadata.broker.list", a_seedBrokers.get(0) + ":" + a_port);
- props.put("group.id", "test");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "100");
- props.put("session.timeout.ms", "30000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(a_topic));
- while (numRead == 0) {
- ConsumerRecords records = consumer.poll(100);
- for (ConsumerRecord record : records) {
- System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
- numRead++;
- }
- }
- }
-
-
- public void consumeMessages2(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws UnsupportedEncodingException {
- SimpleConsumer simpleConsumer = new SimpleConsumer(a_seedBrokers.get(0),
- a_port,
- 30000,
- 2,
- "test");
-
- System.out.println("Testing single fetch");
- kafka.api.FetchRequest req = new FetchRequestBuilder()
- .clientId("test")
- .addFetch(a_topic, 0, 0L, 100)
- .build();
- while (numRead != 10) {
- FetchResponse fetchResponse = simpleConsumer.fetch(req);
- printMessages(fetchResponse.messageSet(a_topic, 0));
- numRead++;
- }
- }
-
- public long getNumRead() {
- return numRead;
- }
-
- private void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
- for(MessageAndOffset messageAndOffset: messageSet) {
- ByteBuffer payload = messageAndOffset.message().payload();
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- System.out.println(new String(bytes, "UTF-8"));
- }
- }
-
-}
diff --git a/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java b/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java
deleted file mode 100644
index 2229d33c..00000000
--- a/hadoop-unit-kafka/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package fr.jetoile.hadoopunit.kafka.producer;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-public class KafkaTestProducer {
-
- // Logger
- private static final Logger LOG = LoggerFactory.getLogger(KafkaTestProducer.class);
-
- private String kafkaHostname;
- private Integer kafkaPort;
- private String topic;
- private Integer messageCount;
-
- private KafkaTestProducer(Builder builder) {
- this.kafkaHostname = builder.kafkaHostname;
- this.kafkaPort = builder.kafkaPort;
- this.topic = builder.topic;
- this.messageCount = builder.messageCount;
- }
-
- public String getKafkaHostname() {
- return kafkaHostname;
- }
-
- public Integer getKafkaPort() {
- return kafkaPort;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public Integer getMessageCount() {
- return messageCount;
- }
-
- public static class Builder {
- private String kafkaHostname;
- private Integer kafkaPort;
- private String topic;
- private Integer messageCount;
-
- public Builder setKafkaHostname(String kafkaHostname) {
- this.kafkaHostname = kafkaHostname;
- return this;
- }
-
- public Builder setKafkaPort(Integer kafkaPort) {
- this.kafkaPort = kafkaPort;
- return this;
- }
-
- public Builder setTopic(String topic) {
- this.topic = topic;
- return this;
- }
-
- public Builder setMessageCount(Integer messageCount) {
- this.messageCount = messageCount;
- return this;
- }
-
- public KafkaTestProducer build() {
- KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(this);
- return kafkaTestProducer;
- }
-
- }
-
- public void produceMessages() {
- Properties props = new Properties();
- props.put("bootstrap.servers", getKafkaHostname() + ":" + getKafkaPort());
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("batch.size", 10);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer producer = new KafkaProducer(props);
-// props.put("metadata.broker.list", getKafkaHostname() + ":" + getKafkaPort());
-// props.put("serializer.class", "kafka.serializer.StringEncoder");
-// ProducerConfig config = new ProducerConfig(props);
-// Producer fr.jetoile.hadoop.kafka.producer = new Producer(config);
-
- // Send 10 messages to the local kafka server:
- LOG.info("KAFKA: Preparing to send {} initial messages", messageCount);
- for (int i = 0; i < messageCount; i++) {
-
- // Create the JSON object
- JSONObject obj = new JSONObject();
- try {
- obj.put("id", String.valueOf(i));
- obj.put("msg", "test-message" + 1);
-// obj.put("dt", GenerateRandomDay.genRandomDay());
- } catch (JSONException e) {
- e.printStackTrace();
- }
- String payload = obj.toString();
-
-// KeyedMessage data = new KeyedMessage(getTopic(), null, payload);
-// fr.jetoile.hadoop.kafka.producer.send(data);
- producer.send(new ProducerRecord<>(getTopic(), String.valueOf(i), obj.toString()));
- LOG.info("Sent message: {}", obj.toString());
- }
- LOG.info("KAFKA: Initial messages sent");
-
- producer.close();
- }
-
-}
diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java b/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java
index aeb9bacf..ff903192 100644
--- a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java
+++ b/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java
@@ -24,9 +24,9 @@
import fr.jetoile.hadoopunit.component.OozieBootstrap;
import fr.jetoile.hadoopunit.exception.BootstrapException;
import fr.jetoile.hadoopunit.exception.NotFoundServiceException;
-import fr.jetoile.hadoopunit.kafka.consumer.KafkaTestConsumer;
-import fr.jetoile.hadoopunit.kafka.producer.KafkaTestProducer;
import fr.jetoile.hadoopunit.test.hdfs.HdfsUtils;
+import fr.jetoile.hadoopunit.test.kafka.KafkaConsumerUtils;
+import fr.jetoile.hadoopunit.test.kafka.KafkaProducerUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
@@ -44,6 +44,8 @@
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
@@ -93,29 +95,29 @@ public static void tearDown() throws BootstrapException {
public void kafkaShouldStart() throws Exception {
// Producer
- KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder()
- .setKafkaHostname(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY))
- .setKafkaPort(configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY))
- .setTopic(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY))
- .setMessageCount(configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY))
- .build();
- kafkaTestProducer.produceMessages();
+ for (int i = 0; i < 10; i++) {
+ String payload = generateMessage(i);
+ KafkaProducerUtils.INSTANCE.produceMessages(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), String.valueOf(i), payload);
+ }
+
// Consumer
- List seeds = new ArrayList();
- seeds.add(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY));
- KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer();
- kafkaTestConsumer.consumeMessages2(
- configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY),
- configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY),
- 0,
- seeds,
- configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY));
+ KafkaConsumerUtils.INSTANCE.consumeMessagesWithNewApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10);
// Assert num of messages produced = num of message consumed
- Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY),
- kafkaTestConsumer.getNumRead());
+ Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), KafkaConsumerUtils.INSTANCE.getNumRead());
+ }
+
+ private String generateMessage(int i) {
+ JSONObject obj = new JSONObject();
+ try {
+ obj.put("id", String.valueOf(i));
+ obj.put("msg", "test-message" + 1);
+ } catch (JSONException e) {
+ e.printStackTrace();
+ }
+ return obj.toString();
}
@Test
diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java b/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java
index 1bbc1750..e945b54f 100644
--- a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java
+++ b/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java
@@ -22,9 +22,9 @@
import fr.jetoile.hadoopunit.component.OozieBootstrap;
import fr.jetoile.hadoopunit.exception.BootstrapException;
import fr.jetoile.hadoopunit.exception.NotFoundServiceException;
-import fr.jetoile.hadoopunit.kafka.consumer.KafkaTestConsumer;
-import fr.jetoile.hadoopunit.kafka.producer.KafkaTestProducer;
import fr.jetoile.hadoopunit.test.hdfs.HdfsUtils;
+import fr.jetoile.hadoopunit.test.kafka.KafkaConsumerUtils;
+import fr.jetoile.hadoopunit.test.kafka.KafkaProducerUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
@@ -42,6 +42,8 @@
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.WorkflowJob;
import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
@@ -56,7 +58,6 @@
import java.net.*;
import java.sql.Connection;
import java.sql.*;
-import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -90,29 +91,29 @@ public static void tearDown() throws BootstrapException {
public void kafkaShouldStart() throws Exception {
// Producer
- KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder()
- .setKafkaHostname(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY))
- .setKafkaPort(configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY))
- .setTopic(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY))
- .setMessageCount(configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY))
- .build();
- kafkaTestProducer.produceMessages();
+ for (int i = 0; i < 10; i++) {
+ String payload = generateMessage(i);
+ KafkaProducerUtils.INSTANCE.produceMessages(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), String.valueOf(i), payload);
+ }
+
// Consumer
- List seeds = new ArrayList();
- seeds.add(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY));
- KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer();
- kafkaTestConsumer.consumeMessages2(
- configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY),
- configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY),
- 0,
- seeds,
- configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY));
+ KafkaConsumerUtils.INSTANCE.consumeMessagesWithNewApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10);
// Assert num of messages produced = num of message consumed
- Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY),
- kafkaTestConsumer.getNumRead());
+ Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), KafkaConsumerUtils.INSTANCE.getNumRead());
+ }
+
+ private String generateMessage(int i) {
+ JSONObject obj = new JSONObject();
+ try {
+ obj.put("id", String.valueOf(i));
+ obj.put("msg", "test-message" + 1);
+ } catch (JSONException e) {
+ e.printStackTrace();
+ }
+ return obj.toString();
}
@Test
diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java b/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java
deleted file mode 100644
index 5d787666..00000000
--- a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package fr.jetoile.hadoopunit.kafka.consumer;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-public class KafkaTestConsumer {
-
- // Logger
- private static final Logger LOG = LoggerFactory.getLogger(KafkaTestConsumer.class);
-
- long numRead = 0;
- private List m_replicaBrokers = new ArrayList();
-
-
- public void consumeMessages(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws Exception {
- Properties props = new Properties();
- props.put("bootstrap.servers", a_seedBrokers.get(0) + ":" + a_port);
-// props.put("metadata.broker.list", a_seedBrokers.get(0) + ":" + a_port);
- props.put("group.id", "test");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "100");
- props.put("session.timeout.ms", "30000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(a_topic));
- while (numRead == 0) {
- ConsumerRecords records = consumer.poll(100);
- for (ConsumerRecord record : records) {
- System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
- numRead++;
- }
- }
- }
-
-
- public void consumeMessages2(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws UnsupportedEncodingException {
- SimpleConsumer simpleConsumer = new SimpleConsumer(a_seedBrokers.get(0),
- a_port,
- 30000,
- 2,
- "test");
-
- System.out.println("Testing single fetch");
- kafka.api.FetchRequest req = new FetchRequestBuilder()
- .clientId("test")
- .addFetch(a_topic, 0, 0L, 100)
- .build();
- while (numRead != 10) {
- FetchResponse fetchResponse = simpleConsumer.fetch(req);
- printMessages(fetchResponse.messageSet(a_topic, 0));
- numRead++;
- }
- }
-
- public long getNumRead() {
- return numRead;
- }
-
- private void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
- for(MessageAndOffset messageAndOffset: messageSet) {
- ByteBuffer payload = messageAndOffset.message().payload();
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- System.out.println(new String(bytes, "UTF-8"));
- }
- }
-
-}
diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java b/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java
deleted file mode 100644
index 1998fa16..00000000
--- a/hadoop-unit-standalone/hadoop-unit-standalone-elasticsearch/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package fr.jetoile.hadoopunit.kafka.producer;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-public class KafkaTestProducer {
-
- // Logger
- private static final Logger LOG = LoggerFactory.getLogger(KafkaTestProducer.class);
-
- private String kafkaHostname;
- private Integer kafkaPort;
- private String topic;
- private Integer messageCount;
-
- private KafkaTestProducer(Builder builder) {
- this.kafkaHostname = builder.kafkaHostname;
- this.kafkaPort = builder.kafkaPort;
- this.topic = builder.topic;
- this.messageCount = builder.messageCount;
- }
-
- public String getKafkaHostname() {
- return kafkaHostname;
- }
-
- public Integer getKafkaPort() {
- return kafkaPort;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public Integer getMessageCount() {
- return messageCount;
- }
-
- public static class Builder {
- private String kafkaHostname;
- private Integer kafkaPort;
- private String topic;
- private Integer messageCount;
-
- public Builder setKafkaHostname(String kafkaHostname) {
- this.kafkaHostname = kafkaHostname;
- return this;
- }
-
- public Builder setKafkaPort(Integer kafkaPort) {
- this.kafkaPort = kafkaPort;
- return this;
- }
-
- public Builder setTopic(String topic) {
- this.topic = topic;
- return this;
- }
-
- public Builder setMessageCount(Integer messageCount) {
- this.messageCount = messageCount;
- return this;
- }
-
- public KafkaTestProducer build() {
- KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(this);
- return kafkaTestProducer;
- }
-
- }
-
- public void produceMessages() {
- Properties props = new Properties();
- props.put("bootstrap.servers", getKafkaHostname() + ":" + getKafkaPort());
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("batch.size", 10);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer producer = new KafkaProducer(props);
-// props.put("metadata.broker.list", getKafkaHostname() + ":" + getKafkaPort());
-// props.put("serializer.class", "kafka.serializer.StringEncoder");
-// ProducerConfig config = new ProducerConfig(props);
-// Producer fr.jetoile.hadoop.kafka.producer = new Producer(config);
-
- // Send 10 messages to the local kafka server:
- LOG.info("KAFKA: Preparing to send {} initial messages", messageCount);
- for (int i = 0; i < messageCount; i++) {
-
- // Create the JSON object
- JSONObject obj = new JSONObject();
- try {
- obj.put("id", String.valueOf(i));
- obj.put("msg", "test-message" + 1);
-// obj.put("dt", GenerateRandomDay.genRandomDay());
- } catch (JSONException e) {
- e.printStackTrace();
- }
- String payload = obj.toString();
-
-// KeyedMessage data = new KeyedMessage(getTopic(), null, payload);
-// fr.jetoile.hadoop.kafka.producer.send(data);
- producer.send(new ProducerRecord<>(getTopic(), String.valueOf(i), obj.toString()));
- LOG.info("Sent message: {}", obj.toString());
- }
- LOG.info("KAFKA: Initial messages sent");
-
- producer.close();
- }
-
-}
diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java b/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java
index 4bbbd7d5..d86b3533 100644
--- a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java
+++ b/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/IntegrationBootstrapTest.java
@@ -25,9 +25,9 @@
import fr.jetoile.hadoopunit.component.SolrCloudBootstrap;
import fr.jetoile.hadoopunit.exception.BootstrapException;
import fr.jetoile.hadoopunit.exception.NotFoundServiceException;
-import fr.jetoile.hadoopunit.kafka.consumer.KafkaTestConsumer;
-import fr.jetoile.hadoopunit.kafka.producer.KafkaTestProducer;
import fr.jetoile.hadoopunit.test.hdfs.HdfsUtils;
+import fr.jetoile.hadoopunit.test.kafka.KafkaConsumerUtils;
+import fr.jetoile.hadoopunit.test.kafka.KafkaProducerUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
@@ -49,6 +49,8 @@
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.zookeeper.KeeperException;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,29 +122,29 @@ public void solrCloudShouldStart() throws IOException, SolrServerException, Keep
public void kafkaShouldStart() throws Exception {
// Producer
- KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder()
- .setKafkaHostname(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY))
- .setKafkaPort(configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY))
- .setTopic(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY))
- .setMessageCount(configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY))
- .build();
- kafkaTestProducer.produceMessages();
+ for (int i = 0; i < 10; i++) {
+ String payload = generateMessage(i);
+ KafkaProducerUtils.INSTANCE.produceMessages(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), String.valueOf(i), payload);
+ }
+
// Consumer
- List seeds = new ArrayList();
- seeds.add(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY));
- KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer();
- kafkaTestConsumer.consumeMessages2(
- configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY),
- configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY),
- 0,
- seeds,
- configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY));
+ KafkaConsumerUtils.INSTANCE.consumeMessagesWithNewApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10);
// Assert num of messages produced = num of message consumed
- Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY),
- kafkaTestConsumer.getNumRead());
+ Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), KafkaConsumerUtils.INSTANCE.getNumRead());
+ }
+
+ private String generateMessage(int i) {
+ JSONObject obj = new JSONObject();
+ try {
+ obj.put("id", String.valueOf(i));
+ obj.put("msg", "test-message" + 1);
+ } catch (JSONException e) {
+ e.printStackTrace();
+ }
+ return obj.toString();
}
@Test
diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java b/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java
index 7262fce8..51803a98 100644
--- a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java
+++ b/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/integrationtest/ManualIntegrationBootstrapTest.java
@@ -23,9 +23,9 @@
import fr.jetoile.hadoopunit.component.SolrCloudBootstrap;
import fr.jetoile.hadoopunit.exception.BootstrapException;
import fr.jetoile.hadoopunit.exception.NotFoundServiceException;
-import fr.jetoile.hadoopunit.kafka.consumer.KafkaTestConsumer;
-import fr.jetoile.hadoopunit.kafka.producer.KafkaTestProducer;
import fr.jetoile.hadoopunit.test.hdfs.HdfsUtils;
+import fr.jetoile.hadoopunit.test.kafka.KafkaConsumerUtils;
+import fr.jetoile.hadoopunit.test.kafka.KafkaProducerUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
@@ -47,6 +47,8 @@
import org.apache.solr.common.SolrDocument;
import org.apache.solr.common.SolrInputDocument;
import org.apache.zookeeper.KeeperException;
+import org.codehaus.jettison.json.JSONException;
+import org.codehaus.jettison.json.JSONObject;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,7 +60,6 @@
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.*;
-import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -120,29 +121,29 @@ public void solrCloudShouldStart() throws IOException, SolrServerException, Keep
public void kafkaShouldStart() throws Exception {
// Producer
- KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder()
- .setKafkaHostname(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY))
- .setKafkaPort(configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY))
- .setTopic(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY))
- .setMessageCount(configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY))
- .build();
- kafkaTestProducer.produceMessages();
+ for (int i = 0; i < 10; i++) {
+ String payload = generateMessage(i);
+ KafkaProducerUtils.INSTANCE.produceMessages(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), String.valueOf(i), payload);
+ }
+
// Consumer
- List seeds = new ArrayList();
- seeds.add(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY));
- KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer();
- kafkaTestConsumer.consumeMessages2(
- configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY),
- configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY),
- 0,
- seeds,
- configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY));
+ KafkaConsumerUtils.INSTANCE.consumeMessagesWithNewApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10);
// Assert num of messages produced = num of message consumed
- Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY),
- kafkaTestConsumer.getNumRead());
+ Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), KafkaConsumerUtils.INSTANCE.getNumRead());
+ }
+
+ private String generateMessage(int i) {
+ JSONObject obj = new JSONObject();
+ try {
+ obj.put("id", String.valueOf(i));
+ obj.put("msg", "test-message" + 1);
+ } catch (JSONException e) {
+ e.printStackTrace();
+ }
+ return obj.toString();
}
@Test
diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java b/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java
deleted file mode 100644
index 5d787666..00000000
--- a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/consumer/KafkaTestConsumer.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package fr.jetoile.hadoopunit.kafka.consumer;
-
-import kafka.api.FetchRequestBuilder;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-public class KafkaTestConsumer {
-
- // Logger
- private static final Logger LOG = LoggerFactory.getLogger(KafkaTestConsumer.class);
-
- long numRead = 0;
- private List m_replicaBrokers = new ArrayList();
-
-
- public void consumeMessages(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws Exception {
- Properties props = new Properties();
- props.put("bootstrap.servers", a_seedBrokers.get(0) + ":" + a_port);
-// props.put("metadata.broker.list", a_seedBrokers.get(0) + ":" + a_port);
- props.put("group.id", "test");
- props.put("enable.auto.commit", "true");
- props.put("auto.commit.interval.ms", "100");
- props.put("session.timeout.ms", "30000");
- props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
- KafkaConsumer consumer = new KafkaConsumer<>(props);
- consumer.subscribe(Arrays.asList(a_topic));
- while (numRead == 0) {
- ConsumerRecords records = consumer.poll(100);
- for (ConsumerRecord record : records) {
- System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
- numRead++;
- }
- }
- }
-
-
- public void consumeMessages2(long a_maxReads, String a_topic, int a_partition, List a_seedBrokers, int a_port) throws UnsupportedEncodingException {
- SimpleConsumer simpleConsumer = new SimpleConsumer(a_seedBrokers.get(0),
- a_port,
- 30000,
- 2,
- "test");
-
- System.out.println("Testing single fetch");
- kafka.api.FetchRequest req = new FetchRequestBuilder()
- .clientId("test")
- .addFetch(a_topic, 0, 0L, 100)
- .build();
- while (numRead != 10) {
- FetchResponse fetchResponse = simpleConsumer.fetch(req);
- printMessages(fetchResponse.messageSet(a_topic, 0));
- numRead++;
- }
- }
-
- public long getNumRead() {
- return numRead;
- }
-
- private void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
- for(MessageAndOffset messageAndOffset: messageSet) {
- ByteBuffer payload = messageAndOffset.message().payload();
- byte[] bytes = new byte[payload.limit()];
- payload.get(bytes);
- System.out.println(new String(bytes, "UTF-8"));
- }
- }
-
-}
diff --git a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java b/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java
deleted file mode 100644
index 1998fa16..00000000
--- a/hadoop-unit-standalone/hadoop-unit-standalone-solr/src/test/java/fr/jetoile/hadoopunit/kafka/producer/KafkaTestProducer.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package fr.jetoile.hadoopunit.kafka.producer;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.codehaus.jettison.json.JSONException;
-import org.codehaus.jettison.json.JSONObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Properties;
-
-public class KafkaTestProducer {
-
- // Logger
- private static final Logger LOG = LoggerFactory.getLogger(KafkaTestProducer.class);
-
- private String kafkaHostname;
- private Integer kafkaPort;
- private String topic;
- private Integer messageCount;
-
- private KafkaTestProducer(Builder builder) {
- this.kafkaHostname = builder.kafkaHostname;
- this.kafkaPort = builder.kafkaPort;
- this.topic = builder.topic;
- this.messageCount = builder.messageCount;
- }
-
- public String getKafkaHostname() {
- return kafkaHostname;
- }
-
- public Integer getKafkaPort() {
- return kafkaPort;
- }
-
- public String getTopic() {
- return topic;
- }
-
- public Integer getMessageCount() {
- return messageCount;
- }
-
- public static class Builder {
- private String kafkaHostname;
- private Integer kafkaPort;
- private String topic;
- private Integer messageCount;
-
- public Builder setKafkaHostname(String kafkaHostname) {
- this.kafkaHostname = kafkaHostname;
- return this;
- }
-
- public Builder setKafkaPort(Integer kafkaPort) {
- this.kafkaPort = kafkaPort;
- return this;
- }
-
- public Builder setTopic(String topic) {
- this.topic = topic;
- return this;
- }
-
- public Builder setMessageCount(Integer messageCount) {
- this.messageCount = messageCount;
- return this;
- }
-
- public KafkaTestProducer build() {
- KafkaTestProducer kafkaTestProducer = new KafkaTestProducer(this);
- return kafkaTestProducer;
- }
-
- }
-
- public void produceMessages() {
- Properties props = new Properties();
- props.put("bootstrap.servers", getKafkaHostname() + ":" + getKafkaPort());
- props.put("acks", "all");
- props.put("retries", 0);
- props.put("batch.size", 10);
- props.put("linger.ms", 1);
- props.put("buffer.memory", 33554432);
- props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
- props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
- Producer producer = new KafkaProducer(props);
-// props.put("metadata.broker.list", getKafkaHostname() + ":" + getKafkaPort());
-// props.put("serializer.class", "kafka.serializer.StringEncoder");
-// ProducerConfig config = new ProducerConfig(props);
-// Producer fr.jetoile.hadoop.kafka.producer = new Producer(config);
-
- // Send 10 messages to the local kafka server:
- LOG.info("KAFKA: Preparing to send {} initial messages", messageCount);
- for (int i = 0; i < messageCount; i++) {
-
- // Create the JSON object
- JSONObject obj = new JSONObject();
- try {
- obj.put("id", String.valueOf(i));
- obj.put("msg", "test-message" + 1);
-// obj.put("dt", GenerateRandomDay.genRandomDay());
- } catch (JSONException e) {
- e.printStackTrace();
- }
- String payload = obj.toString();
-
-// KeyedMessage data = new KeyedMessage(getTopic(), null, payload);
-// fr.jetoile.hadoop.kafka.producer.send(data);
- producer.send(new ProducerRecord<>(getTopic(), String.valueOf(i), obj.toString()));
- LOG.info("Sent message: {}", obj.toString());
- }
- LOG.info("KAFKA: Initial messages sent");
-
- producer.close();
- }
-
-}
diff --git a/hadoop-unit-standalone/pom.xml b/hadoop-unit-standalone/pom.xml
index ce6befb4..0e2afcb9 100644
--- a/hadoop-unit-standalone/pom.xml
+++ b/hadoop-unit-standalone/pom.xml
@@ -86,6 +86,12 @@
test
+
+ fr.jetoile.hadoop
+ hadoop-unit-client-kafka
+ test
+
+
fr.jetoile.hadoop
hadoop-unit-client-solrcloud
diff --git a/pom.xml b/pom.xml
index 7ef580dc..252c4abe 100755
--- a/pom.xml
+++ b/pom.xml
@@ -200,6 +200,13 @@
test
+
+ fr.jetoile.hadoop
+ hadoop-unit-client-kafka
+ ${hadoop-unit.version}
+ test
+
+
fr.jetoile.hadoop
hadoop-unit-commons