Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to submit ingestion job using Flink #62

Merged
merged 9 commits into from
Jan 18, 2019
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,11 @@
<artifactId>google-cloud-bigquery</artifactId>
<version>1.48.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.5.5</version>
</dependency>

<!--compile 'com.github.spullara.mustache.java:compiler:0.9.5'-->
<dependency>
Expand Down Expand Up @@ -249,5 +254,11 @@
<version>2.23.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>mockwebserver</artifactId>
<version>3.11.0</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
171 changes: 112 additions & 59 deletions core/src/main/java/feast/core/config/JobConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,37 @@
import com.timgroup.statsd.StatsDClient;
import feast.core.job.JobManager;
import feast.core.job.JobMonitor;
import feast.core.job.NoopJobManager;
import feast.core.job.NoopJobMonitor;
import feast.core.job.Runner;
import feast.core.job.StatsdMetricPusher;
import feast.core.job.dataflow.DataflowJobConfig;
import feast.core.job.dataflow.DataflowJobManager;
import feast.core.job.dataflow.DataflowJobMonitor;
import feast.core.job.direct.DirectRunnerJobManager;
import feast.core.job.flink.FlinkJobConfig;
import feast.core.job.flink.FlinkJobManager;
import feast.core.job.flink.FlinkJobMonitor;
import feast.core.job.flink.FlinkRestApi;
import java.io.IOException;
import java.security.GeneralSecurityException;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.configuration.GlobalConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

