Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1831] Use flowexecutionid in kafka monitor and jobnames #3694

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.JobLauncherUtils;
import org.apache.gobblin.util.PropertiesUtils;


/**
Expand Down Expand Up @@ -96,12 +97,14 @@ public HelixJobsMapping(Config sysConfig, URI fsUri, String rootDir) {

public static String createPlanningJobId (Properties jobPlanningProps) {
return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.PLANNING_JOB_NAME_PREFIX
+ JobState.getJobNameFromProps(jobPlanningProps));
+ JobState.getJobNameFromProps(jobPlanningProps),
PropertiesUtils.getPropAsLong(jobPlanningProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
}

public static String createActualJobId (Properties jobProps) {
return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
+ JobState.getJobNameFromProps(jobProps));
return JobLauncherUtils.newJobId(GobblinClusterConfigurationKeys.ACTUAL_JOB_NAME_PREFIX
+ JobState.getJobNameFromProps(jobProps),
PropertiesUtils.getPropAsLong(jobProps, ConfigurationKeys.FLOW_EXECUTION_ID_KEY, System.currentTimeMillis()));
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,25 +29,26 @@
import org.apache.commons.lang3.tuple.Pair;
import org.testng.Assert;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.Test;

import com.google.common.io.Closer;
import com.typesafe.config.Config;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.KafkaTestBase;
import org.apache.gobblin.kafka.client.Kafka09ConsumerClient;
import org.apache.gobblin.kafka.writer.KafkaWriterConfigurationKeys;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog;
import org.apache.gobblin.runtime.job_monitor.KafkaJobMonitor;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.WriteResponse;
import org.apache.gobblin.runtime.api.SpecExecutor;

import lombok.extern.slf4j.Slf4j;


@Slf4j
Expand All @@ -63,9 +64,12 @@ public class StreamingKafkaSpecExecutorTest extends KafkaTestBase {
private String _kafkaBrokers;
private static final String _TEST_DIR_PATH = "/tmp/StreamingKafkaSpecExecutorTest";
private static final String _JOBS_DIR_PATH = _TEST_DIR_PATH + "/jobs";
String flowSpecUriString = "/flowgroup/flowname/spec";
Spec flowSpec = initJobSpecWithFlowExecutionId(flowSpecUriString, "12345");
String specUriString = "/foo/bar/spec";
Spec spec = initJobSpec(specUriString);


@BeforeSuite
public void beforeSuite() {
log.info("Process id = " + ManagementFactory.getRuntimeMXBean().getName());
Expand All @@ -92,9 +96,8 @@ private void cleanupTestDir() {
}
}
}

@Test
public void testAddSpec() throws Exception {
@BeforeClass
public void setup() throws Exception {
_closer = Closer.create();
_properties = new Properties();

Expand All @@ -116,16 +119,20 @@ public void testAddSpec() throws Exception {
// SEI Producer
_seip = _closer.register(new SimpleKafkaSpecProducer(config));

WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
log.info("WriteResponse: " + writeResponse);

_jobCatalog = new NonObservingFSJobCatalog(config.getConfig("gobblin.cluster"));
_jobCatalog.startAsync().awaitRunning();

// SEI Consumer
_seic = _closer.register(new StreamingKafkaSpecConsumer(config, _jobCatalog));
_seic.startAsync().awaitRunning();

}

@Test
public void testAddSpec() throws Exception {
WriteResponse writeResponse = (WriteResponse) _seip.addSpec(spec).get();
log.info("WriteResponse: " + writeResponse);

List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");

Expand Down Expand Up @@ -165,6 +172,66 @@ public void testDeleteSpec() throws Exception {
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
}

@Test(dependsOnMethods = "testDeleteSpec")
public void testCancelSpec() throws Exception {
// Cancel an existing spec that was added
_seip.addSpec(spec).get();
WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(specUriString), new Properties()).get();
log.info("WriteResponse: " + writeResponse);

// Wait for the cancellation to be processed
Thread.sleep(5000);
List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
Assert.assertTrue(consumedEvent.size() == 3, "Consumption did not match production");

Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(2);
log.info(consumedSpecAction.getKey().toString());
Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
Assert.assertTrue(consumedEvent.get(1).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(specUriString), "Expected URI did not match");
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
}

@Test (dependsOnMethods = "testCancelSpec")
public void testCancelSpecNoop() throws Exception {
_seip.addSpec(flowSpec).get();
Properties props = new Properties();
props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "54321"); // Does not match with added jobspec, so should not cancel job
WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(flowSpecUriString), props).get();
log.info("WriteResponse: " + writeResponse);
// Wait for the cancellation to be processed, but it should ignore the spec as flow execution IDs do not match
Thread.sleep(5000);
List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
Assert.assertTrue(consumedEvent.size() == 1, "Consumption did not match production");

Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(0);
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.ADD), "Verb did not match");
Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
}

@Test(dependsOnMethods = "testCancelSpecNoop")
public void testCancelSpecWithFlowExecutionId() throws Exception {
Properties props = new Properties();
props.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, "12345");
WriteResponse writeResponse = (WriteResponse) _seip.cancelJob(new URI(flowSpecUriString), props).get();
log.info("WriteResponse: " + writeResponse);

// Wait for the cancellation to be processed
Thread.sleep(5000);
List<Pair<SpecExecutor.Verb, Spec>> consumedEvent = _seic.changedSpecs().get();
Assert.assertTrue(consumedEvent.size() == 2, "Consumption did not match production");

Map.Entry<SpecExecutor.Verb, Spec> consumedSpecAction = consumedEvent.get(1);
log.info(consumedSpecAction.getKey().toString());
Assert.assertTrue(consumedEvent.get(0).getKey().equals(SpecExecutor.Verb.DELETE), "Verb did not match");
Assert.assertTrue(consumedSpecAction.getKey().equals(SpecExecutor.Verb.CANCEL), "Verb did not match");
Assert.assertTrue(consumedSpecAction.getValue().getUri().toString().equals(flowSpecUriString), "Expected URI did not match");
Assert.assertTrue(consumedSpecAction.getValue() instanceof JobSpec, "Expected JobSpec");
}


private static JobSpec initJobSpec(String specUri) {
Properties properties = new Properties();
return JobSpec.builder(specUri)
Expand All @@ -174,6 +241,16 @@ private static JobSpec initJobSpec(String specUri) {
.build();
}

private static JobSpec initJobSpecWithFlowExecutionId(String specUri, String flowExecutionId) {
Properties properties = new Properties();
properties.setProperty(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, flowExecutionId);
return JobSpec.builder(specUri)
.withConfig(ConfigUtils.propertiesToConfig(properties))
.withVersion("1")
.withDescription("Spec Description")
.build();
}

@AfterSuite
public void after() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,30 @@

package org.apache.gobblin.service;

import com.google.common.base.Joiner;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import java.util.concurrent.Future;
import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.slf4j.Logger;

import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;

import javax.annotation.concurrent.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
Expand All @@ -54,9 +57,6 @@
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;

import javax.annotation.concurrent.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;

@Slf4j
@NotThreadSafe
public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
Expand Down Expand Up @@ -105,19 +105,6 @@ private Meter createMeter(String suffix) {
return this.metricContext.meter(MetricRegistry.name(ServiceMetricNames.GOBBLIN_SERVICE_PREFIX, getClass().getSimpleName(), suffix));
}

private Spec addExecutionIdToJobSpecUri(Spec spec) {
JobSpec newSpec = (JobSpec)spec;
if (newSpec.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
try {
newSpec.setUri(new URI(Joiner.on("/").
join(spec.getUri().toString(), newSpec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY))));
} catch (URISyntaxException e) {
log.error("Cannot create job uri to cancel job", e);
}
}
return newSpec;
}

private URI getURIWithExecutionId(URI originalURI, Properties props) {
URI result = originalURI;
if (props.containsKey(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
Expand All @@ -133,48 +120,43 @@ private URI getURIWithExecutionId(URI originalURI, Properties props) {

@Override
public Future<?> addSpec(Spec addedSpec) {
Spec spec = addExecutionIdToJobSpecUri(addedSpec);
AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.ADD);
AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD);

log.info("Adding Spec: " + spec + " using Kafka.");
log.info("Adding Spec: " + addedSpec + " using Kafka.");
this.addSpecMeter.mark();

return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
}

@Override
public Future<?> updateSpec(Spec updatedSpec) {
Spec spec = addExecutionIdToJobSpecUri(updatedSpec);
AvroJobSpec avroJobSpec = convertToAvroJobSpec(spec, SpecExecutor.Verb.UPDATE);
AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE);

log.info("Updating Spec: " + spec + " using Kafka.");
log.info("Updating Spec: " + updatedSpec + " using Kafka.");
this.updateSpecMeter.mark();

return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
}

@Override
public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
URI finalDeletedSpecURI = getURIWithExecutionId(deletedSpecURI, headers);

AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(finalDeletedSpecURI.toString())
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.DELETE.name()))
.setProperties(Maps.fromProperties(headers)).build();

log.info("Deleting Spec: " + finalDeletedSpecURI + " using Kafka.");
log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
this.deleteSpecMeter.mark();

return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
}

@Override
public Future<?> cancelJob(URI deletedSpecURI, Properties properties) {
URI finalDeletedSpecURI = getURIWithExecutionId(deletedSpecURI, properties);
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(finalDeletedSpecURI.toString())
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.CANCEL.name()))
.setProperties(Maps.fromProperties(properties)).build();

log.info("Cancelling job: " + finalDeletedSpecURI + " using Kafka.");
log.info("Cancelling job: " + deletedSpecURI + " using Kafka.");
this.cancelSpecMeter.mark();

return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metastore.DatasetStateStore;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.JobSpecMonitor;
import org.apache.gobblin.runtime.api.JobSpecNotFoundException;
import org.apache.gobblin.runtime.api.MutableJobCatalog;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.kafka.HighLevelConsumer;
Expand Down Expand Up @@ -136,14 +138,30 @@ protected void processMessage(DecodeableKafkaRecord<byte[],byte[]> message) {
break;
case DELETE:
this.removedSpecs.mark();
URI jobSpecUri = parsedMessage.getUri();
this.jobCatalog.remove(jobSpecUri);
this.jobCatalog.remove(parsedMessage.getUri());
// Delete the job state if it is a delete spec request
deleteStateStore(jobSpecUri);
deleteStateStore(parsedMessage.getUri());
break;
case CANCEL:
this.cancelledSpecs.mark();
this.jobCatalog.remove(parsedMessage.getUri(), true);
// Validate that the flow execution ID of the running flow matches the one in the incoming job spec
URI specUri = parsedMessage.getUri();
if (!parsedMessage.getConfig().hasPath(ConfigurationKeys.FLOW_EXECUTION_ID_KEY)) {
this.cancelledSpecs.mark();
this.jobCatalog.remove(specUri, true);
} else {
try {
JobSpec spec = this.jobCatalog.getJobSpec(specUri);
String flowIdToCancel = parsedMessage.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY);
if (spec.getConfig().getString(ConfigurationKeys.FLOW_EXECUTION_ID_KEY).equals(flowIdToCancel)) {
this.cancelledSpecs.mark();
this.jobCatalog.remove(specUri, true);
} else {
log.warn("Could not find job spec {} with flow execution ID {} to cancel", specUri, flowIdToCancel);
Will-Lo marked this conversation as resolved.
Show resolved Hide resolved
}
} catch (JobSpecNotFoundException e) {
log.warn("Could not find job spec {} to cancel in job catalog", specUri);
}
}
break;
default:
log.error("Cannot process spec {} with verb {}", parsedMessage.getUri(), verb);
Expand Down