diff --git a/.prow/scripts/test-end-to-end.sh b/.prow/scripts/test-end-to-end.sh index 592a3a59cd..88917bf3be 100755 --- a/.prow/scripts/test-end-to-end.sh +++ b/.prow/scripts/test-end-to-end.sh @@ -59,7 +59,7 @@ nohup /tmp/kafka/bin/zookeeper-server-start.sh /tmp/kafka/config/zookeeper.prope sleep 5 tail -n10 /var/log/zookeeper.log nohup /tmp/kafka/bin/kafka-server-start.sh /tmp/kafka/config/server.properties &> /var/log/kafka.log 2>&1 & -sleep 5 +sleep 10 tail -n10 /var/log/kafka.log echo " @@ -124,7 +124,7 @@ EOF nohup java -jar core/target/feast-core-0.3.2-SNAPSHOT.jar \ --spring.config.location=file:///tmp/core.application.yml \ &> /var/log/feast-core.log & -sleep 20 +sleep 30 tail -n10 /var/log/feast-core.log echo " diff --git a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java index b23d3d4046..45a26bbed7 100644 --- a/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java +++ b/ingestion/src/main/java/feast/ingestion/transform/WriteToStore.java @@ -13,6 +13,7 @@ import feast.ingestion.utils.ResourceUtil; import feast.ingestion.values.FailedElement; import feast.store.serving.bigquery.FeatureRowToTableRow; +import feast.store.serving.bigquery.GetTableDestination; import feast.store.serving.redis.FeatureRowToRedisMutationDoFn; import feast.store.serving.redis.RedisCustomIO; import feast.types.FeatureRowProto.FeatureRow; @@ -24,16 +25,12 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError; import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy; -import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; import org.apache.beam.sdk.io.gcp.bigquery.WriteResult; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.ValueInSingleWindow; -import org.codehaus.jackson.JsonParser.Feature; import org.slf4j.Logger; @AutoValue @@ -88,15 +85,8 @@ public PDone expand(PCollection input) { .apply( "WriteTableRowToBigQuery", BigQueryIO.write() - .to((SerializableFunction, TableDestination>) element -> { - String[] split = element.getValue().getFeatureSet().split(":"); - return new TableDestination(String.format( - "%s:%s.%s_v%s", - bigqueryConfig.getProjectId(), - bigqueryConfig.getDatasetId(), - split[0], - split[1]), null); - }) + .to(new GetTableDestination(bigqueryConfig.getProjectId(), + bigqueryConfig.getDatasetId())) .withFormatFunction(new FeatureRowToTableRow(options.getJobName())) .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(WriteDisposition.WRITE_APPEND) diff --git a/ingestion/src/main/java/feast/store/serving/bigquery/GetTableDestination.java b/ingestion/src/main/java/feast/store/serving/bigquery/GetTableDestination.java new file mode 100644 index 0000000000..14517ffd81 --- /dev/null +++ b/ingestion/src/main/java/feast/store/serving/bigquery/GetTableDestination.java @@ -0,0 +1,35 @@ +package feast.store.serving.bigquery; + +import com.google.api.services.bigquery.model.TimePartitioning; +import feast.types.FeatureRowProto.FeatureRow; +import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.ValueInSingleWindow; + +public class GetTableDestination implements + SerializableFunction, TableDestination> { + + private String projectId; + private String datasetId; + + public GetTableDestination(String projectId, String datasetId) { + this.projectId = projectId; + this.datasetId = datasetId; + } + + @Override + public TableDestination apply(ValueInSingleWindow input) { + String[] split = input.getValue().getFeatureSet().split(":"); + + TimePartitioning timePartitioning = + new TimePartitioning() + .setType("DAY") + .setField(FeatureRowToTableRow.getEventTimestampColumn()); + + return new TableDestination( + String.format("%s:%s.%s_v%s", projectId, datasetId, split[0], split[1]), + String + .format("Feast table for %s", input.getValue().getFeatureSet()), + timePartitioning); + } +} \ No newline at end of file