diff --git a/core/src/main/java/feast/core/job/JobUpdateTask.java b/core/src/main/java/feast/core/job/JobUpdateTask.java index 57b2dfee4f..1ade5364fa 100644 --- a/core/src/main/java/feast/core/job/JobUpdateTask.java +++ b/core/src/main/java/feast/core/job/JobUpdateTask.java @@ -144,7 +144,7 @@ private Job startJob( new Job( jobId, "", - jobManager.getRunnerType().toString(), + jobManager.getRunnerType().name(), Source.fromProto(source), Store.fromProto(sinkSpec), featureSets, @@ -155,7 +155,7 @@ private Job startJob( jobId, Action.SUBMIT, "Building graph and submitting to %s", - jobManager.getRunnerType().getName()); + jobManager.getRunnerType().toString()); job = jobManager.startJob(job); if (job.getExtId().isEmpty()) { @@ -168,7 +168,7 @@ private Job startJob( jobId, Action.STATUS_CHANGE, "Job submitted to runner %s with ext id %s.", - jobManager.getRunnerType().getName(), + jobManager.getRunnerType().toString(), job.getExtId()); return job; @@ -178,7 +178,7 @@ private Job startJob( jobId, Action.STATUS_CHANGE, "Job failed to be submitted to runner %s. Job status changed to ERROR.", - jobManager.getRunnerType().getName()); + jobManager.getRunnerType().toString()); job.setStatus(JobStatus.ERROR); return job; @@ -205,7 +205,7 @@ private Job updateJob( Action.UPDATE, "Updating job %s for runner %s", job.getId(), - jobManager.getRunnerType().getName()); + jobManager.getRunnerType().toString()); return jobManager.updateJob(job); } diff --git a/core/src/main/java/feast/core/job/Runner.java b/core/src/main/java/feast/core/job/Runner.java index 637621be35..4e2033fed6 100644 --- a/core/src/main/java/feast/core/job/Runner.java +++ b/core/src/main/java/feast/core/job/Runner.java @@ -27,13 +27,19 @@ public enum Runner { this.name = name; } - public String getName() { + /** + * Get the human readable name of this runner. Returns a human readable name of the runner that + * can be used for logging/config files/etc. + */ + @Override + public String toString() { return name; } + /** Parses a runner from its human readable name. */ public static Runner fromString(String runner) { for (Runner r : Runner.values()) { - if (r.getName().equals(runner)) { + if (r.toString().equals(runner)) { return r; } } diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 56c6f9de5f..1580a3ec85 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -154,7 +154,7 @@ public void abortJob(String dataflowJobId) { */ @Override public JobStatus getJobStatus(Job job) { - if (!Runner.DATAFLOW.getName().equals(job.getRunner())) { + if (!Runner.DATAFLOW.name().equals(job.getRunner())) { return job.getStatus(); } @@ -185,7 +185,7 @@ private Job submitDataflowJob( .map( fsp -> { FeatureSet featureSet = new FeatureSet(); - featureSet.setName(fsp.getSpec().getName()); + featureSet.setName(fsp.getSpec().toString()); featureSet.setVersion(fsp.getSpec().getVersion()); featureSet.setProject(new Project(fsp.getSpec().getProject())); return featureSet; @@ -195,7 +195,7 @@ private Job submitDataflowJob( return new Job( jobName, jobId, - getRunnerType().getName(), + getRunnerType().name(), Source.fromProto(source), Store.fromProto(sink), featureSets, diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 2fd17a58d7..377f5f7095 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -39,6 +39,7 @@ public class Job extends AbstractTimestampEntity { private String extId; // Runner type + // Use Runner.name() when converting a Runner to string to assign to this property. @Column(name = "runner") private String runner; diff --git a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java index 19ce0858b2..2a1e80994a 100644 --- a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java +++ b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java @@ -102,7 +102,7 @@ public void shouldUpdateJobIfPresent() { new Job( "job", "old_ext", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -119,7 +119,7 @@ public void shouldUpdateJobIfPresent() { new Job( "job", "old_ext", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -129,7 +129,7 @@ public void shouldUpdateJobIfPresent() { new Job( "job", "new_ext", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), Source.fromProto(source), Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -163,7 +163,7 @@ public void shouldCreateJobIfNotPresent() { new Job( "job", "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -173,7 +173,7 @@ public void shouldCreateJobIfNotPresent() { new Job( "job", "ext", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -202,7 +202,7 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { new Job( "job", "ext", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -216,7 +216,7 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { new Job( "job", "ext", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), Source.fromProto(source), Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -248,7 +248,7 @@ public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { new Job( "job", "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -258,7 +258,7 @@ public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { new Job( "job", "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index 9f26c6919e..2d562d38df 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -145,7 +145,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { new Job( jobName, "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), Source.fromProto(source), Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), @@ -226,7 +226,7 @@ public void shouldThrowExceptionWhenJobStateTerminal() throws IOException { new Job( "job", "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), Source.fromProto(source), Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index 64412f4391..76530d9f40 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -144,7 +144,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { new Job( expectedJobId, "", - Runner.DIRECT.getName(), + Runner.DIRECT.name(), Source.fromProto(source), Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 67a87e9316..aa71f201dd 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -161,7 +161,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep new Job( "", "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -171,7 +171,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep new Job( "some_id", extId, - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -261,7 +261,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "name1", "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source1), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -271,7 +271,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "name1", "extId1", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source1), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -281,7 +281,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "", "extId2", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source2), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet2)), @@ -291,7 +291,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "name2", "extId2", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source2), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet2)),