Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Always set destination table in BigQuery query config in Feast Batch Serving so it can handle large results #392

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.storage.Storage;
import feast.core.FeatureSetProto.FeatureSetSpec;
import feast.serving.ServingAPIProto;
Expand All @@ -56,10 +57,13 @@
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.joda.time.Duration;
import org.slf4j.Logger;

public class BigQueryServingService implements ServingService {

// Default no of millis for which a temporary table should exist before it is deleted in BigQuery.
public static final long TEMP_TABLE_EXPIRY_DURATION_MS = Duration.standardDays(1).getMillis();
private static final Logger log = org.slf4j.LoggerFactory.getLogger(BigQueryServingService.class);

private final BigQuery bigquery;
Expand Down Expand Up @@ -182,22 +186,29 @@ private Table loadEntities(DatasetSource datasetSource) {
switch (datasetSource.getDatasetSourceCase()) {
case FILE_SOURCE:
try {
String tableName = generateTemporaryTableName();
log.info("Loading entity dataset to table {}.{}.{}", projectId, datasetId, tableName);
TableId tableId = TableId.of(projectId, datasetId, tableName);
// Currently only avro supported
// Currently only AVRO format is supported
if (datasetSource.getFileSource().getDataFormat() != DataFormat.DATA_FORMAT_AVRO) {
throw Status.INVALID_ARGUMENT
.withDescription("Invalid file format, only avro supported")
.withDescription("Invalid file format, only AVRO is supported.")
.asRuntimeException();
}

TableId tableId = TableId.of(projectId, datasetId, createTempTableName());
log.info("Loading entity rows to: {}.{}.{}", projectId, datasetId, tableId.getTable());
LoadJobConfiguration loadJobConfiguration =
LoadJobConfiguration.of(
tableId, datasetSource.getFileSource().getFileUrisList(), FormatOptions.avro());
loadJobConfiguration =
loadJobConfiguration.toBuilder().setUseAvroLogicalTypes(true).build();
Job job = bigquery.create(JobInfo.of(loadJobConfiguration));
job.waitFor();
TableInfo expiry =
bigquery
.getTable(tableId)
.toBuilder()
.setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
.build();
bigquery.update(expiry);
loadedEntityTable = bigquery.getTable(tableId);
if (!loadedEntityTable.exists()) {
throw new RuntimeException(
Expand All @@ -207,7 +218,7 @@ private Table loadEntities(DatasetSource datasetSource) {
} catch (Exception e) {
log.error("Exception has occurred in loadEntities method: ", e);
throw Status.INTERNAL
.withDescription("Failed to load entity dataset into store")
.withDescription("Failed to load entity dataset into store: " + e.toString())
.withCause(e)
.asRuntimeException();
}
Expand All @@ -219,20 +230,23 @@ private Table loadEntities(DatasetSource datasetSource) {
}
}

private String generateTemporaryTableName() {
String source = String.format("feastserving%d", System.currentTimeMillis());
String guid = UUID.nameUUIDFromBytes(source.getBytes()).toString();
String suffix = guid.substring(0, Math.min(guid.length(), 10)).replaceAll("-", "");
return String.format("temp_%s", suffix);
}

private TableId generateUUIDs(Table loadedEntityTable) {
try {
String uuidQuery =
createEntityTableUUIDQuery(generateFullTableName(loadedEntityTable.getTableId()));
QueryJobConfiguration queryJobConfig = QueryJobConfiguration.newBuilder(uuidQuery).build();
QueryJobConfiguration queryJobConfig =
QueryJobConfiguration.newBuilder(uuidQuery)
.setDestinationTable(TableId.of(projectId, datasetId, createTempTableName()))
.build();
Job queryJob = bigquery.create(JobInfo.of(queryJobConfig));
queryJob.waitFor();
TableInfo expiry =
bigquery
.getTable(queryJobConfig.getDestinationTable())
.toBuilder()
.setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
.build();
bigquery.update(expiry);
queryJobConfig = queryJob.getConfiguration();
return queryJobConfig.getDestinationTable();
} catch (InterruptedException | BigQueryException e) {
Expand All @@ -242,4 +256,8 @@ private TableId generateUUIDs(Table loadedEntityTable) {
.asRuntimeException();
}
}

public static String createTempTableName() {
return "_" + UUID.randomUUID().toString().replace("-", "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package feast.serving.store.bigquery;

import static feast.serving.service.BigQueryServingService.TEMP_TABLE_EXPIRY_DURATION_MS;
import static feast.serving.service.BigQueryServingService.createTempTableName;
import static feast.serving.store.bigquery.QueryTemplater.createTimestampLimitQuery;

import com.google.auto.value.AutoValue;
Expand All @@ -27,6 +29,8 @@
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.Storage;
Expand Down Expand Up @@ -175,23 +179,26 @@ Job runBatchQuery(List<String> featureSetQueries)
ExecutorCompletionService<FeatureSetInfo> executorCompletionService =
new ExecutorCompletionService<>(executorService);


List<FeatureSetInfo> featureSetInfos = new ArrayList<>();

for (int i = 0; i < featureSetQueries.size(); i++) {
QueryJobConfiguration queryJobConfig =
QueryJobConfiguration.newBuilder(featureSetQueries.get(i)).build();
QueryJobConfiguration.newBuilder(featureSetQueries.get(i))
.setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName()))
.build();
Job subqueryJob = bigquery().create(JobInfo.of(queryJobConfig));
executorCompletionService.submit(
SubqueryCallable.builder()
.setBigquery(bigquery())
.setFeatureSetInfo(featureSetInfos().get(i))
.setSubqueryJob(subqueryJob)
.build());
}

for (int i = 0; i < featureSetQueries.size(); i++) {
try {
FeatureSetInfo featureSetInfo = executorCompletionService.take().get(SUBQUERY_TIMEOUT_SECS, TimeUnit.SECONDS);
FeatureSetInfo featureSetInfo =
executorCompletionService.take().get(SUBQUERY_TIMEOUT_SECS, TimeUnit.SECONDS);
featureSetInfos.add(featureSetInfo);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
jobService()
Expand All @@ -214,9 +221,20 @@ Job runBatchQuery(List<String> featureSetQueries)
String joinQuery =
QueryTemplater.createJoinQuery(
featureSetInfos, entityTableColumnNames(), entityTableName());
QueryJobConfiguration queryJobConfig = QueryJobConfiguration.newBuilder(joinQuery).build();
QueryJobConfiguration queryJobConfig =
QueryJobConfiguration.newBuilder(joinQuery)
.setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName()))
.build();
queryJob = bigquery().create(JobInfo.of(queryJobConfig));
queryJob.waitFor();
TableInfo expiry =
bigquery()
.getTable(queryJobConfig.getDestinationTable())
.toBuilder()
.setExpirationTime(
System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
.build();
bigquery().update(expiry);

return queryJob;
}
Expand Down Expand Up @@ -248,10 +266,19 @@ private FieldValueList getTimestampLimits(String entityTableName) {
QueryJobConfiguration getTimestampLimitsQuery =
QueryJobConfiguration.newBuilder(createTimestampLimitQuery(entityTableName))
.setDefaultDataset(DatasetId.of(projectId(), datasetId()))
.setDestinationTable(TableId.of(projectId(), datasetId(), createTempTableName()))
.build();
try {
Job job = bigquery().create(JobInfo.of(getTimestampLimitsQuery));
TableResult getTimestampLimitsQueryResult = job.waitFor().getQueryResults();
TableInfo expiry =
bigquery()
.getTable(getTimestampLimitsQuery.getDestinationTable())
.toBuilder()
.setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
.build();
bigquery().update(expiry);

FieldValueList result = null;
for (FieldValueList fields : getTimestampLimitsQueryResult.getValues()) {
result = fields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package feast.serving.store.bigquery;

import static feast.serving.service.BigQueryServingService.TEMP_TABLE_EXPIRY_DURATION_MS;
import static feast.serving.store.bigquery.QueryTemplater.generateFullTableName;

import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import feast.serving.store.bigquery.model.FeatureSetInfo;
import java.util.concurrent.Callable;

Expand All @@ -33,6 +36,8 @@
@AutoValue
public abstract class SubqueryCallable implements Callable<FeatureSetInfo> {

public abstract BigQuery bigquery();

public abstract FeatureSetInfo featureSetInfo();

public abstract Job subqueryJob();
Expand All @@ -44,6 +49,8 @@ public static Builder builder() {
@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setBigquery(BigQuery bigquery);

public abstract Builder setFeatureSetInfo(FeatureSetInfo featureSetInfo);

public abstract Builder setSubqueryJob(Job subqueryJob);
Expand All @@ -57,6 +64,13 @@ public FeatureSetInfo call() throws BigQueryException, InterruptedException {
subqueryJob().waitFor();
subqueryConfig = subqueryJob().getConfiguration();
TableId destinationTable = subqueryConfig.getDestinationTable();
TableInfo expiry =
bigquery()
.getTable(destinationTable)
.toBuilder()
.setExpirationTime(System.currentTimeMillis() + TEMP_TABLE_EXPIRY_DURATION_MS)
.build();
bigquery().update(expiry);
String fullTablePath = generateFullTableName(destinationTable);

return new FeatureSetInfo(featureSetInfo(), fullTablePath);
Expand Down
5 changes: 3 additions & 2 deletions tests/e2e/bq-batch-retrieval.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from feast.type_map import ValueType
from google.protobuf.duration_pb2 import Duration

pd.set_option('display.max_columns', None)

@pytest.fixture(scope="module")
def core_url(pytestconfig):
Expand Down Expand Up @@ -112,8 +113,8 @@ def test_additional_columns_in_entity_table(client):
feature_retrieval_job = client.get_batch_features(
entity_rows=entity_df, feature_ids=["additional_columns:1:feature_value"]
)
output = feature_retrieval_job.to_dataframe()
print(output.head())
output = feature_retrieval_job.to_dataframe().sort_values(by=["entity_id"])
print(output.head(10))

assert np.allclose(output["additional_float_col"], entity_df["additional_float_col"])
assert output["additional_string_col"].to_list() == entity_df["additional_string_col"].to_list()
Expand Down