Skip to content

Commit

Permalink
Skip deleted blobs before BigQuery.Load from PubSub (#1263)
Browse files Browse the repository at this point in the history
  • Loading branch information
relud authored Apr 24, 2020
1 parent 7859940 commit af01e7c
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -12,7 +13,7 @@
import com.mozilla.telemetry.ingestion.sink.io.BigQuery.BigQueryErrors;
import com.mozilla.telemetry.ingestion.sink.io.Gcs;
import com.mozilla.telemetry.ingestion.sink.io.Pubsub;
import com.mozilla.telemetry.ingestion.sink.transform.BlobInfoToPubsubMessage;
import com.mozilla.telemetry.ingestion.sink.transform.BlobIdToPubsubMessage;
import com.mozilla.telemetry.ingestion.sink.transform.CompressPayload;
import com.mozilla.telemetry.ingestion.sink.transform.DecompressPayload;
import com.mozilla.telemetry.ingestion.sink.transform.DocumentTypePredicate;
Expand Down Expand Up @@ -165,13 +166,27 @@ Output getOutput(Env env) {

@Override
Output getOutput(Env env) {
return new Output(env, this,
new BigQuery.Load(getBigQueryService(env), getGcsService(env),
env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES),
env.getInt(LOAD_MAX_FILES, DEFAULT_LOAD_MAX_FILES),
env.getDuration(LOAD_MAX_DELAY, DEFAULT_LOAD_MAX_DELAY), DEFAULT_EXECUTOR,
// don't delete files until successfully loaded
BigQuery.Load.Delete.onSuccess).withOpenCensusMetrics());
final Storage storage = getGcsService(env);
final Function<Blob, CompletableFuture<Void>> bigQueryLoad = new BigQuery.Load(
getBigQueryService(env), storage, env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES),
env.getInt(LOAD_MAX_FILES, DEFAULT_LOAD_MAX_FILES),
env.getDuration(LOAD_MAX_DELAY, DEFAULT_LOAD_MAX_DELAY), DEFAULT_EXECUTOR,
// don't delete files until successfully loaded
BigQuery.Load.Delete.onSuccess).withOpenCensusMetrics();
return new Output(env, this, message -> {
// Messages may be delivered more than once, so check whether the blob has been deleted.
// The blob is never deleted in this mode unless it has already been successfully loaded
// to BigQuery. If the blob does not exist, it must have been deleted, because Cloud
// Storage provides strong global consistency for read-after-write operations.
// https://cloud.google.com/storage/docs/consistency
Blob blob = storage.get(BlobIdToPubsubMessage.decode(message));
if (blob == null) {
// blob was deleted, so ack this message by returning a successfully completed future
// TODO measure the frequency of this
return CompletableFuture.completedFuture(null);
}
return bigQueryLoad.apply(blob);
});
}
},

