Skip to content

Commit

Permalink
change WriteFn to use Write spec
Browse files Browse the repository at this point in the history
  • Loading branch information
twosom committed Sep 27, 2024
1 parent b785bd7 commit 1d91ec0
Showing 1 changed file with 15 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -610,49 +610,45 @@ public void populateDisplayData(DisplayData.Builder builder) {
@Override
public PDone expand(PCollection<InputT> input) {
checkArgument(connectionConfiguration() != null, "connectionConfiguration can not be null");
final SerializableFunction<InputT, String> topicFn;
if (dynamic()) {
checkArgument(
connectionConfiguration().getTopic() == null, "DynamicWrite can not have static topic");
topicFn = topicFn();
checkArgument(topicFn() != null, "topicFn can not be null");
} else {
checkArgument(connectionConfiguration().getTopic() != null, "topic can not be null");
final String topic = connectionConfiguration().getTopic();
topicFn = ignore -> topic;
}

checkArgument(topicFn != null, "topicFn can not be null");
checkArgument(payloadFn() != null, "payloadFn can not be null");

input.apply(
ParDo.of(new WriteFn<>(connectionConfiguration(), topicFn, payloadFn(), retained())));
input.apply(ParDo.of(new WriteFn<>(this)));
return PDone.in(input.getPipeline());
}

private static class WriteFn<InputT> extends DoFn<InputT, Void> {
private final ConnectionConfiguration connectionConfiguration;

private final Write<InputT> spec;
private final SerializableFunction<InputT, String> topicFn;
private final SerializableFunction<InputT, byte[]> payloadFn;
private final boolean retained;

private transient MQTT client;
private transient BlockingConnection connection;

public WriteFn(
ConnectionConfiguration connectionConfiguration,
SerializableFunction<InputT, String> topicFn,
SerializableFunction<InputT, byte[]> payloadFn,
boolean retained) {
this.connectionConfiguration = connectionConfiguration;
this.topicFn = topicFn;
this.payloadFn = payloadFn;
this.retained = retained;
public WriteFn(Write<InputT> spec) {
this.spec = spec;
if (spec.dynamic()) {
this.topicFn = spec.topicFn();
} else {
String topic = spec.connectionConfiguration().getTopic();
this.topicFn = ignore -> topic;
}
this.payloadFn = spec.payloadFn();
this.retained = spec.retained();
}

@Setup
public void createMqttClient() throws Exception {
LOG.debug("Starting MQTT writer");
this.client = this.connectionConfiguration.createClient();
this.client = this.spec.connectionConfiguration().createClient();
LOG.debug("MQTT writer client ID is {}", client.getClientId());
this.connection = createConnection(client);
}
Expand Down

0 comments on commit 1d91ec0

Please sign in to comment.