From 913e7c91ac6c2c26168098788638c5c68a0223fe Mon Sep 17 00:00:00 2001 From: Chen Zhiling Date: Sat, 18 Jan 2020 17:07:43 +0800 Subject: [PATCH] Add documentation for bigquery batch retrieval (#428) * Add documentation for bigquery batch retrieval * Fix formatting for multiline comments --- .../bigquery/BatchRetrievalQueryRunnable.java | 32 ++++++++++++++++ .../store/bigquery/SubqueryCallable.java | 4 +- .../resources/templates/join_featuresets.sql | 3 ++ .../templates/single_featureset_pit_join.sql | 37 ++++++++++++++++++- 4 files changed, 72 insertions(+), 4 deletions(-) diff --git a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java index d437294dfc..e875de35a8 100644 --- a/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java +++ b/serving/src/main/java/feast/serving/store/bigquery/BatchRetrievalQueryRunnable.java @@ -52,6 +52,27 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +/** + * BatchRetrievalQueryRunnable is a Runnable for running a BigQuery Feast batch retrieval job async. + * + *

It does the following, in sequence: + * + *

1. Retrieve the temporal bounds of the entity dataset provided. This will be used to filter + * the feature set tables when performing the feature retrieval. + * + *

2. For each of the feature sets requested, generate the subquery for doing a point-in-time + * correctness join of the features in the feature set to the entity table. + * + *

3. Run each of the subqueries in parallel and wait for them to complete. If any of the jobs + * are unsuccessful, the thread running the BatchRetrievalQueryRunnable catches the error and + * updates the job database. + * + *

4. When all the subquery jobs are complete, join the outputs of all the subqueries into a + * single table. + * + *

5. Extract the output of the join to a remote file, and write the location of the remote file + * to the job database, and mark the retrieval job as successful. + */ @AutoValue public abstract class BatchRetrievalQueryRunnable implements Runnable { @@ -109,18 +130,22 @@ public abstract static class Builder { @Override public void run() { + // 1. Retrieve the temporal bounds of the entity dataset provided FieldValueList timestampLimits = getTimestampLimits(entityTableName()); + // 2. Generate the subqueries List featureSetQueries = generateQueries(timestampLimits); QueryJobConfiguration queryConfig; try { + // 3 & 4. Run the subqueries in parallel then collect the outputs Job queryJob = runBatchQuery(featureSetQueries); queryConfig = queryJob.getConfiguration(); String exportTableDestinationUri = String.format("%s/%s/*.avro", jobStagingLocation(), feastJobId()); + // 5. Export the table // Hardcode the format to Avro for now ExtractJobConfiguration extractConfig = ExtractJobConfiguration.of( @@ -141,6 +166,7 @@ public void run() { List fileUris = parseOutputFileURIs(); + // 5. Update the job database jobService() .upsert( ServingAPIProto.Job.newBuilder() @@ -181,6 +207,8 @@ Job runBatchQuery(List featureSetQueries) List featureSetInfos = new ArrayList<>(); + // For each of the feature sets requested, start an async job joining the features in that + // feature set to the provided entity table for (int i = 0; i < featureSetQueries.size(); i++) { QueryJobConfiguration queryJobConfig = QueryJobConfiguration.newBuilder(featureSetQueries.get(i)) @@ -197,6 +225,8 @@ Job runBatchQuery(List featureSetQueries) for (int i = 0; i < featureSetQueries.size(); i++) { try { + // Try to retrieve the outputs of all the jobs. The timeout here is a formality; + // a stricter timeout is implemented in the actual SubqueryCallable. FeatureSetInfo featureSetInfo = executorCompletionService.take().get(SUBQUERY_TIMEOUT_SECS, TimeUnit.SECONDS); featureSetInfos.add(featureSetInfo); @@ -218,6 +248,8 @@ Job runBatchQuery(List featureSetQueries) } } + // Generate and run a join query to collect the outputs of all the + // subqueries into a single table. String joinQuery = QueryTemplater.createJoinQuery( featureSetInfos, entityTableColumnNames(), entityTableName()); diff --git a/serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java b/serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java index e0b8f45798..14026030b4 100644 --- a/serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java +++ b/serving/src/main/java/feast/serving/store/bigquery/SubqueryCallable.java @@ -30,8 +30,8 @@ import java.util.concurrent.Callable; /** - * Waits for a bigquery job to complete; when complete, it updates the feature set info with the - * output table name, as well as increments the completed jobs counter in the query job listener. + * Waits for a point-in-time correctness join to complete. On completion, returns a featureSetInfo + * updated with the reference to the table containing the results of the query. */ @AutoValue public abstract class SubqueryCallable implements Callable { diff --git a/serving/src/main/resources/templates/join_featuresets.sql b/serving/src/main/resources/templates/join_featuresets.sql index e57b0c1031..60b7c7d7a1 100644 --- a/serving/src/main/resources/templates/join_featuresets.sql +++ b/serving/src/main/resources/templates/join_featuresets.sql @@ -1,3 +1,6 @@ +/* + Joins the outputs of multiple point-in-time-correctness joins to a single table. + */ WITH joined as ( SELECT * FROM `{{ leftTableName }}` {% for featureSet in featureSets %} diff --git a/serving/src/main/resources/templates/single_featureset_pit_join.sql b/serving/src/main/resources/templates/single_featureset_pit_join.sql index f667842185..1f4612b350 100644 --- a/serving/src/main/resources/templates/single_featureset_pit_join.sql +++ b/serving/src/main/resources/templates/single_featureset_pit_join.sql @@ -1,9 +1,24 @@ -WITH union_features AS (SELECT +/* + This query template performs the point-in-time correctness join for a single feature set table + to the provided entity table. + + 1. Concatenate the timestamp and entities from the feature set table with the entity dataset. + Feature values are joined to this table later for improved efficiency. + featureset_timestamp is equal to null in rows from the entity dataset. + */ +WITH union_features AS ( +SELECT + -- uuid is a unique identifier for each row in the entity dataset. Generated by `QueryTemplater.createEntityTableUUIDQuery` uuid, + -- event_timestamp contains the timestamps to join onto event_timestamp, + -- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp NULL as {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, + -- created timestamp of the feature at the corresponding feature_timestamp NULL as created_timestamp, + -- select only entities belonging to this feature set {{ featureSet.entities | join(', ')}}, + -- boolean for filtering the dataset later true AS is_entity_table FROM `{{leftTableName}}` UNION ALL @@ -15,7 +30,18 @@ SELECT {{ featureSet.entities | join(', ')}}, false AS is_entity_table FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second) -), joined AS ( +), +/* + 2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as + well as is_entity_table. + Within each window, back-fill the feature_timestamp - as a result of this, the null feature_timestamps + in the rows from the entity table should now contain the latest timestamps relative to the row's + event_timestamp. + + For rows where event_timestamp(provided datetime) - feature_timestamp > max age, set the + feature_timestamp to null. + */ +joined AS ( SELECT uuid, event_timestamp, @@ -34,6 +60,10 @@ SELECT FROM union_features WINDOW w AS (PARTITION BY {{ featureSet.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC, created_timestamp DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) ) +/* + 3. Select only the rows from the entity table, and join the features from the original feature set table + to the dataset using the entity values, feature_timestamp, and created_timestamps. + */ LEFT JOIN ( SELECT event_timestamp as {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, @@ -46,6 +76,9 @@ FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }} ) USING ({{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, created_timestamp, {{ featureSet.entities | join(', ')}}) WHERE is_entity_table ) +/* + 4. Finally, deduplicate the rows by selecting the first occurrence of each entity table row UUID. + */ SELECT k.* FROM (