-
Notifications
You must be signed in to change notification settings - Fork 15
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
500 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
99 changes: 99 additions & 0 deletions
99
src/main/java/fr/jetoile/sample/component/KafkaBootstrap.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,99 @@ | ||
package fr.jetoile.sample.component; | ||
|
||
import com.github.sakserv.minicluster.config.ConfigVars; | ||
import com.github.sakserv.minicluster.impl.HdfsLocalCluster; | ||
import com.github.sakserv.minicluster.impl.KafkaLocalBroker; | ||
import fr.jetoile.sample.BootstrapException; | ||
import org.apache.commons.configuration.Configuration; | ||
import org.apache.commons.configuration.ConfigurationException; | ||
import org.apache.commons.configuration.PropertiesConfiguration; | ||
import org.apache.hadoop.fs.FileSystem; | ||
import org.apache.hadoop.hdfs.HdfsConfiguration; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Properties; | ||
|
||
public enum KafkaBootstrap implements Bootstrap { | ||
INSTANCE; | ||
|
||
final private Logger LOGGER = LoggerFactory.getLogger(KafkaBootstrap.class); | ||
|
||
private KafkaLocalBroker kafkaLocalCluster; | ||
|
||
private Configuration configuration; | ||
private String zookeeperConnectionString; | ||
private String host; | ||
private int port; | ||
private int brokerId; | ||
private String tmpDirectory; | ||
|
||
|
||
KafkaBootstrap() { | ||
if (kafkaLocalCluster == null) { | ||
try { | ||
loadConfig(); | ||
} catch (BootstrapException e) { | ||
LOGGER.error("unable to load configuration", e); | ||
} | ||
init(); | ||
build(); | ||
} | ||
} | ||
|
||
private void init() { | ||
|
||
} | ||
|
||
private void build() { | ||
kafkaLocalCluster = new KafkaLocalBroker.Builder() | ||
.setKafkaHostname(host) | ||
.setKafkaPort(port) | ||
.setKafkaBrokerId(brokerId) | ||
.setKafkaProperties(new Properties()) | ||
.setKafkaTempDir(tmpDirectory) | ||
.setZookeeperConnectionString(zookeeperConnectionString) | ||
.build(); | ||
} | ||
|
||
private void loadConfig() throws BootstrapException { | ||
try { | ||
configuration = new PropertiesConfiguration("default.properties"); | ||
} catch (ConfigurationException e) { | ||
throw new BootstrapException("bad config", e); | ||
} | ||
host = configuration.getString(ConfigVars.KAFKA_HOSTNAME_KEY); | ||
port = configuration.getInt(ConfigVars.KAFKA_PORT_KEY); | ||
brokerId = configuration.getInt(ConfigVars.KAFKA_TEST_BROKER_ID_KEY); | ||
tmpDirectory = configuration.getString(ConfigVars.KAFKA_TEST_TEMP_DIR_KEY); | ||
zookeeperConnectionString = configuration.getString(ConfigVars.ZOOKEEPER_HOST_KEY) + ":" + configuration.getInt(ConfigVars.ZOOKEEPER_PORT_KEY); | ||
|
||
} | ||
|
||
@Override | ||
public Bootstrap start() { | ||
try { | ||
kafkaLocalCluster.start(); | ||
} catch (Exception e) { | ||
LOGGER.error("unable to start kafka", e); | ||
} | ||
return this; | ||
} | ||
|
||
@Override | ||
public Bootstrap stop() { | ||
try { | ||
kafkaLocalCluster.stop(true); | ||
} catch (Exception e) { | ||
LOGGER.error("unable to stop kafka", e); | ||
} | ||
return this; | ||
|
||
} | ||
|
||
@Override | ||
public org.apache.hadoop.conf.Configuration getConfiguration() { | ||
throw new UnsupportedOperationException("the method getConfiguration can not be called on KafkaBootstrap"); | ||
} | ||
|
||
} |
83 changes: 83 additions & 0 deletions
83
src/test/java/fr/jetoile/sample/component/KafkaBootstrapTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
package fr.jetoile.sample.component; | ||
|
||
|
||
import com.github.sakserv.minicluster.config.ConfigVars; | ||
import fr.jetoile.sample.BootstrapException; | ||
import fr.jetoile.sample.kafka.consumer.KafkaTestConsumer; | ||
import fr.jetoile.sample.kafka.producer.KafkaTestProducer; | ||
import org.apache.commons.configuration.Configuration; | ||
import org.apache.commons.configuration.ConfigurationException; | ||
import org.apache.commons.configuration.PropertiesConfiguration; | ||
import org.apache.hadoop.hbase.HColumnDescriptor; | ||
import org.apache.hadoop.hbase.HTableDescriptor; | ||
import org.apache.hadoop.hbase.TableName; | ||
import org.apache.hadoop.hbase.client.*; | ||
import org.apache.hadoop.hbase.util.Bytes; | ||
import org.junit.AfterClass; | ||
import org.junit.Assert; | ||
import org.junit.BeforeClass; | ||
import org.junit.Test; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
import static org.junit.Assert.assertEquals; | ||
|
||
public class KafkaBootstrapTest { | ||
static private Logger LOGGER = LoggerFactory.getLogger(KafkaBootstrapTest.class); | ||
|
||
static private Bootstrap zookeeper; | ||
static private Bootstrap kafka; | ||
static private Configuration configuration; | ||
|
||
|
||
@BeforeClass | ||
public static void setup() throws Exception { | ||
try { | ||
configuration = new PropertiesConfiguration("default.properties"); | ||
} catch (ConfigurationException e) { | ||
throw new BootstrapException("bad config", e); | ||
} | ||
|
||
zookeeper = ZookeeperBootstrap.INSTANCE.start(); | ||
kafka = KafkaBootstrap.INSTANCE.start(); | ||
} | ||
|
||
@AfterClass | ||
public static void tearDown() throws Exception { | ||
kafka.stop(); | ||
zookeeper.stop(); | ||
} | ||
|
||
|
||
@Test | ||
public void kafkaShouldStart() throws Exception { | ||
|
||
// Producer | ||
KafkaTestProducer kafkaTestProducer = new KafkaTestProducer.Builder() | ||
.setKafkaHostname(configuration.getString(ConfigVars.KAFKA_HOSTNAME_KEY)) | ||
.setKafkaPort(configuration.getInt(ConfigVars.KAFKA_PORT_KEY)) | ||
.setTopic(configuration.getString(ConfigVars.KAFKA_TEST_TOPIC_KEY)) | ||
.setMessageCount(configuration.getInt(ConfigVars.KAFKA_TEST_MESSAGE_COUNT_KEY)) | ||
.build(); | ||
kafkaTestProducer.produceMessages(); | ||
|
||
|
||
// Consumer | ||
List<String> seeds = new ArrayList<String>(); | ||
seeds.add(configuration.getString(ConfigVars.KAFKA_HOSTNAME_KEY)); | ||
KafkaTestConsumer kafkaTestConsumer = new KafkaTestConsumer(); | ||
kafkaTestConsumer.consumeMessages( | ||
configuration.getInt(ConfigVars.KAFKA_TEST_MESSAGE_COUNT_KEY), | ||
configuration.getString(ConfigVars.KAFKA_TEST_TOPIC_KEY), | ||
0, | ||
seeds, | ||
configuration.getInt(ConfigVars.KAFKA_PORT_KEY)); | ||
|
||
// Assert num of messages produced = num of message consumed | ||
Assert.assertEquals(configuration.getLong(ConfigVars.KAFKA_TEST_MESSAGE_COUNT_KEY), | ||
kafkaTestConsumer.getNumRead()); | ||
} | ||
} |
178 changes: 178 additions & 0 deletions
178
src/test/java/fr/jetoile/sample/kafka/consumer/KafkaTestConsumer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,178 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package fr.jetoile.sample.kafka.consumer; | ||
|
||
import kafka.api.FetchRequest; | ||
import kafka.api.FetchRequestBuilder; | ||
import kafka.common.ErrorMapping; | ||
import kafka.javaapi.*; | ||
import kafka.javaapi.consumer.SimpleConsumer; | ||
import kafka.message.MessageAndOffset; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.nio.ByteBuffer; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
|
||
public class KafkaTestConsumer { | ||
|
||
// Logger | ||
private static final Logger LOG = LoggerFactory.getLogger(KafkaTestConsumer.class); | ||
|
||
long numRead = 0; | ||
private List<String> m_replicaBrokers = new ArrayList<String>(); | ||
|
||
public void consumeMessages(long a_maxReads, String a_topic, int a_partition, List<String> a_seedBrokers, int a_port) throws Exception { | ||
// find the meta data about the topic and partition we are interested in | ||
// | ||
PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition); | ||
if (metadata == null) { | ||
LOG.info("Can't find metadata for Topic and Partition. Exiting"); | ||
return; | ||
} | ||
if (metadata.leader() == null) { | ||
LOG.info("Can't find Leader for Topic and Partition. Exiting"); | ||
return; | ||
} | ||
String leadBroker = metadata.leader().host(); | ||
String clientName = "Client_" + a_topic + "_" + a_partition; | ||
|
||
SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); | ||
long readOffset = 0L; | ||
|
||
int numErrors = 0; | ||
while (a_maxReads > 0) { | ||
if (consumer == null) { | ||
consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName); | ||
} | ||
FetchRequest req = new FetchRequestBuilder() | ||
.clientId(clientName) | ||
.addFetch(a_topic, a_partition, readOffset, 100000) // Note: this fetchSize of 100000 might need to be increased if large batches are written to Kafka | ||
.build(); | ||
FetchResponse fetchResponse = consumer.fetch(req); | ||
|
||
if (fetchResponse.hasError()) { | ||
numErrors++; | ||
// Something went wrong! | ||
short code = fetchResponse.errorCode(a_topic, a_partition); | ||
LOG.info("Error fetching data from the Broker: {} Reason: {}", leadBroker, code); | ||
if (numErrors > 5) break; | ||
if (code == ErrorMapping.OffsetOutOfRangeCode()) { | ||
// We asked for an invalid offset. For simple case ask for the last element to reset | ||
readOffset = 0L; | ||
continue; | ||
} | ||
consumer.close(); | ||
consumer = null; | ||
leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port); | ||
continue; | ||
} | ||
numErrors = 0; | ||
|
||
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) { | ||
long currentOffset = messageAndOffset.offset(); | ||
if (currentOffset < readOffset) { | ||
LOG.info("Found an old offset: {} Expecting: {}", currentOffset, readOffset); | ||
continue; | ||
} | ||
readOffset = messageAndOffset.nextOffset(); | ||
ByteBuffer payload = messageAndOffset.message().payload(); | ||
|
||
byte[] bytes = new byte[payload.limit()]; | ||
payload.get(bytes); | ||
LOG.info("Consumed: {}: {}", String.valueOf(messageAndOffset.offset()), new String(bytes, "UTF-8")); | ||
numRead++; | ||
a_maxReads--; | ||
} | ||
|
||
if (numRead == 0) { | ||
try { | ||
Thread.sleep(1000); | ||
} catch (InterruptedException ie) { | ||
} | ||
} | ||
} | ||
if (consumer != null) consumer.close(); | ||
} | ||
|
||
private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws Exception { | ||
for (int i = 0; i < 3; i++) { | ||
boolean goToSleep = false; | ||
PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition); | ||
if (metadata == null) { | ||
goToSleep = true; | ||
} else if (metadata.leader() == null) { | ||
goToSleep = true; | ||
} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) { | ||
// first time through if the leader hasn't changed give ZooKeeper a second to recover | ||
// second time, assume the broker did recover before failover, or it was a non-Broker issue | ||
// | ||
goToSleep = true; | ||
} else { | ||
return metadata.leader().host(); | ||
} | ||
if (goToSleep) { | ||
try { | ||
Thread.sleep(1000); | ||
} catch (InterruptedException ie) { | ||
} | ||
} | ||
} | ||
LOG.info("Unable to find new leader after Broker failure. Exiting"); | ||
throw new Exception("Unable to find new leader after Broker failure. Exiting"); | ||
} | ||
|
||
private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port, String a_topic, int a_partition) { | ||
PartitionMetadata returnMetaData = null; | ||
loop: | ||
for (String seed : a_seedBrokers) { | ||
SimpleConsumer consumer = null; | ||
try { | ||
consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup"); | ||
List<String> topics = Collections.singletonList(a_topic); | ||
TopicMetadataRequest req = new TopicMetadataRequest(topics); | ||
TopicMetadataResponse resp = consumer.send(req); | ||
|
||
List<TopicMetadata> metaData = resp.topicsMetadata(); | ||
for (TopicMetadata item : metaData) { | ||
for (PartitionMetadata part : item.partitionsMetadata()) { | ||
if (part.partitionId() == a_partition) { | ||
returnMetaData = part; | ||
break loop; | ||
} | ||
} | ||
} | ||
} catch (Exception e) { | ||
LOG.info("Error communicating with Broker [{}] to find Leader for [{},{}] Reason: {}", | ||
seed, a_topic, a_partition, e); | ||
} finally { | ||
if (consumer != null) consumer.close(); | ||
} | ||
} | ||
if (returnMetaData != null) { | ||
m_replicaBrokers.clear(); | ||
for (kafka.cluster.BrokerEndPoint replica : returnMetaData.replicas()) { | ||
m_replicaBrokers.add(replica.host()); | ||
} | ||
} | ||
return returnMetaData; | ||
} | ||
|
||
public long getNumRead() { | ||
return numRead; | ||
} | ||
|
||
} |
Oops, something went wrong.