diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java index c7791a266..611408fe0 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/config/SinkConfig.java @@ -3,6 +3,7 @@ import com.google.api.gax.batching.FlowControlSettings; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.storage.Blob; import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageOptions; import com.google.common.annotations.VisibleForTesting; @@ -12,7 +13,7 @@ import com.mozilla.telemetry.ingestion.sink.io.BigQuery.BigQueryErrors; import com.mozilla.telemetry.ingestion.sink.io.Gcs; import com.mozilla.telemetry.ingestion.sink.io.Pubsub; -import com.mozilla.telemetry.ingestion.sink.transform.BlobInfoToPubsubMessage; +import com.mozilla.telemetry.ingestion.sink.transform.BlobIdToPubsubMessage; import com.mozilla.telemetry.ingestion.sink.transform.CompressPayload; import com.mozilla.telemetry.ingestion.sink.transform.DecompressPayload; import com.mozilla.telemetry.ingestion.sink.transform.DocumentTypePredicate; @@ -165,13 +166,27 @@ Output getOutput(Env env) { @Override Output getOutput(Env env) { - return new Output(env, this, - new BigQuery.Load(getBigQueryService(env), getGcsService(env), - env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES), - env.getInt(LOAD_MAX_FILES, DEFAULT_LOAD_MAX_FILES), - env.getDuration(LOAD_MAX_DELAY, DEFAULT_LOAD_MAX_DELAY), DEFAULT_EXECUTOR, - // don't delete files until successfully loaded - BigQuery.Load.Delete.onSuccess).withOpenCensusMetrics()); + final Storage storage = getGcsService(env); + final Function> bigQueryLoad = new BigQuery.Load( + getBigQueryService(env), storage, env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES), + env.getInt(LOAD_MAX_FILES, DEFAULT_LOAD_MAX_FILES), + env.getDuration(LOAD_MAX_DELAY, DEFAULT_LOAD_MAX_DELAY), DEFAULT_EXECUTOR, + // don't delete files until successfully loaded + BigQuery.Load.Delete.onSuccess).withOpenCensusMetrics(); + return new Output(env, this, message -> { + // Messages may be delivered more than once, so check whether the blob has been deleted. + // The blob is never deleted in this mode unless it has already been successfully loaded + // to BigQuery. If the blob does not exist, it must have been deleted, because Cloud + // Storage provides strong global consistency for read-after-write operations. + // https://cloud.google.com/storage/docs/consistency + Blob blob = storage.get(BlobIdToPubsubMessage.decode(message)); + if (blob == null) { + // blob was deleted, so ack this message by returning a successfully completed future + // TODO measure the frequency of this + return CompletableFuture.completedFuture(null); + } + return bigQueryLoad.apply(blob); + }); } }, @@ -189,7 +204,7 @@ Output getOutput(Env env) { DEFAULT_EXECUTOR, getFormat(env), // BigQuery Load API limits maximum load requests per table per day to 1,000 so send // blobInfo to pubsub and require loads be run separately to reduce maximum latency - blobInfo -> pubsubWrite.apply(BlobInfoToPubsubMessage.apply(blobInfo))) + blobInfo -> pubsubWrite.apply(BlobIdToPubsubMessage.encode(blobInfo.getBlobId()))) .withOpenCensusMetrics()); } }, @@ -214,11 +229,13 @@ Output getOutput(Env env) { Output getOutput(Env env) { final com.google.cloud.bigquery.BigQuery bigQuery = getBigQueryService(env); final Storage storage = getGcsService(env); - final Function> bigQueryLoad; + final Function> bigQueryLoad; if (env.containsKey(OUTPUT_TOPIC)) { // BigQuery Load API limits maximum load requests per table per day to 1,000 so if // OUTPUT_TOPIC is present send blobInfo to pubsub and run load jobs separately - bigQueryLoad = pubsub.getOutput(env); + final Function> pubsubOutput = pubsub + .getOutput(env); + bigQueryLoad = blob -> pubsubOutput.apply(BlobIdToPubsubMessage.encode(blob.getBlobId())); } else { bigQueryLoad = new BigQuery.Load(bigQuery, storage, env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES), @@ -233,9 +250,7 @@ Output getOutput(Env env) { env.getInt(BATCH_MAX_MESSAGES, DEFAULT_BATCH_MAX_MESSAGES), env.getDuration(BATCH_MAX_DELAY, DEFAULT_BATCH_MAX_DELAY), PubsubMessageToTemplatedString.forBigQuery(getBigQueryOutputBucket(env)), - DEFAULT_EXECUTOR, getFormat(env), - blobInfo -> bigQueryLoad.apply(BlobInfoToPubsubMessage.apply(blobInfo))) - .withOpenCensusMetrics(); + DEFAULT_EXECUTOR, getFormat(env), bigQueryLoad).withOpenCensusMetrics(); // Like bigQueryStreaming, but use STREAMING_ prefix env vars for batch configuration Function> streamingOutput = new BigQuery.Write( bigQuery, env.getLong(STREAMING_BATCH_MAX_BYTES, DEFAULT_STREAMING_BATCH_MAX_BYTES), diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/BigQuery.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/BigQuery.java index 46e94978a..8f8eb2e45 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/BigQuery.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/BigQuery.java @@ -8,6 +8,7 @@ import com.google.cloud.bigquery.JobStatus; import com.google.cloud.bigquery.LoadJobConfiguration; import com.google.cloud.bigquery.TableId; +import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.Storage; import com.google.common.annotations.VisibleForTesting; @@ -16,7 +17,6 @@ import com.mozilla.telemetry.ingestion.core.util.Json; import com.mozilla.telemetry.ingestion.sink.config.SinkConfig; import com.mozilla.telemetry.ingestion.sink.transform.BlobIdToString; -import com.mozilla.telemetry.ingestion.sink.transform.BlobInfoToPubsubMessage; import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToObjectNode; import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToTemplatedString; import com.mozilla.telemetry.ingestion.sink.util.BatchWrite; @@ -138,7 +138,7 @@ protected void checkResultFor(InsertAllResponse batchResult, int index) { *

GCS blobs should have a 7-day expiration policy if {@code Delete.onSuccess} is specified, * so that failed loads are deleted after they will no longer be retried. */ - public static class Load extends BatchWrite { + public static class Load extends BatchWrite { public enum Delete { always, onSuccess @@ -160,8 +160,8 @@ public Load(com.google.cloud.bigquery.BigQuery bigQuery, Storage storage, long m } @Override - protected TableId getBatchKey(PubsubMessage input) { - String sourceUri = input.getAttributesOrThrow(BlobInfoToPubsubMessage.NAME); + protected TableId getBatchKey(Blob input) { + String sourceUri = input.getName(); final Matcher outputTableMatcher = OUTPUT_TABLE_PATTERN.matcher(sourceUri); if (!outputTableMatcher.matches()) { throw new IllegalArgumentException( @@ -172,7 +172,7 @@ protected TableId getBatchKey(PubsubMessage input) { } @Override - protected PubsubMessage encodeInput(PubsubMessage input) { + protected Blob encodeInput(Blob input) { return input; } @@ -182,7 +182,7 @@ protected Batch getBatch(TableId tableId) { } @VisibleForTesting - class Batch extends BatchWrite.Batch { + class Batch extends BatchWrite.Batch { @VisibleForTesting final List sourceBlobIds = new LinkedList<>(); @@ -231,15 +231,13 @@ protected CompletableFuture close() { } @Override - protected void write(PubsubMessage input) { - sourceBlobIds.add(BlobId.of(input.getAttributesOrThrow(BlobInfoToPubsubMessage.BUCKET), - input.getAttributesOrThrow(BlobInfoToPubsubMessage.NAME))); + protected void write(Blob input) { + sourceBlobIds.add(input.getBlobId()); } @Override - protected long getByteSize(PubsubMessage input) { - // use blob size - return Integer.parseInt(input.getAttributesOrThrow(BlobInfoToPubsubMessage.SIZE)); + protected long getByteSize(Blob input) { + return input.getSize(); } } } diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Gcs.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Gcs.java index be516ed82..ba4aead4e 100644 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Gcs.java +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/io/Gcs.java @@ -1,5 +1,6 @@ package com.mozilla.telemetry.ingestion.sink.io; +import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; @@ -39,7 +40,7 @@ public static class Ndjson extends Write { public Ndjson(Storage storage, long maxBytes, int maxMessages, Duration maxDelay, PubsubMessageToTemplatedString batchKeyTemplate, Executor executor, PubsubMessageToObjectNode encoder, - Function> batchCloseHook) { + Function> batchCloseHook) { super(storage, maxBytes, maxMessages, maxDelay, batchKeyTemplate, executor, batchCloseHook); this.encoder = encoder; } @@ -55,11 +56,11 @@ protected byte[] encodeInput(PubsubMessage input) { } private final Storage storage; - private final Function> batchCloseHook; + private final Function> batchCloseHook; private Write(Storage storage, long maxBytes, int maxMessages, Duration maxDelay, PubsubMessageToTemplatedString batchKeyTemplate, Executor executor, - Function> batchCloseHook) { + Function> batchCloseHook) { super(maxBytes, maxMessages, maxDelay, batchKeyTemplate, executor); this.storage = storage; this.batchCloseHook = batchCloseHook; @@ -108,7 +109,6 @@ private Batch(String bucket, String keyPrefix) { */ @Override protected CompletableFuture close() { - return batchCloseHook.apply( storage.create(blobInfo, content.toByteArray(), BlobTargetOption.doesNotExist())); } diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/transform/BlobIdToPubsubMessage.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/transform/BlobIdToPubsubMessage.java new file mode 100644 index 000000000..07454bdb6 --- /dev/null +++ b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/transform/BlobIdToPubsubMessage.java @@ -0,0 +1,25 @@ +package com.mozilla.telemetry.ingestion.sink.transform; + +import com.google.cloud.storage.BlobId; +import com.google.pubsub.v1.PubsubMessage; + +public class BlobIdToPubsubMessage { + + public static final String BUCKET = "bucket"; + public static final String NAME = "name"; + + /** + * Convert contents of a {@link BlobId} into a {@link PubsubMessage}. + */ + public static PubsubMessage encode(BlobId blobId) { + return PubsubMessage.newBuilder().putAttributes(BUCKET, blobId.getBucket()) + .putAttributes(NAME, blobId.getName()).build(); + } + + /** + * Read the contents of a {@link PubsubMessage} into a {@link BlobId}. + */ + public static BlobId decode(PubsubMessage message) { + return BlobId.of(message.getAttributesOrThrow(BUCKET), message.getAttributesOrThrow(NAME)); + } +} diff --git a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/transform/BlobInfoToPubsubMessage.java b/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/transform/BlobInfoToPubsubMessage.java deleted file mode 100644 index a4c4cb924..000000000 --- a/ingestion-sink/src/main/java/com/mozilla/telemetry/ingestion/sink/transform/BlobInfoToPubsubMessage.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.mozilla.telemetry.ingestion.sink.transform; - -import com.google.cloud.storage.BlobInfo; -import com.google.pubsub.v1.PubsubMessage; - -public class BlobInfoToPubsubMessage { - - public static final String BUCKET = "bucket"; - public static final String NAME = "name"; - public static final String SIZE = "size"; - - /** - * Read the contents of a GCS blob into a {@link PubsubMessage}. - */ - public static PubsubMessage apply(BlobInfo blobInfo) { - return PubsubMessage.newBuilder().putAttributes(BUCKET, blobInfo.getBucket()) - .putAttributes(NAME, blobInfo.getName()).putAttributes(SIZE, blobInfo.getSize().toString()) - .build(); - } -} diff --git a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/BigQueryLoadIntegrationTest.java b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/BigQueryLoadIntegrationTest.java index dec930db5..0642efb78 100644 --- a/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/BigQueryLoadIntegrationTest.java +++ b/ingestion-sink/src/test/java/com/mozilla/telemetry/ingestion/sink/io/BigQueryLoadIntegrationTest.java @@ -11,11 +11,10 @@ import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; +import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobId; import com.google.cloud.storage.BlobInfo; -import com.google.pubsub.v1.PubsubMessage; import com.mozilla.telemetry.ingestion.sink.io.BigQuery.BigQueryErrors; -import com.mozilla.telemetry.ingestion.sink.transform.BlobInfoToPubsubMessage; import com.mozilla.telemetry.ingestion.sink.util.BigQueryDataset; import com.mozilla.telemetry.ingestion.sink.util.GcsBucket; import java.nio.charset.StandardCharsets; @@ -47,26 +46,21 @@ private String createTable() { return tableId.getProject() + "." + tableId.getDataset() + "." + tableId.getTable(); } - private BlobInfo generateBlobId(String outputTable) { - return BlobInfo + private Blob createBlob(String outputTable, String content) { + return gcs.storage.create(BlobInfo .newBuilder(BlobId.of(gcs.bucket, "OUTPUT_TABLE=" + outputTable + "/" + UUID.randomUUID().toString() + ".ndjson")) - .build(); - } - - private BlobInfo createBlob(String outputTable, String content) { - return gcs.storage.create(generateBlobId(outputTable), - content.getBytes(StandardCharsets.UTF_8)); + .build(), content.getBytes(StandardCharsets.UTF_8)); } @Test public void canLoad() throws Exception { final String outputTable = createTable(); - final BlobInfo blobInfo = createBlob(outputTable, "{\"payload\":\"canLoad\"}"); + final Blob blob = createBlob(outputTable, "{\"payload\":\"canLoad\"}"); // load blob final BigQuery.Load loader = new BigQuery.Load(bq.bigquery, gcs.storage, 0, 0, Duration.ZERO, ForkJoinPool.commonPool(), BigQuery.Load.Delete.onSuccess); - loader.apply(BlobInfoToPubsubMessage.apply(blobInfo)).join(); + loader.apply(blob).join(); // check result final List actual = StreamSupport .stream(bq.bigquery.query(QueryJobConfiguration.of("SELECT * FROM `" + outputTable + "`")) @@ -78,16 +72,14 @@ public void canLoad() throws Exception { @Test public void failsOnMissingBlob() { final String outputTable = createTable(); - final BlobInfo missingBlob = generateBlobId(outputTable); - final BlobInfo presentBlob = createBlob(outputTable, "{}"); + final Blob presentBlob = createBlob(outputTable, "{}"); + final Blob missingBlob = createBlob(outputTable, ""); + missingBlob.delete(); // load single batch with missing and present blobs final BigQuery.Load loader = new BigQuery.Load(bq.bigquery, gcs.storage, 10, 2, Duration.ofMillis(500), ForkJoinPool.commonPool(), BigQuery.Load.Delete.onSuccess); - final CompletableFuture missing = loader - .apply(PubsubMessage.newBuilder().putAttributes("bucket", missingBlob.getBucket()) - .putAttributes("name", missingBlob.getName()).putAttributes("size", "0").build()); - final CompletableFuture present = loader - .apply(BlobInfoToPubsubMessage.apply(presentBlob)); + final CompletableFuture present = loader.apply(presentBlob); + final CompletableFuture missing = loader.apply(missingBlob); // require single batch of both messages assertEquals(ImmutableList.of(2), loader.batches.values().stream().map(b -> b.size).collect(Collectors.toList()));