Skip to content

Commit

Permalink
[BEAM-2852] Fix minor code style issues
Browse files Browse the repository at this point in the history
  • Loading branch information
aromanenko-dev committed Apr 6, 2018
1 parent 8f7d724 commit ca3436c
Showing 1 changed file with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -820,18 +820,17 @@ public void processElement(ProcessContext c) {
private PCollection<Event> sourceEventsFromKafka(Pipeline p) {
NexmarkUtils.console("Reading events from Kafka Topic %s", options.getKafkaSourceTopic());

if (Strings.isNullOrEmpty(options.getBootstrapServers())) {
throw new RuntimeException("Missing --bootstrapServers");
}
checkArgument(!Strings.isNullOrEmpty(options.getBootstrapServers()),
"Missing --bootstrapServers");

KafkaIO.Read<Long, byte[]> io = KafkaIO.<Long, byte[]>read()
KafkaIO.Read<Long, byte[]> read = KafkaIO.<Long, byte[]>read()
.withBootstrapServers(options.getBootstrapServers())
.withTopic(options.getKafkaSourceTopic())
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(ByteArrayDeserializer.class);

return p
.apply(queryName + ".ReadKafkaEvents", io.withoutMetadata())
.apply(queryName + ".ReadKafkaEvents", read.withoutMetadata())
.apply(queryName + ".KafkaToEvents", ParDo.of(BYTEARRAY_TO_EVENT));
}

Expand Down

0 comments on commit ca3436c

Please sign in to comment.