Skip to content

Commit

Permalink
Remove support for redis deduplication (#1583)
Browse files Browse the repository at this point in the history
Co-authored-by: whd <whd@users.noreply.github.com>
  • Loading branch information
relud and whd authored Mar 11, 2021
1 parent 608089c commit c05fbab
Show file tree
Hide file tree
Showing 14 changed files with 16 additions and 608 deletions.
11 changes: 0 additions & 11 deletions ingestion-beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -124,11 +124,6 @@
<artifactId>geoip2</artifactId>
<version>2.15.0</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.5.1</version>
</dependency>
<!-- Following https://github.com/apache/beam/blob/v2.20.0/examples/java/build.gradle#L64 -->
<dependency>
<groupId>org.slf4j</groupId>
Expand All @@ -140,12 +135,6 @@
<artifactId>jose4j</artifactId>
<version>0.7.6</version>
</dependency>
<dependency>
<groupId>com.github.kstyrc</groupId>
<artifactId>embedded-redis</artifactId>
<version>0.6</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import com.mozilla.telemetry.decoder.AddMetadata;
import com.mozilla.telemetry.decoder.DecoderOptions;
import com.mozilla.telemetry.decoder.DecryptPioneerPayloads;
import com.mozilla.telemetry.decoder.Deduplicate;
import com.mozilla.telemetry.decoder.GeoCityLookup;
import com.mozilla.telemetry.decoder.GeoIspLookup;
import com.mozilla.telemetry.decoder.ParsePayload;
Expand Down Expand Up @@ -94,8 +93,6 @@ public static PipelineResult run(DecoderOptions.Parsed options) {
.apply(ParseUserAgent.of()) //
.apply(NormalizeAttributes.of()) //
.apply("AddMetadata", AddMetadata.of()).failuresTo(failureCollections) //
.apply(Deduplicate.removeDuplicates(options.getParsedRedisUri())) //
.sendDuplicateMetadataToErrors().failuresTo(failureCollections) //
.apply(options.getOutputType().write(options)).failuresTo(failureCollections));

// Error output
Expand Down
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
package com.mozilla.telemetry;

import com.mozilla.telemetry.decoder.Deduplicate;
import com.mozilla.telemetry.republisher.RandomSampler;
import com.mozilla.telemetry.republisher.RepublishPerChannel;
import com.mozilla.telemetry.republisher.RepublishPerDocType;
import com.mozilla.telemetry.republisher.RepublishPerNamespace;
import com.mozilla.telemetry.republisher.RepublisherOptions;
import com.mozilla.telemetry.transforms.PubsubConstraints;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;

public class Republisher extends Sink {

Expand Down Expand Up @@ -51,18 +46,11 @@ public static PipelineResult run(RepublisherOptions.Parsed options) {
options.setOutputPubsubCompression(StaticValueProvider.of(Compression.UNCOMPRESSED));

final Pipeline pipeline = Pipeline.create(options);
final List<PCollection<PubsubMessage>> failuresCollections = new ArrayList<>();

// Trailing comments are used below to prevent re-wrapping by google-java-format.
PCollection<PubsubMessage> decoded = pipeline //
.apply(options.getInputType().read(options));

// Mark messages as seen in Redis.
decoded //
.apply("MarkAsSeen", Deduplicate.markAsSeen(options.getParsedRedisUri(),
options.getDeduplicateExpireSeconds()))
.failuresTo(failuresCollections);

// Republish debug messages.
if (options.getEnableDebugDestination()) {
RepublisherOptions.Parsed opts = options.as(RepublisherOptions.Parsed.class);
Expand Down Expand Up @@ -103,12 +91,6 @@ public static PipelineResult run(RepublisherOptions.Parsed options) {
decoded.apply(RepublishPerChannel.of(options));
}

// Write error output collections.
PCollectionList.of(failuresCollections) //
.apply("FlattenFailureCollections", Flatten.pCollections()) //
.apply("WriteErrorOutput", options.getErrorOutputType().write(options)) //
.output();

return pipeline.run();
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package com.mozilla.telemetry.decoder;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.mozilla.telemetry.options.SinkOptions;
import java.net.URI;
import java.util.Optional;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Hidden;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;

/**
* Options supported by {@code Decoder}.
Expand Down Expand Up @@ -39,11 +35,6 @@ public interface DecoderOptions extends SinkOptions, PipelineOptions {

void setGeoIspDatabase(ValueProvider<String> value);

@Description("URI of a redis server that will be used for deduplication; leave null to disable")
ValueProvider<String> getRedisUri();

void setRedisUri(ValueProvider<String> value);

@Description("If set to true, enable decryption of Pioneer payloads.")
@Default.Boolean(false)
Boolean getPioneerEnabled();
Expand Down Expand Up @@ -107,11 +98,6 @@ public interface DecoderOptions extends SinkOptions, PipelineOptions {
*/
@Hidden
interface Parsed extends DecoderOptions, SinkOptions.Parsed {

@JsonIgnore
ValueProvider<URI> getParsedRedisUri();

void setParsedRedisUri(ValueProvider<URI> value);
}

/**
Expand All @@ -129,8 +115,6 @@ static Parsed parseDecoderOptions(DecoderOptions options) {
*/
static void enrichDecoderOptions(Parsed options) {
SinkOptions.enrichSinkOptions(options);
options.setParsedRedisUri(NestedValueProvider.of(options.getRedisUri(),
s -> Optional.ofNullable(s).map(URI::create).orElse(null)));
}

}
Loading

0 comments on commit c05fbab

Please sign in to comment.