Skip to content

Commit

Permalink
[BEAM-50] Add function in BigQueryTableInserter to import files to Bi…
Browse files Browse the repository at this point in the history
…gQuery.
  • Loading branch information
peihe committed Mar 15, 2016
1 parent 8bc0659 commit 362e2b6
Show file tree
Hide file tree
Showing 2 changed files with 325 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,18 @@

package com.google.cloud.dataflow.sdk.util;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.ExponentialBackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.Bigquery.Jobs;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfiguration;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobReference;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
Expand Down Expand Up @@ -73,6 +80,23 @@ public class BigQueryTableInserter {
// The initial backoff after a failure inserting rows into BigQuery.
private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L;

// The maximum number of retry load jobs.
private static final int MAX_RETRY_LOAD_JOBS = 3;

// The maximum number of retries to poll the status of a load job.
private static final int MAX_LOAD_JOB_POLL_RETRIES = 10;

// The initial backoff for polling the status of a load job.
private static final int INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS = 60000;

// The maximum number of attempts to execute a load job RPC.
private static final int MAX_LOAD_JOB_RPC_ATTEMPTS = 10;

// The initial backoff for executing a load job RPC.
private static final int INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS = 1000;

private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor();

private final Bigquery client;
private final TableReference defaultRef;
private final long maxRowsPerBatch;
Expand Down Expand Up @@ -272,6 +296,137 @@ public List<TableDataInsertAllResponse.InsertErrors> call() throws IOException {
}
}

/**
* Import files into BigQuery with load jobs.
*/
public void load(
String jobId,
TableReference ref,
List<String> gcsUris,
TableSchema schema,
WriteDisposition writeDisposition,
CreateDisposition createDisposition) throws InterruptedException, IOException {
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
loadConfig.setSourceUris(gcsUris);
loadConfig.setDestinationTable(ref);
loadConfig.setSchema(schema);
loadConfig.setWriteDisposition(writeDisposition.name());
loadConfig.setCreateDisposition(createDisposition.name());
loadConfig.setSourceFormat("NEWLINE_DELIMITED_JSON");

String projectId = ref.getProjectId();
for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) {
String retryingJobId = jobId + "-" + i;
insertLoadJob(retryingJobId, loadConfig);
Status jobStatus = pollJobStatus(projectId, retryingJobId);
if (jobStatus == Status.SUCCEEDED) {
return;
} else if (jobStatus == Status.UNKNOWN) {
throw new RuntimeException("Failed to poll the load job status.");
}
// continue for Status.FAILED
}
throw new RuntimeException(
"Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS);
}

@VisibleForTesting
void insertLoadJob(String jobId, JobConfigurationLoad loadConfig)
throws InterruptedException, IOException {
TableReference ref = loadConfig.getDestinationTable();
String projectId = ref.getProjectId();

Job job = new Job();
JobReference jobRef = new JobReference();
jobRef.setProjectId(projectId);
jobRef.setJobId(jobId);
job.setJobReference(jobRef);
JobConfiguration config = new JobConfiguration();
config.setLoad(loadConfig);
job.setConfiguration(config);

Exception lastException = null;
Sleeper sleeper = Sleeper.DEFAULT;
BackOff backoff = new AttemptBoundedExponentialBackOff(
MAX_LOAD_JOB_RPC_ATTEMPTS, INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS);
do {
try {
client.jobs().insert(projectId, job).execute();
return; // SUCCEEDED
} catch (GoogleJsonResponseException e) {
if (errorExtractor.itemAlreadyExists(e)) {
return; // SUCCEEDED
}
// ignore and retry
LOG.warn("Ignore the error and retry inserting the job.", e);
lastException = e;
} catch (IOException e) {
// ignore and retry
LOG.warn("Ignore the error and retry inserting the job.", e);
lastException = e;
}
} while (nextBackOff(sleeper, backoff));
throw new IOException(
String.format(
"Unable to insert job: {}, aborting after {} retries.",
jobId, MAX_LOAD_JOB_RPC_ATTEMPTS),
lastException);
}

@VisibleForTesting
enum Status {
SUCCEEDED,
FAILED,
UNKNOWN,
}

private Status pollJobStatus(String projectId, String jobId) throws InterruptedException {
BackOff backoff = new AttemptBoundedExponentialBackOff(
MAX_LOAD_JOB_POLL_RETRIES, INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS);
return pollJobStatus(projectId, jobId, Sleeper.DEFAULT, backoff);
}

@VisibleForTesting
Status pollJobStatus(
String projectId,
String jobId,
Sleeper sleeper,
BackOff backoff) throws InterruptedException {
do {
try {
JobStatus status = client.jobs().get(projectId, jobId).execute().getStatus();
if (status != null && status.getState() != null && status.getState().equals("DONE")) {
if (status.getErrorResult() != null) {
return Status.FAILED;
} else if (status.getErrors() != null && !status.getErrors().isEmpty()) {
return Status.FAILED;
} else {
return Status.SUCCEEDED;
}
}
// The job is not DONE, wait longer and retry.
} catch (IOException e) {
// ignore and retry
LOG.warn("Ignore the error and retry polling job status.", e);
}
} while (nextBackOff(sleeper, backoff));
LOG.warn("Unable to poll job status: {}, aborting after {} retries.",
jobId, MAX_LOAD_JOB_POLL_RETRIES);
return Status.UNKNOWN;
}

/**
* Identical to {@link BackOffUtils#next} but without checked IOException.
* @throws InterruptedException
*/
private boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException {
try {
return BackOffUtils.next(sleeper, backoff);
} catch (IOException e) {
throw Throwables.propagate(e);
}
}

