Skip to content

Commit

Permalink
BQ Sink is failing when Feature consists of only null values (#853)
Browse files Browse the repository at this point in the history
* fix null values in bq writer

* fix only some null values
  • Loading branch information
Oleksii Moskalenko committed Jul 1, 2020
1 parent fdbc376 commit 9a037dc
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ private Schema inferCommonSchema(Iterable<FeatureRowProto.FeatureRow> featureRow
f -> {
Schema.FieldType fieldType =
protoToSchemaTypes.get(f.getValue().getValCase());
if (fieldType == null) {
return;
}
if (types.containsKey(f.getName())) {
if (!types.get(f.getName()).equals(fieldType)) {
throw new RuntimeException("schema cannot be inferred");
Expand Down Expand Up @@ -140,11 +143,14 @@ private void toColumnar(Iterable<FeatureRowProto.FeatureRow> featureRows) {
idx -> {
Schema.Field field = schema.getField(idx);
if (rowValues.containsKey(field.getName())) {
((List<Object>) values.get(idx))
.add(protoValueToObject(rowValues.get(field.getName())));
} else {
((List<Object>) values.get(idx)).add(defaultValues.get(field.getName()));
Object o = protoValueToObject(rowValues.get(field.getName()));
if (o != null) {
((List<Object>) values.get(idx)).add(o);
return;
}
}

((List<Object>) values.get(idx)).add(defaultValues.get(field.getName()));
});
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.cloud.bigquery.*;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import feast.common.models.FeatureSetReference;
import feast.proto.core.FeatureSetProto.EntitySpec;
import feast.proto.core.FeatureSetProto.FeatureSetSpec;
Expand Down Expand Up @@ -121,7 +123,8 @@ private FeatureRow generateRow(String featureSet) {
FeatureRow.Builder row =
FeatureRow.newBuilder()
.setFeatureSet(featureSet)
.addFields(field("entity", rd.nextInt(), ValueProto.ValueType.Enum.INT64));
.addFields(field("entity", rd.nextInt(), ValueProto.ValueType.Enum.INT64))
.addFields(FieldProto.Field.newBuilder().setName("null_value").build());

for (ValueProto.ValueType.Enum type : ValueProto.ValueType.Enum.values()) {
if (type == ValueProto.ValueType.Enum.INVALID
Expand Down Expand Up @@ -204,7 +207,9 @@ public void simpleInsert() {
FeatureSetReference.of(spec.getProject(), spec.getName(), 1), spec))));
PCollection<FeatureRow> successfulInserts =
p.apply(featureRowTestStream).apply(sink.writer()).getSuccessfulInserts();
PAssert.that(successfulInserts).containsInAnyOrder(row1, row2);

List<FeatureRow> inputWithoutNulls = dropNullFeature(ImmutableList.of(row1, row2));
PAssert.that(successfulInserts).containsInAnyOrder(inputWithoutNulls);
p.run();

assert jobService.getAllJobs().size() == 1;
Expand Down Expand Up @@ -288,7 +293,7 @@ public void expectingJobResult() {
PCollection<FeatureRow> inserts =
p.apply(featureRowTestStream).apply(writer).getSuccessfulInserts();

PAssert.that(inserts).containsInAnyOrder(featureRow);
PAssert.that(inserts).containsInAnyOrder(dropNullFeature(ImmutableList.of(featureRow)));

p.run();
}
Expand Down Expand Up @@ -446,16 +451,60 @@ public void featureRowCompressShouldPackAndUnpackSuccessfully() {

List<FeatureRow> input = Stream.concat(stream1, stream2).collect(Collectors.toList());

FeatureRow rowWithNull =
FeatureRow.newBuilder()
.setFeatureSet("project/fs")
.addAllFields(copyFieldsWithout(generateRow(""), "entity"))
.addFields(FieldProto.Field.newBuilder().setName("entity").build())
.build();

List<FeatureRow> inputWithNulls = Lists.newArrayList(input);
inputWithNulls.add(rowWithNull);

PCollection<FeatureRow> result =
p.apply(Create.of(input))
p.apply(Create.of(inputWithNulls))
.apply("KV", ParDo.of(new ExtractKV()))
.apply(new CompactFeatureRows(1000))
.apply("Flat", ParDo.of(new FlatMap()));

PAssert.that(result).containsInAnyOrder(input);
List<FeatureRow> inputWithoutNulls = dropNullFeature(input);

inputWithoutNulls.add(
FeatureRow.newBuilder()
.setFeatureSet("project/fs")
.addFields(
FieldProto.Field.newBuilder()
.setName("entity")
.setValue(ValueProto.Value.newBuilder().setInt64Val(0L).build())
.build())
.addAllFields(copyFieldsWithout(rowWithNull, "entity", "null_value"))
.build());

PAssert.that(result).containsInAnyOrder(inputWithoutNulls);
p.run();
}

private List<FeatureRow> dropNullFeature(List<FeatureRow> input) {
return input.stream()
.map(
r ->
FeatureRow.newBuilder()
.setFeatureSet(r.getFeatureSet())
.addAllFields(
r.getFieldsList().stream()
.filter(f -> !f.getName().equals("null_value"))
.collect(Collectors.toList()))
.build())
.collect(Collectors.toList());
}

private List<FieldProto.Field> copyFieldsWithout(FeatureRow row, String... except) {
ArrayList<String> exclude = Lists.newArrayList(except);
return row.getFieldsList().stream()
.filter(f -> !exclude.contains(f.getName()))
.collect(Collectors.toList());
}

public static class TableAnswer implements Answer<Table>, Serializable {
TableId tableId;
TableDefinition tableDefinition;
Expand Down

0 comments on commit 9a037dc

Please sign in to comment.