Skip to content

Commit

Permalink
Chain applies together
Browse files Browse the repository at this point in the history
  • Loading branch information
sarahcaseybot committed Oct 14, 2022
1 parent 29557d0 commit 458040e
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 16 deletions.
24 changes: 11 additions & 13 deletions core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,10 @@ private static KV<String, String> parseRow(Object[] row) {
static void saveToSql(
PCollection<KV<DomainNameInfo, ThreatMatch>> threatMatches, Spec11PipelineOptions options) {
LocalDate date = LocalDate.parse(options.getDate(), ISODateTimeFormat.date());
String transformId = "Spec11 Threat Matches";

PCollection<Spec11ThreatMatch> spec11ThreatMatches =
threatMatches.apply(
threatMatches
.apply(
"Construct objects",
ParDo.of(
new DoFn<KV<DomainNameInfo, ThreatMatch>, Spec11ThreatMatch>() {
Expand All @@ -179,17 +180,14 @@ public void processElement(
.build();
output.output(spec11ThreatMatch);
}
}));

spec11ThreatMatches.apply("Prevent Fusing", Reshuffle.viaRandomKey());
String transformId = "Spec11 Threat Matches";

spec11ThreatMatches.apply(
"Write to Sql: " + transformId,
RegistryJpaIO.<Spec11ThreatMatch>write()
.withName(transformId)
.withBatchSize(options.getSqlWriteBatchSize())
.withShards(options.getSqlWriteShards()));
}))
.apply("Prevent Fusing", Reshuffle.viaRandomKey())
.apply(
"Write to Sql: " + transformId,
RegistryJpaIO.<Spec11ThreatMatch>write()
.withName(transformId)
.withBatchSize(options.getSqlWriteBatchSize())
.withShards(options.getSqlWriteShards()));
}

static void saveToGcs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,10 +288,10 @@ private void verifySaveToCloudSql() {
jpaTm()
.transact(
() -> {
ImmutableList<Spec11ThreatMatch> sqlThreatMatches =
ImmutableList<Spec11ThreatMatch> spec11ThreatMatches =
Spec11ThreatMatchDao.loadEntriesByDate(jpaTm(), new LocalDate(2020, 1, 27));
assertThat(sqlThreatMatches)
.comparingElementsUsing(immutableObjectCorrespondence())
assertThat(spec11ThreatMatches)
.comparingElementsUsing(immutableObjectCorrespondence("id"))
.containsExactlyElementsIn(sqlThreatMatches);
});
}
Expand Down

0 comments on commit 458040e

Please sign in to comment.