Skip to content

Commit

Permalink
Add documentation for bigquery batch retrieval (#428)
Browse files Browse the repository at this point in the history
* Add documentation for bigquery batch retrieval

* Fix formatting for multiline comments
  • Loading branch information
Chen Zhiling authored and feast-ci-bot committed Jan 18, 2020
1 parent 5fcc30f commit 913e7c9
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,27 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

/**
* BatchRetrievalQueryRunnable is a Runnable for running a BigQuery Feast batch retrieval job async.
*
* <p>It does the following, in sequence:
*
* <p>1. Retrieve the temporal bounds of the entity dataset provided. This will be used to filter
* the feature set tables when performing the feature retrieval.
*
* <p>2. For each of the feature sets requested, generate the subquery for doing a point-in-time
* correctness join of the features in the feature set to the entity table.
*
* <p>3. Run each of the subqueries in parallel and wait for them to complete. If any of the jobs
* are unsuccessful, the thread running the BatchRetrievalQueryRunnable catches the error and
* updates the job database.
*
* <p>4. When all the subquery jobs are complete, join the outputs of all the subqueries into a
* single table.
*
* <p>5. Extract the output of the join to a remote file, and write the location of the remote file
* to the job database, and mark the retrieval job as successful.
*/
@AutoValue
public abstract class BatchRetrievalQueryRunnable implements Runnable {

Expand Down Expand Up @@ -109,18 +130,22 @@ public abstract static class Builder {
@Override
public void run() {

// 1. Retrieve the temporal bounds of the entity dataset provided
FieldValueList timestampLimits = getTimestampLimits(entityTableName());

// 2. Generate the subqueries
List<String> featureSetQueries = generateQueries(timestampLimits);

QueryJobConfiguration queryConfig;

try {
// 3 & 4. Run the subqueries in parallel then collect the outputs
Job queryJob = runBatchQuery(featureSetQueries);
queryConfig = queryJob.getConfiguration();
String exportTableDestinationUri =
String.format("%s/%s/*.avro", jobStagingLocation(), feastJobId());

// 5. Export the table
// Hardcode the format to Avro for now
ExtractJobConfiguration extractConfig =
ExtractJobConfiguration.of(
Expand All @@ -141,6 +166,7 @@ public void run() {

List<String> fileUris = parseOutputFileURIs();

// 5. Update the job database
jobService()
.upsert(
ServingAPIProto.Job.newBuilder()
Expand Down Expand Up @@ -181,6 +207,8 @@ Job runBatchQuery(List<String> featureSetQueries)

List<FeatureSetInfo> featureSetInfos = new ArrayList<>();

// For each of the feature sets requested, start an async job joining the features in that
// feature set to the provided entity table
for (int i = 0; i < featureSetQueries.size(); i++) {
QueryJobConfiguration queryJobConfig =
QueryJobConfiguration.newBuilder(featureSetQueries.get(i))
Expand All @@ -197,6 +225,8 @@ Job runBatchQuery(List<String> featureSetQueries)

for (int i = 0; i < featureSetQueries.size(); i++) {
try {
// Try to retrieve the outputs of all the jobs. The timeout here is a formality;
// a stricter timeout is implemented in the actual SubqueryCallable.
FeatureSetInfo featureSetInfo =
executorCompletionService.take().get(SUBQUERY_TIMEOUT_SECS, TimeUnit.SECONDS);
featureSetInfos.add(featureSetInfo);
Expand All @@ -218,6 +248,8 @@ Job runBatchQuery(List<String> featureSetQueries)
}
}

// Generate and run a join query to collect the outputs of all the
// subqueries into a single table.
String joinQuery =
QueryTemplater.createJoinQuery(
featureSetInfos, entityTableColumnNames(), entityTableName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@
import java.util.concurrent.Callable;

/**
* Waits for a bigquery job to complete; when complete, it updates the feature set info with the
* output table name, as well as increments the completed jobs counter in the query job listener.
* Waits for a point-in-time correctness join to complete. On completion, returns a featureSetInfo
* updated with the reference to the table containing the results of the query.
*/
@AutoValue
public abstract class SubqueryCallable implements Callable<FeatureSetInfo> {
Expand Down
3 changes: 3 additions & 0 deletions serving/src/main/resources/templates/join_featuresets.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
/*
Joins the outputs of multiple point-in-time-correctness joins to a single table.
*/
WITH joined as (
SELECT * FROM `{{ leftTableName }}`
{% for featureSet in featureSets %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,24 @@
WITH union_features AS (SELECT
/*
This query template performs the point-in-time correctness join for a single feature set table
to the provided entity table.
1. Concatenate the timestamp and entities from the feature set table with the entity dataset.
Feature values are joined to this table later for improved efficiency.
featureset_timestamp is equal to null in rows from the entity dataset.
*/
WITH union_features AS (
SELECT
-- uuid is a unique identifier for each row in the entity dataset. Generated by `QueryTemplater.createEntityTableUUIDQuery`
uuid,
-- event_timestamp contains the timestamps to join onto
event_timestamp,
-- the feature_timestamp, i.e. the latest occurrence of the requested feature relative to the entity_dataset timestamp
NULL as {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
-- created timestamp of the feature at the corresponding feature_timestamp
NULL as created_timestamp,
-- select only entities belonging to this feature set
{{ featureSet.entities | join(', ')}},
-- boolean for filtering the dataset later
true AS is_entity_table
FROM `{{leftTableName}}`
UNION ALL
Expand All @@ -15,7 +30,18 @@ SELECT
{{ featureSet.entities | join(', ')}},
false AS is_entity_table
FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}` WHERE event_timestamp <= '{{maxTimestamp}}' AND event_timestamp >= Timestamp_sub(TIMESTAMP '{{ minTimestamp }}', interval {{ featureSet.maxAge }} second)
), joined AS (
),
/*
2. Window the data in the unioned dataset, partitioning by entity and ordering by event_timestamp, as
well as is_entity_table.
Within each window, back-fill the feature_timestamp - as a result of this, the null feature_timestamps
in the rows from the entity table should now contain the latest timestamps relative to the row's
event_timestamp.
For rows where event_timestamp(provided datetime) - feature_timestamp > max age, set the
feature_timestamp to null.
*/
joined AS (
SELECT
uuid,
event_timestamp,
Expand All @@ -34,6 +60,10 @@ SELECT
FROM union_features
WINDOW w AS (PARTITION BY {{ featureSet.entities | join(', ') }} ORDER BY event_timestamp DESC, is_entity_table DESC, created_timestamp DESC ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING)
)
/*
3. Select only the rows from the entity table, and join the features from the original feature set table
to the dataset using the entity values, feature_timestamp, and created_timestamps.
*/
LEFT JOIN (
SELECT
event_timestamp as {{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp,
Expand All @@ -46,6 +76,9 @@ FROM `{{projectId}}.{{datasetId}}.{{ featureSet.project }}_{{ featureSet.name }}
) USING ({{ featureSet.project }}_{{ featureSet.name }}_v{{ featureSet.version }}_feature_timestamp, created_timestamp, {{ featureSet.entities | join(', ')}})
WHERE is_entity_table
)
/*
4. Finally, deduplicate the rows by selecting the first occurrence of each entity table row UUID.
*/
SELECT
k.*
FROM (
Expand Down

0 comments on commit 913e7c9

Please sign in to comment.