From 362e2b67b5b195ae09d4873f30abbacf570258de Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 14 Mar 2016 21:26:19 -0700 Subject: [PATCH] [BEAM-50] Add function in BigQueryTableInserter to import files to BigQuery. --- .../sdk/util/BigQueryTableInserter.java | 161 ++++++++++++++++- .../sdk/util/BigQueryTableInserterTest.java | 168 ++++++++++++++++++ 2 files changed, 325 insertions(+), 4 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java index cd51062756467..e93d996a03160 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserter.java @@ -16,11 +16,18 @@ package com.google.cloud.dataflow.sdk.util; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.util.BackOff; import com.google.api.client.util.BackOffUtils; import com.google.api.client.util.ExponentialBackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.Bigquery.Jobs; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfiguration; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobReference; +import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableDataInsertAllRequest; import com.google.api.services.bigquery.model.TableDataInsertAllResponse; @@ -73,6 +80,23 @@ public class BigQueryTableInserter { // The initial backoff after a failure inserting rows into BigQuery. private static final long INITIAL_INSERT_BACKOFF_INTERVAL_MS = 200L; + // The maximum number of retry load jobs. + private static final int MAX_RETRY_LOAD_JOBS = 3; + + // The maximum number of retries to poll the status of a load job. + private static final int MAX_LOAD_JOB_POLL_RETRIES = 10; + + // The initial backoff for polling the status of a load job. + private static final int INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS = 60000; + + // The maximum number of attempts to execute a load job RPC. + private static final int MAX_LOAD_JOB_RPC_ATTEMPTS = 10; + + // The initial backoff for executing a load job RPC. + private static final int INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS = 1000; + + private final ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); + private final Bigquery client; private final TableReference defaultRef; private final long maxRowsPerBatch; @@ -272,6 +296,137 @@ public List call() throws IOException { } } + /** + * Import files into BigQuery with load jobs. + */ + public void load( + String jobId, + TableReference ref, + List gcsUris, + TableSchema schema, + WriteDisposition writeDisposition, + CreateDisposition createDisposition) throws InterruptedException, IOException { + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setSourceUris(gcsUris); + loadConfig.setDestinationTable(ref); + loadConfig.setSchema(schema); + loadConfig.setWriteDisposition(writeDisposition.name()); + loadConfig.setCreateDisposition(createDisposition.name()); + loadConfig.setSourceFormat("NEWLINE_DELIMITED_JSON"); + + String projectId = ref.getProjectId(); + for (int i = 0; i < MAX_RETRY_LOAD_JOBS; ++i) { + String retryingJobId = jobId + "-" + i; + insertLoadJob(retryingJobId, loadConfig); + Status jobStatus = pollJobStatus(projectId, retryingJobId); + if (jobStatus == Status.SUCCEEDED) { + return; + } else if (jobStatus == Status.UNKNOWN) { + throw new RuntimeException("Failed to poll the load job status."); + } + // continue for Status.FAILED + } + throw new RuntimeException( + "Failed to create the load job, reached max retries: " + MAX_RETRY_LOAD_JOBS); + } + + @VisibleForTesting + void insertLoadJob(String jobId, JobConfigurationLoad loadConfig) + throws InterruptedException, IOException { + TableReference ref = loadConfig.getDestinationTable(); + String projectId = ref.getProjectId(); + + Job job = new Job(); + JobReference jobRef = new JobReference(); + jobRef.setProjectId(projectId); + jobRef.setJobId(jobId); + job.setJobReference(jobRef); + JobConfiguration config = new JobConfiguration(); + config.setLoad(loadConfig); + job.setConfiguration(config); + + Exception lastException = null; + Sleeper sleeper = Sleeper.DEFAULT; + BackOff backoff = new AttemptBoundedExponentialBackOff( + MAX_LOAD_JOB_RPC_ATTEMPTS, INITIAL_LOAD_JOB_RPC_BACKOFF_MILLIS); + do { + try { + client.jobs().insert(projectId, job).execute(); + return; // SUCCEEDED + } catch (GoogleJsonResponseException e) { + if (errorExtractor.itemAlreadyExists(e)) { + return; // SUCCEEDED + } + // ignore and retry + LOG.warn("Ignore the error and retry inserting the job.", e); + lastException = e; + } catch (IOException e) { + // ignore and retry + LOG.warn("Ignore the error and retry inserting the job.", e); + lastException = e; + } + } while (nextBackOff(sleeper, backoff)); + throw new IOException( + String.format( + "Unable to insert job: {}, aborting after {} retries.", + jobId, MAX_LOAD_JOB_RPC_ATTEMPTS), + lastException); + } + + @VisibleForTesting + enum Status { + SUCCEEDED, + FAILED, + UNKNOWN, + } + + private Status pollJobStatus(String projectId, String jobId) throws InterruptedException { + BackOff backoff = new AttemptBoundedExponentialBackOff( + MAX_LOAD_JOB_POLL_RETRIES, INITIAL_LOAD_JOB_POLL_BACKOFF_MILLIS); + return pollJobStatus(projectId, jobId, Sleeper.DEFAULT, backoff); + } + + @VisibleForTesting + Status pollJobStatus( + String projectId, + String jobId, + Sleeper sleeper, + BackOff backoff) throws InterruptedException { + do { + try { + JobStatus status = client.jobs().get(projectId, jobId).execute().getStatus(); + if (status != null && status.getState() != null && status.getState().equals("DONE")) { + if (status.getErrorResult() != null) { + return Status.FAILED; + } else if (status.getErrors() != null && !status.getErrors().isEmpty()) { + return Status.FAILED; + } else { + return Status.SUCCEEDED; + } + } + // The job is not DONE, wait longer and retry. + } catch (IOException e) { + // ignore and retry + LOG.warn("Ignore the error and retry polling job status.", e); + } + } while (nextBackOff(sleeper, backoff)); + LOG.warn("Unable to poll job status: {}, aborting after {} retries.", + jobId, MAX_LOAD_JOB_POLL_RETRIES); + return Status.UNKNOWN; + } + + /** + * Identical to {@link BackOffUtils#next} but without checked IOException. + * @throws InterruptedException + */ + private boolean nextBackOff(Sleeper sleeper, BackOff backoff) throws InterruptedException { + try { + return BackOffUtils.next(sleeper, backoff); + } catch (IOException e) { + throw Throwables.propagate(e); + } + } + /** * Retrieves or creates the table. * @@ -302,7 +457,6 @@ public Table getOrCreateTable( try { table = get.execute(); } catch (IOException e) { - ApiErrorExtractor errorExtractor = new ApiErrorExtractor(); if (!errorExtractor.itemNotFound(e) || createDisposition != CreateDisposition.CREATE_IF_NEEDED) { // Rethrow. @@ -402,11 +556,10 @@ Table tryCreateTable( try { return client.tables().insert(projectId, datasetId, table).execute(); } catch (IOException e) { - ApiErrorExtractor extractor = new ApiErrorExtractor(); - if (extractor.itemAlreadyExists(e)) { + if (errorExtractor.itemAlreadyExists(e)) { // The table already exists, nothing to return. return null; - } else if (extractor.rateLimited(e)) { + } else if (errorExtractor.rateLimited(e)) { // The request failed because we hit a temporary quota. Back off and try again. try { if (BackOffUtils.next(sleeper, backoff)) { diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java index d53315ba66885..31aa3f2516d56 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/BigQueryTableInserterTest.java @@ -39,8 +39,15 @@ import com.google.api.client.util.BackOff; import com.google.api.client.util.Sleeper; import com.google.api.services.bigquery.Bigquery; +import com.google.api.services.bigquery.model.ErrorProto; +import com.google.api.services.bigquery.model.Job; +import com.google.api.services.bigquery.model.JobConfigurationLoad; +import com.google.api.services.bigquery.model.JobStatus; import com.google.api.services.bigquery.model.Table; import com.google.api.services.bigquery.model.TableReference; +import com.google.api.services.bigquery.model.TableSchema; +import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition; +import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition; import com.google.cloud.dataflow.sdk.testing.ExpectedLogs; import com.google.cloud.hadoop.util.RetryBoundedBackOff; import com.google.common.collect.ImmutableList; @@ -236,4 +243,165 @@ public void testCreateTableDoesNotRetry() throws IOException { throw e; } } + + /** + * Tests that {@link BigQueryTableInserter#insertLoadJob} succeeds. + */ + @Test + public void testInsertLoadJobSucceeds() throws IOException, InterruptedException { + Job testJob = new Job(); + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + TableReference ref = new TableReference(); + ref.setProjectId("projectId"); + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setDestinationTable(ref); + + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + inserter.insertLoadJob("jobId", loadConfig); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryTableInserter#insertLoadJob} succeeds with an already exist job. + */ + @Test + public void testInsertLoadJobSucceedsAlreadyExists() throws IOException, InterruptedException { + when(response.getStatusCode()).thenReturn(409); // 409 means already exists + + TableReference ref = new TableReference(); + ref.setProjectId("projectId"); + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setDestinationTable(ref); + + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + inserter.insertLoadJob("jobId", loadConfig); + + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryTableInserter#insertLoadJob} succeeds with a retry. + */ + @Test + public void testInsertLoadJobRetry() throws IOException, InterruptedException { + Job testJob = new Job(); + + // First response is 403 rate limited, second response has valid payload. + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(403).thenReturn(200); + when(response.getContent()) + .thenReturn(toStream(errorWithReasonAndStatus("rateLimitExceeded", 403))) + .thenReturn(toStream(testJob)); + + TableReference ref = new TableReference(); + ref.setProjectId("projectId"); + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setDestinationTable(ref); + + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + inserter.insertLoadJob("jobId", loadConfig); + verify(response, times(2)).getStatusCode(); + verify(response, times(2)).getContent(); + verify(response, times(2)).getContentType(); + } + + /** + * Tests that {@link BigQueryTableInserter#pollJobStatus} succeeds. + */ + @Test + public void testPollJobStatusSucceeds() throws IOException, InterruptedException { + Job testJob = new Job(); + testJob.setStatus(new JobStatus().setState("DONE")); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter.Status status = + inserter.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + + assertEquals(BigQueryTableInserter.Status.SUCCEEDED, status); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryTableInserter#pollJobStatus} fails. + */ + @Test + public void testPollJobStatusFailed() throws IOException, InterruptedException { + Job testJob = new Job(); + testJob.setStatus(new JobStatus().setState("DONE").setErrorResult(new ErrorProto())); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter.Status status = + inserter.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.ZERO_BACKOFF); + + assertEquals(BigQueryTableInserter.Status.FAILED, status); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryTableInserter#pollJobStatus} returns UNKNOWN. + */ + @Test + public void testPollJobStatusUnknown() throws IOException, InterruptedException { + Job testJob = new Job(); + testJob.setStatus(new JobStatus()); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)); + + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + BigQueryTableInserter.Status status = + inserter.pollJobStatus("projectId", "jobId", Sleeper.DEFAULT, BackOff.STOP_BACKOFF); + + assertEquals(BigQueryTableInserter.Status.UNKNOWN, status); + verify(response, times(1)).getStatusCode(); + verify(response, times(1)).getContent(); + verify(response, times(1)).getContentType(); + } + + /** + * Tests that {@link BigQueryTableInserter#load} succeeds. + */ + @Test + public void testLoadSucceeds() throws IOException, InterruptedException { + Job testJob = new Job(); + testJob.setStatus(new JobStatus().setState("DONE")); + + when(response.getContentType()).thenReturn(Json.MEDIA_TYPE).thenReturn(Json.MEDIA_TYPE); + when(response.getStatusCode()).thenReturn(200).thenReturn(200); + when(response.getContent()).thenReturn(toStream(testJob)).thenReturn(toStream(testJob)); + + TableReference ref = new TableReference(); + ref.setProjectId("projectId"); + JobConfigurationLoad loadConfig = new JobConfigurationLoad(); + loadConfig.setDestinationTable(ref); + + BigQueryTableInserter inserter = new BigQueryTableInserter(bigquery); + inserter.load( + "jobId", ref, ImmutableList.of("uri"), new TableSchema(), + WriteDisposition.WRITE_EMPTY, CreateDisposition.CREATE_NEVER); + + verify(response, times(2)).getStatusCode(); + verify(response, times(2)).getContent(); + verify(response, times(2)).getContentType(); + } }