diff --git a/sdks/java/testing/load-tests/build.gradle b/sdks/java/testing/load-tests/build.gradle index 50ee111d41ad1..a2f5d719d0385 100644 --- a/sdks/java/testing/load-tests/build.gradle +++ b/sdks/java/testing/load-tests/build.gradle @@ -55,11 +55,14 @@ configurations { } dependencies { + shadow library.java.kafka_clients + shadow project(path: ":beam-sdks-java-core", configuration: "shadow") shadow project(path: ":beam-runners-direct-java", configuration: "shadow") shadow project(path: ":beam-sdks-java-io-synthetic", configuration: "shadow") shadow project(path: ":beam-sdks-java-test-utils", configuration: "shadow") shadow project(path: ":beam-sdks-java-io-google-cloud-platform", configuration: "shadow") + shadow project(path: ":beam-sdks-java-io-kafka", configuration: "shadow") gradleRun project(path: project.path, configuration: "shadow") gradleRun project(path: runnerDependency, configuration: "shadow") diff --git a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPubSubPublisher.java b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPublisher.java similarity index 64% rename from sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPubSubPublisher.java rename to sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPublisher.java index 99e1b7c9cf480..49298335ea563 100644 --- a/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPubSubPublisher.java +++ b/sdks/java/testing/load-tests/src/main/java/org/apache/beam/sdk/loadtests/SyntheticDataPublisher.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import org.apache.beam.sdk.Pipeline; @@ -30,6 +31,7 @@ import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.kafka.KafkaIO; import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource; import org.apache.beam.sdk.io.synthetic.SyntheticOptions; import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions; @@ -41,6 +43,8 @@ import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.kafka.common.serialization.StringSerializer; /** * Pipeline that generates synthetic data and publishes it in PubSub topic. @@ -49,16 +53,23 @@ * *
  *  ./gradlew :beam-sdks-java-load-tests:run -PloadTest.args='
- *    --insertionPipelineTopic=TOPIC_NAME
+ *    --pubSubTopic=TOPIC_NAME
+ *    --kafkaBootstrapServerAddress=SERVER_ADDRESS
+ *    --kafkaTopic=KAFKA_TOPIC_NAME
  *    --sourceOptions={"numRecords":1000,...}'
- *    -PloadTest.mainClass="org.apache.beam.sdk.loadtests.SyntheticDataPubSubPublisher"
+ *    -PloadTest.mainClass="org.apache.beam.sdk.loadtests.SyntheticDataPublisher"
  *  
+ * + *

If parameters related to a specific sink are provided (Kafka or PubSub), the pipeline writes + * to the sink. Writing to both sinks is also acceptable. */ -public class SyntheticDataPubSubPublisher { +public class SyntheticDataPublisher { private static final KvCoder RECORD_CODER = KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of()); + private static Options options; + /** Options for the pipeline. */ public interface Options extends PipelineOptions, ApplicationNameOptions { @@ -69,26 +80,64 @@ public interface Options extends PipelineOptions, ApplicationNameOptions { void setSourceOptions(String sourceOptions); @Description("PubSub topic to publish to") - @Validation.Required - String getInsertionPipelineTopic(); + String getPubSubTopic(); + + void setPubSubTopic(String topic); + + @Description("Kafka server address") + String getKafkaBootstrapServerAddress(); - void setInsertionPipelineTopic(String topic); + void setKafkaBootstrapServerAddress(String address); + + @Description("Kafka topic") + String getKafkaTopic(); + + void setKafkaTopic(String topic); } public static void main(String[] args) throws IOException { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); + options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); SyntheticSourceOptions sourceOptions = SyntheticOptions.fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class); Pipeline pipeline = Pipeline.create(options); + PCollection> syntheticData = + pipeline.apply("Read synthetic data", Read.from(new SyntheticBoundedSource(sourceOptions))); + + if (options.getKafkaBootstrapServerAddress() != null && options.getKafkaTopic() != null) { + writeToKafka(syntheticData); + } + if (options.getPubSubTopic() != null) { + writeToPubSub(syntheticData); + } + pipeline.run().waitUntilFinish(); + } - pipeline - .apply("Read synthetic data", Read.from(new SyntheticBoundedSource(sourceOptions))) + private static void writeToPubSub(PCollection> collection) { + collection .apply("Map to PubSub messages", MapElements.via(new MapBytesToPubSubMessage())) - .apply("Write to PubSub", PubsubIO.writeMessages().to(options.getInsertionPipelineTopic())); + .apply("Write to PubSub", PubsubIO.writeMessages().to(options.getPubSubTopic())); + } - pipeline.run().waitUntilFinish(); + private static void writeToKafka(PCollection> collection) { + collection + .apply("Map to Kafka messages", MapElements.via(new MapKVToString())) + .apply( + "Write to Kafka", + KafkaIO.write() + .withBootstrapServers(options.getKafkaBootstrapServerAddress()) + .withTopic(options.getKafkaTopic()) + .withValueSerializer(StringSerializer.class) + .values()); + } + + private static class MapKVToString extends SimpleFunction, String> { + @Override + public String apply(KV input) { + return String.format( + "{%s,%s}", Arrays.toString(input.getKey()), Arrays.toString(input.getValue())); + } } private static class MapBytesToPubSubMessage