Skip to content

Commit

Permalink
Prevent saving duplicate rows in spec11 pipeline (#1810)
Browse files Browse the repository at this point in the history
* Prevent saving duplicate rows in spec11 pipeline

* Chain applies together
  • Loading branch information
sarahcaseybot authored Dec 15, 2022
1 parent 2292bfc commit 4ede5f0
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 21 deletions.
49 changes: 31 additions & 18 deletions core/src/main/java/google/registry/beam/spec11/Spec11Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import google.registry.beam.common.RegistryJpaIO.Read;
import google.registry.beam.spec11.SafeBrowsingTransforms.EvaluateSafeBrowsingFn;
import google.registry.config.RegistryConfig.ConfigModule;
import google.registry.model.IdService;
import google.registry.model.domain.Domain;
import google.registry.model.reporting.Spec11ThreatMatch;
import google.registry.model.reporting.Spec11ThreatMatch.ThreatType;
Expand All @@ -45,6 +46,7 @@
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
Expand Down Expand Up @@ -154,25 +156,36 @@ private static KV<String, String> parseRow(Object[] row) {

static void saveToSql(
PCollection<KV<DomainNameInfo, ThreatMatch>> threatMatches, Spec11PipelineOptions options) {
String transformId = "Spec11 Threat Matches";
LocalDate date = LocalDate.parse(options.getDate(), ISODateTimeFormat.date());
threatMatches.apply(
"Write to Sql: " + transformId,
RegistryJpaIO.<KV<DomainNameInfo, ThreatMatch>>write()
.withName(transformId)
.withBatchSize(options.getSqlWriteBatchSize())
.withJpaConverter(
(kv) -> {
DomainNameInfo domainNameInfo = kv.getKey();
return new Spec11ThreatMatch.Builder()
.setThreatTypes(
ImmutableSet.of(ThreatType.valueOf(kv.getValue().threatType())))
.setCheckDate(date)
.setDomainName(domainNameInfo.domainName())
.setDomainRepoId(domainNameInfo.domainRepoId())
.setRegistrarId(domainNameInfo.registrarId())
.build();
}));
String transformId = "Spec11 Threat Matches";
threatMatches
.apply(
"Construct objects",
ParDo.of(
new DoFn<KV<DomainNameInfo, ThreatMatch>, Spec11ThreatMatch>() {
@ProcessElement
public void processElement(
@Element KV<DomainNameInfo, ThreatMatch> input,
OutputReceiver<Spec11ThreatMatch> output) {
Spec11ThreatMatch spec11ThreatMatch =
new Spec11ThreatMatch.Builder()
.setThreatTypes(
ImmutableSet.of(ThreatType.valueOf(input.getValue().threatType())))
.setCheckDate(date)
.setDomainName(input.getKey().domainName())
.setDomainRepoId(input.getKey().domainRepoId())
.setRegistrarId(input.getKey().registrarId())
.setId(IdService.allocateId())
.build();
output.output(spec11ThreatMatch);
}
}))
.apply("Prevent Fusing", Reshuffle.viaRandomKey())
.apply(
"Write to Sql: " + transformId,
RegistryJpaIO.<Spec11ThreatMatch>write()
.withName(transformId)
.withBatchSize(options.getSqlWriteBatchSize()));
}

static void saveToGcs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import google.registry.model.Buildable;
import google.registry.model.ImmutableObject;
import google.registry.util.DomainNameUtils;
import java.io.Serializable;
import java.util.Set;
import javax.persistence.Column;
import javax.persistence.Entity;
Expand All @@ -39,7 +40,7 @@
@Index(name = "spec11threatmatch_tld_idx", columnList = "tld"),
@Index(name = "spec11threatmatch_check_date_idx", columnList = "checkDate")
})
public class Spec11ThreatMatch extends ImmutableObject implements Buildable {
public class Spec11ThreatMatch extends ImmutableObject implements Buildable, Serializable {

/** The type of threat detected. */
public enum ThreatType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,9 @@ private void verifySaveToGcs() throws Exception {
private void verifySaveToCloudSql() {
tm().transact(
() -> {
ImmutableList<Spec11ThreatMatch> sqlThreatMatches =
ImmutableList<Spec11ThreatMatch> spec11ThreatMatches =
Spec11ThreatMatchDao.loadEntriesByDate(tm(), new LocalDate(2020, 1, 27));
assertThat(sqlThreatMatches)
assertThat(spec11ThreatMatches)
.comparingElementsUsing(immutableObjectCorrespondence("id"))
.containsExactlyElementsIn(sqlThreatMatches);
});
Expand Down

0 comments on commit 4ede5f0

Please sign in to comment.