Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use bzip2 compressed feature set json as pipeline option #466

Merged
merged 3 commits into from
Feb 14, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.common.base.Strings;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import feast.core.FeatureSetProto;
import feast.core.SourceProto;
import feast.core.StoreProto;
Expand All @@ -36,9 +35,11 @@
import feast.core.model.Project;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.core.util.ProtoUtil;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.utils.CompressionUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -93,7 +94,8 @@ public Job startJob(Job job) {
} catch (InvalidProtocolBufferException e) {
log.error(e.getMessage());
throw new IllegalArgumentException(
String.format("DataflowJobManager failed to START job with id '%s' because the job"
String.format(
"DataflowJobManager failed to START job with id '%s' because the job"
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
job.getId(), e.getMessage()));
}
Expand All @@ -112,12 +114,13 @@ public Job updateJob(Job job) {
for (FeatureSet featureSet : job.getFeatureSets()) {
featureSetProtos.add(featureSet.toProto());
}
return submitDataflowJob(job.getId(), featureSetProtos, job.getSource().toProto(),
job.getStore().toProto(), true);
return submitDataflowJob(
job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), true);
} catch (InvalidProtocolBufferException e) {
log.error(e.getMessage());
throw new IllegalArgumentException(
String.format("DataflowJobManager failed to UPDATE job with id '%s' because the job"
String.format(
"DataflowJobManager failed to UPDATE job with id '%s' because the job"
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
job.getId(), e.getMessage()));
}
Expand Down Expand Up @@ -221,13 +224,9 @@ private ImportOptions getPipelineOptions(
throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
Printer printer = JsonFormat.printer();
List<String> featureSetsJson = new ArrayList<>();
for (FeatureSetProto.FeatureSet featureSet : featureSets) {
featureSetsJson.add(printer.print(featureSet.getSpec()));
}
pipelineOptions.setFeatureSetJson(featureSetsJson);
pipelineOptions.setStoreJson(Collections.singletonList(printer.print(sink)));

pipelineOptions.setFeatureSetJson(CompressionUtil.compress(ProtoUtil.toJson(featureSets)));
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setProject(projectId);
pipelineOptions.setUpdate(update);
pipelineOptions.setRunner(DataflowRunner.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package feast.core.job.direct;

import com.google.common.base.Strings;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import feast.core.FeatureSetProto;
import feast.core.StoreProto;
import feast.core.config.FeastProperties.MetricsProperties;
Expand All @@ -29,9 +27,11 @@
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.util.ProtoUtil;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.utils.CompressionUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -92,17 +92,11 @@ public Job startJob(Job job) {
}

private ImportOptions getPipelineOptions(
List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink)
throws InvalidProtocolBufferException {
List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink) throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
Printer printer = JsonFormat.printer();
List<String> featureSetsJson = new ArrayList<>();
for (FeatureSetProto.FeatureSet featureSet : featureSets) {
featureSetsJson.add(printer.print(featureSet.getSpec()));
}
pipelineOptions.setFeatureSetJson(featureSetsJson);
pipelineOptions.setStoreJson(Collections.singletonList(printer.print(sink)));
pipelineOptions.setFeatureSetJson(CompressionUtil.compress(ProtoUtil.toJson(featureSets)));
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setRunner(DirectRunner.class);
pipelineOptions.setProject(""); // set to default value to satisfy validation
if (metrics.isEnabled()) {
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/java/feast/core/model/FeatureSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ private void setEntitySpecFields(EntitySpec.Builder entitySpecBuilder, Field ent
if (entityField.getPresence() != null) {
entitySpecBuilder.setPresence(FeaturePresence.parseFrom(entityField.getPresence()));
} else if (entityField.getGroupPresence() != null) {
entitySpecBuilder
.setGroupPresence(FeaturePresenceWithinGroup.parseFrom(entityField.getGroupPresence()));
entitySpecBuilder.setGroupPresence(
FeaturePresenceWithinGroup.parseFrom(entityField.getGroupPresence()));
}

if (entityField.getShape() != null) {
Expand Down Expand Up @@ -298,8 +298,8 @@ private void setEntitySpecFields(EntitySpec.Builder entitySpecBuilder, Field ent
} else if (entityField.getTimeDomain() != null) {
entitySpecBuilder.setTimeDomain(TimeDomain.parseFrom(entityField.getTimeDomain()));
} else if (entityField.getTimeOfDayDomain() != null) {
entitySpecBuilder
.setTimeOfDayDomain(TimeOfDayDomain.parseFrom(entityField.getTimeOfDayDomain()));
entitySpecBuilder.setTimeOfDayDomain(
TimeOfDayDomain.parseFrom(entityField.getTimeOfDayDomain()));
}
}

Expand All @@ -314,8 +314,8 @@ private void setFeatureSpecFields(FeatureSpec.Builder featureSpecBuilder, Field
if (featureField.getPresence() != null) {
featureSpecBuilder.setPresence(FeaturePresence.parseFrom(featureField.getPresence()));
} else if (featureField.getGroupPresence() != null) {
featureSpecBuilder
.setGroupPresence(FeaturePresenceWithinGroup.parseFrom(featureField.getGroupPresence()));
featureSpecBuilder.setGroupPresence(
FeaturePresenceWithinGroup.parseFrom(featureField.getGroupPresence()));
}

if (featureField.getShape() != null) {
Expand Down Expand Up @@ -348,8 +348,8 @@ private void setFeatureSpecFields(FeatureSpec.Builder featureSpecBuilder, Field
} else if (featureField.getTimeDomain() != null) {
featureSpecBuilder.setTimeDomain(TimeDomain.parseFrom(featureField.getTimeDomain()));
} else if (featureField.getTimeOfDayDomain() != null) {
featureSpecBuilder
.setTimeOfDayDomain(TimeOfDayDomain.parseFrom(featureField.getTimeOfDayDomain()));
featureSpecBuilder.setTimeOfDayDomain(
TimeOfDayDomain.parseFrom(featureField.getTimeOfDayDomain()));
}
}

Expand Down
3 changes: 1 addition & 2 deletions core/src/main/java/feast/core/model/Field.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ public class Field {
private byte[] timeDomain;
private byte[] timeOfDayDomain;

public Field() {
}
public Field() {}

public Field(String name, ValueType.Enum type) {
this.name = name;
Expand Down
9 changes: 4 additions & 5 deletions core/src/main/java/feast/core/service/SpecService.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter fil
checkValidCharactersAllowAsterisk(name, "featureSetName");
checkValidCharactersAllowAsterisk(project, "projectName");

List<FeatureSet> featureSets = new ArrayList<FeatureSet>() {
};
List<FeatureSet> featureSets = new ArrayList<FeatureSet>() {};

if (project.equals("*")) {
// Matching all projects
Expand Down Expand Up @@ -277,9 +276,9 @@ public ListStoresResponse listStores(ListStoresRequest.Filter filter) {
* Creates or updates a feature set in the repository. If there is a change in the feature set
* schema, then the feature set version will be incremented.
*
* <p>This function is idempotent. If no changes are detected in the incoming featureSet's
* schema, this method will update the incoming featureSet spec with the latest version stored in
* the repository, and return that.
* <p>This function is idempotent. If no changes are detected in the incoming featureSet's schema,
* this method will update the incoming featureSet spec with the latest version stored in the
* repository, and return that.
*
* @param newFeatureSet Feature set that will be created or updated.
*/
Expand Down
36 changes: 36 additions & 0 deletions core/src/main/java/feast/core/util/ProtoUtil.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.util;

import com.google.protobuf.util.JsonFormat;
import feast.core.FeatureSetProto;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ProtoUtil {

public static String toJson(List<FeatureSetProto.FeatureSet> featureSets) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we avoid creating non-generic utility methods. ProtoUtil and toJson seem like a generic class and method, but the implementation is specific to FeatureSetProtos.

Either we need to rename this to be more specific and generalize later, or move this functionality out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename seems more of a band aid solution, so i refactored my commits such that it is no longer under util, and can be extended for other compression strategies.

JsonFormat.Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts();
List<String> featureSetsJson = new ArrayList<>();
for (FeatureSetProto.FeatureSet featureSet : featureSets) {
featureSetsJson.add(printer.print(featureSet.getSpec()));
}
return String.join("\n", featureSetsJson);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.services.dataflow.Dataflow;
Expand All @@ -44,13 +40,12 @@
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.exception.JobExecutionException;
import feast.core.job.Runner;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.core.model.*;
import feast.core.util.ProtoUtil;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.utils.CompressionUtil;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
Expand Down Expand Up @@ -132,7 +127,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException {
expectedPipelineOptions.setJobName(jobName);
expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store)));
expectedPipelineOptions.setFeatureSetJson(
Lists.newArrayList(printer.print(featureSet.getSpec())));
CompressionUtil.compress(ProtoUtil.toJson(Collections.singletonList(featureSet))));

ArgumentCaptor<ImportOptions> captor = ArgumentCaptor.forClass(ImportOptions.class);

Expand Down Expand Up @@ -170,7 +165,19 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException {
// Assume the files that are staged are correct
expectedPipelineOptions.setFilesToStage(actualPipelineOptions.getFilesToStage());

assertThat(actualPipelineOptions.toString(), equalTo(expectedPipelineOptions.toString()));
assertThat(
actualPipelineOptions.getFeatureSetJson(),
equalTo(expectedPipelineOptions.getFeatureSetJson()));
assertThat(
actualPipelineOptions.getDeadLetterTableSpec(),
equalTo(expectedPipelineOptions.getDeadLetterTableSpec()));
assertThat(
actualPipelineOptions.getStatsdHost(), equalTo(expectedPipelineOptions.getStatsdHost()));
assertThat(
actualPipelineOptions.getMetricsExporterType(),
equalTo(expectedPipelineOptions.getMetricsExporterType()));
assertThat(
actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson()));
assertThat(actual.getExtId(), equalTo(expectedExtJobId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.core.util.ProtoUtil;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.utils.CompressionUtil;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.direct.DirectRunner;
Expand Down Expand Up @@ -122,7 +125,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException {
expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store)));
expectedPipelineOptions.setProject("");
expectedPipelineOptions.setFeatureSetJson(
Lists.newArrayList(printer.print(featureSet.getSpec())));
CompressionUtil.compress(ProtoUtil.toJson(Collections.singletonList(featureSet))));

String expectedJobId = "feast-job-0";
ArgumentCaptor<ImportOptions> pipelineOptionsCaptor =
Expand Down Expand Up @@ -150,7 +153,20 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException {
expectedPipelineOptions.setOptionsId(
actualPipelineOptions.getOptionsId()); // avoid comparing this value

assertThat(actualPipelineOptions.toString(), equalTo(expectedPipelineOptions.toString()));
assertThat(
actualPipelineOptions.getFeatureSetJson(),
equalTo(expectedPipelineOptions.getFeatureSetJson()));
assertThat(
actualPipelineOptions.getDeadLetterTableSpec(),
equalTo(expectedPipelineOptions.getDeadLetterTableSpec()));
assertThat(
actualPipelineOptions.getStatsdHost(), equalTo(expectedPipelineOptions.getStatsdHost()));
assertThat(
actualPipelineOptions.getMetricsExporterType(),
equalTo(expectedPipelineOptions.getMetricsExporterType()));
assertThat(
actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson()));

assertThat(jobStarted.getPipelineResult(), equalTo(mockPipelineResult));
assertThat(jobStarted.getJobId(), equalTo(expectedJobId));
assertThat(actual.getExtId(), equalTo(expectedJobId));
Expand Down
Loading