Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka client unit add #31

Merged
merged 1 commit into from
Jul 4, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions hadoop-unit-client/hadoop-unit-client-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>hadoop-unit-client</artifactId>
<groupId>fr.jetoile.hadoop</groupId>
<version>1.4-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>hadoop-unit-client-kafka</artifactId>

<dependencies>
<dependency>
<groupId>fr.jetoile.hadoop</groupId>
<artifactId>hadoop-unit-commons</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
</dependency>

<dependency>
<groupId>commons-configuration</groupId>
<artifactId>commons-configuration</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package fr.jetoile.hadoopunit.test.kafka;

import fr.jetoile.hadoopunit.HadoopUnitConfig;
import fr.jetoile.hadoopunit.exception.ConfigException;
import kafka.api.FetchRequestBuilder;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.javaapi.message.ByteBufferMessageSet;
import kafka.message.MessageAndOffset;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Properties;

public enum KafkaConsumerUtils {
INSTANCE;

// Logger
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerUtils.class);

private String kafkaHostname;
private Integer kafkaPort;

private Configuration configuration;

long numRead = 0;

KafkaConsumerUtils() {
try {
loadConfig();
} catch (ConfigException e) {
System.exit(-1);
}
}


public void consumeMessagesWithOldApi(String topic, int nbMessageToRead) throws UnsupportedEncodingException {
SimpleConsumer simpleConsumer = new SimpleConsumer(kafkaHostname,
kafkaPort,
30000,
2,
"test");

System.out.println("Testing single fetch");
kafka.api.FetchRequest req = new FetchRequestBuilder()
.clientId("test")
.addFetch(topic, 0, 0L, 100)
.build();
while (numRead != nbMessageToRead) {
FetchResponse fetchResponse = simpleConsumer.fetch(req);
printMessages(fetchResponse.messageSet(topic, 0));
numRead++;
}
}


public void consumeMessagesWithNewApi(String topic, int nbMessageToRead) {
Properties props = new Properties();
props.put("bootstrap.servers", kafkaHostname + ":" + kafkaPort);
props.put("group.id", "test");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));

try {
while (numRead != nbMessageToRead) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
numRead++;
// LOG.debug("topic = %s, partition = %s, offset = %d, customer = %s, country = %s\n", record.topic(), record.partition(), record.offset(), record.key(), record.value());
LOG.debug(record.value());
}
}
} finally {
consumer.close();
}
}

public long getNumRead() {
return numRead;
}

private void printMessages(ByteBufferMessageSet messageSet) throws UnsupportedEncodingException {
for (MessageAndOffset messageAndOffset : messageSet) {
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
LOG.debug(new String(bytes, "UTF-8"));
}
}

private void loadConfig() throws ConfigException {
try {
configuration = new PropertiesConfiguration(HadoopUnitConfig.DEFAULT_PROPS_FILE);
} catch (ConfigurationException e) {
throw new ConfigException("bad config", e);
}

kafkaHostname = configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY);
kafkaPort = configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package fr.jetoile.hadoopunit.test.kafka;

import fr.jetoile.hadoopunit.HadoopUnitConfig;
import fr.jetoile.hadoopunit.exception.ConfigException;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;

public enum KafkaProducerUtils {
INSTANCE;

// Logger
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerUtils.class);

private String kafkaHostname;
private Integer kafkaPort;
private Properties props;

private Configuration configuration;

KafkaProducerUtils() {
try {
loadConfig();
props = new Properties();
props.put("bootstrap.servers", kafkaHostname + ":" + kafkaPort);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 10);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

} catch (ConfigException e) {
System.exit(-1);
}
}

public void produceMessages(String topic, String key, String message) {
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>(topic, key, message));
producer.close();
}

private void loadConfig() throws ConfigException {
try {
configuration = new PropertiesConfiguration(HadoopUnitConfig.DEFAULT_PROPS_FILE);
} catch (ConfigurationException e) {
throw new ConfigException("bad config", e);
}

kafkaHostname = configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY);
kafkaPort = configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY);
}

}
1 change: 1 addition & 0 deletions hadoop-unit-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
<module>hadoop-unit-client-hdfs</module>
<module>hadoop-unit-client-solrcloud</module>
<module>hadoop-unit-client-spark</module>
<module>hadoop-unit-client-kafka</module>
</modules>


Expand Down
19 changes: 19 additions & 0 deletions hadoop-unit-kafka/pom.xml
Original file line number Diff line number Diff line change
@@ -1,4 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
Expand Down Expand Up @@ -33,5 +47,10 @@
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
</dependency>

<dependency>
<groupId>fr.jetoile.hadoop</groupId>
<artifactId>hadoop-unit-client-kafka</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@
import fr.jetoile.hadoopunit.HadoopUnitConfig;
import fr.jetoile.hadoopunit.HadoopBootstrap;
import fr.jetoile.hadoopunit.exception.BootstrapException;
import fr.jetoile.hadoopunit.kafka.consumer.KafkaTestConsumer;
import fr.jetoile.hadoopunit.kafka.producer.KafkaTestProducer;
import fr.jetoile.hadoopunit.test.kafka.KafkaConsumerUtils;
import fr.jetoile.hadoopunit.test.kafka.KafkaProducerUtils;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.codehaus.jettison.json.JSONException;
import org.codehaus.jettison.json.JSONObject;
import org.junit.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertEquals;

public class KafkaBootstrapTest {
Expand Down Expand Up @@ -61,28 +60,29 @@ public static void tearDown() throws Exception {
public void kafkaShouldStart() throws Exception {

// Producer
KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder()
.setKafkaHostname(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY))
.setKafkaPort(configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY))
.setTopic(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY))
.setMessageCount(configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY))
.build();
kafkaTestProducer.produceMessages();
for (int i = 0; i < 10; i++) {
String payload = generateMessage(i);
KafkaProducerUtils.INSTANCE.produceMessages(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), String.valueOf(i), payload);
}



// Consumer
List<String> seeds = new ArrayList<String>();
seeds.add(configuration.getString(HadoopUnitConfig.KAFKA_HOSTNAME_KEY));
KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer();
kafkaTestConsumer.consumeMessages2(
configuration.getInt(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY),
configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY),
0,
seeds,
configuration.getInt(HadoopUnitConfig.KAFKA_PORT_KEY));
// KafkaConsumerUtils.INSTANCE.consumeMessagesWithOldApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10);
KafkaConsumerUtils.INSTANCE.consumeMessagesWithNewApi(configuration.getString(HadoopUnitConfig.KAFKA_TEST_TOPIC_KEY), 10);

// Assert num of messages produced = num of message consumed
Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY),
kafkaTestConsumer.getNumRead());
Assert.assertEquals(configuration.getLong(HadoopUnitConfig.KAFKA_TEST_MESSAGE_COUNT_KEY), KafkaConsumerUtils.INSTANCE.getNumRead());
}

private String generateMessage(int i) {
JSONObject obj = new JSONObject();
try {
obj.put("id", String.valueOf(i));
obj.put("msg", "test-message" + 1);
} catch (JSONException e) {
e.printStackTrace();
}
return obj.toString();
}
}
Loading