diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 7115ee3f66..323eb35983 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -22,7 +22,6 @@ import com.google.common.base.Strings; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; -import com.google.protobuf.util.JsonFormat.Printer; import feast.core.FeatureSetProto; import feast.core.SourceProto; import feast.core.StoreProto; @@ -30,15 +29,13 @@ import feast.core.exception.JobExecutionException; import feast.core.job.JobManager; import feast.core.job.Runner; -import feast.core.model.FeatureSet; -import feast.core.model.Job; -import feast.core.model.JobStatus; -import feast.core.model.Project; -import feast.core.model.Source; -import feast.core.model.Store; +import feast.core.job.option.FeatureSetJsonByteConverter; +import feast.core.model.*; import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.OptionCompressor; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -93,7 +90,8 @@ public Job startJob(Job job) { } catch (InvalidProtocolBufferException e) { log.error(e.getMessage()); throw new IllegalArgumentException( - String.format("DataflowJobManager failed to START job with id '%s' because the job" + String.format( + "DataflowJobManager failed to START job with id '%s' because the job" + "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s", job.getId(), e.getMessage())); } @@ -112,12 +110,13 @@ public Job updateJob(Job job) { for (FeatureSet featureSet : job.getFeatureSets()) { featureSetProtos.add(featureSet.toProto()); } - return submitDataflowJob(job.getId(), featureSetProtos, job.getSource().toProto(), - job.getStore().toProto(), true); + return submitDataflowJob( + job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), true); } catch (InvalidProtocolBufferException e) { log.error(e.getMessage()); throw new IllegalArgumentException( - String.format("DataflowJobManager failed to UPDATE job with id '%s' because the job" + String.format( + "DataflowJobManager failed to UPDATE job with id '%s' because the job" + "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s", job.getId(), e.getMessage())); } @@ -221,13 +220,12 @@ private ImportOptions getPipelineOptions( throws IOException { String[] args = TypeConversion.convertMapToArgs(defaultOptions); ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class); - Printer printer = JsonFormat.printer(); - List featureSetsJson = new ArrayList<>(); - for (FeatureSetProto.FeatureSet featureSet : featureSets) { - featureSetsJson.add(printer.print(featureSet.getSpec())); - } - pipelineOptions.setFeatureSetJson(featureSetsJson); - pipelineOptions.setStoreJson(Collections.singletonList(printer.print(sink))); + + OptionCompressor> featureSetJsonCompressor = + new BZip2Compressor<>(new FeatureSetJsonByteConverter()); + + pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets)); + pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setProject(projectId); pipelineOptions.setUpdate(update); pipelineOptions.setRunner(DataflowRunner.class); diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index b01d37d892..08aeed1cc3 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -17,21 +17,22 @@ package feast.core.job.direct; import com.google.common.base.Strings; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; -import com.google.protobuf.util.JsonFormat.Printer; import feast.core.FeatureSetProto; import feast.core.StoreProto; import feast.core.config.FeastProperties.MetricsProperties; import feast.core.exception.JobExecutionException; import feast.core.job.JobManager; import feast.core.job.Runner; +import feast.core.job.option.FeatureSetJsonByteConverter; import feast.core.model.FeatureSet; import feast.core.model.Job; import feast.core.model.JobStatus; import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.OptionCompressor; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -92,17 +93,15 @@ public Job startJob(Job job) { } private ImportOptions getPipelineOptions( - List featureSets, StoreProto.Store sink) - throws InvalidProtocolBufferException { + List featureSets, StoreProto.Store sink) throws IOException { String[] args = TypeConversion.convertMapToArgs(defaultOptions); ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class); - Printer printer = JsonFormat.printer(); - List featureSetsJson = new ArrayList<>(); - for (FeatureSetProto.FeatureSet featureSet : featureSets) { - featureSetsJson.add(printer.print(featureSet.getSpec())); - } - pipelineOptions.setFeatureSetJson(featureSetsJson); - pipelineOptions.setStoreJson(Collections.singletonList(printer.print(sink))); + + OptionCompressor> featureSetJsonCompressor = + new BZip2Compressor<>(new FeatureSetJsonByteConverter()); + + pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets)); + pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setRunner(DirectRunner.class); pipelineOptions.setProject(""); // set to default value to satisfy validation if (metrics.isEnabled()) { diff --git a/core/src/main/java/feast/core/job/option/FeatureSetJsonByteConverter.java b/core/src/main/java/feast/core/job/option/FeatureSetJsonByteConverter.java new file mode 100644 index 0000000000..dbd04d668f --- /dev/null +++ b/core/src/main/java/feast/core/job/option/FeatureSetJsonByteConverter.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job.option; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.util.JsonFormat; +import feast.core.FeatureSetProto; +import feast.ingestion.options.OptionByteConverter; +import java.util.ArrayList; +import java.util.List; + +public class FeatureSetJsonByteConverter + implements OptionByteConverter> { + + /** + * Convert list of feature sets to json strings joined by new line, represented as byte arrays + * + * @param featureSets List of feature set protobufs + * @return Byte array representation of the json strings + * @throws InvalidProtocolBufferException + */ + @Override + public byte[] toByte(List featureSets) + throws InvalidProtocolBufferException { + JsonFormat.Printer printer = + JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); + List featureSetsJson = new ArrayList<>(); + for (FeatureSetProto.FeatureSet featureSet : featureSets) { + featureSetsJson.add(printer.print(featureSet.getSpec())); + } + return String.join("\n", featureSetsJson).getBytes(); + } +} diff --git a/core/src/main/java/feast/core/model/FeatureSet.java b/core/src/main/java/feast/core/model/FeatureSet.java index cd6036fe5e..c593dcd701 100644 --- a/core/src/main/java/feast/core/model/FeatureSet.java +++ b/core/src/main/java/feast/core/model/FeatureSet.java @@ -264,8 +264,8 @@ private void setEntitySpecFields(EntitySpec.Builder entitySpecBuilder, Field ent if (entityField.getPresence() != null) { entitySpecBuilder.setPresence(FeaturePresence.parseFrom(entityField.getPresence())); } else if (entityField.getGroupPresence() != null) { - entitySpecBuilder - .setGroupPresence(FeaturePresenceWithinGroup.parseFrom(entityField.getGroupPresence())); + entitySpecBuilder.setGroupPresence( + FeaturePresenceWithinGroup.parseFrom(entityField.getGroupPresence())); } if (entityField.getShape() != null) { @@ -298,8 +298,8 @@ private void setEntitySpecFields(EntitySpec.Builder entitySpecBuilder, Field ent } else if (entityField.getTimeDomain() != null) { entitySpecBuilder.setTimeDomain(TimeDomain.parseFrom(entityField.getTimeDomain())); } else if (entityField.getTimeOfDayDomain() != null) { - entitySpecBuilder - .setTimeOfDayDomain(TimeOfDayDomain.parseFrom(entityField.getTimeOfDayDomain())); + entitySpecBuilder.setTimeOfDayDomain( + TimeOfDayDomain.parseFrom(entityField.getTimeOfDayDomain())); } } @@ -314,8 +314,8 @@ private void setFeatureSpecFields(FeatureSpec.Builder featureSpecBuilder, Field if (featureField.getPresence() != null) { featureSpecBuilder.setPresence(FeaturePresence.parseFrom(featureField.getPresence())); } else if (featureField.getGroupPresence() != null) { - featureSpecBuilder - .setGroupPresence(FeaturePresenceWithinGroup.parseFrom(featureField.getGroupPresence())); + featureSpecBuilder.setGroupPresence( + FeaturePresenceWithinGroup.parseFrom(featureField.getGroupPresence())); } if (featureField.getShape() != null) { @@ -348,8 +348,8 @@ private void setFeatureSpecFields(FeatureSpec.Builder featureSpecBuilder, Field } else if (featureField.getTimeDomain() != null) { featureSpecBuilder.setTimeDomain(TimeDomain.parseFrom(featureField.getTimeDomain())); } else if (featureField.getTimeOfDayDomain() != null) { - featureSpecBuilder - .setTimeOfDayDomain(TimeOfDayDomain.parseFrom(featureField.getTimeOfDayDomain())); + featureSpecBuilder.setTimeOfDayDomain( + TimeOfDayDomain.parseFrom(featureField.getTimeOfDayDomain())); } } diff --git a/core/src/main/java/feast/core/model/Field.java b/core/src/main/java/feast/core/model/Field.java index edb0a73acb..355b673fc8 100644 --- a/core/src/main/java/feast/core/model/Field.java +++ b/core/src/main/java/feast/core/model/Field.java @@ -71,8 +71,7 @@ public class Field { private byte[] timeDomain; private byte[] timeOfDayDomain; - public Field() { - } + public Field() {} public Field(String name, ValueType.Enum type) { this.name = name; diff --git a/core/src/main/java/feast/core/service/SpecService.java b/core/src/main/java/feast/core/service/SpecService.java index 9016b692d1..5b98d06597 100644 --- a/core/src/main/java/feast/core/service/SpecService.java +++ b/core/src/main/java/feast/core/service/SpecService.java @@ -167,8 +167,7 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter fil checkValidCharactersAllowAsterisk(name, "featureSetName"); checkValidCharactersAllowAsterisk(project, "projectName"); - List featureSets = new ArrayList() { - }; + List featureSets = new ArrayList() {}; if (project.equals("*")) { // Matching all projects @@ -277,9 +276,9 @@ public ListStoresResponse listStores(ListStoresRequest.Filter filter) { * Creates or updates a feature set in the repository. If there is a change in the feature set * schema, then the feature set version will be incremented. * - *

