From b3172854b42804022073b8e7a906c073d110e61c Mon Sep 17 00:00:00 2001 From: Ches Martin Date: Sun, 26 Apr 2020 14:19:32 +0700 Subject: [PATCH] core: Use Runner enum type instead of string for Job model #575 sought to clear up inconsistencies between uses of `Runner#name()` (the standard final method of `java.lang.Enum` that returns the value's enum constant name) and the riskily-named `Runner#getName()` defined in Feast for human-readable Beam Runner names. The latter is used as runner name users can set in config. The former is used for values of the runner column of the jobs table in SQL (as it should be). But it relied on careful coding to use the right one when constructing `Job` instances. This is error prone, as #578 demonstrates. There is a more robust way: use the enum instead of stringly-typed programming. It's one of the reasons we have enums :-) This also renames the internal identifier in the Runner definition to `humanName`, to distinguish it further from `Enum#name()`. --- .../java/feast/core/job/JobUpdateTask.java | 2 +- core/src/main/java/feast/core/job/Runner.java | 26 +++++++----- .../core/job/dataflow/DataflowJobManager.java | 4 +- core/src/main/java/feast/core/model/Job.java | 7 ++-- .../java/feast/core/service/JobService.java | 13 +++--- .../feast/core/job/JobUpdateTaskTest.java | 18 ++++---- .../test/java/feast/core/job/RunnerTest.java | 42 +++++++++++++++++++ .../job/dataflow/DataflowJobManagerTest.java | 4 +- .../direct/DirectRunnerJobManagerTest.java | 2 +- .../service/JobCoordinatorServiceTest.java | 12 +++--- .../feast/core/service/JobServiceTest.java | 2 +- 11 files changed, 90 insertions(+), 42 deletions(-) create mode 100644 core/src/test/java/feast/core/job/RunnerTest.java diff --git a/core/src/main/java/feast/core/job/JobUpdateTask.java b/core/src/main/java/feast/core/job/JobUpdateTask.java index f3afe84df7..04aab0cff6 100644 --- a/core/src/main/java/feast/core/job/JobUpdateTask.java +++ b/core/src/main/java/feast/core/job/JobUpdateTask.java @@ -144,7 +144,7 @@ private Job startJob( new Job( jobId, "", - jobManager.getRunnerType().name(), + jobManager.getRunnerType(), Source.fromProto(source), Store.fromProto(sinkSpec), featureSets, diff --git a/core/src/main/java/feast/core/job/Runner.java b/core/src/main/java/feast/core/job/Runner.java index 4e2033fed6..acccb70c8b 100644 --- a/core/src/main/java/feast/core/job/Runner.java +++ b/core/src/main/java/feast/core/job/Runner.java @@ -16,33 +16,37 @@ */ package feast.core.job; +import java.util.NoSuchElementException; + +/** + * An Apache Beam Runner, for which Feast Core supports managing ingestion jobs. + * + * @see Beam Runners + */ public enum Runner { DATAFLOW("DataflowRunner"), FLINK("FlinkRunner"), DIRECT("DirectRunner"); - private final String name; + private final String humanName; - Runner(String name) { - this.name = name; + Runner(String humanName) { + this.humanName = humanName; } - /** - * Get the human readable name of this runner. Returns a human readable name of the runner that - * can be used for logging/config files/etc. - */ + /** Returns the human readable name of this runner, usable in logging, config files, etc. */ @Override public String toString() { - return name; + return humanName; } /** Parses a runner from its human readable name. */ - public static Runner fromString(String runner) { + public static Runner fromString(String humanName) { for (Runner r : Runner.values()) { - if (r.toString().equals(runner)) { + if (r.toString().equals(humanName)) { return r; } } - throw new IllegalArgumentException("Unknown value: " + runner); + throw new NoSuchElementException("Unknown Runner value: " + humanName); } } diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 6002133e82..880dd6c146 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -220,7 +220,7 @@ public Job restartJob(Job job) { */ @Override public JobStatus getJobStatus(Job job) { - if (!Runner.DATAFLOW.name().equals(job.getRunner())) { + if (job.getRunner() != RUNNER_TYPE) { return job.getStatus(); } @@ -252,7 +252,7 @@ private Job submitDataflowJob( return new Job( jobName, jobId, - getRunnerType().name(), + getRunnerType(), Source.fromProto(source), Store.fromProto(sink), featureSets, diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 060edefc3b..95bcd79e6c 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -19,6 +19,7 @@ import com.google.protobuf.InvalidProtocolBufferException; import feast.core.FeatureSetProto; import feast.core.IngestionJobProto; +import feast.core.job.Runner; import java.util.ArrayList; import java.util.List; import javax.persistence.CascadeType; @@ -55,9 +56,9 @@ public class Job extends AbstractTimestampEntity { private String extId; // Runner type - // Use Runner.name() when converting a Runner to string to assign to this property. + @Enumerated(EnumType.STRING) @Column(name = "runner") - private String runner; + private Runner runner; // Source id @ManyToOne @@ -96,7 +97,7 @@ public Job() { public Job( String id, String extId, - String runner, + Runner runner, Source source, Store sink, List featureSets, diff --git a/core/src/main/java/feast/core/service/JobService.java b/core/src/main/java/feast/core/service/JobService.java index bf74b90e80..c8fc5caf5e 100644 --- a/core/src/main/java/feast/core/service/JobService.java +++ b/core/src/main/java/feast/core/service/JobService.java @@ -29,6 +29,7 @@ import feast.core.IngestionJobProto; import feast.core.dao.JobRepository; import feast.core.job.JobManager; +import feast.core.job.Runner; import feast.core.log.Action; import feast.core.log.AuditLogger; import feast.core.log.Resource; @@ -50,13 +51,13 @@ import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; -/** Defines a Job Managemenent Service that allows users to manage feast ingestion jobs. */ +/** A Job Management Service that allows users to manage Feast ingestion jobs. */ @Slf4j @Service public class JobService { - private JobRepository jobRepository; - private SpecService specService; - private Map jobManagers; + private final JobRepository jobRepository; + private final SpecService specService; + private final Map jobManagers; @Autowired public JobService( @@ -66,13 +67,13 @@ public JobService( this.jobManagers = new HashMap<>(); for (JobManager manager : jobManagerList) { - this.jobManagers.put(manager.getRunnerType().name(), manager); + this.jobManagers.put(manager.getRunnerType(), manager); } } /* Job Service API */ /** - * List Ingestion Jobs in feast matching the given request. See CoreService protobuf documentation + * List Ingestion Jobs in Feast matching the given request. See CoreService protobuf documentation * for more detailed documentation. * * @param request list ingestion jobs request specifying which jobs to include diff --git a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java index 2a1e80994a..5faf446a94 100644 --- a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java +++ b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java @@ -102,7 +102,7 @@ public void shouldUpdateJobIfPresent() { new Job( "job", "old_ext", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -119,7 +119,7 @@ public void shouldUpdateJobIfPresent() { new Job( "job", "old_ext", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -129,7 +129,7 @@ public void shouldUpdateJobIfPresent() { new Job( "job", "new_ext", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, Source.fromProto(source), Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -163,7 +163,7 @@ public void shouldCreateJobIfNotPresent() { new Job( "job", "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -173,7 +173,7 @@ public void shouldCreateJobIfNotPresent() { new Job( "job", "ext", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -202,7 +202,7 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { new Job( "job", "ext", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -216,7 +216,7 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { new Job( "job", "ext", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, Source.fromProto(source), Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -248,7 +248,7 @@ public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { new Job( "job", "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -258,7 +258,7 @@ public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { new Job( "job", "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), diff --git a/core/src/test/java/feast/core/job/RunnerTest.java b/core/src/test/java/feast/core/job/RunnerTest.java new file mode 100644 index 0000000000..ce1700acbe --- /dev/null +++ b/core/src/test/java/feast/core/job/RunnerTest.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +import java.util.NoSuchElementException; +import org.junit.Test; + +public class RunnerTest { + + @Test + public void toStringReturnsHumanReadableName() { + assertThat(Runner.DATAFLOW.toString(), is("DataflowRunner")); + } + + @Test + public void fromStringLoadsValueFromHumanReadableName() { + var humanName = Runner.DATAFLOW.toString(); + assertThat(Runner.fromString(humanName), is(Runner.DATAFLOW)); + } + + @Test(expected = NoSuchElementException.class) + public void fromStringThrowsNoSuchElementExceptionForUnknownValue() { + Runner.fromString("this is not a valid Runner"); + } +} diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index e610f39373..72b921ef69 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -158,7 +158,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { new Job( jobName, "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, Source.fromProto(source), Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), @@ -239,7 +239,7 @@ public void shouldThrowExceptionWhenJobStateTerminal() throws IOException { new Job( "job", "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, Source.fromProto(source), Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index 76530d9f40..6980450ca4 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -144,7 +144,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { new Job( expectedJobId, "", - Runner.DIRECT.name(), + Runner.DIRECT, Source.fromProto(source), Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 52e838c3d9..59fdc32b20 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -164,7 +164,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep new Job( "", "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -174,7 +174,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep new Job( "some_id", extId, - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -264,7 +264,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "name1", "", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source1), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -274,7 +274,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "name1", "extId1", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source1), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -284,7 +284,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "", "extId2", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source2), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet2)), @@ -294,7 +294,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "name2", "extId2", - Runner.DATAFLOW.name(), + Runner.DATAFLOW, feast.core.model.Source.fromProto(source2), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet2)), diff --git a/core/src/test/java/feast/core/service/JobServiceTest.java b/core/src/test/java/feast/core/service/JobServiceTest.java index c0e90ca43f..6f34205bbf 100644 --- a/core/src/test/java/feast/core/service/JobServiceTest.java +++ b/core/src/test/java/feast/core/service/JobServiceTest.java @@ -179,7 +179,7 @@ private Job newDummyJob(String id, String extId, JobStatus status) { return new Job( id, extId, - Runner.DATAFLOW.name(), + Runner.DATAFLOW, this.dataSource, this.dataStore, Arrays.asList(this.featureSet),