Skip to content

Commit

Permalink
Use bzip2 compressed feature set json as pipeline option (feast-dev#466)
Browse files Browse the repository at this point in the history
* Use bzip2 compressed feature set json as pipeline option

* Make decompressor and compressor more generic and extensible

* Avoid code duplication in test
  • Loading branch information
khorshuheng authored and Shu Heng committed Feb 14, 2020
1 parent 85f398e commit f12f55c
Show file tree
Hide file tree
Showing 22 changed files with 609 additions and 80 deletions.
40 changes: 22 additions & 18 deletions core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,20 @@
import com.google.common.base.Strings;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import feast.core.FeatureSetProto;
import feast.core.SourceProto;
import feast.core.StoreProto;
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.exception.JobExecutionException;
import feast.core.job.JobManager;
import feast.core.job.Runner;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Project;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.*;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -88,7 +85,12 @@ public Job startJob(Job job) {
job.getStore().toProto(),
false);
} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(String.format("Unable to start job %s", job.getId()), e);
log.error(e.getMessage());
throw new IllegalArgumentException(
String.format(
"DataflowJobManager failed to START job with id '%s' because the job"
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
job.getId(), e.getMessage()));
}
}

Expand All @@ -103,12 +105,15 @@ public Job updateJob(Job job) {
try {
List<FeatureSetProto.FeatureSet> featureSetProtos =
job.getFeatureSets().stream().map(FeatureSet::toProto).collect(Collectors.toList());

return submitDataflowJob(
job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), true);

} catch (InvalidProtocolBufferException e) {
throw new RuntimeException(String.format("Unable to update job %s", job.getId()), e);
log.error(e.getMessage());
throw new IllegalArgumentException(
String.format(
"DataflowJobManager failed to UPDATE job with id '%s' because the job"
+ "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s",
job.getId(), e.getMessage()));
}
}

Expand Down Expand Up @@ -210,13 +215,12 @@ private ImportOptions getPipelineOptions(
throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
Printer printer = JsonFormat.printer();
List<String> featureSetsJson = new ArrayList<>();
for (FeatureSetProto.FeatureSet featureSet : featureSets) {
featureSetsJson.add(printer.print(featureSet.getSpec()));
}
pipelineOptions.setFeatureSetJson(featureSetsJson);
pipelineOptions.setStoreJson(Collections.singletonList(printer.print(sink)));

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());

pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets));
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setProject(projectId);
pipelineOptions.setUpdate(update);
pipelineOptions.setRunner(DataflowRunner.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,23 @@
package feast.core.job.direct;

import com.google.common.base.Strings;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.Printer;
import feast.core.FeatureSetProto;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.core.StoreProto;
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.exception.JobExecutionException;
import feast.core.job.JobManager;
import feast.core.job.Runner;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.util.TypeConversion;
import feast.ingestion.ImportJob;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -92,17 +93,15 @@ public Job startJob(Job job) {
}

private ImportOptions getPipelineOptions(
List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink)
throws InvalidProtocolBufferException {
List<FeatureSetProto.FeatureSet> featureSets, StoreProto.Store sink) throws IOException {
String[] args = TypeConversion.convertMapToArgs(defaultOptions);
ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class);
Printer printer = JsonFormat.printer();
List<String> featureSetsJson = new ArrayList<>();
for (FeatureSetProto.FeatureSet featureSet : featureSets) {
featureSetsJson.add(printer.print(featureSet.getSpec()));
}
pipelineOptions.setFeatureSetJson(featureSetsJson);
pipelineOptions.setStoreJson(Collections.singletonList(printer.print(sink)));

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());

pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets));
pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink)));
pipelineOptions.setRunner(DirectRunner.class);
pipelineOptions.setProject(""); // set to default value to satisfy validation
if (metrics.isEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job.option;

import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import feast.core.FeatureSetProto;
import feast.ingestion.options.OptionByteConverter;
import java.util.ArrayList;
import java.util.List;

public class FeatureSetJsonByteConverter
implements OptionByteConverter<List<FeatureSetProto.FeatureSet>> {

/**
* Convert list of feature sets to json strings joined by new line, represented as byte arrays
*
* @param featureSets List of feature set protobufs
* @return Byte array representation of the json strings
* @throws InvalidProtocolBufferException
*/
@Override
public byte[] toByte(List<FeatureSetProto.FeatureSet> featureSets)
throws InvalidProtocolBufferException {
JsonFormat.Printer printer =
JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts();
List<String> featureSetsJson = new ArrayList<>();
for (FeatureSetProto.FeatureSet featureSet : featureSets) {
featureSetsJson.add(printer.print(featureSet.getSpec()));
}
return String.join("\n", featureSetsJson).getBytes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.services.dataflow.Dataflow;
Expand All @@ -44,14 +40,15 @@
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.exception.JobExecutionException;
import feast.core.job.Runner;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.*;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.dataflow.DataflowPipelineJob;
import org.apache.beam.runners.dataflow.DataflowRunner;
Expand Down Expand Up @@ -131,8 +128,11 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException {
expectedPipelineOptions.setAppName("DataflowJobManager");
expectedPipelineOptions.setJobName(jobName);
expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store)));

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());
expectedPipelineOptions.setFeatureSetJson(
Lists.newArrayList(printer.print(featureSet.getSpec())));
featureSetJsonCompressor.compress(Collections.singletonList(featureSet)));

ArgumentCaptor<ImportOptions> captor = ArgumentCaptor.forClass(ImportOptions.class);