This function is idempotent. If no changes are detected in the incoming featureSet's - * schema, this method will update the incoming featureSet spec with the latest version stored in - * the repository, and return that. + *

This function is idempotent. If no changes are detected in the incoming featureSet's schema, + * this method will update the incoming featureSet spec with the latest version stored in the + * repository, and return that. * * @param newFeatureSet Feature set that will be created or updated. */ diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index c263515ed0..9f26c6919e 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -19,11 +19,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; import com.google.api.services.dataflow.Dataflow; @@ -44,14 +40,15 @@ import feast.core.config.FeastProperties.MetricsProperties; import feast.core.exception.JobExecutionException; import feast.core.job.Runner; -import feast.core.model.FeatureSet; -import feast.core.model.Job; -import feast.core.model.JobStatus; -import feast.core.model.Source; -import feast.core.model.Store; +import feast.core.job.option.FeatureSetJsonByteConverter; +import feast.core.model.*; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.OptionCompressor; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -131,8 +128,11 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { expectedPipelineOptions.setAppName("DataflowJobManager"); expectedPipelineOptions.setJobName(jobName); expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); + + OptionCompressor> featureSetJsonCompressor = + new BZip2Compressor<>(new FeatureSetJsonByteConverter()); expectedPipelineOptions.setFeatureSetJson( - Lists.newArrayList(printer.print(featureSet.getSpec()))); + featureSetJsonCompressor.compress(Collections.singletonList(featureSet))); ArgumentCaptor captor = ArgumentCaptor.forClass(ImportOptions.class); @@ -170,7 +170,19 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { // Assume the files that are staged are correct expectedPipelineOptions.setFilesToStage(actualPipelineOptions.getFilesToStage()); - assertThat(actualPipelineOptions.toString(), equalTo(expectedPipelineOptions.toString())); + assertThat( + actualPipelineOptions.getFeatureSetJson(), + equalTo(expectedPipelineOptions.getFeatureSetJson())); + assertThat( + actualPipelineOptions.getDeadLetterTableSpec(), + equalTo(expectedPipelineOptions.getDeadLetterTableSpec())); + assertThat( + actualPipelineOptions.getStatsdHost(), equalTo(expectedPipelineOptions.getStatsdHost())); + assertThat( + actualPipelineOptions.getMetricsExporterType(), + equalTo(expectedPipelineOptions.getMetricsExporterType())); + assertThat( + actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson())); assertThat(actual.getExtId(), equalTo(expectedExtJobId)); } diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index 2dd87cfc6e..64412f4391 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -40,14 +40,19 @@ import feast.core.StoreProto.Store.Subscription; import feast.core.config.FeastProperties.MetricsProperties; import feast.core.job.Runner; +import feast.core.job.option.FeatureSetJsonByteConverter; import feast.core.model.FeatureSet; import feast.core.model.Job; import feast.core.model.JobStatus; import feast.core.model.Source; import feast.core.model.Store; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.OptionCompressor; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.PipelineResult; @@ -121,8 +126,11 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { expectedPipelineOptions.setProject(""); expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); expectedPipelineOptions.setProject(""); + + OptionCompressor> featureSetJsonCompressor = + new BZip2Compressor<>(new FeatureSetJsonByteConverter()); expectedPipelineOptions.setFeatureSetJson( - Lists.newArrayList(printer.print(featureSet.getSpec()))); + featureSetJsonCompressor.compress(Collections.singletonList(featureSet))); String expectedJobId = "feast-job-0"; ArgumentCaptor pipelineOptionsCaptor = @@ -150,7 +158,20 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { expectedPipelineOptions.setOptionsId( actualPipelineOptions.getOptionsId()); // avoid comparing this value - assertThat(actualPipelineOptions.toString(), equalTo(expectedPipelineOptions.toString())); + assertThat( + actualPipelineOptions.getFeatureSetJson(), + equalTo(expectedPipelineOptions.getFeatureSetJson())); + assertThat( + actualPipelineOptions.getDeadLetterTableSpec(), + equalTo(expectedPipelineOptions.getDeadLetterTableSpec())); + assertThat( + actualPipelineOptions.getStatsdHost(), equalTo(expectedPipelineOptions.getStatsdHost())); + assertThat( + actualPipelineOptions.getMetricsExporterType(), + equalTo(expectedPipelineOptions.getMetricsExporterType())); + assertThat( + actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson())); + assertThat(jobStarted.getPipelineResult(), equalTo(mockPipelineResult)); assertThat(jobStarted.getJobId(), equalTo(expectedJobId)); assertThat(actual.getExtId(), equalTo(expectedJobId)); diff --git a/core/src/test/java/feast/core/job/option/FeatureSetJsonByteConverterTest.java b/core/src/test/java/feast/core/job/option/FeatureSetJsonByteConverterTest.java new file mode 100644 index 0000000000..2dfeef1d96 --- /dev/null +++ b/core/src/test/java/feast/core/job/option/FeatureSetJsonByteConverterTest.java @@ -0,0 +1,83 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.job.option; + +import static org.junit.Assert.*; + +import com.google.protobuf.InvalidProtocolBufferException; +import feast.core.FeatureSetProto; +import feast.core.SourceProto; +import feast.types.ValueProto; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.Test; + +public class FeatureSetJsonByteConverterTest { + + private FeatureSetProto.FeatureSet newFeatureSet(Integer version, Integer numberOfFeatures) { + List features = + IntStream.range(1, numberOfFeatures + 1) + .mapToObj( + i -> + FeatureSetProto.FeatureSpec.newBuilder() + .setValueType(ValueProto.ValueType.Enum.FLOAT) + .setName("feature".concat(Integer.toString(i))) + .build()) + .collect(Collectors.toList()); + + return FeatureSetProto.FeatureSet.newBuilder() + .setSpec( + FeatureSetProto.FeatureSetSpec.newBuilder() + .setSource( + SourceProto.Source.newBuilder() + .setType(SourceProto.SourceType.KAFKA) + .setKafkaSourceConfig( + SourceProto.KafkaSourceConfig.newBuilder() + .setBootstrapServers("somebrokers:9092") + .setTopic("sometopic"))) + .addAllFeatures(features) + .setVersion(version) + .addEntities( + FeatureSetProto.EntitySpec.newBuilder() + .setName("entity") + .setValueType(ValueProto.ValueType.Enum.STRING))) + .build(); + } + + @Test + public void shouldConvertFeatureSetsAsJsonStringBytes() throws InvalidProtocolBufferException { + int nrOfFeatureSet = 1; + int nrOfFeatures = 1; + List featureSets = + IntStream.range(1, nrOfFeatureSet + 1) + .mapToObj(i -> newFeatureSet(i, nrOfFeatures)) + .collect(Collectors.toList()); + + String expectedOutputString = + "{\"version\":1," + + "\"entities\":[{\"name\":\"entity\",\"valueType\":2}]," + + "\"features\":[{\"name\":\"feature1\",\"valueType\":6}]," + + "\"source\":{" + + "\"type\":1," + + "\"kafkaSourceConfig\":{" + + "\"bootstrapServers\":\"somebrokers:9092\"," + + "\"topic\":\"sometopic\"}}}"; + FeatureSetJsonByteConverter byteConverter = new FeatureSetJsonByteConverter(); + assertEquals(expectedOutputString, new String(byteConverter.toByte(featureSets))); + } +} diff --git a/core/src/test/java/feast/core/service/SpecServiceTest.java b/core/src/test/java/feast/core/service/SpecServiceTest.java index c533f593e3..38f7475636 100644 --- a/core/src/test/java/feast/core/service/SpecServiceTest.java +++ b/core/src/test/java/feast/core/service/SpecServiceTest.java @@ -84,17 +84,13 @@ public class SpecServiceTest { - @Mock - private FeatureSetRepository featureSetRepository; + @Mock private FeatureSetRepository featureSetRepository; - @Mock - private StoreRepository storeRepository; + @Mock private StoreRepository storeRepository; - @Mock - private ProjectRepository projectRepository; + @Mock private ProjectRepository projectRepository; - @Rule - public final ExpectedException expectedException = ExpectedException.none(); + @Rule public final ExpectedException expectedException = ExpectedException.none(); private SpecService specService; private List featureSets; @@ -140,25 +136,25 @@ public void setUp() { when(featureSetRepository.findFeatureSetByNameAndProject_NameAndVersion("f1", "project1", 1)) .thenReturn(featureSets.get(0)); when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "f1", "project1")) + "f1", "project1")) .thenReturn(featureSets.subList(0, 3)); when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "f3", "project1")) + "f3", "project1")) .thenReturn(featureSets.subList(4, 5)); when(featureSetRepository.findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc( - "f1", "project1")) + "f1", "project1")) .thenReturn(featureSet1v3); when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "f1", "project1")) + "f1", "project1")) .thenReturn(featureSets.subList(0, 3)); when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "asd", "project1")) + "asd", "project1")) .thenReturn(Lists.newArrayList()); when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "f%", "project1")) + "f%", "project1")) .thenReturn(featureSets); when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc( - "%", "%")) + "%", "%")) .thenReturn(featureSets); when(projectRepository.findAllByArchivedIsFalse()) @@ -403,7 +399,7 @@ public void applyFeatureSetShouldReturnFeatureSetWithLatestVersionIfFeatureSetHa public void applyFeatureSetShouldApplyFeatureSetWithInitVersionIfNotExists() throws InvalidProtocolBufferException { when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "f2", "project1")) + "f2", "project1")) .thenReturn(Lists.newArrayList()); FeatureSetProto.FeatureSet incomingFeatureSet = @@ -485,14 +481,14 @@ public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered() Field f3e1 = new Field("f3e1", Enum.STRING); FeatureSetProto.FeatureSet incomingFeatureSet = (new FeatureSet( - "f3", - "project1", - 5, - 100L, - Arrays.asList(f3e1), - Arrays.asList(f3f2, f3f1), - defaultSource, - FeatureSetStatus.STATUS_READY)) + "f3", + "project1", + 5, + 100L, + Arrays.asList(f3e1), + Arrays.asList(f3f2, f3f1), + defaultSource, + FeatureSetStatus.STATUS_READY)) .toProto(); ApplyFeatureSetResponse applyFeatureSetResponse = @@ -513,78 +509,98 @@ public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered() public void applyFeatureSetShouldAcceptPresenceShapeAndDomainConstraints() throws InvalidProtocolBufferException { List entitySpecs = new ArrayList<>(); - entitySpecs.add(EntitySpec.newBuilder().setName("entity1") - .setValueType(Enum.INT64) - .setPresence(FeaturePresence.getDefaultInstance()) - .setShape(FixedShape.getDefaultInstance()) - .setDomain("mydomain") - .build()); - entitySpecs.add(EntitySpec.newBuilder().setName("entity2") - .setValueType(Enum.INT64) - .setGroupPresence(FeaturePresenceWithinGroup.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setIntDomain(IntDomain.getDefaultInstance()) - .build()); - entitySpecs.add(EntitySpec.newBuilder().setName("entity3") - .setValueType(Enum.FLOAT) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setFloatDomain(FloatDomain.getDefaultInstance()) - .build()); - entitySpecs.add(EntitySpec.newBuilder().setName("entity4") - .setValueType(Enum.STRING) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setStringDomain(StringDomain.getDefaultInstance()) - .build()); - entitySpecs.add(EntitySpec.newBuilder().setName("entity5") - .setValueType(Enum.BOOL) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setBoolDomain(BoolDomain.getDefaultInstance()) - .build()); + entitySpecs.add( + EntitySpec.newBuilder() + .setName("entity1") + .setValueType(Enum.INT64) + .setPresence(FeaturePresence.getDefaultInstance()) + .setShape(FixedShape.getDefaultInstance()) + .setDomain("mydomain") + .build()); + entitySpecs.add( + EntitySpec.newBuilder() + .setName("entity2") + .setValueType(Enum.INT64) + .setGroupPresence(FeaturePresenceWithinGroup.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setIntDomain(IntDomain.getDefaultInstance()) + .build()); + entitySpecs.add( + EntitySpec.newBuilder() + .setName("entity3") + .setValueType(Enum.FLOAT) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setFloatDomain(FloatDomain.getDefaultInstance()) + .build()); + entitySpecs.add( + EntitySpec.newBuilder() + .setName("entity4") + .setValueType(Enum.STRING) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setStringDomain(StringDomain.getDefaultInstance()) + .build()); + entitySpecs.add( + EntitySpec.newBuilder() + .setName("entity5") + .setValueType(Enum.BOOL) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setBoolDomain(BoolDomain.getDefaultInstance()) + .build()); List featureSpecs = new ArrayList<>(); - featureSpecs.add(FeatureSpec.newBuilder().setName("feature1") - .setValueType(Enum.INT64) - .setPresence(FeaturePresence.getDefaultInstance()) - .setShape(FixedShape.getDefaultInstance()) - .setDomain("mydomain") - .build()); - featureSpecs.add(FeatureSpec.newBuilder().setName("feature2") - .setValueType(Enum.INT64) - .setGroupPresence(FeaturePresenceWithinGroup.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setIntDomain(IntDomain.getDefaultInstance()) - .build()); - featureSpecs.add(FeatureSpec.newBuilder().setName("feature3") - .setValueType(Enum.FLOAT) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setFloatDomain(FloatDomain.getDefaultInstance()) - .build()); - featureSpecs.add(FeatureSpec.newBuilder().setName("feature4") - .setValueType(Enum.STRING) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setStringDomain(StringDomain.getDefaultInstance()) - .build()); - featureSpecs.add(FeatureSpec.newBuilder().setName("feature5") - .setValueType(Enum.BOOL) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setBoolDomain(BoolDomain.getDefaultInstance()) - .build()); - - FeatureSetSpec featureSetSpec = FeatureSetSpec.newBuilder() - .setProject("project1") - .setName("featureSetWithConstraints") - .addAllEntities(entitySpecs) - .addAllFeatures(featureSpecs) - .build(); - FeatureSetProto.FeatureSet featureSet = FeatureSetProto.FeatureSet.newBuilder() - .setSpec(featureSetSpec) - .build(); + featureSpecs.add( + FeatureSpec.newBuilder() + .setName("feature1") + .setValueType(Enum.INT64) + .setPresence(FeaturePresence.getDefaultInstance()) + .setShape(FixedShape.getDefaultInstance()) + .setDomain("mydomain") + .build()); + featureSpecs.add( + FeatureSpec.newBuilder() + .setName("feature2") + .setValueType(Enum.INT64) + .setGroupPresence(FeaturePresenceWithinGroup.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setIntDomain(IntDomain.getDefaultInstance()) + .build()); + featureSpecs.add( + FeatureSpec.newBuilder() + .setName("feature3") + .setValueType(Enum.FLOAT) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setFloatDomain(FloatDomain.getDefaultInstance()) + .build()); + featureSpecs.add( + FeatureSpec.newBuilder() + .setName("feature4") + .setValueType(Enum.STRING) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setStringDomain(StringDomain.getDefaultInstance()) + .build()); + featureSpecs.add( + FeatureSpec.newBuilder() + .setName("feature5") + .setValueType(Enum.BOOL) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setBoolDomain(BoolDomain.getDefaultInstance()) + .build()); + + FeatureSetSpec featureSetSpec = + FeatureSetSpec.newBuilder() + .setProject("project1") + .setName("featureSetWithConstraints") + .addAllEntities(entitySpecs) + .addAllFeatures(featureSpecs) + .build(); + FeatureSetProto.FeatureSet featureSet = + FeatureSetProto.FeatureSet.newBuilder().setSpec(featureSetSpec).build(); ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet(featureSet); FeatureSetSpec appliedFeatureSetSpec = applyFeatureSetResponse.getFeatureSet().getSpec(); @@ -596,7 +612,8 @@ public void applyFeatureSetShouldAcceptPresenceShapeAndDomainConstraints() // appliedFeatureSpecs needs to be sorted because the list returned by specService may not // follow the order in the request - List appliedFeatureSpecs = new ArrayList<>(appliedFeatureSetSpec.getFeaturesList()); + List appliedFeatureSpecs = + new ArrayList<>(appliedFeatureSetSpec.getFeaturesList()); appliedFeatureSpecs.sort(Comparator.comparing(FeatureSpec::getName)); assertEquals(appliedEntitySpecs.size(), entitySpecs.size()); @@ -684,6 +701,4 @@ private Store newDummyStore(String name) { store.setConfig(RedisConfig.newBuilder().setPort(6379).build().toByteArray()); return store; } - - } diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 41af5f9bb4..c4973ce3ca 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -22,7 +22,9 @@ import feast.core.FeatureSetProto.FeatureSet; import feast.core.SourceProto.Source; import feast.core.StoreProto.Store; +import feast.ingestion.options.BZip2Decompressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.StringListStreamConverter; import feast.ingestion.transform.ReadFromSource; import feast.ingestion.transform.ValidateFeatureRows; import feast.ingestion.transform.WriteFailedElementToBigQuery; @@ -33,6 +35,7 @@ import feast.ingestion.utils.StoreUtil; import feast.ingestion.values.FailedElement; import feast.types.FeatureRowProto.FeatureRow; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -57,15 +60,14 @@ public class ImportJob { * @param args arguments to be passed to Beam pipeline * @throws InvalidProtocolBufferException if options passed to the pipeline are invalid */ - public static void main(String[] args) throws InvalidProtocolBufferException { + public static void main(String[] args) throws IOException { ImportOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(ImportOptions.class); runPipeline(options); } @SuppressWarnings("UnusedReturnValue") - public static PipelineResult runPipeline(ImportOptions options) - throws InvalidProtocolBufferException { + public static PipelineResult runPipeline(ImportOptions options) throws IOException { /* * Steps: * 1. Read messages from Feast Source as FeatureRow @@ -80,8 +82,10 @@ public static PipelineResult runPipeline(ImportOptions options) log.info("Starting import job with settings: \n{}", options.toString()); - List featureSets = - SpecUtil.parseFeatureSetSpecJsonList(options.getFeatureSetJson()); + BZip2Decompressor> decompressor = + new BZip2Decompressor<>(new StringListStreamConverter()); + List featureSetJson = decompressor.decompress(options.getFeatureSetJson()); + List featureSets = SpecUtil.parseFeatureSetSpecJsonList(featureSetJson); List stores = SpecUtil.parseStoreJsonList(options.getStoreJson()); for (Store store : stores) { diff --git a/ingestion/src/main/java/feast/ingestion/options/BZip2Compressor.java b/ingestion/src/main/java/feast/ingestion/options/BZip2Compressor.java new file mode 100644 index 0000000000..b7e4e6ee0a --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/BZip2Compressor.java @@ -0,0 +1,47 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; + +public class BZip2Compressor implements OptionCompressor { + + private final OptionByteConverter byteConverter; + + public BZip2Compressor(OptionByteConverter byteConverter) { + this.byteConverter = byteConverter; + } + /** + * Compress pipeline option using BZip2 + * + * @param option Pipeline option value + * @return BZip2 compressed option value + * @throws IOException + */ + @Override + public byte[] compress(T option) throws IOException { + ByteArrayOutputStream compressedStream = new ByteArrayOutputStream(); + try (BZip2CompressorOutputStream bzip2Output = + new BZip2CompressorOutputStream(compressedStream)) { + bzip2Output.write(byteConverter.toByte(option)); + } + + return compressedStream.toByteArray(); + } +} diff --git a/ingestion/src/main/java/feast/ingestion/options/BZip2Decompressor.java b/ingestion/src/main/java/feast/ingestion/options/BZip2Decompressor.java new file mode 100644 index 0000000000..ce49c1be6e --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/BZip2Decompressor.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; + +public class BZip2Decompressor implements OptionDecompressor { + + private final InputStreamConverter inputStreamConverter; + + public BZip2Decompressor(InputStreamConverter inputStreamConverter) { + this.inputStreamConverter = inputStreamConverter; + } + + @Override + public T decompress(byte[] compressed) throws IOException { + try (ByteArrayInputStream inputStream = new ByteArrayInputStream(compressed); + BZip2CompressorInputStream bzip2Input = new BZip2CompressorInputStream(inputStream)) { + return inputStreamConverter.readStream(bzip2Input); + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index b299bb47e5..6afdd80dd7 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -28,16 +28,16 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions { @Required @Description( - "JSON string representation of the FeatureSet that the import job will process." + "JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format." + "FeatureSet follows the format in feast.core.FeatureSet proto." + "Mutliple FeatureSetSpec can be passed by specifying '--featureSet={...}' multiple times" + "The conversion of Proto message to JSON should follow this mapping:" + "https://developers.google.com/protocol-buffers/docs/proto3#json" + "Please minify and remove all insignificant whitespace such as newline in the JSON string" + "to prevent error when parsing the options") - List getFeatureSetJson(); + byte[] getFeatureSetJson(); - void setFeatureSetJson(List featureSetJson); + void setFeatureSetJson(byte[] featureSetJson); @Required @Description( diff --git a/ingestion/src/main/java/feast/ingestion/options/InputStreamConverter.java b/ingestion/src/main/java/feast/ingestion/options/InputStreamConverter.java new file mode 100644 index 0000000000..e2fef73236 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/InputStreamConverter.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.IOException; +import java.io.InputStream; + +public interface InputStreamConverter { + + /** + * Used in conjunction with {@link OptionDecompressor} to decompress the pipeline option + * + * @param inputStream Input byte stream in compressed format + * @return Decompressed pipeline option value + */ + T readStream(InputStream inputStream) throws IOException; +} diff --git a/ingestion/src/main/java/feast/ingestion/options/OptionByteConverter.java b/ingestion/src/main/java/feast/ingestion/options/OptionByteConverter.java new file mode 100644 index 0000000000..ff5a41a627 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/OptionByteConverter.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.IOException; + +public interface OptionByteConverter { + + /** + * Used in conjunction with {@link OptionCompressor} to compress the pipeline option + * + * @param option Pipeline option value + * @return byte representation of the pipeline option value, without compression. + */ + byte[] toByte(T option) throws IOException; +} diff --git a/ingestion/src/main/java/feast/ingestion/options/OptionCompressor.java b/ingestion/src/main/java/feast/ingestion/options/OptionCompressor.java new file mode 100644 index 0000000000..b2345fc3eb --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/OptionCompressor.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.IOException; + +public interface OptionCompressor { + + /** + * Compress pipeline option into bytes format. This is necessary as some Beam runner has + * limitation in terms of pipeline option size. + * + * @param option Pipeline option value + * @return Compressed values of the option, as byte array + */ + byte[] compress(T option) throws IOException; +} diff --git a/ingestion/src/main/java/feast/ingestion/options/OptionDecompressor.java b/ingestion/src/main/java/feast/ingestion/options/OptionDecompressor.java new file mode 100644 index 0000000000..affeafdaa0 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/OptionDecompressor.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.IOException; + +public interface OptionDecompressor { + + /** + * Decompress pipeline option from byte array. + * + * @param compressed Compressed pipeline option value + * @return Decompressed pipeline option + */ + T decompress(byte[] compressed) throws IOException; +} diff --git a/ingestion/src/main/java/feast/ingestion/options/StringListStreamConverter.java b/ingestion/src/main/java/feast/ingestion/options/StringListStreamConverter.java new file mode 100644 index 0000000000..d7277f3c7d --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/StringListStreamConverter.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.List; +import java.util.stream.Collectors; + +public class StringListStreamConverter implements InputStreamConverter> { + + /** + * Convert Input byte stream to newline separated strings + * + * @param inputStream Input byte stream + * @return List of string + */ + @Override + public List readStream(InputStream inputStream) throws IOException { + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + List stringList = reader.lines().collect(Collectors.toList()); + reader.close(); + return stringList; + } +} diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 290b38dabe..58ecae8f04 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -30,13 +30,16 @@ import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; import feast.core.StoreProto.Store.Subscription; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.OptionByteConverter; import feast.storage.RedisProto.RedisKey; import feast.test.TestUtil; import feast.test.TestUtil.LocalKafka; import feast.test.TestUtil.LocalRedis; import feast.types.FeatureRowProto.FeatureRow; import feast.types.ValueProto.ValueType.Enum; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -48,6 +51,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.joda.time.Duration; import org.junit.AfterClass; @@ -162,12 +166,13 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() .build(); ImportOptions options = PipelineOptionsFactory.create().as(ImportOptions.class); - options.setFeatureSetJson( - Collections.singletonList( - JsonFormat.printer().omittingInsignificantWhitespace().print(featureSet.getSpec()))); - options.setStoreJson( - Collections.singletonList( - JsonFormat.printer().omittingInsignificantWhitespace().print(redis))); + BZip2Compressor compressor = new BZip2Compressor<>(option -> { + JsonFormat.Printer printer = + JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); + return printer.print(option).getBytes(); + }); + options.setFeatureSetJson(compressor.compress(spec)); + options.setStoreJson(Collections.singletonList(JsonFormat.printer().print(redis))); options.setProject(""); options.setBlockOnRun(false); diff --git a/ingestion/src/test/java/feast/ingestion/options/BZip2CompressorTest.java b/ingestion/src/test/java/feast/ingestion/options/BZip2CompressorTest.java new file mode 100644 index 0000000000..cd03b18c79 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/options/BZip2CompressorTest.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.junit.Assert; +import org.junit.Test; + +public class BZip2CompressorTest { + + @Test + public void shouldHavBZip2CompatibleOutput() throws IOException { + BZip2Compressor compressor = new BZip2Compressor<>(String::getBytes); + String origString = "somestring"; + try (ByteArrayInputStream inputStream = + new ByteArrayInputStream(compressor.compress(origString)); + BZip2CompressorInputStream bzip2Input = new BZip2CompressorInputStream(inputStream); + BufferedReader reader = new BufferedReader(new InputStreamReader(bzip2Input))) { + Assert.assertEquals(origString, reader.readLine()); + } + } +} diff --git a/ingestion/src/test/java/feast/ingestion/options/BZip2DecompressorTest.java b/ingestion/src/test/java/feast/ingestion/options/BZip2DecompressorTest.java new file mode 100644 index 0000000000..fe7cc789d8 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/options/BZip2DecompressorTest.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import static org.junit.Assert.*; + +import java.io.*; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.junit.Test; + +public class BZip2DecompressorTest { + + @Test + public void shouldDecompressBZip2Stream() throws IOException { + BZip2Decompressor decompressor = + new BZip2Decompressor<>( + inputStream -> { + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + String output = reader.readLine(); + reader.close(); + return output; + }); + + String originalString = "abc"; + ByteArrayOutputStream compressedStream = new ByteArrayOutputStream(); + try (BZip2CompressorOutputStream bzip2Output = + new BZip2CompressorOutputStream(compressedStream)) { + bzip2Output.write(originalString.getBytes()); + } + + String decompressedString = decompressor.decompress(compressedStream.toByteArray()); + assertEquals(originalString, decompressedString); + } +} diff --git a/ingestion/src/test/java/feast/ingestion/options/StringListStreamConverterTest.java b/ingestion/src/test/java/feast/ingestion/options/StringListStreamConverterTest.java new file mode 100644 index 0000000000..5ce9f054bc --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/options/StringListStreamConverterTest.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import static org.junit.Assert.*; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import org.junit.Test; + +public class StringListStreamConverterTest { + + @Test + public void shouldReadStreamAsNewlineSeparatedStrings() throws IOException { + StringListStreamConverter converter = new StringListStreamConverter(); + String originalString = "abc\ndef"; + InputStream stringStream = new ByteArrayInputStream(originalString.getBytes()); + assertEquals(Arrays.asList("abc", "def"), converter.readStream(stringStream)); + } +} diff --git a/ingestion/src/test/java/feast/ingestion/util/DateUtilTest.java b/ingestion/src/test/java/feast/ingestion/utils/DateUtilTest.java similarity index 92% rename from ingestion/src/test/java/feast/ingestion/util/DateUtilTest.java rename to ingestion/src/test/java/feast/ingestion/utils/DateUtilTest.java index 71d4e67bea..151d501a59 100644 --- a/ingestion/src/test/java/feast/ingestion/util/DateUtilTest.java +++ b/ingestion/src/test/java/feast/ingestion/utils/DateUtilTest.java @@ -14,15 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.ingestion.util; +package feast.ingestion.utils; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.*; import com.google.protobuf.Timestamp; -import feast.ingestion.utils.DateUtil; import junit.framework.TestCase; import org.joda.time.DateTime; diff --git a/ingestion/src/test/java/feast/ingestion/util/JsonUtilTest.java b/ingestion/src/test/java/feast/ingestion/utils/JsonUtilTest.java similarity index 95% rename from ingestion/src/test/java/feast/ingestion/util/JsonUtilTest.java rename to ingestion/src/test/java/feast/ingestion/utils/JsonUtilTest.java index 02af4d819f..62c74dfc34 100644 --- a/ingestion/src/test/java/feast/ingestion/util/JsonUtilTest.java +++ b/ingestion/src/test/java/feast/ingestion/utils/JsonUtilTest.java @@ -14,12 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.ingestion.util; +package feast.ingestion.utils; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; -import feast.ingestion.utils.JsonUtil; import java.util.Collections; import java.util.HashMap; import java.util.Map; diff --git a/ingestion/src/test/java/feast/ingestion/util/StoreUtilTest.java b/ingestion/src/test/java/feast/ingestion/utils/StoreUtilTest.java similarity index 91% rename from ingestion/src/test/java/feast/ingestion/util/StoreUtilTest.java rename to ingestion/src/test/java/feast/ingestion/utils/StoreUtilTest.java index 4e2297e405..82988121bc 100644 --- a/ingestion/src/test/java/feast/ingestion/util/StoreUtilTest.java +++ b/ingestion/src/test/java/feast/ingestion/utils/StoreUtilTest.java @@ -14,22 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.ingestion.util; +package feast.ingestion.utils; -import static feast.types.ValueProto.ValueType.Enum.BOOL; -import static feast.types.ValueProto.ValueType.Enum.BOOL_LIST; -import static feast.types.ValueProto.ValueType.Enum.BYTES; -import static feast.types.ValueProto.ValueType.Enum.BYTES_LIST; -import static feast.types.ValueProto.ValueType.Enum.DOUBLE; -import static feast.types.ValueProto.ValueType.Enum.DOUBLE_LIST; -import static feast.types.ValueProto.ValueType.Enum.FLOAT; -import static feast.types.ValueProto.ValueType.Enum.FLOAT_LIST; -import static feast.types.ValueProto.ValueType.Enum.INT32; -import static feast.types.ValueProto.ValueType.Enum.INT32_LIST; -import static feast.types.ValueProto.ValueType.Enum.INT64; -import static feast.types.ValueProto.ValueType.Enum.INT64_LIST; -import static feast.types.ValueProto.ValueType.Enum.STRING; -import static feast.types.ValueProto.ValueType.Enum.STRING_LIST; +import static feast.types.ValueProto.ValueType.Enum.*; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.Field; @@ -40,7 +27,6 @@ import feast.core.FeatureSetProto.FeatureSet; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.FeatureSetProto.FeatureSpec; -import feast.ingestion.utils.StoreUtil; import java.util.Arrays; import org.junit.Assert; import org.junit.Test;