Skip to content

Commit

Permalink
Set BigQuery table timepartition inside get table function (#333)
Browse files Browse the repository at this point in the history
* Fix BigQuery write setting timepartition outside of table reference

* Give core a bit more time to start up

* Rename GetTableReference to correctly reflect return type

* Increase kafka sleep time
  • Loading branch information
Chen Zhiling authored and feast-ci-bot committed Nov 27, 2019
1 parent 61d4c2d commit 57b574e
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 15 deletions.
4 changes: 2 additions & 2 deletions .prow/scripts/test-end-to-end.sh
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ nohup /tmp/kafka/bin/zookeeper-server-start.sh /tmp/kafka/config/zookeeper.prope
sleep 5
tail -n10 /var/log/zookeeper.log
nohup /tmp/kafka/bin/kafka-server-start.sh /tmp/kafka/config/server.properties &> /var/log/kafka.log 2>&1 &
sleep 5
sleep 10
tail -n10 /var/log/kafka.log

echo "
Expand Down Expand Up @@ -124,7 +124,7 @@ EOF
nohup java -jar core/target/feast-core-0.3.2-SNAPSHOT.jar \
--spring.config.location=file:///tmp/core.application.yml \
&> /var/log/feast-core.log &
sleep 20
sleep 30
tail -n10 /var/log/feast-core.log

echo "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import feast.ingestion.utils.ResourceUtil;
import feast.ingestion.values.FailedElement;
import feast.store.serving.bigquery.FeatureRowToTableRow;
import feast.store.serving.bigquery.GetTableDestination;
import feast.store.serving.redis.FeatureRowToRedisMutationDoFn;
import feast.store.serving.redis.RedisCustomIO;
import feast.types.FeatureRowProto.FeatureRow;
Expand All @@ -24,16 +25,12 @@
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.codehaus.jackson.JsonParser.Feature;
import org.slf4j.Logger;

@AutoValue
Expand Down Expand Up @@ -88,15 +85,8 @@ public PDone expand(PCollection<FeatureRow> input) {
.apply(
"WriteTableRowToBigQuery",
BigQueryIO.<FeatureRow>write()
.to((SerializableFunction<ValueInSingleWindow<FeatureRow>, TableDestination>) element -> {
String[] split = element.getValue().getFeatureSet().split(":");
return new TableDestination(String.format(
"%s:%s.%s_v%s",
bigqueryConfig.getProjectId(),
bigqueryConfig.getDatasetId(),
split[0],
split[1]), null);
})
.to(new GetTableDestination(bigqueryConfig.getProjectId(),
bigqueryConfig.getDatasetId()))
.withFormatFunction(new FeatureRowToTableRow(options.getJobName()))
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package feast.store.serving.bigquery;

import com.google.api.services.bigquery.model.TimePartitioning;
import feast.types.FeatureRowProto.FeatureRow;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.ValueInSingleWindow;

public class GetTableDestination implements
SerializableFunction<ValueInSingleWindow<FeatureRow>, TableDestination> {

private String projectId;
private String datasetId;

public GetTableDestination(String projectId, String datasetId) {
this.projectId = projectId;
this.datasetId = datasetId;
}

@Override
public TableDestination apply(ValueInSingleWindow<FeatureRow> input) {
String[] split = input.getValue().getFeatureSet().split(":");

TimePartitioning timePartitioning =
new TimePartitioning()
.setType("DAY")
.setField(FeatureRowToTableRow.getEventTimestampColumn());

return new TableDestination(
String.format("%s:%s.%s_v%s", projectId, datasetId, split[0], split[1]),
String
.format("Feast table for %s", input.getValue().getFeatureSet()),
timePartitioning);
}
}

0 comments on commit 57b574e

Please sign in to comment.