Expand Down Expand Up @@ -170,7 +170,19 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException {
// Assume the files that are staged are correct
expectedPipelineOptions.setFilesToStage(actualPipelineOptions.getFilesToStage());

assertThat(actualPipelineOptions.toString(), equalTo(expectedPipelineOptions.toString()));
assertThat(
actualPipelineOptions.getFeatureSetJson(),
equalTo(expectedPipelineOptions.getFeatureSetJson()));
assertThat(
actualPipelineOptions.getDeadLetterTableSpec(),
equalTo(expectedPipelineOptions.getDeadLetterTableSpec()));
assertThat(
actualPipelineOptions.getStatsdHost(), equalTo(expectedPipelineOptions.getStatsdHost()));
assertThat(
actualPipelineOptions.getMetricsExporterType(),
equalTo(expectedPipelineOptions.getMetricsExporterType()));
assertThat(
actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson()));
assertThat(actual.getExtId(), equalTo(expectedExtJobId));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,19 @@
import feast.core.StoreProto.Store.Subscription;
import feast.core.config.FeastProperties.MetricsProperties;
import feast.core.job.Runner;
import feast.core.job.option.FeatureSetJsonByteConverter;
import feast.core.model.FeatureSet;
import feast.core.model.Job;
import feast.core.model.JobStatus;
import feast.core.model.Source;
import feast.core.model.Store;
import feast.ingestion.options.BZip2Compressor;
import feast.ingestion.options.ImportOptions;
import feast.ingestion.options.OptionCompressor;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.PipelineResult;
Expand Down Expand Up @@ -121,8 +126,11 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException {
expectedPipelineOptions.setProject("");
expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store)));
expectedPipelineOptions.setProject("");

OptionCompressor<List<FeatureSetProto.FeatureSet>> featureSetJsonCompressor =
new BZip2Compressor<>(new FeatureSetJsonByteConverter());
expectedPipelineOptions.setFeatureSetJson(
Lists.newArrayList(printer.print(featureSet.getSpec())));
featureSetJsonCompressor.compress(Collections.singletonList(featureSet)));

String expectedJobId = "feast-job-0";
ArgumentCaptor<ImportOptions> pipelineOptionsCaptor =
Expand Down Expand Up @@ -150,7 +158,20 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException {
expectedPipelineOptions.setOptionsId(
actualPipelineOptions.getOptionsId()); // avoid comparing this value

assertThat(actualPipelineOptions.toString(), equalTo(expectedPipelineOptions.toString()));
assertThat(
actualPipelineOptions.getFeatureSetJson(),
equalTo(expectedPipelineOptions.getFeatureSetJson()));
assertThat(
actualPipelineOptions.getDeadLetterTableSpec(),
equalTo(expectedPipelineOptions.getDeadLetterTableSpec()));
assertThat(
actualPipelineOptions.getStatsdHost(), equalTo(expectedPipelineOptions.getStatsdHost()));
assertThat(
actualPipelineOptions.getMetricsExporterType(),
equalTo(expectedPipelineOptions.getMetricsExporterType()));
assertThat(
actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson()));

assertThat(jobStarted.getPipelineResult(), equalTo(mockPipelineResult));
assertThat(jobStarted.getJobId(), equalTo(expectedJobId));
assertThat(actual.getExtId(), equalTo(expectedJobId));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright 2018-2020 The Feast Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package feast.core.job.option;

import static org.junit.Assert.*;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.core.FeatureSetProto;
import feast.core.SourceProto;
import feast.types.ValueProto;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Test;

public class FeatureSetJsonByteConverterTest {

private FeatureSetProto.FeatureSet newFeatureSet(Integer version, Integer numberOfFeatures) {
List<FeatureSetProto.FeatureSpec> features =
IntStream.range(1, numberOfFeatures + 1)
.mapToObj(
i ->
FeatureSetProto.FeatureSpec.newBuilder()
.setValueType(ValueProto.ValueType.Enum.FLOAT)
.setName("feature".concat(Integer.toString(i)))
.build())
.collect(Collectors.toList());

return FeatureSetProto.FeatureSet.newBuilder()
.setSpec(
FeatureSetProto.FeatureSetSpec.newBuilder()
.setSource(
SourceProto.Source.newBuilder()
.setType(SourceProto.SourceType.KAFKA)
.setKafkaSourceConfig(
SourceProto.KafkaSourceConfig.newBuilder()
.setBootstrapServers("somebrokers:9092")
.setTopic("sometopic")))
.addAllFeatures(features)
.setVersion(version)
.addEntities(
FeatureSetProto.EntitySpec.newBuilder()
.setName("entity")
.setValueType(ValueProto.ValueType.Enum.STRING)))
.build();
}

@Test
public void shouldConvertFeatureSetsAsJsonStringBytes() throws InvalidProtocolBufferException {
int nrOfFeatureSet = 1;
int nrOfFeatures = 1;
List<FeatureSetProto.FeatureSet> featureSets =
IntStream.range(1, nrOfFeatureSet + 1)
.mapToObj(i -> newFeatureSet(i, nrOfFeatures))
.collect(Collectors.toList());

String expectedOutputString =
"{\"version\":1,"
+ "\"entities\":[{\"name\":\"entity\",\"valueType\":2}],"
+ "\"features\":[{\"name\":\"feature1\",\"valueType\":6}],"
+ "\"source\":{"
+ "\"type\":1,"
+ "\"kafkaSourceConfig\":{"
+ "\"bootstrapServers\":\"somebrokers:9092\","
+ "\"topic\":\"sometopic\"}}}";
FeatureSetJsonByteConverter byteConverter = new FeatureSetJsonByteConverter();
assertEquals(expectedOutputString, new String(byteConverter.toByte(featureSets)));
}
}
Loading

0 comments on commit f12f55c

Please sign in to comment.