Skip to content

Commit

Permalink
Add ability to submit ingestion job using Flink (#62)
Browse files Browse the repository at this point in the history
* Restructure Job Management components

* Change JobMonitor.getJobStatus to accept JobInfo

* Fix compilation error on test

* Implement Flink job submission and status monitoring

* Add configuration for Flink

* Skip metrics if it's not for particular runner

* Fix logging for DataflowJobMonitor

* Add license

* Set job status in DirectRunner to COMPLETED after submission since it runs in blocking fashion
  • Loading branch information
pradithya authored and feast-ci-bot committed Jan 18, 2019
1 parent bb2f5ec commit c525845
Show file tree
Hide file tree
Showing 29 changed files with 1,185 additions and 331 deletions.
11 changes: 11 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,11 @@
<artifactId>google-cloud-bigquery</artifactId>
<version>1.48.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.5.5</version>
</dependency>

<!-- Jackson due to jinjava dependency problems -->
<dependency>
Expand Down Expand Up @@ -281,5 +286,11 @@
<version>2.23.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>3.11.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
171 changes: 112 additions & 59 deletions core/src/main/java/feast/core/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,37 @@
import com.timgroup.statsd.StatsDClient;
import feast.core.job.JobManager;
import feast.core.job.JobMonitor;
import feast.core.job.NoopJobManager;
import feast.core.job.NoopJobMonitor;
import feast.core.job.Runner;
import feast.core.job.StatsdMetricPusher;
import feast.core.job.dataflow.DataflowJobConfig;
import feast.core.job.dataflow.DataflowJobManager;
import feast.core.job.dataflow.DataflowJobMonitor;
import feast.core.job.direct.DirectRunnerJobManager;
import feast.core.job.flink.FlinkJobConfig;
import feast.core.job.flink.FlinkJobManager;
import feast.core.job.flink.FlinkJobMonitor;
import feast.core.job.flink.FlinkRestApi;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.GlobalConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

/**
* Beans for job management
*/
/** Beans for job management */
@Slf4j
@Configuration
public class JobConfig {

/**
* Get configuration for dataflow connection
*
* @param projectId
* @param location
* @return DataflowJobConfig
Expand All @@ -59,89 +68,133 @@ public DataflowJobConfig getDataflowJobConfig(
return new DataflowJobConfig(projectId, location);
}

@Bean
public FlinkJobConfig getFlinkJobConfig(
@Value("${feast.jobs.flink.configDir}") String flinkConfigDir,
@Value("${feast.jobs.flink.masterUrl}") String flinkMasterUrl) {
return new FlinkJobConfig(flinkMasterUrl, flinkConfigDir);
}

/**
* Get a JobManager according to the runner type and dataflow configuration.
*
* @param runnerType runner type: one of [DataflowRunner, DirectRunner, FlinkRunner]
* @param dfConfig dataflow job configuration
* @return JobManager
*/
@Bean
public JobManager getJobManager(@Value("${feast.jobs.runner}") String runnerType,
DataflowJobConfig dfConfig) {
if ("DataflowRunner".equals(runnerType)) {
if (Strings.isNullOrEmpty(dfConfig.getLocation()) ||
Strings.isNullOrEmpty(dfConfig.getProjectId())) {
log.error("Project and location of the Dataflow runner is not configured");
throw new IllegalStateException(
"Project and location of Dataflow runner must be specified for jobs to be run on Dataflow runner.");
}
try {
GoogleCredential credential = GoogleCredential.getApplicationDefault()
.createScoped(DataflowScopes.all());
Dataflow dataflow = new Dataflow(GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(), credential);
public JobManager getJobManager(
@Value("${feast.jobs.runner}") String runnerType,
DataflowJobConfig dfConfig,
FlinkJobConfig flinkConfig,
ImportJobDefaults defaults)
throws Exception {

Runner runner = Runner.fromString(runnerType);

switch (runner) {
case DATAFLOW:
if (Strings.isNullOrEmpty(dfConfig.getLocation())
|| Strings.isNullOrEmpty(dfConfig.getProjectId())) {
log.error("Project and location of the Dataflow runner is not configured");
throw new IllegalStateException(
"Project and location of Dataflow runner must be specified for jobs to be run on Dataflow runner.");
}
try {
GoogleCredential credential =
GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all());
Dataflow dataflow =
new Dataflow(
GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(),
credential);

return new DataflowJobManager(dataflow,
dfConfig.getProjectId(),
dfConfig.getLocation());
} catch (IOException e) {
throw new IllegalStateException(
"Unable to find credential required for Dataflow monitoring API", e);
} catch (GeneralSecurityException e) {
throw new IllegalStateException("Security exception while connecting to Dataflow API", e);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize DataflowJobManager", e);
}
return new DataflowJobManager(
dataflow, dfConfig.getProjectId(), dfConfig.getLocation(), defaults);
} catch (IOException e) {
throw new IllegalStateException(
"Unable to find credential required for Dataflow monitoring API", e);
} catch (GeneralSecurityException e) {
throw new IllegalStateException("Security exception while connecting to Dataflow API", e);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize DataflowJobManager", e);
}
case FLINK:
org.apache.flink.configuration.Configuration configuration =
GlobalConfiguration.loadConfiguration(flinkConfig.getConfigDir());
List<CustomCommandLine<?>> customCommandLines =
CliFrontend.loadCustomCommandLines(configuration, flinkConfig.getConfigDir());
CliFrontend flinkCli = new CliFrontend(configuration, customCommandLines);
FlinkRestApi flinkRestApi =
new FlinkRestApi(new RestTemplate(), flinkConfig.getMasterUrl());
return new FlinkJobManager(flinkCli, flinkConfig, flinkRestApi, defaults);
case DIRECT:
return new DirectRunnerJobManager(defaults);
default:
throw new IllegalArgumentException("Unsupported runner: " + runnerType);
}
return new NoopJobManager();
}

/**
* Get a Job Monitor given the runner type and dataflow configuration.
*
* @param runnerType runner type: one of [DataflowRunner, DirectRunner, FlinkRunner]
* @param dfConfig dataflow job configuration
* @return JobMonitor
*/
@Bean
public JobMonitor getJobMonitor(@Value("${feast.jobs.runner}") String runnerType,
DataflowJobConfig dfConfig) {
if ("DataflowRunner".equals(runnerType)) {
if (Strings.isNullOrEmpty(dfConfig.getLocation()) ||
Strings.isNullOrEmpty(dfConfig.getProjectId())) {
log.warn(
"Project and location of the Dataflow runner is not configured, will not do job monitoring");
return new NoopJobMonitor();
}
try {
GoogleCredential credential = GoogleCredential.getApplicationDefault()
.createScoped(DataflowScopes.all());
Dataflow dataflow = new Dataflow(GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(), credential);
public JobMonitor getJobMonitor(
@Value("${feast.jobs.runner}") String runnerType,
DataflowJobConfig dfConfig,
FlinkJobConfig flinkJobConfig)
throws Exception {

return new DataflowJobMonitor(dataflow,
dfConfig.getProjectId(),
dfConfig.getLocation());
} catch (IOException e) {
log.error("Unable to find credential required for Dataflow monitoring API: {}",
e.getMessage());
} catch (GeneralSecurityException e) {
log.error("Security exception while ");
} catch (Exception e) {
log.error("Unable to initialize DataflowJobMonitor", e);
}
}
Runner runner = Runner.fromString(runnerType);

switch (runner) {
case DATAFLOW:
if (Strings.isNullOrEmpty(dfConfig.getLocation())
|| Strings.isNullOrEmpty(dfConfig.getProjectId())) {
log.warn(
"Project and location of the Dataflow runner is not configured, will not do job monitoring");
return new NoopJobMonitor();
}
try {
GoogleCredential credential =
GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all());
Dataflow dataflow =
new Dataflow(
GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(),
credential);

// Default to no monitoring
return new NoopJobMonitor();
return new DataflowJobMonitor(dataflow, dfConfig.getProjectId(), dfConfig.getLocation());
} catch (IOException e) {
log.error(
"Unable to find credential required for Dataflow monitoring API: {}", e.getMessage());
} catch (GeneralSecurityException e) {
log.error("Security exception while ");
} catch (Exception e) {
log.error("Unable to initialize DataflowJobMonitor", e);
}
case FLINK:
FlinkRestApi flinkRestApi =
new FlinkRestApi(new RestTemplate(), flinkJobConfig.getMasterUrl());
return new FlinkJobMonitor(flinkRestApi);
case DIRECT:
default:
return new NoopJobMonitor();
}
}

/**
* Get metrics pusher to statsd
*
* @param statsDClient
* @return StatsdMetricPusher
*/
@Bean
public StatsdMetricPusher getStatsdMetricPusher(StatsDClient statsDClient){
public StatsdMetricPusher getStatsdMetricPusher(StatsDClient statsDClient) {
return new StatsdMetricPusher(statsDClient);
}
}
22 changes: 15 additions & 7 deletions core/src/main/java/feast/core/grpc/JobServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,36 @@

import com.google.protobuf.Empty;
import feast.core.JobServiceGrpc;
import feast.core.JobServiceProto.JobServiceTypes.*;
import feast.core.JobServiceProto.JobServiceTypes.AbortJobRequest;
import feast.core.JobServiceProto.JobServiceTypes.AbortJobResponse;
import feast.core.JobServiceProto.JobServiceTypes.GetJobRequest;
import feast.core.JobServiceProto.JobServiceTypes.GetJobResponse;
import feast.core.JobServiceProto.JobServiceTypes.JobDetail;
import feast.core.JobServiceProto.JobServiceTypes.ListJobsResponse;
import feast.core.JobServiceProto.JobServiceTypes.SubmitImportJobRequest;
import feast.core.JobServiceProto.JobServiceTypes.SubmitImportJobResponse;
import feast.core.exception.JobExecutionException;
import feast.core.service.JobExecutionService;
import feast.core.service.JobManagementService;
import feast.core.validators.SpecValidator;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.lognet.springboot.grpc.GRpcService;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

/** Implementation of the feast job GRPC service. */
@Slf4j
@GRpcService
public class JobServiceImpl extends JobServiceGrpc.JobServiceImplBase {
@Autowired private JobExecutionService jobExecutionService;

@Autowired private JobManagementService jobManagementService;

@Autowired private SpecValidator validator;

/**
* submit a job to the runner by providing an import spec.
*
* @param request ImportJobRequest object containing an import spec
* @param responseObserver
*/
Expand All @@ -53,8 +57,9 @@ public void submitJob(
SubmitImportJobRequest request, StreamObserver<SubmitImportJobResponse> responseObserver) {
try {
validator.validateImportSpec(request.getImportSpec());
String jobID = jobManagementService.submitJob(request.getImportSpec(), request.getName());
SubmitImportJobResponse response =
jobExecutionService.submitJob(request.getImportSpec(), request.getName());
SubmitImportJobResponse.newBuilder().setJobId(jobID).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (IllegalArgumentException e) {
Expand All @@ -68,6 +73,7 @@ public void submitJob(

/**
* Abort a job given its feast-internal job id
*
* @param request AbortJobRequest object containing feast job id
* @param responseObserver
*/
Expand All @@ -86,6 +92,7 @@ public void abortJob(AbortJobRequest request, StreamObserver<AbortJobResponse> r

/**
* List all jobs previously submitted to the system.
*
* @param request Empty request
* @param responseObserver
*/
Expand All @@ -104,6 +111,7 @@ public void listJobs(Empty request, StreamObserver<ListJobsResponse> responseObs

/**
* Get a single job previously submitted to the system by id
*
* @param request GetJobRequest object containing a feast-internal job id
* @param responseObserver
*/
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/feast/core/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,18 @@

package feast.core.job;

import feast.specs.ImportSpecProto.ImportSpec;

public interface JobManager {

/**
* Submit an ingestion job into runner
* @param importSpec import spec of the ingestion job to run
* @param jobName name of the job
* @return extId runner specific job ID.
*/
String submitJob(ImportSpec importSpec, String jobName);

/**
* abort a job given runner-specific job ID.
*
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/feast/core/job/JobMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ public interface JobMonitor {
/**
* Get status of a job given runner-specific job ID.
*
* @param runnerJobId runner specific job id.
* @param job job.
* @return job status.
*/
JobStatus getJobStatus(String runnerJobId);
JobStatus getJobStatus(JobInfo job);

/**
* Get metrics of a job.
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/feast/core/job/NoopJobMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
public class NoopJobMonitor implements JobMonitor {

@Override
public JobStatus getJobStatus(String runnerJobId) {
public JobStatus getJobStatus(JobInfo job) {
return JobStatus.UNKNOWN;
}

Expand Down
Loading

0 comments on commit c525845

Please sign in to comment.