Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Skip deleted blobs before BigQuery.Load from PubSub #1263

Merged
merged 1 commit into from
Apr 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageOptions;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -12,7 +13,7 @@
import com.mozilla.telemetry.ingestion.sink.io.BigQuery.BigQueryErrors;
import com.mozilla.telemetry.ingestion.sink.io.Gcs;
import com.mozilla.telemetry.ingestion.sink.io.Pubsub;
import com.mozilla.telemetry.ingestion.sink.transform.BlobInfoToPubsubMessage;
import com.mozilla.telemetry.ingestion.sink.transform.BlobIdToPubsubMessage;
import com.mozilla.telemetry.ingestion.sink.transform.CompressPayload;
import com.mozilla.telemetry.ingestion.sink.transform.DecompressPayload;
import com.mozilla.telemetry.ingestion.sink.transform.DocumentTypePredicate;
Expand Down Expand Up @@ -165,13 +166,27 @@ Output getOutput(Env env) {

@Override
Output getOutput(Env env) {
return new Output(env, this,
new BigQuery.Load(getBigQueryService(env), getGcsService(env),
env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES),
env.getInt(LOAD_MAX_FILES, DEFAULT_LOAD_MAX_FILES),
env.getDuration(LOAD_MAX_DELAY, DEFAULT_LOAD_MAX_DELAY), DEFAULT_EXECUTOR,
// don't delete files until successfully loaded
BigQuery.Load.Delete.onSuccess).withOpenCensusMetrics());
final Storage storage = getGcsService(env);
final Function<Blob, CompletableFuture<Void>> bigQueryLoad = new BigQuery.Load(
getBigQueryService(env), storage, env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES),
env.getInt(LOAD_MAX_FILES, DEFAULT_LOAD_MAX_FILES),
env.getDuration(LOAD_MAX_DELAY, DEFAULT_LOAD_MAX_DELAY), DEFAULT_EXECUTOR,
// don't delete files until successfully loaded
BigQuery.Load.Delete.onSuccess).withOpenCensusMetrics();
return new Output(env, this, message -> {
// Messages may be delivered more than once, so check whether the blob has been deleted.
// The blob is never deleted in this mode unless it has already been successfully loaded
// to BigQuery. If the blob does not exist, it must have been deleted, because Cloud
// Storage provides strong global consistency for read-after-write operations.
// https://cloud.google.com/storage/docs/consistency
Blob blob = storage.get(BlobIdToPubsubMessage.decode(message));
if (blob == null) {
// blob was deleted, so ack this message by returning a successfully completed future
// TODO measure the frequency of this
return CompletableFuture.completedFuture(null);
}
return bigQueryLoad.apply(blob);
});
}
},

