From ed8e951f000a36e6c99ba1be537eed4e8221aeeb Mon Sep 17 00:00:00 2001 From: Zhu Zhan Yan Date: Fri, 27 Mar 2020 13:41:43 +0800 Subject: [PATCH] Fix runner to string inconsistency (#575) * Changed Runner.getName() to Runner.toString() when passing to Job.runner in DataflowJobManager This is necessary to standardise the use of Runner.toString() when passing to the Job.runner, so that code dependending on Job.runner would know what to expect. * Document how & when Runner.toString() or Runner.getName() should be used * Convert getName() to toString(). Use name() for Job.runner. Use toString() to render human readable strings while using the non overriding name() for code dependencies. Co-authored-by: Zhu Zhanyan --- .../java/feast/core/job/JobUpdateTask.java | 10 +++++----- core/src/main/java/feast/core/job/Runner.java | 10 ++++++++-- .../core/job/dataflow/DataflowJobManager.java | 6 +++--- core/src/main/java/feast/core/model/Job.java | 1 + .../java/feast/core/job/JobUpdateTaskTest.java | 18 +++++++++--------- .../job/dataflow/DataflowJobManagerTest.java | 4 ++-- .../job/direct/DirectRunnerJobManagerTest.java | 2 +- .../service/JobCoordinatorServiceTest.java | 12 ++++++------ 8 files changed, 35 insertions(+), 28 deletions(-) diff --git a/core/src/main/java/feast/core/job/JobUpdateTask.java b/core/src/main/java/feast/core/job/JobUpdateTask.java index 57b2dfee4f..1ade5364fa 100644 --- a/core/src/main/java/feast/core/job/JobUpdateTask.java +++ b/core/src/main/java/feast/core/job/JobUpdateTask.java @@ -144,7 +144,7 @@ private Job startJob( new Job( jobId, "", - jobManager.getRunnerType().toString(), + jobManager.getRunnerType().name(), Source.fromProto(source), Store.fromProto(sinkSpec), featureSets, @@ -155,7 +155,7 @@ private Job startJob( jobId, Action.SUBMIT, "Building graph and submitting to %s", - jobManager.getRunnerType().getName()); + jobManager.getRunnerType().toString()); job = jobManager.startJob(job); if (job.getExtId().isEmpty()) { @@ -168,7 +168,7 @@ private Job startJob( jobId, Action.STATUS_CHANGE, "Job submitted to runner %s with ext id %s.", - jobManager.getRunnerType().getName(), + jobManager.getRunnerType().toString(), job.getExtId()); return job; @@ -178,7 +178,7 @@ private Job startJob( jobId, Action.STATUS_CHANGE, "Job failed to be submitted to runner %s. Job status changed to ERROR.", - jobManager.getRunnerType().getName()); + jobManager.getRunnerType().toString()); job.setStatus(JobStatus.ERROR); return job; @@ -205,7 +205,7 @@ private Job updateJob( Action.UPDATE, "Updating job %s for runner %s", job.getId(), - jobManager.getRunnerType().getName()); + jobManager.getRunnerType().toString()); return jobManager.updateJob(job); } diff --git a/core/src/main/java/feast/core/job/Runner.java b/core/src/main/java/feast/core/job/Runner.java index 637621be35..4e2033fed6 100644 --- a/core/src/main/java/feast/core/job/Runner.java +++ b/core/src/main/java/feast/core/job/Runner.java @@ -27,13 +27,19 @@ public enum Runner { this.name = name; } - public String getName() { + /** + * Get the human readable name of this runner. Returns a human readable name of the runner that + * can be used for logging/config files/etc. + */ + @Override + public String toString() { return name; } + /** Parses a runner from its human readable name. */ public static Runner fromString(String runner) { for (Runner r : Runner.values()) { - if (r.getName().equals(runner)) { + if (r.toString().equals(runner)) { return r; } } diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 56c6f9de5f..1580a3ec85 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -154,7 +154,7 @@ public void abortJob(String dataflowJobId) { */ @Override public JobStatus getJobStatus(Job job) { - if (!Runner.DATAFLOW.getName().equals(job.getRunner())) { + if (!Runner.DATAFLOW.name().equals(job.getRunner())) { return job.getStatus(); } @@ -185,7 +185,7 @@ private Job submitDataflowJob( .map( fsp -> { FeatureSet featureSet = new FeatureSet(); - featureSet.setName(fsp.getSpec().getName()); + featureSet.setName(fsp.getSpec().toString()); featureSet.setVersion(fsp.getSpec().getVersion()); featureSet.setProject(new Project(fsp.getSpec().getProject())); return featureSet; @@ -195,7 +195,7 @@ private Job submitDataflowJob( return new Job( jobName, jobId, - getRunnerType().getName(), + getRunnerType().name(), Source.fromProto(source), Store.fromProto(sink), featureSets, diff --git a/core/src/main/java/feast/core/model/Job.java b/core/src/main/java/feast/core/model/Job.java index 2fd17a58d7..377f5f7095 100644 --- a/core/src/main/java/feast/core/model/Job.java +++ b/core/src/main/java/feast/core/model/Job.java @@ -39,6 +39,7 @@ public class Job extends AbstractTimestampEntity { private String extId; // Runner type + // Use Runner.name() when converting a Runner to string to assign to this property. @Column(name = "runner") private String runner; diff --git a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java index 19ce0858b2..2a1e80994a 100644 --- a/core/src/test/java/feast/core/job/JobUpdateTaskTest.java +++ b/core/src/test/java/feast/core/job/JobUpdateTaskTest.java @@ -102,7 +102,7 @@ public void shouldUpdateJobIfPresent() { new Job( "job", "old_ext", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -119,7 +119,7 @@ public void shouldUpdateJobIfPresent() { new Job( "job", "old_ext", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -129,7 +129,7 @@ public void shouldUpdateJobIfPresent() { new Job( "job", "new_ext", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), Source.fromProto(source), Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -163,7 +163,7 @@ public void shouldCreateJobIfNotPresent() { new Job( "job", "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -173,7 +173,7 @@ public void shouldCreateJobIfNotPresent() { new Job( "job", "ext", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -202,7 +202,7 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { new Job( "job", "ext", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -216,7 +216,7 @@ public void shouldUpdateJobStatusIfNotCreateOrUpdate() { new Job( "job", "ext", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), Source.fromProto(source), Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -248,7 +248,7 @@ public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { new Job( "job", "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -258,7 +258,7 @@ public void shouldReturnJobWithErrorStatusIfFailedToSubmit() { new Job( "job", "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index 9f26c6919e..2d562d38df 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -145,7 +145,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { new Job( jobName, "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), Source.fromProto(source), Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), @@ -226,7 +226,7 @@ public void shouldThrowExceptionWhenJobStateTerminal() throws IOException { new Job( "job", "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), Source.fromProto(source), Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index 64412f4391..76530d9f40 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -144,7 +144,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { new Job( expectedJobId, "", - Runner.DIRECT.getName(), + Runner.DIRECT.name(), Source.fromProto(source), Store.fromProto(store), Lists.newArrayList(FeatureSet.fromProto(featureSet)), diff --git a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java index 67a87e9316..aa71f201dd 100644 --- a/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java +++ b/core/src/test/java/feast/core/service/JobCoordinatorServiceTest.java @@ -161,7 +161,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep new Job( "", "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -171,7 +171,7 @@ public void shouldGenerateAndSubmitJobsIfAny() throws InvalidProtocolBufferExcep new Job( "some_id", extId, - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1), FeatureSet.fromProto(featureSet2)), @@ -261,7 +261,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "name1", "", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source1), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -271,7 +271,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "name1", "extId1", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source1), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet1)), @@ -281,7 +281,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "", "extId2", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source2), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet2)), @@ -291,7 +291,7 @@ public void shouldGroupJobsBySource() throws InvalidProtocolBufferException { new Job( "name2", "extId2", - Runner.DATAFLOW.getName(), + Runner.DATAFLOW.name(), feast.core.model.Source.fromProto(source2), feast.core.model.Store.fromProto(store), Arrays.asList(FeatureSet.fromProto(featureSet2)),