From 9a037dc7bfe81c61a76976d17f3b23517d227498 Mon Sep 17 00:00:00 2001 From: Oleksii Moskalenko Date: Wed, 1 Jul 2020 15:36:01 +0300 Subject: [PATCH] BQ Sink is failing when Feature consists of only null values (#853) * fix null values in bq writer * fix only some null values --- .../compression/FeatureRowsBatch.java | 14 +++-- .../bigquery/writer/BigQuerySinkTest.java | 59 +++++++++++++++++-- 2 files changed, 64 insertions(+), 9 deletions(-) diff --git a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/compression/FeatureRowsBatch.java b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/compression/FeatureRowsBatch.java index 67b48a4cd1..ee5b1d67bc 100644 --- a/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/compression/FeatureRowsBatch.java +++ b/storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/compression/FeatureRowsBatch.java @@ -92,6 +92,9 @@ private Schema inferCommonSchema(Iterable featureRow f -> { Schema.FieldType fieldType = protoToSchemaTypes.get(f.getValue().getValCase()); + if (fieldType == null) { + return; + } if (types.containsKey(f.getName())) { if (!types.get(f.getName()).equals(fieldType)) { throw new RuntimeException("schema cannot be inferred"); @@ -140,11 +143,14 @@ private void toColumnar(Iterable featureRows) { idx -> { Schema.Field field = schema.getField(idx); if (rowValues.containsKey(field.getName())) { - ((List) values.get(idx)) - .add(protoValueToObject(rowValues.get(field.getName()))); - } else { - ((List) values.get(idx)).add(defaultValues.get(field.getName())); + Object o = protoValueToObject(rowValues.get(field.getName())); + if (o != null) { + ((List) values.get(idx)).add(o); + return; + } } + + ((List) values.get(idx)).add(defaultValues.get(field.getName())); }); }); } diff --git a/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java b/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java index 6c95c6b0c0..18c453ed67 100644 --- a/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java +++ b/storage/connectors/bigquery/src/test/java/feast/storage/connectors/bigquery/writer/BigQuerySinkTest.java @@ -29,8 +29,10 @@ import com.google.api.services.bigquery.model.TableFieldSchema; import com.google.api.services.bigquery.model.TableReference; import com.google.cloud.bigquery.*; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterators; +import com.google.common.collect.Lists; import feast.common.models.FeatureSetReference; import feast.proto.core.FeatureSetProto.EntitySpec; import feast.proto.core.FeatureSetProto.FeatureSetSpec; @@ -121,7 +123,8 @@ private FeatureRow generateRow(String featureSet) { FeatureRow.Builder row = FeatureRow.newBuilder() .setFeatureSet(featureSet) - .addFields(field("entity", rd.nextInt(), ValueProto.ValueType.Enum.INT64)); + .addFields(field("entity", rd.nextInt(), ValueProto.ValueType.Enum.INT64)) + .addFields(FieldProto.Field.newBuilder().setName("null_value").build()); for (ValueProto.ValueType.Enum type : ValueProto.ValueType.Enum.values()) { if (type == ValueProto.ValueType.Enum.INVALID @@ -204,7 +207,9 @@ public void simpleInsert() { FeatureSetReference.of(spec.getProject(), spec.getName(), 1), spec)))); PCollection successfulInserts = p.apply(featureRowTestStream).apply(sink.writer()).getSuccessfulInserts(); - PAssert.that(successfulInserts).containsInAnyOrder(row1, row2); + + List inputWithoutNulls = dropNullFeature(ImmutableList.of(row1, row2)); + PAssert.that(successfulInserts).containsInAnyOrder(inputWithoutNulls); p.run(); assert jobService.getAllJobs().size() == 1; @@ -288,7 +293,7 @@ public void expectingJobResult() { PCollection inserts = p.apply(featureRowTestStream).apply(writer).getSuccessfulInserts(); - PAssert.that(inserts).containsInAnyOrder(featureRow); + PAssert.that(inserts).containsInAnyOrder(dropNullFeature(ImmutableList.of(featureRow))); p.run(); } @@ -446,16 +451,60 @@ public void featureRowCompressShouldPackAndUnpackSuccessfully() { List input = Stream.concat(stream1, stream2).collect(Collectors.toList()); + FeatureRow rowWithNull = + FeatureRow.newBuilder() + .setFeatureSet("project/fs") + .addAllFields(copyFieldsWithout(generateRow(""), "entity")) + .addFields(FieldProto.Field.newBuilder().setName("entity").build()) + .build(); + + List inputWithNulls = Lists.newArrayList(input); + inputWithNulls.add(rowWithNull); + PCollection result = - p.apply(Create.of(input)) + p.apply(Create.of(inputWithNulls)) .apply("KV", ParDo.of(new ExtractKV())) .apply(new CompactFeatureRows(1000)) .apply("Flat", ParDo.of(new FlatMap())); - PAssert.that(result).containsInAnyOrder(input); + List inputWithoutNulls = dropNullFeature(input); + + inputWithoutNulls.add( + FeatureRow.newBuilder() + .setFeatureSet("project/fs") + .addFields( + FieldProto.Field.newBuilder() + .setName("entity") + .setValue(ValueProto.Value.newBuilder().setInt64Val(0L).build()) + .build()) + .addAllFields(copyFieldsWithout(rowWithNull, "entity", "null_value")) + .build()); + + PAssert.that(result).containsInAnyOrder(inputWithoutNulls); p.run(); } + private List dropNullFeature(List input) { + return input.stream() + .map( + r -> + FeatureRow.newBuilder() + .setFeatureSet(r.getFeatureSet()) + .addAllFields( + r.getFieldsList().stream() + .filter(f -> !f.getName().equals("null_value")) + .collect(Collectors.toList())) + .build()) + .collect(Collectors.toList()); + } + + private List copyFieldsWithout(FeatureRow row, String... except) { + ArrayList exclude = Lists.newArrayList(except); + return row.getFieldsList().stream() + .filter(f -> !exclude.contains(f.getName())) + .collect(Collectors.toList()); + } + public static class TableAnswer implements Answer, Serializable { TableId tableId; TableDefinition tableDefinition;