diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java index 7fff64199e0fd..9645d7c20083e 100644 --- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java +++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java @@ -23,7 +23,10 @@ import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.Read.Unbounded; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark; @@ -34,10 +37,12 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.ExposedByteArrayInputStream; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PInput; import com.google.common.annotations.VisibleForTesting; @@ -56,10 +61,17 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.Serializer; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -86,8 +98,8 @@ import javax.annotation.Nullable; /** - * An unbounded source for Kafka topics. Kafka version 0.9 - * and above are supported. + * An unbounded source and a sink for Kafka topics. + * Kafka version 0.9 and above are supported. * *

Reading from Kafka topics

* @@ -146,25 +158,54 @@ * beginning by setting appropriate appropriate properties in {@link ConsumerConfig}, through * {@link Read#updateConsumerProperties(Map)}. * + *

Writing to Kafka

+ * + * KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write + * just the values. To configure a Kafka sink, you must specify at the minimum Kafka + * bootstrapServers and the topic to write to. The following example illustrates various + * options for configuring the sink: + * + *
{@code
+ *
+ *  pipeline
+ *    .apply(...) // returns PCollection>
+ *    .apply(KafkaIO.write()
+ *       .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *       .withTopic("results")
+ *
+ *       // set Coder for Key and Value
+ *       .withKeyCoder(BigEndianLongCoder.of())
+ *       .withValueCoder(StringUtf8Coder.of())
+
+ *       // you can further customize KafkaProducer used to write the records by adding more
+ *       // settings for ProducerConfig. e.g, to enable compression :
+ *       .updateProducerProperties(ImmutableMap.of("compression.type", "gzip"))
+ *    );
+ * }
+ * + * Often you might want to write just values without any keys to Kafka. Use {@code values()} to + * write records with default empty(null) key: + * + *
{@code
+ *  PCollection strings = ...;
+ *  strings.apply(KafkaIO.write()
+ *      .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *      .withTopic("results")
+ *      .withValueCoder(StringUtf8Coder.of()) // just need coder for value
+ *      .values() // writes values to Kafka with default key
+ *    );
+ * }
+ * *

Advanced Kafka Configuration

- * KafakIO allows setting most of the properties in {@link ConsumerConfig}. E.g. if you would like - * to enable offset auto commit (for external monitoring or other purposes), you can set + * KafakIO allows setting most of the properties in {@link ConsumerConfig} for source or in + * {@link ProducerConfig} for sink. E.g. if you would like to enable offset + * auto commit (for external monitoring or other purposes), you can set * "group.id", "enable.auto.commit", etc. */ public class KafkaIO { - private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class); - - private static class NowTimestampFn implements SerializableFunction { - @Override - public Instant apply(T input) { - return Instant.now(); - } - } - - /** - * Creates and uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka + * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka * configuration should set with {@link Read#withBootstrapServers(String)} and * {@link Read#withTopics(List)}. Other optional settings include key and value coders, * custom timestamp and watermark functions. @@ -181,6 +222,21 @@ public static Read read() { null); } + /** + * Creates an uninitialized {@link Write} {@link PTransform}. Before use, Kafka configuration + * should be set with {@link Write#withBootstrapServers(String)} and {@link Write#withTopic} + * along with {@link Coder}s for (optional) key and values. + */ + public static Write write() { + return new Write( + null, + ByteArrayCoder.of(), + ByteArrayCoder.of(), + TypedWrite.DEFAULT_PRODUCER_PROPERTIES); + } + + ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\ + /** * A {@link PTransform} to read from Kafka topics. See {@link KafkaIO} for more * information on usage and configuration. @@ -253,13 +309,9 @@ public Read withConsumerFactoryFn( * Update consumer configuration with new properties. */ public Read updateConsumerProperties(Map configUpdates) { - for (String key : configUpdates.keySet()) { - checkArgument(!IGNORED_CONSUMER_PROPERTIES.containsKey(key), - "No need to configure '%s'. %s", key, IGNORED_CONSUMER_PROPERTIES.get(key)); - } - Map config = new HashMap<>(consumerConfig); - config.putAll(configUpdates); + Map config = updateKafkaProperties(consumerConfig, + IGNORED_CONSUMER_PROPERTIES, configUpdates); return new Read(topics, topicPartitions, keyCoder, valueCoder, consumerFactoryFn, config, maxNumRecords, maxReadTime); @@ -305,8 +357,8 @@ private Read( * A set of properties that are not required or don't make sense for our consumer. */ private static final Map IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of( - ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDecoderFn instead", - ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDecoderFn instead" + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyCoder instead", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueCoder instead" // "group.id", "enable.auto.commit", "auto.commit.interval.ms" : // lets allow these, applications can have better resume point for restarts. ); @@ -508,6 +560,37 @@ public void processElement(ProcessContext ctx) { } } + //////////////////////////////////////////////////////////////////////////////////////////////// + + private static final Logger LOG = LoggerFactory.getLogger(KafkaIO.class); + + /** + * Returns a new config map which is merge of current config and updates. + * Verifies the updates do not includes ignored properties. + */ + private static Map updateKafkaProperties( + Map currentConfig, + Map ignoredProperties, + Map updates) { + + for (String key : updates.keySet()) { + checkArgument(!ignoredProperties.containsKey(key), + "No need to configure '%s'. %s", key, ignoredProperties.get(key)); + } + + Map config = new HashMap<>(currentConfig); + config.putAll(updates); + + return config; + } + + private static class NowTimestampFn implements SerializableFunction { + @Override + public Instant apply(T input) { + return Instant.now(); + } + } + /** Static class, prevent instantiation. */ private KafkaIO() {} @@ -719,7 +802,6 @@ private static class PartitionState { private double avgRecordSize = 0; private static final int movingAvgWindow = 1000; // very roughly avg of last 1000 elements - PartitionState(TopicPartition partition, long offset) { this.topicPartition = partition; this.consumedOffset = offset; @@ -1073,4 +1155,315 @@ public void close() throws IOException { Closeables.close(consumer, true); } } + + //////////////////////// Sink Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\ + + /** + * A {@link PTransform} to write to a Kafka topic. See {@link KafkaIO} for more + * information on usage and configuration. + */ + public static class Write extends TypedWrite { + + /** + * Returns a new {@link Write} transform with Kafka producer pointing to + * {@code bootstrapServers}. + */ + public Write withBootstrapServers(String bootstrapServers) { + return updateProducerProperties( + ImmutableMap.of( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)); + } + + /** + * Returns a new {@link Write} transform that write to given topic. + */ + public Write withTopic(String topic) { + return new Write(topic, keyCoder, valueCoder, producerConfig); + } + + /** + * Returns a new {@link Write} with {@link Coder} for serializing key (if any) to bytes. + * A key is optional while writing to Kafka. Note when a key is set, its hash is used to + * determine partition in Kafka (see {@link ProducerRecord} for more details). + */ + public Write withKeyCoder(Coder keyCoder) { + return new Write(topic, keyCoder, valueCoder, producerConfig); + } + + /** + * Returns a new {@link Write} with {@link Coder} for serializing value to bytes. + */ + public Write withValueCoder(Coder valueCoder) { + return new Write(topic, keyCoder, valueCoder, producerConfig); + } + + public Write updateProducerProperties(Map configUpdates) { + Map config = updateKafkaProperties(producerConfig, + TypedWrite.IGNORED_PRODUCER_PROPERTIES, configUpdates); + return new Write(topic, keyCoder, valueCoder, config); + } + + private Write( + String topic, + Coder keyCoder, + Coder valueCoder, + Map producerConfig) { + super(topic, keyCoder, valueCoder, producerConfig, + Optional., Producer>>absent()); + } + } + + /** + * A {@link PTransform} to write to a Kafka topic. See {@link KafkaIO} for more + * information on usage and configuration. + */ + public static class TypedWrite extends PTransform>, PDone> { + + /** + * Returns a new {@link Write} with a custom function to create Kafka producer. Primarily used + * for tests. Default is {@link KafkaProducer} + */ + public TypedWrite withProducerFactoryFn( + SerializableFunction, Producer> producerFactoryFn) { + return new TypedWrite(topic, keyCoder, valueCoder, producerConfig, + Optional.of(producerFactoryFn)); + } + + /** + * Returns a new transform that writes just the values to Kafka. This is useful for writing + * collections of values rather thank {@link KV}s. + */ + @SuppressWarnings("unchecked") + public PTransform, PDone> values() { + return new KafkaValueWrite((TypedWrite) this); + // Any way to avoid casting here to TypedWrite? We can't create + // new TypedWrite without casting producerFactoryFn. + } + + @Override + public PDone apply(PCollection> input) { + input.apply(ParDo.of(new KafkaWriter( + topic, keyCoder, valueCoder, producerConfig, producerFactoryFnOpt))); + return PDone.in(input.getPipeline()); + } + + @Override + public void validate(PCollection> input) { + checkNotNull(producerConfig.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), + "Kafka bootstrap servers should be set"); + checkNotNull(topic, "Kafka topic should be set"); + } + + ////////////////////////////////////////////////////////////////////////////////////////// + + protected final String topic; + protected final Coder keyCoder; + protected final Coder valueCoder; + protected final Optional, Producer>> + producerFactoryFnOpt; + protected final Map producerConfig; + + protected TypedWrite( + String topic, + Coder keyCoder, + Coder valueCoder, + Map producerConfig, + Optional, Producer>> producerFactoryFnOpt) { + + this.topic = topic; + this.keyCoder = keyCoder; + this.valueCoder = valueCoder; + this.producerConfig = producerConfig; + this.producerFactoryFnOpt = producerFactoryFnOpt; + } + + // set config defaults + private static final Map DEFAULT_PRODUCER_PROPERTIES = + ImmutableMap.of( + ProducerConfig.RETRIES_CONFIG, 3, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class); + + /** + * A set of properties that are not required or don't make sense for our consumer. + */ + private static final Map IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Set keyCoder instead", + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Set valueCoder instead", + configForKeySerializer(), "Reserved for internal serializer", + configForValueSerializer(), "Reserved for internal serializer" + ); + } + + /** + * Same as Write without a Key. Null is used for key as it is the convention is Kafka + * when there is no key specified. Majority of Kafka writers don't specify a key. + */ + private static class KafkaValueWrite extends PTransform, PDone> { + + private final TypedWrite kvWriteTransform; + + private KafkaValueWrite(TypedWrite kvWriteTransform) { + this.kvWriteTransform = kvWriteTransform; + } + + @Override + public PDone apply(PCollection input) { + return input + .apply("Kafka values with default key", + ParDo.of(new DoFn>() { + @Override + public void processElement(ProcessContext ctx) throws Exception { + ctx.output(KV.of(null, ctx.element())); + } + })) + .setCoder(KvCoder.of(VoidCoder.of(), kvWriteTransform.valueCoder)) + .apply(kvWriteTransform); + } + } + + private static class KafkaWriter extends DoFn, Void> { + + @Override + public void startBundle(Context c) throws Exception { + // Producer initialization is fairly costly. Move this to future initialization api to avoid + // creating a producer for each bundle. + if (producer == null) { + if (producerFactoryFnOpt.isPresent()) { + producer = producerFactoryFnOpt.get().apply(producerConfig); + } else { + producer = new KafkaProducer(producerConfig); + } + } + } + + @Override + public void processElement(ProcessContext ctx) throws Exception { + checkForFailures(); + + KV kv = ctx.element(); + producer.send( + new ProducerRecord(topic, kv.getKey(), kv.getValue()), + new SendCallback()); + } + + @Override + public void finishBundle(Context c) throws Exception { + producer.flush(); + producer.close(); + producer = null; + checkForFailures(); + } + + /////////////////////////////////////////////////////////////////////////////////// + + private final String topic; + private final Map producerConfig; + private final Optional, Producer>> + producerFactoryFnOpt; + + private transient Producer producer = null; + //private transient Callback sendCallback = new SendCallback(); + // first exception and number of failures since last invocation of checkForFailures(): + private transient Exception sendException = null; + private transient long numSendFailures = 0; + + KafkaWriter(String topic, + Coder keyCoder, + Coder valueCoder, + Map producerConfig, + Optional, Producer>> producerFactoryFnOpt) { + + this.topic = topic; + this.producerFactoryFnOpt = producerFactoryFnOpt; + + // Set custom kafka serializers. We can not serialize user objects then pass the bytes to + // producer. The key and value objects are used in kafka Partitioner interface. + // This does not matter for default partitioner in Kafka as it uses just the serialized + // key bytes to pick a partition. But are making sure user's custom partitioner would work + // as expected. + + this.producerConfig = new HashMap<>(producerConfig); + this.producerConfig.put(configForKeySerializer(), keyCoder); + this.producerConfig.put(configForValueSerializer(), valueCoder); + } + + private synchronized void checkForFailures() throws IOException { + if (numSendFailures == 0) { + return; + } + + String msg = String.format( + "KafkaWriter : failed to send %d records (since last report)", numSendFailures); + + Exception e = sendException; + sendException = null; + numSendFailures = 0; + + LOG.warn(msg); + throw new IOException(msg, e); + } + + private class SendCallback implements Callback { + @Override + public void onCompletion(RecordMetadata metadata, Exception exception) { + if (exception == null) { + return; + } + + synchronized (KafkaWriter.this) { + if (sendException == null) { + sendException = exception; + } + numSendFailures++; + } + // don't log exception stacktrace here, exception will be propagated up. + LOG.warn("KafkaWriter send failed : '{}'", exception.getMessage()); + } + } + } + + /** + * Implements Kafka's {@link Serializer} with a {@link Coder}. The coder is stored as serialized + * value in producer configuration map. + */ + public static class CoderBasedKafkaSerializer implements Serializer { + + @SuppressWarnings("unchecked") + @Override + public void configure(Map configs, boolean isKey) { + String configKey = isKey ? configForKeySerializer() : configForValueSerializer(); + coder = (Coder) configs.get(configKey); + checkNotNull(coder, "could not instantiate coder for Kafka serialization"); + } + + @Override + public byte[] serialize(String topic, @Nullable T data) { + if (data == null) { + return null; // common for keys to be null + } + + try { + return CoderUtils.encodeToByteArray(coder, data); + } catch (CoderException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + } + + private Coder coder = null; + private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.serializer"; + } + + + private static String configForKeySerializer() { + return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "key"); + } + + private static String configForValueSerializer() { + return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "value"); + } } diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java index 957271e291ff3..7d4337d2def95 100644 --- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java +++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java @@ -18,8 +18,12 @@ package org.apache.beam.sdk.io.kafka; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.Pipeline.PipelineExecutionException; +import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.BigEndianLongCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.UnboundedSource; @@ -49,20 +53,31 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.annotation.Nullable; @@ -79,6 +94,9 @@ public class KafkaIOTest { * - test KafkaRecordCoder */ + @Rule + public ExpectedException thrown = ExpectedException.none(); + // Update mock consumer with records distributed among the given topics, each with given number // of partitions. Records are assigned in round-robin order among the partitions. private static MockConsumer mkMockConsumer( @@ -113,8 +131,8 @@ private static MockConsumer mkMockConsumer( tp.topic(), tp.partition(), offsets[pIdx]++, - null, // key - ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id. + ByteBuffer.wrap(new byte[8]).putInt(i).array(), // key is 4 byte record id + ByteBuffer.wrap(new byte[8]).putLong(i).array())); // value is 8 byte record id } MockConsumer consumer = @@ -161,16 +179,17 @@ public Consumer apply(Map config) { * Creates a consumer with two topics, with 5 partitions each. * numElements are (round-robin) assigned all the 10 partitions. */ - private static KafkaIO.TypedRead mkKafkaReadTransform( + private static KafkaIO.TypedRead mkKafkaReadTransform( int numElements, - @Nullable SerializableFunction, Instant> timestampFn) { + @Nullable SerializableFunction, Instant> timestampFn) { List topics = ImmutableList.of("topic_a", "topic_b"); - KafkaIO.Read reader = KafkaIO.read() + KafkaIO.Read reader = KafkaIO.read() .withBootstrapServers("none") .withTopics(topics) .withConsumerFactoryFn(new ConsumerFactoryFn(topics, 10, numElements)) // 20 partitions + .withKeyCoder(BigEndianIntegerCoder.of()) .withValueCoder(BigEndianLongCoder.of()) .withMaxNumRecords(numElements); @@ -305,9 +324,9 @@ public void testUnboundedSourceSplits() throws Exception { int numElements = 1000; int numSplits = 10; - UnboundedSource, ?> initial = + UnboundedSource, ?> initial = mkKafkaReadTransform(numElements, null).makeSource(); - List, ?>> splits = + List, ?>> splits = initial.generateInitialSplits(numSplits, p.getOptions()); assertEquals("Expected exact splitting", numSplits, splits.size()); @@ -317,7 +336,7 @@ public void testUnboundedSourceSplits() throws Exception { for (int i = 0; i < splits.size(); ++i) { pcollections = pcollections.and( p.apply("split" + i, Read.from(splits.get(i)).withMaxNumRecords(elementsPerSplit)) - .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata())) + .apply("Remove Metadata " + i, ParDo.of(new RemoveKafkaMetadata())) .apply("collection " + i, Values.create())); } PCollection input = pcollections.apply(Flatten.pCollections()); @@ -330,9 +349,9 @@ public void testUnboundedSourceSplits() throws Exception { * A timestamp function that uses the given value as the timestamp. */ private static class ValueAsTimestampFn - implements SerializableFunction, Instant> { + implements SerializableFunction, Instant> { @Override - public Instant apply(KV input) { + public Instant apply(KV input) { return new Instant(input.getValue()); } } @@ -352,13 +371,13 @@ public void testUnboundedSourceCheckpointMark() throws Exception { int numElements = 85; // 85 to make sure some partitions have more records than other. // create a single split: - UnboundedSource, KafkaCheckpointMark> source = + UnboundedSource, KafkaCheckpointMark> source = mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) .makeSource() .generateInitialSplits(1, PipelineOptionsFactory.fromArgs(new String[0]).create()) .get(0); - UnboundedReader> reader = source.createReader(null, null); + UnboundedReader> reader = source.createReader(null, null); final int numToSkip = 3; // advance numToSkip elements @@ -394,4 +413,261 @@ public void testUnboundedSourceCheckpointMark() throws Exception { } } } + + @Test + public void testSink() throws Exception { + // Simply read from kafka source and write to kafka sink. Then verify the records + // are correctly published to mock kafka producer. + + int numElements = 1000; + + synchronized (MOCK_PRODUCER_LOCK) { + + MOCK_PRODUCER.clear(); + + ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start(); + + Pipeline pipeline = TestPipeline.create(); + String topic = "test"; + + pipeline + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) + .withoutMetadata()) + .apply(KafkaIO.write() + .withBootstrapServers("none") + .withTopic(topic) + .withKeyCoder(BigEndianIntegerCoder.of()) + .withValueCoder(BigEndianLongCoder.of()) + .withProducerFactoryFn(new ProducerFactoryFn())); + + pipeline.run(); + + completionThread.shutdown(); + + verifyProducerRecords(topic, numElements, false); + } + } + + @Test + public void testValuesSink() throws Exception { + // similar to testSink(), but use values()' interface. + + int numElements = 1000; + + synchronized (MOCK_PRODUCER_LOCK) { + + MOCK_PRODUCER.clear(); + + ProducerSendCompletionThread completionThread = new ProducerSendCompletionThread().start(); + + Pipeline pipeline = TestPipeline.create(); + String topic = "test"; + + pipeline + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) + .withoutMetadata()) + .apply(Values.create()) // there are no keys + .apply(KafkaIO.write() + .withBootstrapServers("none") + .withTopic(topic) + .withKeyCoder(BigEndianIntegerCoder.of()) + .withValueCoder(BigEndianLongCoder.of()) + .withProducerFactoryFn(new ProducerFactoryFn()) + .values()); + + pipeline.run(); + + completionThread.shutdown(); + + verifyProducerRecords(topic, numElements, true); + } + } + + @Test + public void testSinkWithSendErrors() throws Throwable { + // similar to testSink(), except that up to 10 of the send calls to producer will fail + // asynchronously. + + // TODO: Ideally we want the pipeline to run to completion by retrying bundles that fail. + // We limit the number of errors injected to 10 below. This would reflect a real streaming + // pipeline. But I am sure how to achieve that. For now expect an exception: + + thrown.expect(InjectedErrorException.class); + thrown.expectMessage("Injected Error #1"); + + int numElements = 1000; + + synchronized (MOCK_PRODUCER_LOCK) { + + MOCK_PRODUCER.clear(); + + Pipeline pipeline = TestPipeline.create(); + String topic = "test"; + + ProducerSendCompletionThread completionThreadWithErrors = + new ProducerSendCompletionThread(10, 100).start(); + + pipeline + .apply(mkKafkaReadTransform(numElements, new ValueAsTimestampFn()) + .withoutMetadata()) + .apply(KafkaIO.write() + .withBootstrapServers("none") + .withTopic(topic) + .withKeyCoder(BigEndianIntegerCoder.of()) + .withValueCoder(BigEndianLongCoder.of()) + .withProducerFactoryFn(new ProducerFactoryFn())); + + try { + pipeline.run(); + } catch (PipelineExecutionException e) { + // throwing inner exception helps assert that first exception is thrown from the Sink + throw e.getCause().getCause(); + } finally { + completionThreadWithErrors.shutdown(); + } + } + } + + private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent) { + + // verify that appropriate messages are written to kafka + List> sent = MOCK_PRODUCER.history(); + + // sort by values + Collections.sort(sent, new Comparator>() { + @Override + public int compare(ProducerRecord o1, ProducerRecord o2) { + return Long.compare(o1.value(), o2.value()); + } + }); + + for (int i = 0; i < numElements; i++) { + ProducerRecord record = sent.get(i); + assertEquals(topic, record.topic()); + if (keyIsAbsent) { + assertNull(record.key()); + } else { + assertEquals(i, record.key().intValue()); + } + assertEquals(i, record.value().longValue()); + } + } + + /** + * Singleton MockProudcer. Using a singleton here since we need access to the object to fetch + * the actual records published to the producer. This prohibits running the tests using + * the producer in parallel, but there are only one or two tests. + */ + private static final MockProducer MOCK_PRODUCER = + new MockProducer( + false, // disable synchronous completion of send. see ProducerSendCompletionThread below. + new KafkaIO.CoderBasedKafkaSerializer(), + new KafkaIO.CoderBasedKafkaSerializer()) { + + // override flush() so that it does not complete all the waiting sends, giving a chance to + // ProducerCompletionThread to inject errors. + + @Override + public void flush() { + while (completeNext()) { + // there are some uncompleted records. let the completion thread handle them. + try { + Thread.sleep(10); + } catch (InterruptedException e) { + } + } + } + }; + + // use a separate object serialize tests using MOCK_PRODUCER so that we don't interfere + // with Kafka MockProducer locking itself. + private static final Object MOCK_PRODUCER_LOCK = new Object(); + + private static class ProducerFactoryFn + implements SerializableFunction, Producer> { + + @Override + public Producer apply(Map config) { + return MOCK_PRODUCER; + } + } + + private static class InjectedErrorException extends RuntimeException { + public InjectedErrorException(String message) { + super(message); + } + } + + /** + * We start MockProducer with auto-completion disabled. That implies a record is not marked sent + * until #completeNext() is called on it. This class starts a thread to asynchronously 'complete' + * the the sends. During completion, we can also make those requests fail. This error injection + * is used in one of the tests. + */ + private static class ProducerSendCompletionThread { + + private final int maxErrors; + private final int errorFrequency; + private final AtomicBoolean done = new AtomicBoolean(false); + private final ExecutorService injectorThread; + private int numCompletions = 0; + + ProducerSendCompletionThread() { + // complete everything successfully + this(0, 0); + } + + ProducerSendCompletionThread(final int maxErrors, final int errorFrequency) { + this.maxErrors = maxErrors; + this.errorFrequency = errorFrequency; + injectorThread = Executors.newSingleThreadExecutor(); + } + + ProducerSendCompletionThread start() { + injectorThread.submit(new Runnable() { + @Override + public void run() { + int errorsInjected = 0; + + while (!done.get()) { + boolean successful; + + if (errorsInjected < maxErrors && ((numCompletions + 1) % errorFrequency) == 0) { + successful = MOCK_PRODUCER.errorNext( + new InjectedErrorException("Injected Error #" + (errorsInjected + 1))); + + if (successful) { + errorsInjected++; + } + } else { + successful = MOCK_PRODUCER.completeNext(); + } + + if (successful) { + numCompletions++; + } else { + // wait a bit since there are no unsent records + try { + Thread.sleep(1); + } catch (InterruptedException e) { + } + } + } + } + }); + + return this; + } + + void shutdown() { + done.set(true); + injectorThread.shutdown(); + try { + assertTrue(injectorThread.awaitTermination(10, TimeUnit.SECONDS)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } }