From 7128a0ad6ae63e571eab6cc4ac1a62af0957f125 Mon Sep 17 00:00:00 2001 From: Shu Heng Date: Fri, 7 Feb 2020 17:42:32 +0800 Subject: [PATCH 1/3] Use bzip2 compressed feature set json as pipeline option --- .../core/job/dataflow/DataflowJobManager.java | 23 +- .../job/direct/DirectRunnerJobManager.java | 16 +- .../java/feast/core/model/FeatureSet.java | 16 +- .../src/main/java/feast/core/model/Field.java | 3 +- .../java/feast/core/service/SpecService.java | 9 +- .../main/java/feast/core/util/ProtoUtil.java | 36 +++ .../job/dataflow/DataflowJobManagerTest.java | 31 ++- .../direct/DirectRunnerJobManagerTest.java | 20 +- .../feast/core/service/SpecServiceTest.java | 209 ++++++++++-------- .../java/feast/core/util/ProtoUtilTest.java | 82 +++++++ .../main/java/feast/ingestion/ImportJob.java | 12 +- .../ingestion/options/ImportOptions.java | 6 +- .../ingestion/utils/CompressionUtil.java | 45 ++++ .../java/feast/ingestion/ImportJobTest.java | 11 +- .../ingestion/utils/CompressionUtilTest.java | 36 +++ .../{util => utils}/DateUtilTest.java | 7 +- .../{util => utils}/JsonUtilTest.java | 3 +- .../{util => utils}/StoreUtilTest.java | 18 +- 18 files changed, 397 insertions(+), 186 deletions(-) create mode 100644 core/src/main/java/feast/core/util/ProtoUtil.java create mode 100644 core/src/test/java/feast/core/util/ProtoUtilTest.java create mode 100644 ingestion/src/main/java/feast/ingestion/utils/CompressionUtil.java create mode 100644 ingestion/src/test/java/feast/ingestion/utils/CompressionUtilTest.java rename ingestion/src/test/java/feast/ingestion/{util => utils}/DateUtilTest.java (92%) rename ingestion/src/test/java/feast/ingestion/{util => utils}/JsonUtilTest.java (95%) rename ingestion/src/test/java/feast/ingestion/{util => utils}/StoreUtilTest.java (91%) diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 7115ee3f66..054bffc11b 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -22,7 +22,6 @@ import com.google.common.base.Strings; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; -import com.google.protobuf.util.JsonFormat.Printer; import feast.core.FeatureSetProto; import feast.core.SourceProto; import feast.core.StoreProto; @@ -36,9 +35,11 @@ import feast.core.model.Project; import feast.core.model.Source; import feast.core.model.Store; +import feast.core.util.ProtoUtil; import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; import feast.ingestion.options.ImportOptions; +import feast.ingestion.utils.CompressionUtil; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -93,7 +94,8 @@ public Job startJob(Job job) { } catch (InvalidProtocolBufferException e) { log.error(e.getMessage()); throw new IllegalArgumentException( - String.format("DataflowJobManager failed to START job with id '%s' because the job" + String.format( + "DataflowJobManager failed to START job with id '%s' because the job" + "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s", job.getId(), e.getMessage())); } @@ -112,12 +114,13 @@ public Job updateJob(Job job) { for (FeatureSet featureSet : job.getFeatureSets()) { featureSetProtos.add(featureSet.toProto()); } - return submitDataflowJob(job.getId(), featureSetProtos, job.getSource().toProto(), - job.getStore().toProto(), true); + return submitDataflowJob( + job.getId(), featureSetProtos, job.getSource().toProto(), job.getStore().toProto(), true); } catch (InvalidProtocolBufferException e) { log.error(e.getMessage()); throw new IllegalArgumentException( - String.format("DataflowJobManager failed to UPDATE job with id '%s' because the job" + String.format( + "DataflowJobManager failed to UPDATE job with id '%s' because the job" + "has an invalid spec. Please check the FeatureSet, Source and Store specs. Actual error message: %s", job.getId(), e.getMessage())); } @@ -221,13 +224,9 @@ private ImportOptions getPipelineOptions( throws IOException { String[] args = TypeConversion.convertMapToArgs(defaultOptions); ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class); - Printer printer = JsonFormat.printer(); - List featureSetsJson = new ArrayList<>(); - for (FeatureSetProto.FeatureSet featureSet : featureSets) { - featureSetsJson.add(printer.print(featureSet.getSpec())); - } - pipelineOptions.setFeatureSetJson(featureSetsJson); - pipelineOptions.setStoreJson(Collections.singletonList(printer.print(sink))); + + pipelineOptions.setFeatureSetJson(CompressionUtil.compress(ProtoUtil.toJson(featureSets))); + pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setProject(projectId); pipelineOptions.setUpdate(update); pipelineOptions.setRunner(DataflowRunner.class); diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index b01d37d892..4e2d08a690 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -17,9 +17,7 @@ package feast.core.job.direct; import com.google.common.base.Strings; -import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; -import com.google.protobuf.util.JsonFormat.Printer; import feast.core.FeatureSetProto; import feast.core.StoreProto; import feast.core.config.FeastProperties.MetricsProperties; @@ -29,9 +27,11 @@ import feast.core.model.FeatureSet; import feast.core.model.Job; import feast.core.model.JobStatus; +import feast.core.util.ProtoUtil; import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; import feast.ingestion.options.ImportOptions; +import feast.ingestion.utils.CompressionUtil; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -92,17 +92,11 @@ public Job startJob(Job job) { } private ImportOptions getPipelineOptions( - List featureSets, StoreProto.Store sink) - throws InvalidProtocolBufferException { + List featureSets, StoreProto.Store sink) throws IOException { String[] args = TypeConversion.convertMapToArgs(defaultOptions); ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class); - Printer printer = JsonFormat.printer(); - List featureSetsJson = new ArrayList<>(); - for (FeatureSetProto.FeatureSet featureSet : featureSets) { - featureSetsJson.add(printer.print(featureSet.getSpec())); - } - pipelineOptions.setFeatureSetJson(featureSetsJson); - pipelineOptions.setStoreJson(Collections.singletonList(printer.print(sink))); + pipelineOptions.setFeatureSetJson(CompressionUtil.compress(ProtoUtil.toJson(featureSets))); + pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setRunner(DirectRunner.class); pipelineOptions.setProject(""); // set to default value to satisfy validation if (metrics.isEnabled()) { diff --git a/core/src/main/java/feast/core/model/FeatureSet.java b/core/src/main/java/feast/core/model/FeatureSet.java index cd6036fe5e..c593dcd701 100644 --- a/core/src/main/java/feast/core/model/FeatureSet.java +++ b/core/src/main/java/feast/core/model/FeatureSet.java @@ -264,8 +264,8 @@ private void setEntitySpecFields(EntitySpec.Builder entitySpecBuilder, Field ent if (entityField.getPresence() != null) { entitySpecBuilder.setPresence(FeaturePresence.parseFrom(entityField.getPresence())); } else if (entityField.getGroupPresence() != null) { - entitySpecBuilder - .setGroupPresence(FeaturePresenceWithinGroup.parseFrom(entityField.getGroupPresence())); + entitySpecBuilder.setGroupPresence( + FeaturePresenceWithinGroup.parseFrom(entityField.getGroupPresence())); } if (entityField.getShape() != null) { @@ -298,8 +298,8 @@ private void setEntitySpecFields(EntitySpec.Builder entitySpecBuilder, Field ent } else if (entityField.getTimeDomain() != null) { entitySpecBuilder.setTimeDomain(TimeDomain.parseFrom(entityField.getTimeDomain())); } else if (entityField.getTimeOfDayDomain() != null) { - entitySpecBuilder - .setTimeOfDayDomain(TimeOfDayDomain.parseFrom(entityField.getTimeOfDayDomain())); + entitySpecBuilder.setTimeOfDayDomain( + TimeOfDayDomain.parseFrom(entityField.getTimeOfDayDomain())); } } @@ -314,8 +314,8 @@ private void setFeatureSpecFields(FeatureSpec.Builder featureSpecBuilder, Field if (featureField.getPresence() != null) { featureSpecBuilder.setPresence(FeaturePresence.parseFrom(featureField.getPresence())); } else if (featureField.getGroupPresence() != null) { - featureSpecBuilder - .setGroupPresence(FeaturePresenceWithinGroup.parseFrom(featureField.getGroupPresence())); + featureSpecBuilder.setGroupPresence( + FeaturePresenceWithinGroup.parseFrom(featureField.getGroupPresence())); } if (featureField.getShape() != null) { @@ -348,8 +348,8 @@ private void setFeatureSpecFields(FeatureSpec.Builder featureSpecBuilder, Field } else if (featureField.getTimeDomain() != null) { featureSpecBuilder.setTimeDomain(TimeDomain.parseFrom(featureField.getTimeDomain())); } else if (featureField.getTimeOfDayDomain() != null) { - featureSpecBuilder - .setTimeOfDayDomain(TimeOfDayDomain.parseFrom(featureField.getTimeOfDayDomain())); + featureSpecBuilder.setTimeOfDayDomain( + TimeOfDayDomain.parseFrom(featureField.getTimeOfDayDomain())); } } diff --git a/core/src/main/java/feast/core/model/Field.java b/core/src/main/java/feast/core/model/Field.java index edb0a73acb..355b673fc8 100644 --- a/core/src/main/java/feast/core/model/Field.java +++ b/core/src/main/java/feast/core/model/Field.java @@ -71,8 +71,7 @@ public class Field { private byte[] timeDomain; private byte[] timeOfDayDomain; - public Field() { - } + public Field() {} public Field(String name, ValueType.Enum type) { this.name = name; diff --git a/core/src/main/java/feast/core/service/SpecService.java b/core/src/main/java/feast/core/service/SpecService.java index 9016b692d1..5b98d06597 100644 --- a/core/src/main/java/feast/core/service/SpecService.java +++ b/core/src/main/java/feast/core/service/SpecService.java @@ -167,8 +167,7 @@ public ListFeatureSetsResponse listFeatureSets(ListFeatureSetsRequest.Filter fil checkValidCharactersAllowAsterisk(name, "featureSetName"); checkValidCharactersAllowAsterisk(project, "projectName"); - List featureSets = new ArrayList() { - }; + List featureSets = new ArrayList() {}; if (project.equals("*")) { // Matching all projects @@ -277,9 +276,9 @@ public ListStoresResponse listStores(ListStoresRequest.Filter filter) { * Creates or updates a feature set in the repository. If there is a change in the feature set * schema, then the feature set version will be incremented. * - *

This function is idempotent. If no changes are detected in the incoming featureSet's - * schema, this method will update the incoming featureSet spec with the latest version stored in - * the repository, and return that. + *

This function is idempotent. If no changes are detected in the incoming featureSet's schema, + * this method will update the incoming featureSet spec with the latest version stored in the + * repository, and return that. * * @param newFeatureSet Feature set that will be created or updated. */ diff --git a/core/src/main/java/feast/core/util/ProtoUtil.java b/core/src/main/java/feast/core/util/ProtoUtil.java new file mode 100644 index 0000000000..76fdd84e8b --- /dev/null +++ b/core/src/main/java/feast/core/util/ProtoUtil.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.util; + +import com.google.protobuf.util.JsonFormat; +import feast.core.FeatureSetProto; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +public class ProtoUtil { + + public static String toJson(List featureSets) throws IOException { + JsonFormat.Printer printer = + JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); + List featureSetsJson = new ArrayList<>(); + for (FeatureSetProto.FeatureSet featureSet : featureSets) { + featureSetsJson.add(printer.print(featureSet.getSpec())); + } + return String.join("\n", featureSetsJson); + } +} diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index c263515ed0..fca17ff656 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -19,11 +19,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.initMocks; import com.google.api.services.dataflow.Dataflow; @@ -44,13 +40,12 @@ import feast.core.config.FeastProperties.MetricsProperties; import feast.core.exception.JobExecutionException; import feast.core.job.Runner; -import feast.core.model.FeatureSet; -import feast.core.model.Job; -import feast.core.model.JobStatus; -import feast.core.model.Source; -import feast.core.model.Store; +import feast.core.model.*; +import feast.core.util.ProtoUtil; import feast.ingestion.options.ImportOptions; +import feast.ingestion.utils.CompressionUtil; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.dataflow.DataflowPipelineJob; @@ -132,7 +127,7 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { expectedPipelineOptions.setJobName(jobName); expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); expectedPipelineOptions.setFeatureSetJson( - Lists.newArrayList(printer.print(featureSet.getSpec()))); + CompressionUtil.compress(ProtoUtil.toJson(Collections.singletonList(featureSet)))); ArgumentCaptor captor = ArgumentCaptor.forClass(ImportOptions.class); @@ -170,7 +165,19 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { // Assume the files that are staged are correct expectedPipelineOptions.setFilesToStage(actualPipelineOptions.getFilesToStage()); - assertThat(actualPipelineOptions.toString(), equalTo(expectedPipelineOptions.toString())); + assertThat( + actualPipelineOptions.getFeatureSetJson(), + equalTo(expectedPipelineOptions.getFeatureSetJson())); + assertThat( + actualPipelineOptions.getDeadLetterTableSpec(), + equalTo(expectedPipelineOptions.getDeadLetterTableSpec())); + assertThat( + actualPipelineOptions.getStatsdHost(), equalTo(expectedPipelineOptions.getStatsdHost())); + assertThat( + actualPipelineOptions.getMetricsExporterType(), + equalTo(expectedPipelineOptions.getMetricsExporterType())); + assertThat( + actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson())); assertThat(actual.getExtId(), equalTo(expectedExtJobId)); } diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index 2dd87cfc6e..1d23ee1cbc 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -45,8 +45,11 @@ import feast.core.model.JobStatus; import feast.core.model.Source; import feast.core.model.Store; +import feast.core.util.ProtoUtil; import feast.ingestion.options.ImportOptions; +import feast.ingestion.utils.CompressionUtil; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import org.apache.beam.runners.direct.DirectRunner; @@ -122,7 +125,7 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); expectedPipelineOptions.setProject(""); expectedPipelineOptions.setFeatureSetJson( - Lists.newArrayList(printer.print(featureSet.getSpec()))); + CompressionUtil.compress(ProtoUtil.toJson(Collections.singletonList(featureSet)))); String expectedJobId = "feast-job-0"; ArgumentCaptor pipelineOptionsCaptor = @@ -150,7 +153,20 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { expectedPipelineOptions.setOptionsId( actualPipelineOptions.getOptionsId()); // avoid comparing this value - assertThat(actualPipelineOptions.toString(), equalTo(expectedPipelineOptions.toString())); + assertThat( + actualPipelineOptions.getFeatureSetJson(), + equalTo(expectedPipelineOptions.getFeatureSetJson())); + assertThat( + actualPipelineOptions.getDeadLetterTableSpec(), + equalTo(expectedPipelineOptions.getDeadLetterTableSpec())); + assertThat( + actualPipelineOptions.getStatsdHost(), equalTo(expectedPipelineOptions.getStatsdHost())); + assertThat( + actualPipelineOptions.getMetricsExporterType(), + equalTo(expectedPipelineOptions.getMetricsExporterType())); + assertThat( + actualPipelineOptions.getStoreJson(), equalTo(expectedPipelineOptions.getStoreJson())); + assertThat(jobStarted.getPipelineResult(), equalTo(mockPipelineResult)); assertThat(jobStarted.getJobId(), equalTo(expectedJobId)); assertThat(actual.getExtId(), equalTo(expectedJobId)); diff --git a/core/src/test/java/feast/core/service/SpecServiceTest.java b/core/src/test/java/feast/core/service/SpecServiceTest.java index c533f593e3..38f7475636 100644 --- a/core/src/test/java/feast/core/service/SpecServiceTest.java +++ b/core/src/test/java/feast/core/service/SpecServiceTest.java @@ -84,17 +84,13 @@ public class SpecServiceTest { - @Mock - private FeatureSetRepository featureSetRepository; + @Mock private FeatureSetRepository featureSetRepository; - @Mock - private StoreRepository storeRepository; + @Mock private StoreRepository storeRepository; - @Mock - private ProjectRepository projectRepository; + @Mock private ProjectRepository projectRepository; - @Rule - public final ExpectedException expectedException = ExpectedException.none(); + @Rule public final ExpectedException expectedException = ExpectedException.none(); private SpecService specService; private List featureSets; @@ -140,25 +136,25 @@ public void setUp() { when(featureSetRepository.findFeatureSetByNameAndProject_NameAndVersion("f1", "project1", 1)) .thenReturn(featureSets.get(0)); when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "f1", "project1")) + "f1", "project1")) .thenReturn(featureSets.subList(0, 3)); when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "f3", "project1")) + "f3", "project1")) .thenReturn(featureSets.subList(4, 5)); when(featureSetRepository.findFirstFeatureSetByNameLikeAndProject_NameOrderByVersionDesc( - "f1", "project1")) + "f1", "project1")) .thenReturn(featureSet1v3); when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "f1", "project1")) + "f1", "project1")) .thenReturn(featureSets.subList(0, 3)); when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "asd", "project1")) + "asd", "project1")) .thenReturn(Lists.newArrayList()); when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "f%", "project1")) + "f%", "project1")) .thenReturn(featureSets); when(featureSetRepository.findAllByNameLikeAndProject_NameLikeOrderByNameAscVersionAsc( - "%", "%")) + "%", "%")) .thenReturn(featureSets); when(projectRepository.findAllByArchivedIsFalse()) @@ -403,7 +399,7 @@ public void applyFeatureSetShouldReturnFeatureSetWithLatestVersionIfFeatureSetHa public void applyFeatureSetShouldApplyFeatureSetWithInitVersionIfNotExists() throws InvalidProtocolBufferException { when(featureSetRepository.findAllByNameLikeAndProject_NameOrderByNameAscVersionAsc( - "f2", "project1")) + "f2", "project1")) .thenReturn(Lists.newArrayList()); FeatureSetProto.FeatureSet incomingFeatureSet = @@ -485,14 +481,14 @@ public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered() Field f3e1 = new Field("f3e1", Enum.STRING); FeatureSetProto.FeatureSet incomingFeatureSet = (new FeatureSet( - "f3", - "project1", - 5, - 100L, - Arrays.asList(f3e1), - Arrays.asList(f3f2, f3f1), - defaultSource, - FeatureSetStatus.STATUS_READY)) + "f3", + "project1", + 5, + 100L, + Arrays.asList(f3e1), + Arrays.asList(f3f2, f3f1), + defaultSource, + FeatureSetStatus.STATUS_READY)) .toProto(); ApplyFeatureSetResponse applyFeatureSetResponse = @@ -513,78 +509,98 @@ public void applyFeatureSetShouldNotCreateFeatureSetIfFieldsUnordered() public void applyFeatureSetShouldAcceptPresenceShapeAndDomainConstraints() throws InvalidProtocolBufferException { List entitySpecs = new ArrayList<>(); - entitySpecs.add(EntitySpec.newBuilder().setName("entity1") - .setValueType(Enum.INT64) - .setPresence(FeaturePresence.getDefaultInstance()) - .setShape(FixedShape.getDefaultInstance()) - .setDomain("mydomain") - .build()); - entitySpecs.add(EntitySpec.newBuilder().setName("entity2") - .setValueType(Enum.INT64) - .setGroupPresence(FeaturePresenceWithinGroup.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setIntDomain(IntDomain.getDefaultInstance()) - .build()); - entitySpecs.add(EntitySpec.newBuilder().setName("entity3") - .setValueType(Enum.FLOAT) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setFloatDomain(FloatDomain.getDefaultInstance()) - .build()); - entitySpecs.add(EntitySpec.newBuilder().setName("entity4") - .setValueType(Enum.STRING) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setStringDomain(StringDomain.getDefaultInstance()) - .build()); - entitySpecs.add(EntitySpec.newBuilder().setName("entity5") - .setValueType(Enum.BOOL) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setBoolDomain(BoolDomain.getDefaultInstance()) - .build()); + entitySpecs.add( + EntitySpec.newBuilder() + .setName("entity1") + .setValueType(Enum.INT64) + .setPresence(FeaturePresence.getDefaultInstance()) + .setShape(FixedShape.getDefaultInstance()) + .setDomain("mydomain") + .build()); + entitySpecs.add( + EntitySpec.newBuilder() + .setName("entity2") + .setValueType(Enum.INT64) + .setGroupPresence(FeaturePresenceWithinGroup.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setIntDomain(IntDomain.getDefaultInstance()) + .build()); + entitySpecs.add( + EntitySpec.newBuilder() + .setName("entity3") + .setValueType(Enum.FLOAT) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setFloatDomain(FloatDomain.getDefaultInstance()) + .build()); + entitySpecs.add( + EntitySpec.newBuilder() + .setName("entity4") + .setValueType(Enum.STRING) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setStringDomain(StringDomain.getDefaultInstance()) + .build()); + entitySpecs.add( + EntitySpec.newBuilder() + .setName("entity5") + .setValueType(Enum.BOOL) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setBoolDomain(BoolDomain.getDefaultInstance()) + .build()); List featureSpecs = new ArrayList<>(); - featureSpecs.add(FeatureSpec.newBuilder().setName("feature1") - .setValueType(Enum.INT64) - .setPresence(FeaturePresence.getDefaultInstance()) - .setShape(FixedShape.getDefaultInstance()) - .setDomain("mydomain") - .build()); - featureSpecs.add(FeatureSpec.newBuilder().setName("feature2") - .setValueType(Enum.INT64) - .setGroupPresence(FeaturePresenceWithinGroup.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setIntDomain(IntDomain.getDefaultInstance()) - .build()); - featureSpecs.add(FeatureSpec.newBuilder().setName("feature3") - .setValueType(Enum.FLOAT) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setFloatDomain(FloatDomain.getDefaultInstance()) - .build()); - featureSpecs.add(FeatureSpec.newBuilder().setName("feature4") - .setValueType(Enum.STRING) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setStringDomain(StringDomain.getDefaultInstance()) - .build()); - featureSpecs.add(FeatureSpec.newBuilder().setName("feature5") - .setValueType(Enum.BOOL) - .setPresence(FeaturePresence.getDefaultInstance()) - .setValueCount(ValueCount.getDefaultInstance()) - .setBoolDomain(BoolDomain.getDefaultInstance()) - .build()); - - FeatureSetSpec featureSetSpec = FeatureSetSpec.newBuilder() - .setProject("project1") - .setName("featureSetWithConstraints") - .addAllEntities(entitySpecs) - .addAllFeatures(featureSpecs) - .build(); - FeatureSetProto.FeatureSet featureSet = FeatureSetProto.FeatureSet.newBuilder() - .setSpec(featureSetSpec) - .build(); + featureSpecs.add( + FeatureSpec.newBuilder() + .setName("feature1") + .setValueType(Enum.INT64) + .setPresence(FeaturePresence.getDefaultInstance()) + .setShape(FixedShape.getDefaultInstance()) + .setDomain("mydomain") + .build()); + featureSpecs.add( + FeatureSpec.newBuilder() + .setName("feature2") + .setValueType(Enum.INT64) + .setGroupPresence(FeaturePresenceWithinGroup.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setIntDomain(IntDomain.getDefaultInstance()) + .build()); + featureSpecs.add( + FeatureSpec.newBuilder() + .setName("feature3") + .setValueType(Enum.FLOAT) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setFloatDomain(FloatDomain.getDefaultInstance()) + .build()); + featureSpecs.add( + FeatureSpec.newBuilder() + .setName("feature4") + .setValueType(Enum.STRING) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setStringDomain(StringDomain.getDefaultInstance()) + .build()); + featureSpecs.add( + FeatureSpec.newBuilder() + .setName("feature5") + .setValueType(Enum.BOOL) + .setPresence(FeaturePresence.getDefaultInstance()) + .setValueCount(ValueCount.getDefaultInstance()) + .setBoolDomain(BoolDomain.getDefaultInstance()) + .build()); + + FeatureSetSpec featureSetSpec = + FeatureSetSpec.newBuilder() + .setProject("project1") + .setName("featureSetWithConstraints") + .addAllEntities(entitySpecs) + .addAllFeatures(featureSpecs) + .build(); + FeatureSetProto.FeatureSet featureSet = + FeatureSetProto.FeatureSet.newBuilder().setSpec(featureSetSpec).build(); ApplyFeatureSetResponse applyFeatureSetResponse = specService.applyFeatureSet(featureSet); FeatureSetSpec appliedFeatureSetSpec = applyFeatureSetResponse.getFeatureSet().getSpec(); @@ -596,7 +612,8 @@ public void applyFeatureSetShouldAcceptPresenceShapeAndDomainConstraints() // appliedFeatureSpecs needs to be sorted because the list returned by specService may not // follow the order in the request - List appliedFeatureSpecs = new ArrayList<>(appliedFeatureSetSpec.getFeaturesList()); + List appliedFeatureSpecs = + new ArrayList<>(appliedFeatureSetSpec.getFeaturesList()); appliedFeatureSpecs.sort(Comparator.comparing(FeatureSpec::getName)); assertEquals(appliedEntitySpecs.size(), entitySpecs.size()); @@ -684,6 +701,4 @@ private Store newDummyStore(String name) { store.setConfig(RedisConfig.newBuilder().setPort(6379).build().toByteArray()); return store; } - - } diff --git a/core/src/test/java/feast/core/util/ProtoUtilTest.java b/core/src/test/java/feast/core/util/ProtoUtilTest.java new file mode 100644 index 0000000000..beab9a6fbd --- /dev/null +++ b/core/src/test/java/feast/core/util/ProtoUtilTest.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.core.util; + +import static org.junit.Assert.assertEquals; + +import feast.core.FeatureSetProto; +import feast.core.SourceProto; +import feast.types.ValueProto; +import java.io.IOException; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.junit.Test; + +public class ProtoUtilTest { + + private FeatureSetProto.FeatureSet newFeatureSet(Integer version, Integer numberOfFeatures) { + List features = + IntStream.range(1, numberOfFeatures + 1) + .mapToObj( + i -> + FeatureSetProto.FeatureSpec.newBuilder() + .setValueType(ValueProto.ValueType.Enum.FLOAT) + .setName("feature".concat(Integer.toString(i))) + .build()) + .collect(Collectors.toList()); + + return FeatureSetProto.FeatureSet.newBuilder() + .setSpec( + FeatureSetProto.FeatureSetSpec.newBuilder() + .setSource( + SourceProto.Source.newBuilder() + .setType(SourceProto.SourceType.KAFKA) + .setKafkaSourceConfig( + SourceProto.KafkaSourceConfig.newBuilder() + .setBootstrapServers("somebrokers:9092") + .setTopic("sometopic"))) + .addAllFeatures(features) + .setVersion(version) + .addEntities( + FeatureSetProto.EntitySpec.newBuilder() + .setName("entity") + .setValueType(ValueProto.ValueType.Enum.STRING))) + .build(); + } + + @Test + public void testConversionToJson() throws IOException { + int nrOfFeatureSet = 1; + int nrOfFeatures = 1; + List featureSets = + IntStream.range(1, nrOfFeatureSet + 1) + .mapToObj(i -> newFeatureSet(i, nrOfFeatures)) + .collect(Collectors.toList()); + + String expectedOutput = + "{\"version\":1," + + "\"entities\":[{\"name\":\"entity\",\"valueType\":2}]," + + "\"features\":[{\"name\":\"feature1\",\"valueType\":6}]," + + "\"source\":{" + + "\"type\":1," + + "\"kafkaSourceConfig\":{" + + "\"bootstrapServers\":\"somebrokers:9092\"," + + "\"topic\":\"sometopic\"}}}"; + assertEquals(expectedOutput, ProtoUtil.toJson(featureSets)); + } +} diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 41af5f9bb4..11619e66d4 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -28,11 +28,13 @@ import feast.ingestion.transform.WriteFailedElementToBigQuery; import feast.ingestion.transform.WriteToStore; import feast.ingestion.transform.metrics.WriteMetricsTransform; +import feast.ingestion.utils.CompressionUtil; import feast.ingestion.utils.ResourceUtil; import feast.ingestion.utils.SpecUtil; import feast.ingestion.utils.StoreUtil; import feast.ingestion.values.FailedElement; import feast.types.FeatureRowProto.FeatureRow; +import java.io.IOException; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -57,15 +59,14 @@ public class ImportJob { * @param args arguments to be passed to Beam pipeline * @throws InvalidProtocolBufferException if options passed to the pipeline are invalid */ - public static void main(String[] args) throws InvalidProtocolBufferException { + public static void main(String[] args) throws IOException { ImportOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create().as(ImportOptions.class); runPipeline(options); } @SuppressWarnings("UnusedReturnValue") - public static PipelineResult runPipeline(ImportOptions options) - throws InvalidProtocolBufferException { + public static PipelineResult runPipeline(ImportOptions options) throws IOException { /* * Steps: * 1. Read messages from Feast Source as FeatureRow @@ -80,8 +81,9 @@ public static PipelineResult runPipeline(ImportOptions options) log.info("Starting import job with settings: \n{}", options.toString()); - List featureSets = - SpecUtil.parseFeatureSetSpecJsonList(options.getFeatureSetJson()); + List featureSetJson = + CompressionUtil.decompressAsListOfString(options.getFeatureSetJson()); + List featureSets = SpecUtil.parseFeatureSetSpecJsonList(featureSetJson); List stores = SpecUtil.parseStoreJsonList(options.getStoreJson()); for (Store store : stores) { diff --git a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java index b299bb47e5..6afdd80dd7 100644 --- a/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java +++ b/ingestion/src/main/java/feast/ingestion/options/ImportOptions.java @@ -28,16 +28,16 @@ public interface ImportOptions extends PipelineOptions, DataflowPipelineOptions, DirectOptions { @Required @Description( - "JSON string representation of the FeatureSet that the import job will process." + "JSON string representation of the FeatureSet that the import job will process, in BZip2 binary format." + "FeatureSet follows the format in feast.core.FeatureSet proto." + "Mutliple FeatureSetSpec can be passed by specifying '--featureSet={...}' multiple times" + "The conversion of Proto message to JSON should follow this mapping:" + "https://developers.google.com/protocol-buffers/docs/proto3#json" + "Please minify and remove all insignificant whitespace such as newline in the JSON string" + "to prevent error when parsing the options") - List getFeatureSetJson(); + byte[] getFeatureSetJson(); - void setFeatureSetJson(List featureSetJson); + void setFeatureSetJson(byte[] featureSetJson); @Required @Description( diff --git a/ingestion/src/main/java/feast/ingestion/utils/CompressionUtil.java b/ingestion/src/main/java/feast/ingestion/utils/CompressionUtil.java new file mode 100644 index 0000000000..12b81f9d87 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/utils/CompressionUtil.java @@ -0,0 +1,45 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.utils; + +import java.io.*; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; + +public class CompressionUtil { + + public static List decompressAsListOfString(byte[] compressedFeatureSets) + throws IOException { + try (ByteArrayInputStream inputStream = new ByteArrayInputStream(compressedFeatureSets); + BZip2CompressorInputStream bzip2Input = new BZip2CompressorInputStream(inputStream); + BufferedReader reader = new BufferedReader(new InputStreamReader(bzip2Input)); ) { + return reader.lines().collect(Collectors.toList()); + } + } + + public static byte[] compress(String origStr) throws IOException { + ByteArrayOutputStream compressedStream = new ByteArrayOutputStream(); + try (BZip2CompressorOutputStream bzip2Output = + new BZip2CompressorOutputStream(compressedStream)) { + bzip2Output.write(origStr.getBytes()); + } + + return compressedStream.toByteArray(); + } +} diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 290b38dabe..08cfc79ea5 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -31,6 +31,7 @@ import feast.core.StoreProto.Store.StoreType; import feast.core.StoreProto.Store.Subscription; import feast.ingestion.options.ImportOptions; +import feast.ingestion.utils.CompressionUtil; import feast.storage.RedisProto.RedisKey; import feast.test.TestUtil; import feast.test.TestUtil.LocalKafka; @@ -162,12 +163,10 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() .build(); ImportOptions options = PipelineOptionsFactory.create().as(ImportOptions.class); - options.setFeatureSetJson( - Collections.singletonList( - JsonFormat.printer().omittingInsignificantWhitespace().print(featureSet.getSpec()))); - options.setStoreJson( - Collections.singletonList( - JsonFormat.printer().omittingInsignificantWhitespace().print(redis))); + JsonFormat.Printer printer = + JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); + options.setFeatureSetJson(CompressionUtil.compress(printer.print(spec))); + options.setStoreJson(Collections.singletonList(JsonFormat.printer().print(redis))); options.setProject(""); options.setBlockOnRun(false); diff --git a/ingestion/src/test/java/feast/ingestion/utils/CompressionUtilTest.java b/ingestion/src/test/java/feast/ingestion/utils/CompressionUtilTest.java new file mode 100644 index 0000000000..63b964209a --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/utils/CompressionUtilTest.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.utils; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; + +public class CompressionUtilTest { + + @Test + public void testCompressAndDecompression() throws IOException { + String origString = "abc\ndef"; + byte[] compressedString = CompressionUtil.compress(origString); + List decompressedFeatureSets = + CompressionUtil.decompressAsListOfString(compressedString); + assertEquals(Arrays.asList("abc", "def"), decompressedFeatureSets); + } +} diff --git a/ingestion/src/test/java/feast/ingestion/util/DateUtilTest.java b/ingestion/src/test/java/feast/ingestion/utils/DateUtilTest.java similarity index 92% rename from ingestion/src/test/java/feast/ingestion/util/DateUtilTest.java rename to ingestion/src/test/java/feast/ingestion/utils/DateUtilTest.java index 71d4e67bea..151d501a59 100644 --- a/ingestion/src/test/java/feast/ingestion/util/DateUtilTest.java +++ b/ingestion/src/test/java/feast/ingestion/utils/DateUtilTest.java @@ -14,15 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.ingestion.util; +package feast.ingestion.utils; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.*; import com.google.protobuf.Timestamp; -import feast.ingestion.utils.DateUtil; import junit.framework.TestCase; import org.joda.time.DateTime; diff --git a/ingestion/src/test/java/feast/ingestion/util/JsonUtilTest.java b/ingestion/src/test/java/feast/ingestion/utils/JsonUtilTest.java similarity index 95% rename from ingestion/src/test/java/feast/ingestion/util/JsonUtilTest.java rename to ingestion/src/test/java/feast/ingestion/utils/JsonUtilTest.java index 02af4d819f..62c74dfc34 100644 --- a/ingestion/src/test/java/feast/ingestion/util/JsonUtilTest.java +++ b/ingestion/src/test/java/feast/ingestion/utils/JsonUtilTest.java @@ -14,12 +14,11 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.ingestion.util; +package feast.ingestion.utils; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; -import feast.ingestion.utils.JsonUtil; import java.util.Collections; import java.util.HashMap; import java.util.Map; diff --git a/ingestion/src/test/java/feast/ingestion/util/StoreUtilTest.java b/ingestion/src/test/java/feast/ingestion/utils/StoreUtilTest.java similarity index 91% rename from ingestion/src/test/java/feast/ingestion/util/StoreUtilTest.java rename to ingestion/src/test/java/feast/ingestion/utils/StoreUtilTest.java index 4e2297e405..82988121bc 100644 --- a/ingestion/src/test/java/feast/ingestion/util/StoreUtilTest.java +++ b/ingestion/src/test/java/feast/ingestion/utils/StoreUtilTest.java @@ -14,22 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.ingestion.util; +package feast.ingestion.utils; -import static feast.types.ValueProto.ValueType.Enum.BOOL; -import static feast.types.ValueProto.ValueType.Enum.BOOL_LIST; -import static feast.types.ValueProto.ValueType.Enum.BYTES; -import static feast.types.ValueProto.ValueType.Enum.BYTES_LIST; -import static feast.types.ValueProto.ValueType.Enum.DOUBLE; -import static feast.types.ValueProto.ValueType.Enum.DOUBLE_LIST; -import static feast.types.ValueProto.ValueType.Enum.FLOAT; -import static feast.types.ValueProto.ValueType.Enum.FLOAT_LIST; -import static feast.types.ValueProto.ValueType.Enum.INT32; -import static feast.types.ValueProto.ValueType.Enum.INT32_LIST; -import static feast.types.ValueProto.ValueType.Enum.INT64; -import static feast.types.ValueProto.ValueType.Enum.INT64_LIST; -import static feast.types.ValueProto.ValueType.Enum.STRING; -import static feast.types.ValueProto.ValueType.Enum.STRING_LIST; +import static feast.types.ValueProto.ValueType.Enum.*; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.Field; @@ -40,7 +27,6 @@ import feast.core.FeatureSetProto.FeatureSet; import feast.core.FeatureSetProto.FeatureSetSpec; import feast.core.FeatureSetProto.FeatureSpec; -import feast.ingestion.utils.StoreUtil; import java.util.Arrays; import org.junit.Assert; import org.junit.Test; From 0fe5654a695629747d7654e1979c5e18a7d5841c Mon Sep 17 00:00:00 2001 From: Shu Heng Date: Wed, 12 Feb 2020 11:07:52 +0800 Subject: [PATCH 2/3] Make decompressor and compressor more generic and extensible --- .../core/job/dataflow/DataflowJobManager.java | 17 ++++--- .../job/direct/DirectRunnerJobManager.java | 11 +++-- .../option/FeatureSetJsonByteConverter.java} | 21 ++++++-- .../job/dataflow/DataflowJobManagerTest.java | 11 +++-- .../direct/DirectRunnerJobManagerTest.java | 11 +++-- .../FeatureSetJsonByteConverterTest.java} | 15 +++--- .../main/java/feast/ingestion/ImportJob.java | 8 ++-- .../BZip2Compressor.java} | 34 ++++++------- .../ingestion/options/BZip2Decompressor.java | 38 +++++++++++++++ .../options/InputStreamConverter.java | 31 ++++++++++++ .../options/OptionByteConverter.java | 30 ++++++++++++ .../ingestion/options/OptionCompressor.java | 31 ++++++++++++ .../ingestion/options/OptionDecompressor.java | 30 ++++++++++++ .../options/StringListStreamConverter.java | 41 ++++++++++++++++ .../java/feast/ingestion/ImportJobTest.java | 10 +++- .../options/BZip2CompressorTest.java | 40 ++++++++++++++++ .../options/BZip2DecompressorTest.java | 48 +++++++++++++++++++ .../StringListStreamConverterTest.java} | 20 ++++---- 18 files changed, 386 insertions(+), 61 deletions(-) rename core/src/main/java/feast/core/{util/ProtoUtil.java => job/option/FeatureSetJsonByteConverter.java} (60%) rename core/src/test/java/feast/core/{util/ProtoUtilTest.java => job/option/FeatureSetJsonByteConverterTest.java} (85%) rename ingestion/src/main/java/feast/ingestion/{utils/CompressionUtil.java => options/BZip2Compressor.java} (55%) create mode 100644 ingestion/src/main/java/feast/ingestion/options/BZip2Decompressor.java create mode 100644 ingestion/src/main/java/feast/ingestion/options/InputStreamConverter.java create mode 100644 ingestion/src/main/java/feast/ingestion/options/OptionByteConverter.java create mode 100644 ingestion/src/main/java/feast/ingestion/options/OptionCompressor.java create mode 100644 ingestion/src/main/java/feast/ingestion/options/OptionDecompressor.java create mode 100644 ingestion/src/main/java/feast/ingestion/options/StringListStreamConverter.java create mode 100644 ingestion/src/test/java/feast/ingestion/options/BZip2CompressorTest.java create mode 100644 ingestion/src/test/java/feast/ingestion/options/BZip2DecompressorTest.java rename ingestion/src/test/java/feast/ingestion/{utils/CompressionUtilTest.java => options/StringListStreamConverterTest.java} (57%) diff --git a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java index 054bffc11b..323eb35983 100644 --- a/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java +++ b/core/src/main/java/feast/core/job/dataflow/DataflowJobManager.java @@ -29,17 +29,13 @@ import feast.core.exception.JobExecutionException; import feast.core.job.JobManager; import feast.core.job.Runner; -import feast.core.model.FeatureSet; -import feast.core.model.Job; -import feast.core.model.JobStatus; -import feast.core.model.Project; -import feast.core.model.Source; -import feast.core.model.Store; -import feast.core.util.ProtoUtil; +import feast.core.job.option.FeatureSetJsonByteConverter; +import feast.core.model.*; import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; -import feast.ingestion.utils.CompressionUtil; +import feast.ingestion.options.OptionCompressor; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -225,7 +221,10 @@ private ImportOptions getPipelineOptions( String[] args = TypeConversion.convertMapToArgs(defaultOptions); ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class); - pipelineOptions.setFeatureSetJson(CompressionUtil.compress(ProtoUtil.toJson(featureSets))); + OptionCompressor> featureSetJsonCompressor = + new BZip2Compressor<>(new FeatureSetJsonByteConverter()); + + pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets)); pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setProject(projectId); pipelineOptions.setUpdate(update); diff --git a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java index 4e2d08a690..08aeed1cc3 100644 --- a/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java +++ b/core/src/main/java/feast/core/job/direct/DirectRunnerJobManager.java @@ -24,14 +24,15 @@ import feast.core.exception.JobExecutionException; import feast.core.job.JobManager; import feast.core.job.Runner; +import feast.core.job.option.FeatureSetJsonByteConverter; import feast.core.model.FeatureSet; import feast.core.model.Job; import feast.core.model.JobStatus; -import feast.core.util.ProtoUtil; import feast.core.util.TypeConversion; import feast.ingestion.ImportJob; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; -import feast.ingestion.utils.CompressionUtil; +import feast.ingestion.options.OptionCompressor; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -95,7 +96,11 @@ private ImportOptions getPipelineOptions( List featureSets, StoreProto.Store sink) throws IOException { String[] args = TypeConversion.convertMapToArgs(defaultOptions); ImportOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).as(ImportOptions.class); - pipelineOptions.setFeatureSetJson(CompressionUtil.compress(ProtoUtil.toJson(featureSets))); + + OptionCompressor> featureSetJsonCompressor = + new BZip2Compressor<>(new FeatureSetJsonByteConverter()); + + pipelineOptions.setFeatureSetJson(featureSetJsonCompressor.compress(featureSets)); pipelineOptions.setStoreJson(Collections.singletonList(JsonFormat.printer().print(sink))); pipelineOptions.setRunner(DirectRunner.class); pipelineOptions.setProject(""); // set to default value to satisfy validation diff --git a/core/src/main/java/feast/core/util/ProtoUtil.java b/core/src/main/java/feast/core/job/option/FeatureSetJsonByteConverter.java similarity index 60% rename from core/src/main/java/feast/core/util/ProtoUtil.java rename to core/src/main/java/feast/core/job/option/FeatureSetJsonByteConverter.java index 76fdd84e8b..dbd04d668f 100644 --- a/core/src/main/java/feast/core/util/ProtoUtil.java +++ b/core/src/main/java/feast/core/job/option/FeatureSetJsonByteConverter.java @@ -14,23 +14,34 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.core.util; +package feast.core.job.option; +import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.util.JsonFormat; import feast.core.FeatureSetProto; -import java.io.IOException; +import feast.ingestion.options.OptionByteConverter; import java.util.ArrayList; import java.util.List; -public class ProtoUtil { +public class FeatureSetJsonByteConverter + implements OptionByteConverter> { - public static String toJson(List featureSets) throws IOException { + /** + * Convert list of feature sets to json strings joined by new line, represented as byte arrays + * + * @param featureSets List of feature set protobufs + * @return Byte array representation of the json strings + * @throws InvalidProtocolBufferException + */ + @Override + public byte[] toByte(List featureSets) + throws InvalidProtocolBufferException { JsonFormat.Printer printer = JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); List featureSetsJson = new ArrayList<>(); for (FeatureSetProto.FeatureSet featureSet : featureSets) { featureSetsJson.add(printer.print(featureSet.getSpec())); } - return String.join("\n", featureSetsJson); + return String.join("\n", featureSetsJson).getBytes(); } } diff --git a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java index fca17ff656..9f26c6919e 100644 --- a/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java +++ b/core/src/test/java/feast/core/job/dataflow/DataflowJobManagerTest.java @@ -40,13 +40,15 @@ import feast.core.config.FeastProperties.MetricsProperties; import feast.core.exception.JobExecutionException; import feast.core.job.Runner; +import feast.core.job.option.FeatureSetJsonByteConverter; import feast.core.model.*; -import feast.core.util.ProtoUtil; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; -import feast.ingestion.utils.CompressionUtil; +import feast.ingestion.options.OptionCompressor; import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -126,8 +128,11 @@ public void shouldStartJobWithCorrectPipelineOptions() throws IOException { expectedPipelineOptions.setAppName("DataflowJobManager"); expectedPipelineOptions.setJobName(jobName); expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); + + OptionCompressor> featureSetJsonCompressor = + new BZip2Compressor<>(new FeatureSetJsonByteConverter()); expectedPipelineOptions.setFeatureSetJson( - CompressionUtil.compress(ProtoUtil.toJson(Collections.singletonList(featureSet)))); + featureSetJsonCompressor.compress(Collections.singletonList(featureSet))); ArgumentCaptor captor = ArgumentCaptor.forClass(ImportOptions.class); diff --git a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java index 1d23ee1cbc..64412f4391 100644 --- a/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java +++ b/core/src/test/java/feast/core/job/direct/DirectRunnerJobManagerTest.java @@ -40,17 +40,19 @@ import feast.core.StoreProto.Store.Subscription; import feast.core.config.FeastProperties.MetricsProperties; import feast.core.job.Runner; +import feast.core.job.option.FeatureSetJsonByteConverter; import feast.core.model.FeatureSet; import feast.core.model.Job; import feast.core.model.JobStatus; import feast.core.model.Source; import feast.core.model.Store; -import feast.core.util.ProtoUtil; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; -import feast.ingestion.utils.CompressionUtil; +import feast.ingestion.options.OptionCompressor; import java.io.IOException; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.beam.runners.direct.DirectRunner; import org.apache.beam.sdk.PipelineResult; @@ -124,8 +126,11 @@ public void shouldStartDirectJobAndRegisterPipelineResult() throws IOException { expectedPipelineOptions.setProject(""); expectedPipelineOptions.setStoreJson(Lists.newArrayList(printer.print(store))); expectedPipelineOptions.setProject(""); + + OptionCompressor> featureSetJsonCompressor = + new BZip2Compressor<>(new FeatureSetJsonByteConverter()); expectedPipelineOptions.setFeatureSetJson( - CompressionUtil.compress(ProtoUtil.toJson(Collections.singletonList(featureSet)))); + featureSetJsonCompressor.compress(Collections.singletonList(featureSet))); String expectedJobId = "feast-job-0"; ArgumentCaptor pipelineOptionsCaptor = diff --git a/core/src/test/java/feast/core/util/ProtoUtilTest.java b/core/src/test/java/feast/core/job/option/FeatureSetJsonByteConverterTest.java similarity index 85% rename from core/src/test/java/feast/core/util/ProtoUtilTest.java rename to core/src/test/java/feast/core/job/option/FeatureSetJsonByteConverterTest.java index beab9a6fbd..2dfeef1d96 100644 --- a/core/src/test/java/feast/core/util/ProtoUtilTest.java +++ b/core/src/test/java/feast/core/job/option/FeatureSetJsonByteConverterTest.java @@ -14,20 +14,20 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.core.util; +package feast.core.job.option; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; +import com.google.protobuf.InvalidProtocolBufferException; import feast.core.FeatureSetProto; import feast.core.SourceProto; import feast.types.ValueProto; -import java.io.IOException; import java.util.List; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.junit.Test; -public class ProtoUtilTest { +public class FeatureSetJsonByteConverterTest { private FeatureSetProto.FeatureSet newFeatureSet(Integer version, Integer numberOfFeatures) { List features = @@ -60,7 +60,7 @@ private FeatureSetProto.FeatureSet newFeatureSet(Integer version, Integer number } @Test - public void testConversionToJson() throws IOException { + public void shouldConvertFeatureSetsAsJsonStringBytes() throws InvalidProtocolBufferException { int nrOfFeatureSet = 1; int nrOfFeatures = 1; List featureSets = @@ -68,7 +68,7 @@ public void testConversionToJson() throws IOException { .mapToObj(i -> newFeatureSet(i, nrOfFeatures)) .collect(Collectors.toList()); - String expectedOutput = + String expectedOutputString = "{\"version\":1," + "\"entities\":[{\"name\":\"entity\",\"valueType\":2}]," + "\"features\":[{\"name\":\"feature1\",\"valueType\":6}]," @@ -77,6 +77,7 @@ public void testConversionToJson() throws IOException { + "\"kafkaSourceConfig\":{" + "\"bootstrapServers\":\"somebrokers:9092\"," + "\"topic\":\"sometopic\"}}}"; - assertEquals(expectedOutput, ProtoUtil.toJson(featureSets)); + FeatureSetJsonByteConverter byteConverter = new FeatureSetJsonByteConverter(); + assertEquals(expectedOutputString, new String(byteConverter.toByte(featureSets))); } } diff --git a/ingestion/src/main/java/feast/ingestion/ImportJob.java b/ingestion/src/main/java/feast/ingestion/ImportJob.java index 11619e66d4..c4973ce3ca 100644 --- a/ingestion/src/main/java/feast/ingestion/ImportJob.java +++ b/ingestion/src/main/java/feast/ingestion/ImportJob.java @@ -22,13 +22,14 @@ import feast.core.FeatureSetProto.FeatureSet; import feast.core.SourceProto.Source; import feast.core.StoreProto.Store; +import feast.ingestion.options.BZip2Decompressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.StringListStreamConverter; import feast.ingestion.transform.ReadFromSource; import feast.ingestion.transform.ValidateFeatureRows; import feast.ingestion.transform.WriteFailedElementToBigQuery; import feast.ingestion.transform.WriteToStore; import feast.ingestion.transform.metrics.WriteMetricsTransform; -import feast.ingestion.utils.CompressionUtil; import feast.ingestion.utils.ResourceUtil; import feast.ingestion.utils.SpecUtil; import feast.ingestion.utils.StoreUtil; @@ -81,8 +82,9 @@ public static PipelineResult runPipeline(ImportOptions options) throws IOExcepti log.info("Starting import job with settings: \n{}", options.toString()); - List featureSetJson = - CompressionUtil.decompressAsListOfString(options.getFeatureSetJson()); + BZip2Decompressor> decompressor = + new BZip2Decompressor<>(new StringListStreamConverter()); + List featureSetJson = decompressor.decompress(options.getFeatureSetJson()); List featureSets = SpecUtil.parseFeatureSetSpecJsonList(featureSetJson); List stores = SpecUtil.parseStoreJsonList(options.getStoreJson()); diff --git a/ingestion/src/main/java/feast/ingestion/utils/CompressionUtil.java b/ingestion/src/main/java/feast/ingestion/options/BZip2Compressor.java similarity index 55% rename from ingestion/src/main/java/feast/ingestion/utils/CompressionUtil.java rename to ingestion/src/main/java/feast/ingestion/options/BZip2Compressor.java index 12b81f9d87..b7e4e6ee0a 100644 --- a/ingestion/src/main/java/feast/ingestion/utils/CompressionUtil.java +++ b/ingestion/src/main/java/feast/ingestion/options/BZip2Compressor.java @@ -14,30 +14,32 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.ingestion.utils; +package feast.ingestion.options; -import java.io.*; -import java.util.List; -import java.util.stream.Collectors; -import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; -public class CompressionUtil { +public class BZip2Compressor implements OptionCompressor { - public static List decompressAsListOfString(byte[] compressedFeatureSets) - throws IOException { - try (ByteArrayInputStream inputStream = new ByteArrayInputStream(compressedFeatureSets); - BZip2CompressorInputStream bzip2Input = new BZip2CompressorInputStream(inputStream); - BufferedReader reader = new BufferedReader(new InputStreamReader(bzip2Input)); ) { - return reader.lines().collect(Collectors.toList()); - } - } + private final OptionByteConverter byteConverter; - public static byte[] compress(String origStr) throws IOException { + public BZip2Compressor(OptionByteConverter byteConverter) { + this.byteConverter = byteConverter; + } + /** + * Compress pipeline option using BZip2 + * + * @param option Pipeline option value + * @return BZip2 compressed option value + * @throws IOException + */ + @Override + public byte[] compress(T option) throws IOException { ByteArrayOutputStream compressedStream = new ByteArrayOutputStream(); try (BZip2CompressorOutputStream bzip2Output = new BZip2CompressorOutputStream(compressedStream)) { - bzip2Output.write(origStr.getBytes()); + bzip2Output.write(byteConverter.toByte(option)); } return compressedStream.toByteArray(); diff --git a/ingestion/src/main/java/feast/ingestion/options/BZip2Decompressor.java b/ingestion/src/main/java/feast/ingestion/options/BZip2Decompressor.java new file mode 100644 index 0000000000..ce49c1be6e --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/BZip2Decompressor.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; + +public class BZip2Decompressor implements OptionDecompressor { + + private final InputStreamConverter inputStreamConverter; + + public BZip2Decompressor(InputStreamConverter inputStreamConverter) { + this.inputStreamConverter = inputStreamConverter; + } + + @Override + public T decompress(byte[] compressed) throws IOException { + try (ByteArrayInputStream inputStream = new ByteArrayInputStream(compressed); + BZip2CompressorInputStream bzip2Input = new BZip2CompressorInputStream(inputStream)) { + return inputStreamConverter.readStream(bzip2Input); + } + } +} diff --git a/ingestion/src/main/java/feast/ingestion/options/InputStreamConverter.java b/ingestion/src/main/java/feast/ingestion/options/InputStreamConverter.java new file mode 100644 index 0000000000..e2fef73236 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/InputStreamConverter.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.IOException; +import java.io.InputStream; + +public interface InputStreamConverter { + + /** + * Used in conjunction with {@link OptionDecompressor} to decompress the pipeline option + * + * @param inputStream Input byte stream in compressed format + * @return Decompressed pipeline option value + */ + T readStream(InputStream inputStream) throws IOException; +} diff --git a/ingestion/src/main/java/feast/ingestion/options/OptionByteConverter.java b/ingestion/src/main/java/feast/ingestion/options/OptionByteConverter.java new file mode 100644 index 0000000000..ff5a41a627 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/OptionByteConverter.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.IOException; + +public interface OptionByteConverter { + + /** + * Used in conjunction with {@link OptionCompressor} to compress the pipeline option + * + * @param option Pipeline option value + * @return byte representation of the pipeline option value, without compression. + */ + byte[] toByte(T option) throws IOException; +} diff --git a/ingestion/src/main/java/feast/ingestion/options/OptionCompressor.java b/ingestion/src/main/java/feast/ingestion/options/OptionCompressor.java new file mode 100644 index 0000000000..b2345fc3eb --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/OptionCompressor.java @@ -0,0 +1,31 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.IOException; + +public interface OptionCompressor { + + /** + * Compress pipeline option into bytes format. This is necessary as some Beam runner has + * limitation in terms of pipeline option size. + * + * @param option Pipeline option value + * @return Compressed values of the option, as byte array + */ + byte[] compress(T option) throws IOException; +} diff --git a/ingestion/src/main/java/feast/ingestion/options/OptionDecompressor.java b/ingestion/src/main/java/feast/ingestion/options/OptionDecompressor.java new file mode 100644 index 0000000000..affeafdaa0 --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/OptionDecompressor.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.IOException; + +public interface OptionDecompressor { + + /** + * Decompress pipeline option from byte array. + * + * @param compressed Compressed pipeline option value + * @return Decompressed pipeline option + */ + T decompress(byte[] compressed) throws IOException; +} diff --git a/ingestion/src/main/java/feast/ingestion/options/StringListStreamConverter.java b/ingestion/src/main/java/feast/ingestion/options/StringListStreamConverter.java new file mode 100644 index 0000000000..d7277f3c7d --- /dev/null +++ b/ingestion/src/main/java/feast/ingestion/options/StringListStreamConverter.java @@ -0,0 +1,41 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.List; +import java.util.stream.Collectors; + +public class StringListStreamConverter implements InputStreamConverter> { + + /** + * Convert Input byte stream to newline separated strings + * + * @param inputStream Input byte stream + * @return List of string + */ + @Override + public List readStream(InputStream inputStream) throws IOException { + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + List stringList = reader.lines().collect(Collectors.toList()); + reader.close(); + return stringList; + } +} diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 08cfc79ea5..8b0c0cbb34 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -31,13 +31,13 @@ import feast.core.StoreProto.Store.StoreType; import feast.core.StoreProto.Store.Subscription; import feast.ingestion.options.ImportOptions; -import feast.ingestion.utils.CompressionUtil; import feast.storage.RedisProto.RedisKey; import feast.test.TestUtil; import feast.test.TestUtil.LocalKafka; import feast.test.TestUtil.LocalRedis; import feast.types.FeatureRowProto.FeatureRow; import feast.types.ValueProto.ValueType.Enum; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; @@ -49,6 +49,7 @@ import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.PipelineResult.State; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.joda.time.Duration; import org.junit.AfterClass; @@ -165,7 +166,12 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() ImportOptions options = PipelineOptionsFactory.create().as(ImportOptions.class); JsonFormat.Printer printer = JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); - options.setFeatureSetJson(CompressionUtil.compress(printer.print(spec))); + ByteArrayOutputStream compressedStream = new ByteArrayOutputStream(); + try (BZip2CompressorOutputStream bzip2Output = + new BZip2CompressorOutputStream(compressedStream)) { + bzip2Output.write(printer.print(spec).getBytes()); + } + options.setFeatureSetJson(compressedStream.toByteArray()); options.setStoreJson(Collections.singletonList(JsonFormat.printer().print(redis))); options.setProject(""); options.setBlockOnRun(false); diff --git a/ingestion/src/test/java/feast/ingestion/options/BZip2CompressorTest.java b/ingestion/src/test/java/feast/ingestion/options/BZip2CompressorTest.java new file mode 100644 index 0000000000..cd03b18c79 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/options/BZip2CompressorTest.java @@ -0,0 +1,40 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStreamReader; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream; +import org.junit.Assert; +import org.junit.Test; + +public class BZip2CompressorTest { + + @Test + public void shouldHavBZip2CompatibleOutput() throws IOException { + BZip2Compressor compressor = new BZip2Compressor<>(String::getBytes); + String origString = "somestring"; + try (ByteArrayInputStream inputStream = + new ByteArrayInputStream(compressor.compress(origString)); + BZip2CompressorInputStream bzip2Input = new BZip2CompressorInputStream(inputStream); + BufferedReader reader = new BufferedReader(new InputStreamReader(bzip2Input))) { + Assert.assertEquals(origString, reader.readLine()); + } + } +} diff --git a/ingestion/src/test/java/feast/ingestion/options/BZip2DecompressorTest.java b/ingestion/src/test/java/feast/ingestion/options/BZip2DecompressorTest.java new file mode 100644 index 0000000000..fe7cc789d8 --- /dev/null +++ b/ingestion/src/test/java/feast/ingestion/options/BZip2DecompressorTest.java @@ -0,0 +1,48 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright 2018-2020 The Feast Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package feast.ingestion.options; + +import static org.junit.Assert.*; + +import java.io.*; +import org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream; +import org.junit.Test; + +public class BZip2DecompressorTest { + + @Test + public void shouldDecompressBZip2Stream() throws IOException { + BZip2Decompressor decompressor = + new BZip2Decompressor<>( + inputStream -> { + BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream)); + String output = reader.readLine(); + reader.close(); + return output; + }); + + String originalString = "abc"; + ByteArrayOutputStream compressedStream = new ByteArrayOutputStream(); + try (BZip2CompressorOutputStream bzip2Output = + new BZip2CompressorOutputStream(compressedStream)) { + bzip2Output.write(originalString.getBytes()); + } + + String decompressedString = decompressor.decompress(compressedStream.toByteArray()); + assertEquals(originalString, decompressedString); + } +} diff --git a/ingestion/src/test/java/feast/ingestion/utils/CompressionUtilTest.java b/ingestion/src/test/java/feast/ingestion/options/StringListStreamConverterTest.java similarity index 57% rename from ingestion/src/test/java/feast/ingestion/utils/CompressionUtilTest.java rename to ingestion/src/test/java/feast/ingestion/options/StringListStreamConverterTest.java index 63b964209a..5ce9f054bc 100644 --- a/ingestion/src/test/java/feast/ingestion/utils/CompressionUtilTest.java +++ b/ingestion/src/test/java/feast/ingestion/options/StringListStreamConverterTest.java @@ -14,23 +14,23 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package feast.ingestion.utils; +package feast.ingestion.options; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; +import java.io.ByteArrayInputStream; import java.io.IOException; +import java.io.InputStream; import java.util.Arrays; -import java.util.List; import org.junit.Test; -public class CompressionUtilTest { +public class StringListStreamConverterTest { @Test - public void testCompressAndDecompression() throws IOException { - String origString = "abc\ndef"; - byte[] compressedString = CompressionUtil.compress(origString); - List decompressedFeatureSets = - CompressionUtil.decompressAsListOfString(compressedString); - assertEquals(Arrays.asList("abc", "def"), decompressedFeatureSets); + public void shouldReadStreamAsNewlineSeparatedStrings() throws IOException { + StringListStreamConverter converter = new StringListStreamConverter(); + String originalString = "abc\ndef"; + InputStream stringStream = new ByteArrayInputStream(originalString.getBytes()); + assertEquals(Arrays.asList("abc", "def"), converter.readStream(stringStream)); } } From 813b4a738ac19b09a6b38cb5f047e9eb77ed1ced Mon Sep 17 00:00:00 2001 From: Shu Heng Date: Wed, 12 Feb 2020 11:23:13 +0800 Subject: [PATCH 3/3] Avoid code duplication in test --- .../test/java/feast/ingestion/ImportJobTest.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java index 8b0c0cbb34..58ecae8f04 100644 --- a/ingestion/src/test/java/feast/ingestion/ImportJobTest.java +++ b/ingestion/src/test/java/feast/ingestion/ImportJobTest.java @@ -30,7 +30,9 @@ import feast.core.StoreProto.Store.RedisConfig; import feast.core.StoreProto.Store.StoreType; import feast.core.StoreProto.Store.Subscription; +import feast.ingestion.options.BZip2Compressor; import feast.ingestion.options.ImportOptions; +import feast.ingestion.options.OptionByteConverter; import feast.storage.RedisProto.RedisKey; import feast.test.TestUtil; import feast.test.TestUtil.LocalKafka; @@ -164,14 +166,12 @@ public void runPipeline_ShouldWriteToRedisCorrectlyGivenValidSpecAndFeatureRow() .build(); ImportOptions options = PipelineOptionsFactory.create().as(ImportOptions.class); - JsonFormat.Printer printer = - JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); - ByteArrayOutputStream compressedStream = new ByteArrayOutputStream(); - try (BZip2CompressorOutputStream bzip2Output = - new BZip2CompressorOutputStream(compressedStream)) { - bzip2Output.write(printer.print(spec).getBytes()); - } - options.setFeatureSetJson(compressedStream.toByteArray()); + BZip2Compressor compressor = new BZip2Compressor<>(option -> { + JsonFormat.Printer printer = + JsonFormat.printer().omittingInsignificantWhitespace().printingEnumsAsInts(); + return printer.print(option).getBytes(); + }); + options.setFeatureSetJson(compressor.compress(spec)); options.setStoreJson(Collections.singletonList(JsonFormat.printer().print(redis))); options.setProject(""); options.setBlockOnRun(false);