diff --git a/pom.xml b/pom.xml
index ba130d25a3d25..dcc31a2b7bdbc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -70,6 +70,7 @@
1.7.7
v2-rev248-1.21.0
0.2.3
+ v2-rev6-1.21.0
v1b3-rev19-1.21.0
0.5.160222
v1beta2-rev1-4.0.0
diff --git a/sdk/pom.xml b/sdk/pom.xml
index c49c175709013..9be93db77d5c0 100644
--- a/sdk/pom.xml
+++ b/sdk/pom.xml
@@ -502,6 +502,20 @@
+
+ com.google.apis
+ google-api-services-clouddebugger
+ ${clouddebugger.version}
+
+
+
+ com.google.guava
+ guava-jdk5
+
+
+
+
com.google.apis
google-api-services-pubsub
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
index 62be4c9ec2e22..2e1ad9451fa0a 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/options/CloudDebuggerOptions.java
@@ -16,8 +16,11 @@
package com.google.cloud.dataflow.sdk.options;
+import com.google.api.services.clouddebugger.v2.model.Debuggee;
import com.google.cloud.dataflow.sdk.annotations.Experimental;
+import javax.annotation.Nullable;
+
/**
* Options for controlling Cloud Debugger.
*/
@@ -32,5 +35,9 @@ public interface CloudDebuggerOptions {
@Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
boolean getEnableCloudDebugger();
void setEnableCloudDebugger(boolean enabled);
-}
+ @Description("The Cloud Debugger debugee to associate with. This should not be set directly.")
+ @Hidden
+ @Nullable Debuggee getDebuggee();
+ void setDebuggee(Debuggee debuggee);
+}
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
index 6eb6c2f7ad968..0612cca4d0fda 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/DataflowPipelineRunner.java
@@ -23,6 +23,10 @@
import static com.google.common.base.Preconditions.checkState;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
+import com.google.api.services.clouddebugger.v2.Clouddebugger;
+import com.google.api.services.clouddebugger.v2.model.Debuggee;
+import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest;
+import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
@@ -168,6 +172,8 @@
import java.util.SortedSet;
import java.util.TreeSet;
+import javax.annotation.Nullable;
+
/**
* A {@link PipelineRunner} that executes the operations in the
* pipeline by first translating them to the Dataflow representation
@@ -420,6 +426,43 @@ private PCollection applyWindow(
return super.apply(new AssignWindows<>(transform), input);
}
+ private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) {
+ if (!options.getEnableCloudDebugger()) {
+ return;
+ }
+
+ if (options.getDebuggee() != null) {
+ throw new RuntimeException("Should not specify the debuggee");
+ }
+
+ Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build();
+ Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier);
+ options.setDebuggee(debuggee);
+ }
+
+ private Debuggee registerDebuggee(Clouddebugger debuggerClient, String uniquifier) {
+ RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest();
+ registerReq.setDebuggee(new Debuggee()
+ .setProject(options.getProject())
+ .setUniquifier(uniquifier)
+ .setDescription(uniquifier)
+ .setAgentVersion("google.com/cloud-dataflow-java/v1"));
+
+ try {
+ RegisterDebuggeeResponse registerResponse =
+ debuggerClient.controller().debuggees().register(registerReq).execute();
+ Debuggee debuggee = registerResponse.getDebuggee();
+ if (debuggee.getStatus() != null && debuggee.getStatus().getIsError()) {
+ throw new RuntimeException("Unable to register with the debugger: " +
+ debuggee.getStatus().getDescription().getFormat());
+ }
+
+ return debuggee;
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to register with the debugger: ", e);
+ }
+ }
+
@Override
public DataflowPipelineJob run(Pipeline pipeline) {
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
@@ -428,9 +471,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
+ "related to Google Compute Engine usage and other Google Cloud Services.");
List packages = options.getStager().stageFiles();
- JobSpecification jobSpecification =
- translator.translate(pipeline, this, packages);
- Job newJob = jobSpecification.getJob();
+
// Set a unique client_request_id in the CreateJob request.
// This is used to ensure idempotence of job creation across retried
@@ -442,6 +483,15 @@ public DataflowPipelineJob run(Pipeline pipeline) {
int randomNum = new Random().nextInt(9000) + 1000;
String requestId = DateTimeFormat.forPattern("YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC)
.print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum;
+
+ // Try to create a debuggee ID. This must happen before the job is translated since it may
+ // update the options.
+ DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
+ maybeRegisterDebuggee(dataflowOptions, requestId);
+
+ JobSpecification jobSpecification =
+ translator.translate(pipeline, this, packages);
+ Job newJob = jobSpecification.getJob();
newJob.setClientRequestId(requestId);
String version = DataflowReleaseInfo.getReleaseInfo().getVersion();
@@ -450,7 +500,6 @@ public DataflowPipelineJob run(Pipeline pipeline) {
newJob.getEnvironment().setUserAgent(DataflowReleaseInfo.getReleaseInfo());
// The Dataflow Service may write to the temporary directory directly, so
// must be verified.
- DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
if (!Strings.isNullOrEmpty(options.getTempLocation())) {
newJob.getEnvironment().setTempStoragePrefix(
dataflowOptions.getPathValidator().verifyPath(options.getTempLocation()));
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
index 7735a9e01fcc0..15fe2863395a9 100644
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
+++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/Transport.java
@@ -23,6 +23,7 @@
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
+import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.storage.Storage;
@@ -148,6 +149,14 @@ public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
}
+ public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
+ return new Clouddebugger.Builder(getTransport(),
+ getJsonFactory(),
+ chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
+ .setApplicationName(options.getAppName())
+ .setGoogleClientRequestInitializer(options.getGoogleApiTrace());
+ }
+
/**
* Returns a Dataflow client that does not automatically retry failed
* requests.