/**
* Retrieves or creates the table.
*
Expand Down Expand Up @@ -302,7 +457,6 @@ public Table getOrCreateTable(
try {
table = get.execute();
} catch (IOException e) {
ApiErrorExtractor errorExtractor = new ApiErrorExtractor();
if (!errorExtractor.itemNotFound(e) ||
createDisposition != CreateDisposition.CREATE_IF_NEEDED) {
// Rethrow.
Expand Down Expand Up @@ -402,11 +556,10 @@ Table tryCreateTable(
try {
return client.tables().insert(projectId, datasetId, table).execute();
} catch (IOException e) {
ApiErrorExtractor extractor = new ApiErrorExtractor();
if (extractor.itemAlreadyExists(e)) {
if (errorExtractor.itemAlreadyExists(e)) {
// The table already exists, nothing to return.
return null;
} else if (extractor.rateLimited(e)) {
} else if (errorExtractor.rateLimited(e)) {
// The request failed because we hit a temporary quota. Back off and try again.
try {
if (BackOffUtils.next(sleeper, backoff)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@
import com.google.api.client.util.BackOff;
import com.google.api.client.util.Sleeper;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.Job;
import com.google.api.services.bigquery.model.JobConfigurationLoad;
import com.google.api.services.bigquery.model.JobStatus;
import com.google.api.services.bigquery.model.Table;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
import com.google.cloud.dataflow.sdk.testing.ExpectedLogs;
import com.google.cloud.hadoop.util.RetryBoundedBackOff;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -236,4 +243,165 @@ public void testCreateTableDoesNotRetry() throws IOException {
throw e;
}
}

/**
* Tests that {@link BigQueryTableInserter#insertLoadJob} succeeds.
*/
@Test
public void testInsertLoadJobSucceeds() throws IOException, InterruptedException {
Job testJob = new Job();
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(testJob));

TableReference ref = new TableReference();
ref.setProjectId("projectId");
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
loadConfig.setDestinationTable(ref);

BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
inserter.insertLoadJob("jobId", loadConfig);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
}

/**
* Tests that {@link BigQueryTableInserter#insertLoadJob} succeeds with an already exist job.
*/
@Test
public void testInsertLoadJobSucceedsAlreadyExists() throws IOException, InterruptedException {
when(response.getStatusCode()).thenReturn(409); // 409 means already exists

TableReference ref = new TableReference();
ref.setProjectId("projectId");
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
loadConfig.setDestinationTable(ref);

BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
inserter.insertLoadJob("jobId", loadConfig);

verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
}

/**
* Tests that {@link BigQueryTableInserter#insertLoadJob} succeeds with a retry.
*/
@Test
public void testInsertLoadJobRetry() throws IOException, InterruptedException {
Job testJob = new Job();

// First response is 403 rate limited, second response has valid payload.
when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(403).thenReturn(200);
when(response.getContent())
.thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403)))
.thenReturn(toStream(testJob));

TableReference ref = new TableReference();
ref.setProjectId("projectId");
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
loadConfig.setDestinationTable(ref);

BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
inserter.insertLoadJob("jobId", loadConfig);
verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
}

/**
* Tests that {@link BigQueryTableInserter#pollJobStatus} succeeds.
*/
@Test
public void testPollJobStatusSucceeds() throws IOException, InterruptedException {
Job testJob = new Job();
testJob.setStatus(new JobStatus().setState("DONE"));

when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(testJob));

BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
BigQueryTableInserter.Status status =
inserter.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);

assertEquals(BigQueryTableInserter.Status.SUCCEEDED, status);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
}

/**
* Tests that {@link BigQueryTableInserter#pollJobStatus} fails.
*/
@Test
public void testPollJobStatusFailed() throws IOException, InterruptedException {
Job testJob = new Job();
testJob.setStatus(new JobStatus().setState("DONE").setErrorResult(new ErrorProto()));

when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(testJob));

BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
BigQueryTableInserter.Status status =
inserter.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF);

assertEquals(BigQueryTableInserter.Status.FAILED, status);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
}

/**
* Tests that {@link BigQueryTableInserter#pollJobStatus} returns UNKNOWN.
*/
@Test
public void testPollJobStatusUnknown() throws IOException, InterruptedException {
Job testJob = new Job();
testJob.setStatus(new JobStatus());

when(response.getContentType()).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(200);
when(response.getContent()).thenReturn(toStream(testJob));

BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
BigQueryTableInserter.Status status =
inserter.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.STOP_BACKOFF);

assertEquals(BigQueryTableInserter.Status.UNKNOWN, status);
verify(response, times(1)).getStatusCode();
verify(response, times(1)).getContent();
verify(response, times(1)).getContentType();
}

/**
* Tests that {@link BigQueryTableInserter#load} succeeds.
*/
@Test
public void testLoadSucceeds() throws IOException, InterruptedException {
Job testJob = new Job();
testJob.setStatus(new JobStatus().setState("DONE"));

when(response.getContentType()).thenReturn(Json.MEDIA_TYPE).thenReturn(Json.MEDIA_TYPE);
when(response.getStatusCode()).thenReturn(200).thenReturn(200);
when(response.getContent()).thenReturn(toStream(testJob)).thenReturn(toStream(testJob));

TableReference ref = new TableReference();
ref.setProjectId("projectId");
JobConfigurationLoad loadConfig = new JobConfigurationLoad();
loadConfig.setDestinationTable(ref);

BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery);
inserter.load(
"jobId", ref, ImmutableList.of("uri"), new TableSchema(),
WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_NEVER);

verify(response, times(2)).getStatusCode();
verify(response, times(2)).getContent();
verify(response, times(2)).getContentType();
}
}

0 comments on commit 362e2b6

Please sign in to comment.