Expand All @@ -189,7 +204,7 @@ Output getOutput(Env env) {
DEFAULT_EXECUTOR, getFormat(env),
// BigQuery Load API limits maximum load requests per table per day to 1,000 so send
// blobInfo to pubsub and require loads be run separately to reduce maximum latency
blobInfo -> pubsubWrite.apply(BlobInfoToPubsubMessage.apply(blobInfo)))
blobInfo -> pubsubWrite.apply(BlobIdToPubsubMessage.encode(blobInfo.getBlobId())))
.withOpenCensusMetrics());
}
},
Expand All @@ -214,11 +229,13 @@ Output getOutput(Env env) {
Output getOutput(Env env) {
final com.google.cloud.bigquery.BigQuery bigQuery = getBigQueryService(env);
final Storage storage = getGcsService(env);
final Function<PubsubMessage, CompletableFuture<Void>> bigQueryLoad;
final Function<Blob, CompletableFuture<Void>> bigQueryLoad;
if (env.containsKey(OUTPUT_TOPIC)) {
// BigQuery Load API limits maximum load requests per table per day to 1,000 so if
// OUTPUT_TOPIC is present send blobInfo to pubsub and run load jobs separately
bigQueryLoad = pubsub.getOutput(env);
final Function<PubsubMessage, CompletableFuture<Void>> pubsubOutput = pubsub
.getOutput(env);
bigQueryLoad = blob -> pubsubOutput.apply(BlobIdToPubsubMessage.encode(blob.getBlobId()));
} else {
bigQueryLoad = new BigQuery.Load(bigQuery, storage,
env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES),
Expand All @@ -233,9 +250,7 @@ Output getOutput(Env env) {
env.getInt(BATCH_MAX_MESSAGES, DEFAULT_BATCH_MAX_MESSAGES),
env.getDuration(BATCH_MAX_DELAY, DEFAULT_BATCH_MAX_DELAY),
PubsubMessageToTemplatedString.forBigQuery(getBigQueryOutputBucket(env)),
DEFAULT_EXECUTOR, getFormat(env),
blobInfo -> bigQueryLoad.apply(BlobInfoToPubsubMessage.apply(blobInfo)))
.withOpenCensusMetrics();
DEFAULT_EXECUTOR, getFormat(env), bigQueryLoad).withOpenCensusMetrics();
// Like bigQueryStreaming, but use STREAMING_ prefix env vars for batch configuration
Function<PubsubMessage, CompletableFuture<Void>> streamingOutput = new BigQuery.Write(
bigQuery, env.getLong(STREAMING_BATCH_MAX_BYTES, DEFAULT_STREAMING_BATCH_MAX_BYTES),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -16,7 +17,6 @@
import com.mozilla.telemetry.ingestion.core.util.Json;
import com.mozilla.telemetry.ingestion.sink.config.SinkConfig;
import com.mozilla.telemetry.ingestion.sink.transform.BlobIdToString;
import com.mozilla.telemetry.ingestion.sink.transform.BlobInfoToPubsubMessage;
import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToObjectNode;
import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToTemplatedString;
import com.mozilla.telemetry.ingestion.sink.util.BatchWrite;
Expand Down Expand Up @@ -138,7 +138,7 @@ protected void checkResultFor(InsertAllResponse batchResult, int index) {
* <p>GCS blobs should have a 7-day expiration policy if {@code Delete.onSuccess} is specified,
* so that failed loads are deleted after they will no longer be retried.
*/
public static class Load extends BatchWrite<PubsubMessage, PubsubMessage, TableId, Void> {
public static class Load extends BatchWrite<Blob, Blob, TableId, Void> {

public enum Delete {
always, onSuccess
Expand All @@ -160,8 +160,8 @@ public Load(com.google.cloud.bigquery.BigQuery bigQuery, Storage storage, long m
}

@Override
protected TableId getBatchKey(PubsubMessage input) {
String sourceUri = input.getAttributesOrThrow(BlobInfoToPubsubMessage.NAME);
protected TableId getBatchKey(Blob input) {
String sourceUri = input.getName();
final Matcher outputTableMatcher = OUTPUT_TABLE_PATTERN.matcher(sourceUri);
if (!outputTableMatcher.matches()) {
throw new IllegalArgumentException(
Expand All @@ -172,7 +172,7 @@ protected TableId getBatchKey(PubsubMessage input) {
}

@Override
protected PubsubMessage encodeInput(PubsubMessage input) {
protected Blob encodeInput(Blob input) {
return input;
}

Expand All @@ -182,7 +182,7 @@ protected Batch getBatch(TableId tableId) {
}

@VisibleForTesting
class Batch extends BatchWrite<PubsubMessage, PubsubMessage, TableId, Void>.Batch {
class Batch extends BatchWrite<Blob, Blob, TableId, Void>.Batch {

@VisibleForTesting
final List<BlobId> sourceBlobIds = new LinkedList<>();
Expand Down Expand Up @@ -231,15 +231,13 @@ protected CompletableFuture<Void> close() {
}

@Override
protected void write(PubsubMessage input) {
sourceBlobIds.add(BlobId.of(input.getAttributesOrThrow(BlobInfoToPubsubMessage.BUCKET),
input.getAttributesOrThrow(BlobInfoToPubsubMessage.NAME)));
protected void write(Blob input) {
sourceBlobIds.add(input.getBlobId());
}

@Override
protected long getByteSize(PubsubMessage input) {
// use blob size
return Integer.parseInt(input.getAttributesOrThrow(BlobInfoToPubsubMessage.SIZE));
protected long getByteSize(Blob input) {
return input.getSize();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.mozilla.telemetry.ingestion.sink.io;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
Expand Down Expand Up @@ -39,7 +40,7 @@ public static class Ndjson extends Write {
public Ndjson(Storage storage, long maxBytes, int maxMessages, Duration maxDelay,
PubsubMessageToTemplatedString batchKeyTemplate, Executor executor,
PubsubMessageToObjectNode encoder,
Function<BlobInfo, CompletableFuture<Void>> batchCloseHook) {
Function<Blob, CompletableFuture<Void>> batchCloseHook) {
super(storage, maxBytes, maxMessages, maxDelay, batchKeyTemplate, executor, batchCloseHook);
this.encoder = encoder;
}
Expand All @@ -55,11 +56,11 @@ protected byte[] encodeInput(PubsubMessage input) {
}

private final Storage storage;
private final Function<BlobInfo, CompletableFuture<Void>> batchCloseHook;
private final Function<Blob, CompletableFuture<Void>> batchCloseHook;

private Write(Storage storage, long maxBytes, int maxMessages, Duration maxDelay,
PubsubMessageToTemplatedString batchKeyTemplate, Executor executor,
Function<BlobInfo, CompletableFuture<Void>> batchCloseHook) {
Function<Blob, CompletableFuture<Void>> batchCloseHook) {
super(maxBytes, maxMessages, maxDelay, batchKeyTemplate, executor);
this.storage = storage;
this.batchCloseHook = batchCloseHook;
Expand Down Expand Up @@ -108,7 +109,6 @@ private Batch(String bucket, String keyPrefix) {
*/
@Override
protected CompletableFuture<Void> close() {

return batchCloseHook.apply(
storage.create(blobInfo, content.toByteArray(), BlobTargetOption.doesNotExist()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.mozilla.telemetry.ingestion.sink.transform;

import com.google.cloud.storage.BlobId;
import com.google.pubsub.v1.PubsubMessage;

public class BlobIdToPubsubMessage {

public static final String BUCKET = "bucket";
public static final String NAME = "name";

/**
* Convert contents of a {@link BlobId} into a {@link PubsubMessage}.
*/
public static PubsubMessage encode(BlobId blobId) {
return PubsubMessage.newBuilder().putAttributes(BUCKET, blobId.getBucket())
.putAttributes(NAME, blobId.getName()).build();
}

/**
* Read the contents of a {@link PubsubMessage} into a {@link BlobId}.
*/
public static BlobId decode(PubsubMessage message) {
return BlobId.of(message.getAttributesOrThrow(BUCKET), message.getAttributesOrThrow(NAME));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.pubsub.v1.PubsubMessage;
import com.mozilla.telemetry.ingestion.sink.io.BigQuery.BigQueryErrors;
import com.mozilla.telemetry.ingestion.sink.transform.BlobInfoToPubsubMessage;
import com.mozilla.telemetry.ingestion.sink.util.BigQueryDataset;
import com.mozilla.telemetry.ingestion.sink.util.GcsBucket;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -47,26 +46,21 @@ private String createTable() {
return tableId.getProject() + "." + tableId.getDataset() + "." + tableId.getTable();
}

private BlobInfo generateBlobId(String outputTable) {
return BlobInfo
private Blob createBlob(String outputTable, String content) {
return gcs.storage.create(BlobInfo
.newBuilder(BlobId.of(gcs.bucket,
"OUTPUT_TABLE=" + outputTable + "/" + UUID.randomUUID().toString() + ".ndjson"))
.build();
}

private BlobInfo createBlob(String outputTable, String content) {
return gcs.storage.create(generateBlobId(outputTable),
content.getBytes(StandardCharsets.UTF_8));
.build(), content.getBytes(StandardCharsets.UTF_8));
}

@Test
public void canLoad() throws Exception {
final String outputTable = createTable();
final BlobInfo blobInfo = createBlob(outputTable, "{\"payload\":\"canLoad\"}");
final Blob blob = createBlob(outputTable, "{\"payload\":\"canLoad\"}");
// load blob
final BigQuery.Load loader = new BigQuery.Load(bq.bigquery, gcs.storage, 0, 0, Duration.ZERO,
ForkJoinPool.commonPool(), BigQuery.Load.Delete.onSuccess);
loader.apply(BlobInfoToPubsubMessage.apply(blobInfo)).join();
loader.apply(blob).join();
// check result
final List<String> actual = StreamSupport
.stream(bq.bigquery.query(QueryJobConfiguration.of("SELECT * FROM `" + outputTable + "`"))
Expand All @@ -78,16 +72,14 @@ public void canLoad() throws Exception {
@Test
public void failsOnMissingBlob() {
final String outputTable = createTable();
final BlobInfo missingBlob = generateBlobId(outputTable);
final BlobInfo presentBlob = createBlob(outputTable, "{}");
final Blob presentBlob = createBlob(outputTable, "{}");
final Blob missingBlob = createBlob(outputTable, "");
missingBlob.delete();
// load single batch with missing and present blobs
final BigQuery.Load loader = new BigQuery.Load(bq.bigquery, gcs.storage, 10, 2,
Duration.ofMillis(500), ForkJoinPool.commonPool(), BigQuery.Load.Delete.onSuccess);
final CompletableFuture<Void> missing = loader
.apply(PubsubMessage.newBuilder().putAttributes("bucket", missingBlob.getBucket())
.putAttributes("name", missingBlob.getName()).putAttributes("size", "0").build());
final CompletableFuture<Void> present = loader
.apply(BlobInfoToPubsubMessage.apply(presentBlob));
final CompletableFuture<Void> present = loader.apply(presentBlob);
final CompletableFuture<Void> missing = loader.apply(missingBlob);
// require single batch of both messages
assertEquals(ImmutableList.of(2),
loader.batches.values().stream().map(b -> b.size).collect(Collectors.toList()));
Expand Down

0 comments on commit af01e7c

Please sign in to comment.