/**
* Beans for job management
*/
/** Beans for job management */
@Slf4j
@Configuration
public class JobConfig {

/**
* Get configuration for dataflow connection
*
* @param projectId
* @param location
* @return DataflowJobConfig
Expand All @@ -59,89 +68,133 @@ public DataflowJobConfig getDataflowJobConfig(
return new DataflowJobConfig(projectId, location);
}

@Bean
public FlinkJobConfig getFlinkJobConfig(
@Value("${feast.jobs.flink.configDir}") String flinkConfigDir,
@Value("${feast.jobs.flink.masterUrl}") String flinkMasterUrl) {
return new FlinkJobConfig(flinkMasterUrl, flinkConfigDir);
}

/**
* Get a JobManager according to the runner type and dataflow configuration.
*
* @param runnerType runner type: one of [DataflowRunner, DirectRunner, FlinkRunner]
* @param dfConfig dataflow job configuration
* @return JobManager
*/
@Bean
public JobManager getJobManager(@Value("${feast.jobs.runner}") String runnerType,
DataflowJobConfig dfConfig) {
if ("DataflowRunner".equals(runnerType)) {
if (Strings.isNullOrEmpty(dfConfig.getLocation()) ||
Strings.isNullOrEmpty(dfConfig.getProjectId())) {
log.error("Project and location of the Dataflow runner is not configured");
throw new IllegalStateException(
"Project and location of Dataflow runner must be specified for jobs to be run on Dataflow runner.");
}
try {
GoogleCredential credential = GoogleCredential.getApplicationDefault()
.createScoped(DataflowScopes.all());
Dataflow dataflow = new Dataflow(GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(), credential);
public JobManager getJobManager(
@Value("${feast.jobs.runner}") String runnerType,
DataflowJobConfig dfConfig,
FlinkJobConfig flinkConfig,
ImportJobDefaults defaults)
throws Exception {

Runner runner = Runner.fromString(runnerType);

switch (runner) {
case DATAFLOW:
if (Strings.isNullOrEmpty(dfConfig.getLocation())
|| Strings.isNullOrEmpty(dfConfig.getProjectId())) {
log.error("Project and location of the Dataflow runner is not configured");
throw new IllegalStateException(
"Project and location of Dataflow runner must be specified for jobs to be run on Dataflow runner.");
}
try {
GoogleCredential credential =
GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all());
Dataflow dataflow =
new Dataflow(
GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(),
credential);

return new DataflowJobManager(dataflow,
dfConfig.getProjectId(),
dfConfig.getLocation());
} catch (IOException e) {
throw new IllegalStateException(
"Unable to find credential required for Dataflow monitoring API", e);
} catch (GeneralSecurityException e) {
throw new IllegalStateException("Security exception while connecting to Dataflow API", e);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize DataflowJobManager", e);
}
return new DataflowJobManager(
dataflow, dfConfig.getProjectId(), dfConfig.getLocation(), defaults);
} catch (IOException e) {
throw new IllegalStateException(
"Unable to find credential required for Dataflow monitoring API", e);
} catch (GeneralSecurityException e) {
throw new IllegalStateException("Security exception while connecting to Dataflow API", e);
} catch (Exception e) {
throw new IllegalStateException("Unable to initialize DataflowJobManager", e);
}
case FLINK:
org.apache.flink.configuration.Configuration configuration =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor suggestion.. I think this construction code should be moved to a factory style method of the FlinkJobManager.

Similarly for the DataflowJobManager.

GlobalConfiguration.loadConfiguration(flinkConfig.getConfigDir());
List<CustomCommandLine<?>> customCommandLines =
CliFrontend.loadCustomCommandLines(configuration, flinkConfig.getConfigDir());
CliFrontend flinkCli = new CliFrontend(configuration, customCommandLines);
FlinkRestApi flinkRestApi =
new FlinkRestApi(new RestTemplate(), flinkConfig.getMasterUrl());
return new FlinkJobManager(flinkCli, flinkConfig, flinkRestApi, defaults);
case DIRECT:
return new DirectRunnerJobManager(defaults);
default:
throw new IllegalArgumentException("Unsupported runner: " + runnerType);
}
return new NoopJobManager();
}

/**
* Get a Job Monitor given the runner type and dataflow configuration.
*
* @param runnerType runner type: one of [DataflowRunner, DirectRunner, FlinkRunner]
* @param dfConfig dataflow job configuration
* @return JobMonitor
*/
@Bean
public JobMonitor getJobMonitor(@Value("${feast.jobs.runner}") String runnerType,
DataflowJobConfig dfConfig) {
if ("DataflowRunner".equals(runnerType)) {
if (Strings.isNullOrEmpty(dfConfig.getLocation()) ||
Strings.isNullOrEmpty(dfConfig.getProjectId())) {
log.warn(
"Project and location of the Dataflow runner is not configured, will not do job monitoring");
return new NoopJobMonitor();
}
try {
GoogleCredential credential = GoogleCredential.getApplicationDefault()
.createScoped(DataflowScopes.all());
Dataflow dataflow = new Dataflow(GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(), credential);
public JobMonitor getJobMonitor(
@Value("${feast.jobs.runner}") String runnerType,
DataflowJobConfig dfConfig,
FlinkJobConfig flinkJobConfig)
throws Exception {

return new DataflowJobMonitor(dataflow,
dfConfig.getProjectId(),
dfConfig.getLocation());
} catch (IOException e) {
log.error("Unable to find credential required for Dataflow monitoring API: {}",
e.getMessage());
} catch (GeneralSecurityException e) {
log.error("Security exception while ");
} catch (Exception e) {
log.error("Unable to initialize DataflowJobMonitor", e);
}
}
Runner runner = Runner.fromString(runnerType);

switch (runner) {
case DATAFLOW:
if (Strings.isNullOrEmpty(dfConfig.getLocation())
|| Strings.isNullOrEmpty(dfConfig.getProjectId())) {
log.warn(
"Project and location of the Dataflow runner is not configured, will not do job monitoring");
return new NoopJobMonitor();
}
try {
GoogleCredential credential =
GoogleCredential.getApplicationDefault().createScoped(DataflowScopes.all());
Dataflow dataflow =
new Dataflow(
GoogleNetHttpTransport.newTrustedTransport(),
JacksonFactory.getDefaultInstance(),
credential);

// Default to no monitoring
return new NoopJobMonitor();
return new DataflowJobMonitor(dataflow, dfConfig.getProjectId(), dfConfig.getLocation());
} catch (IOException e) {
log.error(
"Unable to find credential required for Dataflow monitoring API: {}", e.getMessage());
} catch (GeneralSecurityException e) {
log.error("Security exception while ");
} catch (Exception e) {
log.error("Unable to initialize DataflowJobMonitor", e);
}
case FLINK:
FlinkRestApi flinkRestApi =
new FlinkRestApi(new RestTemplate(), flinkJobConfig.getMasterUrl());
return new FlinkJobMonitor(flinkRestApi);
case DIRECT:
default:
return new NoopJobMonitor();
}
}

/**
* Get metrics pusher to statsd
*
* @param statsDClient
* @return StatsdMetricPusher
*/
@Bean
public StatsdMetricPusher getStatsdMetricPusher(StatsDClient statsDClient){
public StatsdMetricPusher getStatsdMetricPusher(StatsDClient statsDClient) {
return new StatsdMetricPusher(statsDClient);
}
}
22 changes: 15 additions & 7 deletions core/src/main/java/feast/core/grpc/JobServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,36 @@

import com.google.protobuf.Empty;
import feast.core.JobServiceGrpc;
import feast.core.JobServiceProto.JobServiceTypes.*;
import feast.core.JobServiceProto.JobServiceTypes.AbortJobRequest;
import feast.core.JobServiceProto.JobServiceTypes.AbortJobResponse;
import feast.core.JobServiceProto.JobServiceTypes.GetJobRequest;
import feast.core.JobServiceProto.JobServiceTypes.GetJobResponse;
import feast.core.JobServiceProto.JobServiceTypes.JobDetail;
import feast.core.JobServiceProto.JobServiceTypes.ListJobsResponse;
import feast.core.JobServiceProto.JobServiceTypes.SubmitImportJobRequest;
import feast.core.JobServiceProto.JobServiceTypes.SubmitImportJobResponse;
import feast.core.exception.JobExecutionException;
import feast.core.service.JobExecutionService;
import feast.core.service.JobManagementService;
import feast.core.validators.SpecValidator;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.lognet.springboot.grpc.GRpcService;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

/** Implementation of the feast job GRPC service. */
@Slf4j
@GRpcService
public class JobServiceImpl extends JobServiceGrpc.JobServiceImplBase {
@Autowired private JobExecutionService jobExecutionService;

@Autowired private JobManagementService jobManagementService;

@Autowired private SpecValidator validator;

/**
* submit a job to the runner by providing an import spec.
*
* @param request ImportJobRequest object containing an import spec
* @param responseObserver
*/
Expand All @@ -53,8 +57,9 @@ public void submitJob(
SubmitImportJobRequest request, StreamObserver<SubmitImportJobResponse> responseObserver) {
try {
validator.validateImportSpec(request.getImportSpec());
String jobID = jobManagementService.submitJob(request.getImportSpec(), request.getName());
SubmitImportJobResponse response =
jobExecutionService.submitJob(request.getImportSpec(), request.getName());
SubmitImportJobResponse.newBuilder().setJobId(jobID).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (IllegalArgumentException e) {
Expand All @@ -68,6 +73,7 @@ public void submitJob(

/**
* Abort a job given its feast-internal job id
*
* @param request AbortJobRequest object containing feast job id
* @param responseObserver
*/
Expand All @@ -86,6 +92,7 @@ public void abortJob(AbortJobRequest request, StreamObserver<AbortJobResponse> r

/**
* List all jobs previously submitted to the system.
*
* @param request Empty request
* @param responseObserver
*/
Expand All @@ -104,6 +111,7 @@ public void listJobs(Empty request, StreamObserver<ListJobsResponse> responseObs

/**
* Get a single job previously submitted to the system by id
*
* @param request GetJobRequest object containing a feast-internal job id
* @param responseObserver
*/
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/java/feast/core/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,18 @@

package feast.core.job;

import feast.specs.ImportSpecProto.ImportSpec;

public interface JobManager {

/**
* Submit an ingestion job into runner
* @param importSpec import spec of the ingestion job to run
* @param jobName name of the job
* @return extId runner specific job ID.
*/
String submitJob(ImportSpec importSpec, String jobName);

/**
* abort a job given runner-specific job ID.
*
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/feast/core/job/JobMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ public interface JobMonitor {
/**
* Get status of a job given runner-specific job ID.
*
* @param runnerJobId runner specific job id.
* @param job job.
* @return job status.
*/
JobStatus getJobStatus(String runnerJobId);
JobStatus getJobStatus(JobInfo job);

/**
* Get metrics of a job.
Expand Down
27 changes: 0 additions & 27 deletions core/src/main/java/feast/core/job/NoopJobManager.java

This file was deleted.

Loading