Skip to content

Commit

Permalink
This closes #15
Browse files Browse the repository at this point in the history
  • Loading branch information
davorbonaci committed Mar 8, 2016
2 parents 0be5a97 + 890e3a0 commit b864ce8
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 7 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
<avro.version>1.7.7</avro.version>
<bigquery.version>v2-rev248-1.21.0</bigquery.version>
<bigtable.version>0.2.3</bigtable.version>
<clouddebugger.version>v2-rev6-1.21.0</clouddebugger.version>
<dataflow.version>v1b3-rev22-1.21.0</dataflow.version>
<dataflow.proto.version>0.5.160222</dataflow.proto.version>
<datastore.version>v1beta2-rev1-4.0.0</datastore.version>
Expand Down
14 changes: 14 additions & 0 deletions sdk/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,20 @@
</exclusions>
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-clouddebugger</artifactId>
<version>${clouddebugger.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>

<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

import static com.google.common.base.Preconditions.checkArgument;

import com.google.api.services.datastore.DatastoreV1;
import com.google.api.services.datastore.DatastoreV1.Entity;
import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
import com.google.cloud.dataflow.sdk.coders.CannotProvideCoderException;
import com.google.cloud.dataflow.sdk.coders.Coder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@

package com.google.cloud.dataflow.sdk.options;

import com.google.api.services.clouddebugger.v2.model.Debuggee;
import com.google.cloud.dataflow.sdk.annotations.Experimental;

import javax.annotation.Nullable;

/**
* Options for controlling Cloud Debugger.
*/
Expand All @@ -32,5 +35,9 @@ public interface CloudDebuggerOptions {
@Description("Whether to enable the Cloud Debugger snapshot agent for the current job.")
boolean getEnableCloudDebugger();
void setEnableCloudDebugger(boolean enabled);
}

@Description("The Cloud Debugger debugee to associate with. This should not be set directly.")
@Hidden
@Nullable Debuggee getDebuggee();
void setDebuggee(Debuggee debuggee);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.clouddebugger.v2.model.Debuggee;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeRequest;
import com.google.api.services.clouddebugger.v2.model.RegisterDebuggeeResponse;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.dataflow.model.DataflowPackage;
import com.google.api.services.dataflow.model.Job;
Expand Down Expand Up @@ -420,6 +424,43 @@ private <T> PCollection<T> applyWindow(
return super.apply(new AssignWindows<>(transform), input);
}

private void maybeRegisterDebuggee(DataflowPipelineOptions options, String uniquifier) {
if (!options.getEnableCloudDebugger()) {
return;
}

if (options.getDebuggee() != null) {
throw new RuntimeException("Should not specify the debuggee");
}

Clouddebugger debuggerClient = Transport.newClouddebuggerClient(options).build();
Debuggee debuggee = registerDebuggee(debuggerClient, uniquifier);
options.setDebuggee(debuggee);
}

private Debuggee registerDebuggee(Clouddebugger debuggerClient, String uniquifier) {
RegisterDebuggeeRequest registerReq = new RegisterDebuggeeRequest();
registerReq.setDebuggee(new Debuggee()
.setProject(options.getProject())
.setUniquifier(uniquifier)
.setDescription(uniquifier)
.setAgentVersion("google.com/cloud-dataflow-java/v1"));

try {
RegisterDebuggeeResponse registerResponse =
debuggerClient.controller().debuggees().register(registerReq).execute();
Debuggee debuggee = registerResponse.getDebuggee();
if (debuggee.getStatus() != null && debuggee.getStatus().getIsError()) {
throw new RuntimeException("Unable to register with the debugger: " +
debuggee.getStatus().getDescription().getFormat());
}

return debuggee;
} catch (IOException e) {
throw new RuntimeException("Unable to register with the debugger: ", e);
}
}

@Override
public DataflowPipelineJob run(Pipeline pipeline) {
logWarningIfPCollectionViewHasNonDeterministicKeyCoder(pipeline);
Expand All @@ -428,9 +469,7 @@ public DataflowPipelineJob run(Pipeline pipeline) {
+ "related to Google Compute Engine usage and other Google Cloud Services.");

List<DataflowPackage> packages = options.getStager().stageFiles();
JobSpecification jobSpecification =
translator.translate(pipeline, this, packages);
Job newJob = jobSpecification.getJob();


// Set a unique client_request_id in the CreateJob request.
// This is used to ensure idempotence of job creation across retried
Expand All @@ -442,6 +481,15 @@ public DataflowPipelineJob run(Pipeline pipeline) {
int randomNum = new Random().nextInt(9000) + 1000;
String requestId = DateTimeFormat.forPattern("YYYYMMddHHmmssmmm").withZone(DateTimeZone.UTC)
.print(DateTimeUtils.currentTimeMillis()) + "_" + randomNum;

// Try to create a debuggee ID. This must happen before the job is translated since it may
// update the options.
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
maybeRegisterDebuggee(dataflowOptions, requestId);

JobSpecification jobSpecification =
translator.translate(pipeline, this, packages);
Job newJob = jobSpecification.getJob();
newJob.setClientRequestId(requestId);

String version = DataflowReleaseInfo.getReleaseInfo().getVersion();
Expand All @@ -450,7 +498,6 @@ public DataflowPipelineJob run(Pipeline pipeline) {
newJob.getEnvironment().setUserAgent(DataflowReleaseInfo.getReleaseInfo());
// The Dataflow Service may write to the temporary directory directly, so
// must be verified.
DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
if (!Strings.isNullOrEmpty(options.getTempLocation())) {
newJob.getEnvironment().setTempStoragePrefix(
dataflowOptions.getPathValidator().verifyPath(options.getTempLocation()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson2.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.clouddebugger.v2.Clouddebugger;
import com.google.api.services.dataflow.Dataflow;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.storage.Storage;
Expand Down Expand Up @@ -148,6 +149,14 @@ public static Dataflow.Builder newDataflowClient(DataflowPipelineOptions options
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
}

public static Clouddebugger.Builder newClouddebuggerClient(DataflowPipelineOptions options) {
return new Clouddebugger.Builder(getTransport(),
getJsonFactory(),
chainHttpRequestInitializer(options.getGcpCredential(), new RetryHttpRequestInitializer()))
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace());
}

/**
* Returns a Dataflow client that does not automatically retry failed
* requests.
Expand Down

0 comments on commit b864ce8

Please sign in to comment.