diff --git a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java index d0788cad2ae1..e1868e2c8461 100644 --- a/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java +++ b/sdks/java/io/mqtt/src/main/java/org/apache/beam/sdk/io/mqtt/MqttIO.java @@ -610,27 +610,22 @@ public void populateDisplayData(DisplayData.Builder builder) { @Override public PDone expand(PCollection input) { checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null"); - final SerializableFunction topicFn; if (dynamic()) { checkArgument( connectionConfiguration().getTopic() == null, "DynamicWrite can not have static topic"); - topicFn = topicFn(); + checkArgument(topicFn() != null, "topicFn can not be null"); } else { checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null"); - final String topic = connectionConfiguration().getTopic(); - topicFn = ignore -> topic; } - - checkArgument(topicFn != null, "topicFn can not be null"); checkArgument(payloadFn() != null, "payloadFn can not be null"); - input.apply( - ParDo.of(new WriteFn<>(connectionConfiguration(), topicFn, payloadFn(), retained()))); + input.apply(ParDo.of(new WriteFn<>(this))); return PDone.in(input.getPipeline()); } private static class WriteFn extends DoFn { - private final ConnectionConfiguration connectionConfiguration; + + private final Write spec; private final SerializableFunction topicFn; private final SerializableFunction payloadFn; private final boolean retained; @@ -638,21 +633,22 @@ private static class WriteFn extends DoFn { private transient MQTT client; private transient BlockingConnection connection; - public WriteFn( - ConnectionConfiguration connectionConfiguration, - SerializableFunction topicFn, - SerializableFunction payloadFn, - boolean retained) { - this.connectionConfiguration = connectionConfiguration; - this.topicFn = topicFn; - this.payloadFn = payloadFn; - this.retained = retained; + public WriteFn(Write spec) { + this.spec = spec; + if (spec.dynamic()) { + this.topicFn = spec.topicFn(); + } else { + String topic = spec.connectionConfiguration().getTopic(); + this.topicFn = ignore -> topic; + } + this.payloadFn = spec.payloadFn(); + this.retained = spec.retained(); } @Setup public void createMqttClient() throws Exception { LOG.debug("Starting MQTT writer"); - this.client = this.connectionConfiguration.createClient(); + this.client = this.spec.connectionConfiguration().createClient(); LOG.debug("MQTT writer client ID is {}", client.getClientId()); this.connection = createConnection(client); }