Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Optional#get() and string comparison bugs in JobService #804

Merged
merged 2 commits into from
Jun 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,13 @@ format-java:
mvn spotless:apply

lint-java:
mvn spotless:check
mvn --no-transfer-progress spotless:check

test-java:
mvn test
mvn --no-transfer-progress test

test-java-with-coverage:
mvn test jacoco:report-aggregate
mvn --no-transfer-progress test jacoco:report-aggregate

build-java:
mvn clean verify
Expand Down
75 changes: 37 additions & 38 deletions core/src/main/java/feast/core/service/JobService.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public JobService(
}
}

/* Job Service API */
// region Job Service API

/**
* List Ingestion Jobs in Feast matching the given request. See CoreService protobuf documentation
* for more detailed documentation.
Expand All @@ -88,24 +89,24 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request)

// check that filter specified and not empty
if (request.hasFilter()
&& !(request.getFilter().getId() == ""
&& request.getFilter().getStoreName() == ""
&& request.getFilter().hasFeatureSetReference() == false)) {
&& !(request.getFilter().getId().isEmpty()
&& request.getFilter().getStoreName().isEmpty()
&& !request.getFilter().hasFeatureSetReference())) {
// filter jobs based on request filter
ListIngestionJobsRequest.Filter filter = request.getFilter();

// for proto3, default value for missing values:
// - numeric values (ie int) is zero
// - strings is empty string
if (filter.getId() != "") {
if (!filter.getId().isEmpty()) {
// get by id: no more filters required: found job
Optional<Job> job = this.jobRepository.findById(filter.getId());
if (job.isPresent()) {
matchingJobIds.add(filter.getId());
}
} else {
// multiple filters can apply together in an 'and' operation
if (filter.getStoreName() != "") {
if (!filter.getStoreName().isEmpty()) {
// find jobs by name
List<Job> jobs = this.jobRepository.findByStoreName(filter.getStoreName());
Set<String> jobIds = jobs.stream().map(Job::getId).collect(Collectors.toSet());
Expand Down Expand Up @@ -140,7 +141,7 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request)
// convert matching job models to ingestion job protos
List<IngestionJobProto.IngestionJob> ingestJobs = new ArrayList<>();
for (String jobId : matchingJobIds) {
Job job = this.jobRepository.findById(jobId).get();
Job job = this.jobRepository.findById(jobId).orElseThrow();
// job that failed on start won't be converted toProto successfully
// and they're irrelevant here
if (job.getStatus() == JobStatus.ERROR) {
Expand All @@ -160,21 +161,20 @@ public ListIngestionJobsResponse listJobs(ListIngestionJobsRequest request)
* @param request restart ingestion job request specifying which job to stop
* @throws NoSuchElementException when restart job request requests to restart a nonexistent job.
* @throws UnsupportedOperationException when job to be restarted is in an unsupported status
* @throws InvalidProtocolBufferException on error when constructing response protobuf
*/
@Transactional
public RestartIngestionJobResponse restartJob(RestartIngestionJobRequest request)
throws InvalidProtocolBufferException {
// check job exists
Optional<Job> getJob = this.jobRepository.findById(request.getId());
if (getJob.isEmpty()) {
// FIXME: if getJob.isEmpty then constructing this error message will always throw an error...
throw new NoSuchElementException(
"Attempted to stop nonexistent job with id: " + getJob.get().getId());
}
public RestartIngestionJobResponse restartJob(RestartIngestionJobRequest request) {
String jobId = request.getId();

Job job =
this.jobRepository
.findById(jobId)
.orElseThrow(
() ->
new NoSuchElementException(
"Attempted to restart nonexistent job with id: " + jobId));

// check job status is valid for restarting
Job job = getJob.get();
JobStatus status = job.getStatus();
if (status.isTransitional() || status.isTerminal() || status == JobStatus.UNKNOWN) {
throw new UnsupportedOperationException(
Expand Down Expand Up @@ -202,20 +202,20 @@ public RestartIngestionJobResponse restartJob(RestartIngestionJobRequest request
* @param request stop ingestion job request specifying which job to stop
* @throws NoSuchElementException when stop job request requests to stop a nonexistent job.
* @throws UnsupportedOperationException when job to be stopped is in an unsupported status
* @throws InvalidProtocolBufferException on error when constructing response protobuf
*/
@Transactional
public StopIngestionJobResponse stopJob(StopIngestionJobRequest request)
throws InvalidProtocolBufferException {
// check job exists
Optional<Job> getJob = this.jobRepository.findById(request.getId());
if (getJob.isEmpty()) {
throw new NoSuchElementException(
"Attempted to stop nonexistent job with id: " + getJob.get().getId());
}
public StopIngestionJobResponse stopJob(StopIngestionJobRequest request) {
String jobId = request.getId();

Job job =
this.jobRepository
.findById(jobId)
.orElseThrow(
() ->
new NoSuchElementException(
"Attempted to stop nonexistent job with id: " + jobId));

// check job status is valid for stopping
Job job = getJob.get();
JobStatus status = job.getStatus();
if (status.isTerminal()) {
// do nothing - job is already stopped
Expand All @@ -240,7 +240,9 @@ public StopIngestionJobResponse stopJob(StopIngestionJobRequest request)
return StopIngestionJobResponse.newBuilder().build();
}

/* Private Utility Methods */
// endregion
// region Private Utility Methods

private <T> Set<T> mergeResults(Set<T> results, Collection<T> newResults) {
if (results.size() <= 0) {
// no existing results: copy over new results
Expand All @@ -252,7 +254,7 @@ private <T> Set<T> mergeResults(Set<T> results, Collection<T> newResults) {
return results;
}

// converts feature set reference to a list feature set filter
/** converts feature set reference to a list feature set filter */
private ListFeatureSetsRequest.Filter toListFeatureSetFilter(FeatureSetReference fsReference) {
// match featuresets using contents of featureset reference
String fsName = fsReference.getName();
Expand All @@ -262,16 +264,13 @@ private ListFeatureSetsRequest.Filter toListFeatureSetFilter(FeatureSetReference
// for proto3, default value for missing values:
// - numeric values (ie int) is zero
// - strings is empty string
ListFeatureSetsRequest.Filter filter =
ListFeatureSetsRequest.Filter.newBuilder()
.setFeatureSetName((fsName != "") ? fsName : "*")
.setProject((fsProject != "") ? fsProject : "*")
.build();

return filter;
return ListFeatureSetsRequest.Filter.newBuilder()
.setFeatureSetName(fsName.isEmpty() ? "*" : fsName)
.setProject(fsProject.isEmpty() ? "*" : fsProject)
.build();
}

// sync job status using job manager
/** sync job status using job manager */
private Job syncJobStatus(JobManager jobManager, Job job) {
JobStatus newStatus = jobManager.getJobStatus(job);
// log job status transition
Expand Down
28 changes: 14 additions & 14 deletions core/src/test/java/feast/core/service/JobServiceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,7 @@
import feast.proto.core.StoreProto.Store.StoreType;
import feast.proto.types.ValueProto.ValueType.Enum;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.*;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
Expand Down Expand Up @@ -87,7 +83,7 @@ public void setup() {
"*:*:*");

// fake featureset & job
this.featureSet = this.newDummyFeatureSet("food", 2, "hunger");
this.featureSet = this.newDummyFeatureSet("food", "hunger");
this.job = this.newDummyJob("kafka-to-redis", "job-1111", JobStatus.PENDING);
try {
this.ingestionJob = this.job.toProto();
Expand Down Expand Up @@ -140,7 +136,7 @@ public void setupJobManager() {
.thenReturn(this.newDummyJob(this.job.getId(), this.job.getExtId(), JobStatus.PENDING));
}

private FeatureSet newDummyFeatureSet(String name, int version, String project) {
private FeatureSet newDummyFeatureSet(String name, String project) {
Feature feature = TestObjectFactory.CreateFeature(name + "_feature", Enum.INT64);
Entity entity = TestObjectFactory.CreateEntity(name + "_entity", Enum.STRING);

Expand Down Expand Up @@ -242,7 +238,7 @@ public void testListJobsByStoreName() {

@Test
public void testListIngestionJobByFeatureSetReference() {
// list job by feature set reference: name and version and project
// list job by feature set reference: name and project
ListIngestionJobsRequest.Filter filter =
ListIngestionJobsRequest.Filter.newBuilder()
.setFeatureSetReference(this.fsReferences.get(0))
Expand Down Expand Up @@ -281,7 +277,7 @@ private StopIngestionJobResponse tryStopJob(
fail("Expected exception, but none was thrown");
}
} catch (Exception e) {
if (expectError != true) {
if (!expectError) {
// unexpected exception
e.printStackTrace();
fail("Caught Unexpected exception trying to restart job");
Expand Down Expand Up @@ -330,8 +326,7 @@ public void testStopUnsupportedError() {
// check for UnsupportedOperationException when trying to stop jobs are
// in an in unknown or in a transitional state
JobStatus prevStatus = this.job.getStatus();
List<JobStatus> unsupportedStatuses = new ArrayList<>();
unsupportedStatuses.addAll(JobStatus.getTransitionalStates());
List<JobStatus> unsupportedStatuses = new ArrayList<>(JobStatus.getTransitionalStates());
unsupportedStatuses.add(JobStatus.UNKNOWN);

for (JobStatus status : unsupportedStatuses) {
Expand All @@ -345,6 +340,12 @@ public void testStopUnsupportedError() {
this.job.setStatus(prevStatus);
}

@Test(expected = NoSuchElementException.class)
public void testStopJobForUnknownId() {
var request = StopIngestionJobRequest.newBuilder().setId("bogusJobId").build();
jobService.stopJob(request);
Comment on lines +343 to +346
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This didn't fail before the fix since it was the same exception type anyway. I don't think there's an easy way to assert on a thrown exception's message in JUnit 4.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are potentially adding JUnit 5 dependency to our fork, if you would like to do it now and use assertThrows to validate the message.

JUnit 5 does work well with 4, so no need to upgrade all tests when adding it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think Gojek folks tried/wanted to before—the Java SDK already uses it—but there was some reason it could not or should not be done? IIRC either @davidheryanto or @zhilingc mentioned it to me somewhere awhile back.

}

// restart jobs
private RestartIngestionJobResponse tryRestartJob(
RestartIngestionJobRequest request, boolean expectError) {
Expand All @@ -356,7 +357,7 @@ private RestartIngestionJobResponse tryRestartJob(
fail("Expected exception, but none was thrown");
}
} catch (Exception e) {
if (expectError != true) {
if (!expectError) {
// unexpected exception
e.printStackTrace();
fail("Caught Unexpected exception trying to stop job");
Expand Down Expand Up @@ -392,8 +393,7 @@ public void testRestartUnsupportedError() {
// check for UnsupportedOperationException when trying to restart jobs are
// in an in unknown or in a transitional state
JobStatus prevStatus = this.job.getStatus();
List<JobStatus> unsupportedStatuses = new ArrayList<>();
unsupportedStatuses.addAll(JobStatus.getTransitionalStates());
List<JobStatus> unsupportedStatuses = new ArrayList<>(JobStatus.getTransitionalStates());
unsupportedStatuses.add(JobStatus.UNKNOWN);

for (JobStatus status : unsupportedStatuses) {
Expand Down