Skip to content

Commit

Permalink
Added retry with exponential backoff
Browse files Browse the repository at this point in the history
  • Loading branch information
VeronicaWasson committed Dec 31, 2020
1 parent ac98d25 commit 5cffc9c
Showing 1 changed file with 40 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.example.bigquerystorage;

import com.google.api.client.util.*;
import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.*;
import com.google.cloud.bigquery.storage.v1beta2.*;
Expand All @@ -27,10 +28,20 @@

public class WriteCommittedStream {

public static Status.Code getStatusCode(StatusRuntimeException e) {
static Status.Code getStatusCode(StatusRuntimeException e) {
return e.getStatus().getCode();
}

// Returns true if the operation should be retried.
static Boolean isRetryable(ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof StatusRuntimeException) {
Status status = ((StatusRuntimeException)cause).getStatus();
return (status == Status.ABORTED);
}
return false;
}

public static void runWriteCommittedStream() {
// TODO(developer): Replace these variables before running the sample.
String projectId = "MY_PROJECT_ID";
Expand Down Expand Up @@ -85,7 +96,7 @@ public static void writeCommittedStream(String projectId, String datasetName, St
System.out.println("Appended records successfully.");

} catch (Exception e) {
System.out.println("Failed to append records. \n" + e.toString());
System.out.println("Failed to append records.\n" + e.toString());
}
}

Expand All @@ -100,17 +111,41 @@ public static void writeToDefaultStream(String projectId, String datasetName, St
try (JsonStreamWriter writer =
JsonStreamWriter.newBuilder(parent.toString(), schema).createDefaultStream().build()) {

ExponentialBackOff backoff = new ExponentialBackOff();

for (int i = 0; i < 10; i++) {
JSONObject record = new JSONObject();
record.put("col1", String.format("record %03d", i));
JSONArray jsonArr = new JSONArray();
jsonArr.put(record);

ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
AppendRowsResponse response = future.get();
backoff.reset();
Boolean retry = true;
while (retry) {
try {

ApiFuture<AppendRowsResponse> future = writer.append(jsonArr);
AppendRowsResponse response = future.get();
retry = false;

} catch (ExecutionException ex) {
// If the error is retryable, retry the operation with exponential backoff.
// Don't retry past the maximum retry interval.
long backOffMillis = backoff.nextBackOffMillis();
if (isRetryable(ex) && backOffMillis != BackOff.STOP) {
Thread.sleep(backOffMillis);
}
else {
throw ex;
}
}
}
}

System.out.println("Appended records successfully.");

} catch (Exception e) {
System.out.println(e);
System.out.println("Failed to append records.\n" + e.toString());
}
}
}

0 comments on commit 5cffc9c

Please sign in to comment.