Expand All @@ -189,7 +204,7 @@ Output getOutput(Env env) {
DEFAULT_EXECUTOR, getFormat(env),
// BigQuery Load API limits maximum load requests per table per day to 1,000 so send
// blobInfo to pubsub and require loads be run separately to reduce maximum latency
blobInfo -> pubsubWrite.apply(BlobInfoToPubsubMessage.apply(blobInfo)))
blobInfo -> pubsubWrite.apply(BlobIdToPubsubMessage.encode(blobInfo.getBlobId())))
.withOpenCensusMetrics());
}
},
Expand All @@ -214,11 +229,13 @@ Output getOutput(Env env) {
Output getOutput(Env env) {
final com.google.cloud.bigquery.BigQuery bigQuery = getBigQueryService(env);
final Storage storage = getGcsService(env);
final Function<PubsubMessage, CompletableFuture<Void>> bigQueryLoad;
final Function<Blob, CompletableFuture<Void>> bigQueryLoad;
if (env.containsKey(OUTPUT_TOPIC)) {
// BigQuery Load API limits maximum load requests per table per day to 1,000 so if
// OUTPUT_TOPIC is present send blobInfo to pubsub and run load jobs separately
bigQueryLoad = pubsub.getOutput(env);
final Function<PubsubMessage, CompletableFuture<Void>> pubsubOutput = pubsub
.getOutput(env);
bigQueryLoad = blob -> pubsubOutput.apply(BlobIdToPubsubMessage.encode(blob.getBlobId()));
} else {
bigQueryLoad = new BigQuery.Load(bigQuery, storage,
env.getLong(LOAD_MAX_BYTES, DEFAULT_LOAD_MAX_BYTES),
Expand All @@ -233,9 +250,7 @@ Output getOutput(Env env) {
env.getInt(BATCH_MAX_MESSAGES, DEFAULT_BATCH_MAX_MESSAGES),
env.getDuration(BATCH_MAX_DELAY, DEFAULT_BATCH_MAX_DELAY),
PubsubMessageToTemplatedString.forBigQuery(getBigQueryOutputBucket(env)),
DEFAULT_EXECUTOR, getFormat(env),
blobInfo -> bigQueryLoad.apply(BlobInfoToPubsubMessage.apply(blobInfo)))
.withOpenCensusMetrics();
DEFAULT_EXECUTOR, getFormat(env), bigQueryLoad).withOpenCensusMetrics();
// Like bigQueryStreaming, but use STREAMING_ prefix env vars for batch configuration
Function<PubsubMessage, CompletableFuture<Void>> streamingOutput = new BigQuery.Write(
bigQuery, env.getLong(STREAMING_BATCH_MAX_BYTES, DEFAULT_STREAMING_BATCH_MAX_BYTES),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.cloud.bigquery.JobStatus;
import com.google.cloud.bigquery.LoadJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.Storage;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -16,7 +17,6 @@
import com.mozilla.telemetry.ingestion.core.util.Json;
import com.mozilla.telemetry.ingestion.sink.config.SinkConfig;
import com.mozilla.telemetry.ingestion.sink.transform.BlobIdToString;
import com.mozilla.telemetry.ingestion.sink.transform.BlobInfoToPubsubMessage;
import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToObjectNode;
import com.mozilla.telemetry.ingestion.sink.transform.PubsubMessageToTemplatedString;
import com.mozilla.telemetry.ingestion.sink.util.BatchWrite;
Expand Down Expand Up @@ -138,7 +138,7 @@ protected void checkResultFor(InsertAllResponse batchResult, int index) {
* <p>GCS blobs should have a 7-day expiration policy if {@code Delete.onSuccess} is specified,
* so that failed loads are deleted after they will no longer be retried.
*/
public static class Load extends BatchWrite<PubsubMessage, PubsubMessage, TableId, Void> {
public static class Load extends BatchWrite<Blob, Blob, TableId, Void> {

public enum Delete {
always, onSuccess
Expand All @@ -160,8 +160,8 @@ public Load(com.google.cloud.bigquery.BigQuery bigQuery, Storage storage, long m
}

@Override
protected TableId getBatchKey(PubsubMessage input) {
String sourceUri = input.getAttributesOrThrow(BlobInfoToPubsubMessage.NAME);
protected TableId getBatchKey(Blob input) {
String sourceUri = input.getName();
final Matcher outputTableMatcher = OUTPUT_TABLE_PATTERN.matcher(sourceUri);
if (!outputTableMatcher.matches()) {
throw new IllegalArgumentException(
Expand All @@ -172,7 +172,7 @@ protected TableId getBatchKey(PubsubMessage input) {
}

@Override
protected PubsubMessage encodeInput(PubsubMessage input) {
protected Blob encodeInput(Blob input) {
return input;
}

Expand All @@ -182,7 +182,7 @@ protected Batch getBatch(TableId tableId) {
}

@VisibleForTesting
class Batch extends BatchWrite<PubsubMessage, PubsubMessage, TableId, Void>.Batch {
class Batch extends BatchWrite<Blob, Blob, TableId, Void>.Batch {

@VisibleForTesting
final List<BlobId> sourceBlobIds = new LinkedList<>();
Expand Down Expand Up @@ -231,15 +231,13 @@ protected CompletableFuture<Void> close() {
}

@Override
protected void write(PubsubMessage input) {
sourceBlobIds.add(BlobId.of(input.getAttributesOrThrow(BlobInfoToPubsubMessage.BUCKET),
input.getAttributesOrThrow(BlobInfoToPubsubMessage.NAME)));
protected void write(Blob input) {
sourceBlobIds.add(input.getBlobId());
}

@Override
protected long getByteSize(PubsubMessage input) {
// use blob size
return Integer.parseInt(input.getAttributesOrThrow(BlobInfoToPubsubMessage.SIZE));
protected long getByteSize(Blob input) {
return input.getSize();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.mozilla.telemetry.ingestion.sink.io;

import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
Expand Down Expand Up @@ -39,7 +40,7 @@ public static class Ndjson extends Write {
public Ndjson(Storage storage, long maxBytes, int maxMessages, Duration maxDelay,
PubsubMessageToTemplatedString batchKeyTemplate, Executor executor,
PubsubMessageToObjectNode encoder,
Function<BlobInfo, CompletableFuture<Void>> batchCloseHook) {
Function<Blob, CompletableFuture<Void>> batchCloseHook) {
super(storage, maxBytes, maxMessages, maxDelay, batchKeyTemplate, executor, batchCloseHook);
this.encoder = encoder;
}
Expand All @@ -55,11 +56,11 @@ protected byte[] encodeInput(PubsubMessage input) {
}

private final Storage storage;
private final Function<BlobInfo, CompletableFuture<Void>> batchCloseHook;
private final Function<Blob, CompletableFuture<Void>> batchCloseHook;

private Write(Storage storage, long maxBytes, int maxMessages, Duration maxDelay,
PubsubMessageToTemplatedString batchKeyTemplate, Executor executor,
Function<BlobInfo, CompletableFuture<Void>> batchCloseHook) {
Function<Blob, CompletableFuture<Void>> batchCloseHook) {
super(maxBytes, maxMessages, maxDelay, batchKeyTemplate, executor);
this.storage = storage;
this.batchCloseHook = batchCloseHook;
Expand Down Expand Up @@ -108,7 +109,6 @@ private Batch(String bucket, String keyPrefix) {
*/
@Override
protected CompletableFuture<Void> close() {

return batchCloseHook.apply(
storage.create(blobInfo, content.toByteArray(), BlobTargetOption.doesNotExist()));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.mozilla.telemetry.ingestion.sink.transform;

import com.google.cloud.storage.BlobId;
import com.google.pubsub.v1.PubsubMessage;

public class BlobIdToPubsubMessage {

public static final String BUCKET = "bucket";
public static final String NAME = "name";

/**
* Convert contents of a {@link BlobId} into a {@link PubsubMessage}.
*/
public static PubsubMessage encode(BlobId blobId) {
return PubsubMessage.newBuilder().putAttributes(BUCKET, blobId.getBucket())
.putAttributes(NAME, blobId.getName()).build();
}

/**
* Read the contents of a {@link PubsubMessage} into a {@link BlobId}.
*/
public static BlobId decode(PubsubMessage message) {
return BlobId.of(message.getAttributesOrThrow(BUCKET), message.getAttributesOrThrow(NAME));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.pubsub.v1.PubsubMessage;
import com.mozilla.telemetry.ingestion.sink.io.BigQuery.BigQueryErrors;
import com.mozilla.telemetry.ingestion.sink.transform.BlobInfoToPubsubMessage;
import com.mozilla.telemetry.ingestion.sink.util.BigQueryDataset;
import com.mozilla.telemetry.ingestion.sink.util.GcsBucket;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -47,26 +46,21 @@ private String createTable() {
return tableId.getProject() + "." + tableId.getDataset() + "." + tableId.getTable();
}

private BlobInfo generateBlobId(String outputTable) {
return BlobInfo
private Blob createBlob(String outputTable, String content) {
return gcs.storage.create(BlobInfo
.newBuilder(BlobId.of(gcs.bucket,
"OUTPUT_TABLE=" + outputTable + "/" + UUID.randomUUID().toString() + ".ndjson"))
.build();
}

private BlobInfo createBlob(String outputTable, String content) {
return gcs.storage.create(generateBlobId(outputTable),
content.getBytes(StandardCharsets.UTF_8));
.build(), content.getBytes(StandardCharsets.UTF_8));
}

@Test
public void canLoad() throws Exception {
final String outputTable = createTable();
final BlobInfo blobInfo = createBlob(outputTable, "{\"payload\":\"canLoad\"}");
final Blob blob = createBlob(outputTable, "{\"payload\":\"canLoad\"}");
// load blob
final BigQuery.Load loader = new BigQuery.Load(bq.bigquery, gcs.storage, 0, 0, Duration.ZERO,
ForkJoinPool.commonPool(), BigQuery.Load.Delete.onSuccess);
loader.apply(BlobInfoToPubsubMessage.apply(blobInfo)).join();
loader.apply(blob).join();
// check result
final List<String> actual = StreamSupport
.stream(bq.bigquery.query(QueryJobConfiguration.of("SELECT * FROM `" + outputTable + "`"))
Expand All @@ -78,16 +72,14 @@ public void canLoad() throws Exception {
@Test
public void failsOnMissingBlob() {
final String outputTable = createTable();
final BlobInfo missingBlob = generateBlobId(outputTable);
final BlobInfo presentBlob = createBlob(outputTable, "{}");
final Blob presentBlob = createBlob(outputTable, "{}");
final Blob missingBlob = createBlob(outputTable, "");
missingBlob.delete();
// load single batch with missing and present blobs
final BigQuery.Load loader = new BigQuery.Load(bq.bigquery, gcs.storage, 10, 2,
Duration.ofMillis(500), ForkJoinPool.commonPool(), BigQuery.Load.Delete.onSuccess);
final CompletableFuture<Void> missing = loader
.apply(PubsubMessage.newBuilder().putAttributes("bucket", missingBlob.getBucket())
.putAttributes("name", missingBlob.getName()).putAttributes("size", "0").build());
final CompletableFuture<Void> present = loader
.apply(BlobInfoToPubsubMessage.apply(presentBlob));
final CompletableFuture<Void> present = loader.apply(presentBlob);
final CompletableFuture<Void> missing = loader.apply(missingBlob);
// require single batch of both messages
assertEquals(ImmutableList.of(2),
loader.batches.values().stream().map(b -> b.size).collect(Collectors.toList()));
Expand Down