Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6207] Added option to publish synthetic data to Kafka topic. #7612

Merged
merged 2 commits into from
Jan 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions sdks/java/testing/load-tests/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@ configurations {
}

dependencies {
shadow library.java.kafka_clients

shadow project(path: ":beam-sdks-java-core", configuration: "shadow")
shadow project(path: ":beam-runners-direct-java", configuration: "shadow")
shadow project(path: ":beam-sdks-java-io-synthetic", configuration: "shadow")
shadow project(path: ":beam-sdks-java-test-utils", configuration: "shadow")
shadow project(path: ":beam-sdks-java-io-google-cloud-platform", configuration: "shadow")
shadow project(path: ":beam-sdks-java-io-kafka", configuration: "shadow")

gradleRun project(path: project.path, configuration: "shadow")
gradleRun project(path: runnerDependency, configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
Expand All @@ -30,6 +31,7 @@
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.synthetic.SyntheticBoundedSource;
import org.apache.beam.sdk.io.synthetic.SyntheticOptions;
import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
Expand All @@ -41,6 +43,8 @@
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringSerializer;

/**
* Pipeline that generates synthetic data and publishes it in PubSub topic.
Expand All @@ -49,16 +53,23 @@
*
* <pre>
* ./gradlew :beam-sdks-java-load-tests:run -PloadTest.args='
* --insertionPipelineTopic=TOPIC_NAME
* --pubSubTopic=TOPIC_NAME
* --kafkaBootstrapServerAddress=SERVER_ADDRESS
* --kafkaTopic=KAFKA_TOPIC_NAME
* --sourceOptions={"numRecords":1000,...}'
* -PloadTest.mainClass="org.apache.beam.sdk.loadtests.SyntheticDataPubSubPublisher"
* -PloadTest.mainClass="org.apache.beam.sdk.loadtests.SyntheticDataPublisher"
* </pre>
*
mwalenia marked this conversation as resolved.
Show resolved Hide resolved
* <p>If parameters related to a specific sink are provided (Kafka or PubSub), the pipeline writes
* to the sink. Writing to both sinks is also acceptable.
*/
public class SyntheticDataPubSubPublisher {
public class SyntheticDataPublisher {

private static final KvCoder<byte[], byte[]> RECORD_CODER =
KvCoder.of(ByteArrayCoder.of(), ByteArrayCoder.of());

private static Options options;

/** Options for the pipeline. */
public interface Options extends PipelineOptions, ApplicationNameOptions {

Expand All @@ -69,26 +80,64 @@ public interface Options extends PipelineOptions, ApplicationNameOptions {
void setSourceOptions(String sourceOptions);

@Description("PubSub topic to publish to")
@Validation.Required
String getInsertionPipelineTopic();
String getPubSubTopic();

void setPubSubTopic(String topic);

@Description("Kafka server address")
String getKafkaBootstrapServerAddress();

void setInsertionPipelineTopic(String topic);
void setKafkaBootstrapServerAddress(String address);

@Description("Kafka topic")
String getKafkaTopic();

void setKafkaTopic(String topic);
}

public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

SyntheticSourceOptions sourceOptions =
SyntheticOptions.fromJsonString(options.getSourceOptions(), SyntheticSourceOptions.class);

Pipeline pipeline = Pipeline.create(options);
PCollection<KV<byte[], byte[]>> syntheticData =
mwalenia marked this conversation as resolved.
Show resolved Hide resolved
pipeline.apply("Read synthetic data", Read.from(new SyntheticBoundedSource(sourceOptions)));

if (options.getKafkaBootstrapServerAddress() != null && options.getKafkaTopic() != null) {
writeToKafka(syntheticData);
}
if (options.getPubSubTopic() != null) {
writeToPubSub(syntheticData);
}
pipeline.run().waitUntilFinish();
}

pipeline
.apply("Read synthetic data", Read.from(new SyntheticBoundedSource(sourceOptions)))
private static void writeToPubSub(PCollection<KV<byte[], byte[]>> collection) {
collection
.apply("Map to PubSub messages", MapElements.via(new MapBytesToPubSubMessage()))
.apply("Write to PubSub", PubsubIO.writeMessages().to(options.getInsertionPipelineTopic()));
.apply("Write to PubSub", PubsubIO.writeMessages().to(options.getPubSubTopic()));
}

pipeline.run().waitUntilFinish();
private static void writeToKafka(PCollection<KV<byte[], byte[]>> collection) {
collection
.apply("Map to Kafka messages", MapElements.via(new MapKVToString()))
.apply(
"Write to Kafka",
KafkaIO.<Void, String>write()
.withBootstrapServers(options.getKafkaBootstrapServerAddress())
.withTopic(options.getKafkaTopic())
.withValueSerializer(StringSerializer.class)
.values());
}

private static class MapKVToString extends SimpleFunction<KV<byte[], byte[]>, String> {
@Override
public String apply(KV<byte[], byte[]> input) {
return String.format(
"{%s,%s}", Arrays.toString(input.getKey()), Arrays.toString(input.getValue()));
}
}

private static class MapBytesToPubSubMessage
Expand Down