Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Ingestion Job management API for Feast Core (#548) #624

Merged
merged 1 commit into from
Apr 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/main/java/feast/core/dao/JobRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.dao;

import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import java.util.Collection;
Expand All @@ -29,4 +30,10 @@ public interface JobRepository extends JpaRepository<Job, String> {
List<Job> findByStatusNotIn(Collection<JobStatus> statuses);

List<Job> findBySourceIdAndStoreNameOrderByLastUpdatedDesc(String sourceId, String storeName);

// find jobs by feast store name
List<Job> findByStoreName(String storeName);

// find jobs by featureset
List<Job> findByFeatureSetsIn(List<FeatureSet> featureSets);
}
81 changes: 80 additions & 1 deletion core/src/main/java/feast/core/grpc/CoreServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package feast.core.grpc;

import com.google.api.gax.rpc.InvalidArgumentException;
import feast.core.CoreServiceGrpc.CoreServiceImplBase;
import feast.core.CoreServiceProto.ApplyFeatureSetRequest;
import feast.core.CoreServiceProto.ApplyFeatureSetResponse;
Expand All @@ -29,21 +30,29 @@
import feast.core.CoreServiceProto.GetFeatureSetResponse;
import feast.core.CoreServiceProto.ListFeatureSetsRequest;
import feast.core.CoreServiceProto.ListFeatureSetsResponse;
import feast.core.CoreServiceProto.ListIngestionJobsRequest;
import feast.core.CoreServiceProto.ListIngestionJobsResponse;
import feast.core.CoreServiceProto.ListProjectsRequest;
import feast.core.CoreServiceProto.ListProjectsResponse;
import feast.core.CoreServiceProto.ListStoresRequest;
import feast.core.CoreServiceProto.ListStoresResponse;
import feast.core.CoreServiceProto.RestartIngestionJobRequest;
import feast.core.CoreServiceProto.RestartIngestionJobResponse;
import feast.core.CoreServiceProto.StopIngestionJobRequest;
import feast.core.CoreServiceProto.StopIngestionJobResponse;
import feast.core.CoreServiceProto.UpdateStoreRequest;
import feast.core.CoreServiceProto.UpdateStoreResponse;
import feast.core.exception.RetrievalException;
import feast.core.grpc.interceptors.MonitoringInterceptor;
import feast.core.model.Project;
import feast.core.service.AccessManagementService;
import feast.core.service.JobService;
import feast.core.service.SpecService;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.stub.StreamObserver;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.lognet.springboot.grpc.GRpcService;
Expand All @@ -56,11 +65,16 @@ public class CoreServiceImpl extends CoreServiceImplBase {

private SpecService specService;
private AccessManagementService accessManagementService;
private JobService jobService;

@Autowired
public CoreServiceImpl(SpecService specService, AccessManagementService accessManagementService) {
public CoreServiceImpl(
SpecService specService,
AccessManagementService accessManagementService,
JobService jobService) {
this.specService = specService;
this.accessManagementService = accessManagementService;
this.jobService = jobService;
}

@Override
Expand Down Expand Up @@ -191,4 +205,69 @@ public void listProjects(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void listIngestionJobs(
ListIngestionJobsRequest request,
StreamObserver<ListIngestionJobsResponse> responseObserver) {
try {
ListIngestionJobsResponse response = this.jobService.listJobs(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (InvalidArgumentException e) {
log.error("Recieved an invalid request on calling listIngestionJobs method:", e);
responseObserver.onError(
Status.INVALID_ARGUMENT.withDescription(e.getMessage()).withCause(e).asException());
} catch (Exception e) {
log.error("Unexpected exception on calling listIngestionJobs method:", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void restartIngestionJob(
RestartIngestionJobRequest request,
StreamObserver<RestartIngestionJobResponse> responseObserver) {
try {
RestartIngestionJobResponse response = this.jobService.restartJob(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (NoSuchElementException e) {
log.error(
"Attempted to restart an nonexistent job on calling restartIngestionJob method:", e);
responseObserver.onError(
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
} catch (UnsupportedOperationException e) {
log.error("Recieved an unsupported request on calling restartIngestionJob method:", e);
responseObserver.onError(
Status.FAILED_PRECONDITION.withDescription(e.getMessage()).withCause(e).asException());
} catch (Exception e) {
log.error("Unexpected exception on calling restartIngestionJob method:", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}

@Override
public void stopIngestionJob(
StopIngestionJobRequest request, StreamObserver<StopIngestionJobResponse> responseObserver) {
try {
StopIngestionJobResponse response = this.jobService.stopJob(request);
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (NoSuchElementException e) {
log.error("Attempted to stop an nonexistent job on calling stopIngestionJob method:", e);
responseObserver.onError(
Status.NOT_FOUND.withDescription(e.getMessage()).withCause(e).asException());
} catch (UnsupportedOperationException e) {
log.error("Recieved an unsupported request on calling stopIngestionJob method:", e);
responseObserver.onError(
Status.FAILED_PRECONDITION.withDescription(e.getMessage()).withCause(e).asException());
} catch (Exception e) {
log.error("Unexpected exception on calling stopIngestionJob method:", e);
responseObserver.onError(
Status.INTERNAL.withDescription(e.getMessage()).withCause(e).asRuntimeException());
}
}
}
10 changes: 10 additions & 0 deletions core/src/main/java/feast/core/job/JobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ public interface JobManager {
*/
void abortJob(String extId);

/**
* Restart an job. If job is an terminated state, will simply start the job. Might cause data to
* be lost during when restarting running jobs in some implementations. Refer to on docs the
* specific implementation.
*
* @param job job to restart
* @return the restarted job
*/
Job restartJob(Job job);

/**
* Get status of a job given runner-specific job ID.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,26 @@ public void abortJob(String dataflowJobId) {
}
}

/**
* Restart a restart dataflow job. Dataflow should ensure continuity between during the restart,
* so no data should be lost during the restart operation.
*
* @param job job to restart
* @return the restarted job
*/
@Override
public Job restartJob(Job job) {
JobStatus status = job.getStatus();
if (JobStatus.getTerminalState().contains(status)) {
// job yet not running: just start job
return this.startJob(job);
} else {
// job is running - updating the job without changing the job has
// the effect of restarting the job
return this.updateJob(job);
}
}

/**
* Get status of a dataflow job with given id and try to map it into Feast's JobStatus.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,26 @@ public PipelineResult runPipeline(ImportOptions pipelineOptions) throws IOExcept
return ImportJob.runPipeline(pipelineOptions);
}

/**
* Restart a direct runner job. Note that some data will be temporarily lost during when
* restarting running direct runner jobs. See {#link {@link #updateJob(Job)} for more info.
*
* @param job job to restart
* @return the restarted job
*/
@Override
public Job restartJob(Job job) {
JobStatus status = job.getStatus();
if (JobStatus.getTerminalState().contains(status)) {
// job yet not running: just start job
return this.startJob(job);
} else {
// job is running - updating the job without changing the job has
// the effect of restarting the job.
return this.updateJob(job);
}
}

/**
* Gets the state of the direct runner job. Direct runner jobs only have 2 states: RUNNING and
* ABORTED.
Expand Down
45 changes: 44 additions & 1 deletion core/src/main/java/feast/core/model/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,24 @@
*/
package feast.core.model;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.FeatureSetProto;
import feast.core.IngestionJobProto;
import java.util.ArrayList;
import java.util.List;
import javax.persistence.*;
import javax.persistence.CascadeType;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.EnumType;
import javax.persistence.Enumerated;
import javax.persistence.Id;
import javax.persistence.Index;
import javax.persistence.JoinColumn;
import javax.persistence.JoinTable;
import javax.persistence.ManyToMany;
import javax.persistence.ManyToOne;
import javax.persistence.OneToMany;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -102,4 +118,31 @@ public void updateMetrics(List<Metrics> newMetrics) {
public String getSinkName() {
return store.getName();
}

/**
* Convert a job model to ingestion job proto
*
* @return Ingestion Job proto derieved from the given job
*/
public IngestionJobProto.IngestionJob toProto() throws InvalidProtocolBufferException {

// convert featuresets of job to protos
List<FeatureSetProto.FeatureSet> featureSetProtos = new ArrayList<>();
for (FeatureSet featureSet : this.getFeatureSets()) {
featureSetProtos.add(featureSet.toProto());
}

// build ingestion job proto with job data
IngestionJobProto.IngestionJob ingestJob =
IngestionJobProto.IngestionJob.newBuilder()
.setId(this.getId())
.setExternalId(this.getExtId())
.setStatus(this.getStatus().toProto())
.addAllFeatureSets(featureSetProtos)
.setSource(this.getSource().toProto())
.setStore(this.getStore().toProto())
.build();

return ingestJob;
}
}
39 changes: 39 additions & 0 deletions core/src/main/java/feast/core/model/JobStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
*/
package feast.core.model;

import com.google.common.collect.ImmutableMap;
import feast.core.IngestionJobProto.IngestionJobStatus;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;

public enum JobStatus {
/** Job status is not known. */
Expand Down Expand Up @@ -64,4 +67,40 @@ public enum JobStatus {
public static Collection<JobStatus> getTerminalState() {
return TERMINAL_STATE;
}

private static final Collection<JobStatus> TRANSITIONAL_STATES =
Collections.unmodifiableList(Arrays.asList(PENDING, ABORTING, SUSPENDING));

/**
* Get Transitional Job Status states. Transitionals states are assigned to jobs that
* transitioning to a more stable state (ie SUSPENDED, ABORTED etc.)
*
* @return Collection of transitional Job Status states.
*/
public static final Collection<JobStatus> getTransitionalStates() {
return TRANSITIONAL_STATES;
}

private static final Map<JobStatus, IngestionJobStatus> INGESTION_JOB_STATUS_MAP =
ImmutableMap.<JobStatus, IngestionJobStatus>builder()
.put(JobStatus.UNKNOWN, IngestionJobStatus.UNKNOWN)
.put(JobStatus.PENDING, IngestionJobStatus.PENDING)
.put(JobStatus.RUNNING, IngestionJobStatus.RUNNING)
.put(JobStatus.COMPLETED, IngestionJobStatus.COMPLETED)
.put(JobStatus.ABORTING, IngestionJobStatus.ABORTING)
.put(JobStatus.ABORTED, IngestionJobStatus.ABORTED)
.put(JobStatus.ERROR, IngestionJobStatus.ERROR)
.put(JobStatus.SUSPENDING, IngestionJobStatus.SUSPENDING)
.put(JobStatus.SUSPENDED, IngestionJobStatus.SUSPENDED)
.build();

/**
* Convert a Job Status to Ingestion Job Status proto
*
* @return IngestionJobStatus proto derieved from this job status
*/
public IngestionJobStatus toProto() {
// maps job models job status to ingestion job status
return INGESTION_JOB_STATUS_MAP.get(this);
}
}
Loading