Skip to content

Commit

Permalink
[GOBBLIN-1816] Add job properties and GaaS instance ID to observabili…
Browse files Browse the repository at this point in the history
…ty event (#3676)

* Add job properties and GaaS instance ID to observability event

* Fix conflicts
  • Loading branch information
Will-Lo authored Apr 20, 2023
1 parent 22cfc5c commit 9640ef0
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class ServiceConfigKeys {
public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled";
public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled";
public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "scheduler.enabled";

public static final String GOBBLIN_SERVICE_INSTANCE_NAME = GOBBLIN_SERVICE_PREFIX + "instance.name";

public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled";
public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@
"doc": "The ID of the spec executor that ran or would have ran the job",
"compliance": "NONE"
},
{
"name": "gaasId",
"type": [
"null",
"string"
],
"doc": "The instance of GaaS that is sending the event (if multiple GaaS instances are running)"
},
{
"name":"jobProperties",
"type": [
"null",
"string"
],
"doc": "The job properties GaaS sends to the job executor. This is a JSON string of the job properties"
},
{
"name": "issues",
"type": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1029,7 +1029,8 @@ private void submitJob(DagNode<JobExecutionPlan> dagNode) {
addSpecFuture.get();

jobMetadata.put(TimingEvent.METADATA_MESSAGE, producer.getExecutionLink(addSpecFuture, specExecutorUri));

// Add serialized job properties as part of the orchestrated job event metadata
jobMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, dagNode.getValue().toString());
if (jobOrchestrationTimer != null) {
jobOrchestrationTimer.stop(jobMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
@EqualsAndHashCode(exclude = {"executionStatus", "currentAttempts", "jobFuture", "flowStartTime"})
public class JobExecutionPlan {
public static final String JOB_MAX_ATTEMPTS = "job.maxAttempts";
public static final String JOB_PROPS_KEY = "job.props";
private static final int MAX_JOB_NAME_LENGTH = 255;

private final JobSpec jobSpec;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;


/**
Expand Down Expand Up @@ -134,7 +136,9 @@ private GaaSObservabilityEventExperimental createGaaSObservabilityEvent(final St
.setIssues(issueList)
.setJobStatus(status)
.setExecutionUserUrn(jobState.getProp(AzkabanProjectConfig.USER_TO_PROXY, null))
.setDatasetsWritten(datasetMetrics);
.setDatasetsWritten(datasetMetrics)
.setGaasId(this.state.getProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, null))
.setJobProperties(jobState.getProp(JobExecutionPlan.JOB_PROPS_KEY, null));
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@
import org.apache.gobblin.runtime.troubleshooter.TroubleshooterUtils;
import org.apache.gobblin.runtime.util.GsonUtils;
import org.apache.gobblin.service.ExecutionStatus;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.orchestration.AzkabanProjectConfig;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;


public class GaaSObservabilityProducerTest {
Expand All @@ -67,7 +69,9 @@ public void testCreateGaaSObservabilityEventWithFullMetadata() throws Exception
summaries.add(dataset1);
summaries.add(dataset2);

MockGaaSObservabilityEventProducer producer = new MockGaaSObservabilityEventProducer(new State(), this.issueRepository);
State state = new State();
state.setProp(ServiceConfigKeys.GOBBLIN_SERVICE_INSTANCE_NAME, "testCluster");
MockGaaSObservabilityEventProducer producer = new MockGaaSObservabilityEventProducer(state, this.issueRepository);
Map<String, String> gteEventMetadata = Maps.newHashMap();
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_GROUP_FIELD, flowGroup);
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_NAME_FIELD, flowName);
Expand All @@ -84,6 +88,7 @@ public void testCreateGaaSObservabilityEventWithFullMetadata() throws Exception
gteEventMetadata.put(TimingEvent.JOB_ORCHESTRATED_TIME, "1");
gteEventMetadata.put(TimingEvent.FlowEventConstants.FLOW_MODIFICATION_TIME_FIELD, "20");
gteEventMetadata.put(TimingEvent.DATASET_TASK_SUMMARIES, GsonUtils.GSON_WITH_DATE_HANDLING.toJson(summaries));
gteEventMetadata.put(JobExecutionPlan.JOB_PROPS_KEY, "{\"flow\":{\"executionId\":1681242538558},\"user\":{\"to\":{\"proxy\":\"newUser\"}}}");
Properties jobStatusProps = new Properties();
jobStatusProps.putAll(gteEventMetadata);
producer.emitObservabilityEvent(new State(jobStatusProps));
Expand Down Expand Up @@ -116,7 +121,8 @@ public void testCreateGaaSObservabilityEventWithFullMetadata() throws Exception
Assert.assertEquals(event.getDatasetsWritten().get(1).getEntitiesWritten(), Long.valueOf(dataset2.getRecordsWritten()));
Assert.assertEquals(event.getDatasetsWritten().get(1).getBytesWritten(), Long.valueOf(dataset2.getBytesWritten()));
Assert.assertEquals(event.getDatasetsWritten().get(1).getSuccessfullyCommitted(), Boolean.valueOf(dataset2.isSuccessfullyCommitted()));

Assert.assertEquals(event.getJobProperties(), "{\"flow\":{\"executionId\":1681242538558},\"user\":{\"to\":{\"proxy\":\"newUser\"}}}");
Assert.assertEquals(event.getGaasId(), "testCluster");
AvroSerializer<GaaSObservabilityEventExperimental> serializer = new AvroBinarySerializer<>(
GaaSObservabilityEventExperimental.SCHEMA$, new NoopSchemaVersionWriter()
);
Expand Down

0 comments on commit 9640ef0

Please sign in to comment.