Skip to content

Commit

Permalink
Support query cache option in BigQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed May 16, 2022
1 parent 52f966b commit 4b3d24a
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 11 deletions.
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ Property Description
``bigquery.credentials-key`` The base64 encoded credentials key None. See the `requirements <#requirements>`_ section.
``bigquery.credentials-file`` The path to the JSON credentials file None. See the `requirements <#requirements>`_ section.
``bigquery.case-insensitive-name-matching`` Match dataset and table names case-insensitively ``false``
``bigquery.query-cache.enabled`` Enable `query cache
<https://cloud.google.com/bigquery/docs/cached-results>`_ ``false``
===================================================== ============================================================== ======================================================

Data types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.CreateDisposition;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
Expand Down Expand Up @@ -208,23 +209,29 @@ Job create(JobInfo jobInfo)
return bigQuery.create(jobInfo);
}

public TableResult query(String sql)
public TableResult query(String sql, boolean useQueryCache, CreateDisposition createDisposition)
{
try {
return bigQuery.query(QueryJobConfiguration.of(sql));
return bigQuery.query(QueryJobConfiguration.newBuilder(sql)
.setUseQueryCache(useQueryCache)
.setCreateDisposition(createDisposition)
.build());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", sql), e);
}
}

public TableResult query(TableId table, List<String> requiredColumns, Optional<String> filter)
public TableResult query(TableId table, List<String> requiredColumns, Optional<String> filter, boolean useQueryCache, CreateDisposition createDisposition)
{
String sql = selectSql(table, requiredColumns, filter);
log.debug("Execute query: %s", sql);
try {
return bigQuery.query(QueryJobConfiguration.of(sql));
return bigQuery.query(QueryJobConfiguration.newBuilder(sql)
.setUseQueryCache(useQueryCache)
.setCreateDisposition(createDisposition)
.build());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class BigQueryConfig
private boolean caseInsensitiveNameMatching;
private Duration viewsCacheTtl = new Duration(15, MINUTES);
private Duration serviceCacheTtl = new Duration(3, MINUTES);
private boolean queryCacheEnabled;

public Optional<String> getProjectId()
{
Expand Down Expand Up @@ -212,6 +213,18 @@ public BigQueryConfig setServiceCacheTtl(Duration serviceCacheTtl)
return this;
}

public boolean isQueryCacheEnabled()
{
return queryCacheEnabled;
}

@Config("bigquery.query-cache.enabled")
public BigQueryConfig setQueryCacheEnabled(boolean queryCacheEnabled)
{
this.queryCacheEnabled = queryCacheEnabled;
return this;
}

@PostConstruct
public void validate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryCacheEnabled;
import static java.util.Objects.requireNonNull;

public class BigQueryPageSourceProvider
Expand Down Expand Up @@ -104,6 +106,8 @@ private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQ
table,
columnHandles.stream().map(BigQueryColumnHandle::getName).collect(toImmutableList()),
columnHandles.stream().map(BigQueryColumnHandle::getTrinoType).collect(toImmutableList()),
filter);
filter,
isQueryCacheEnabled(session),
createDisposition(session));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.JobInfo.CreateDisposition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -83,7 +84,9 @@ public BigQueryQueryPageSource(
BigQueryTableHandle table,
List<String> columnNames,
List<Type> columnTypes,
Optional<String> filter)
Optional<String> filter,
boolean useQueryCache,
CreateDisposition createDisposition)
{
requireNonNull(client, "client is null");
requireNonNull(table, "table is null");
Expand All @@ -94,7 +97,7 @@ public BigQueryQueryPageSource(
this.columnTypes = ImmutableList.copyOf(columnTypes);
this.pageBuilder = new PageBuilder(columnTypes);
TableId tableId = TableId.of(client.getProjectId(), table.getRemoteTableName().getDatasetName(), table.getRemoteTableName().getTableName());
this.tableResult = client.query(tableId, ImmutableList.copyOf(columnNames), filter);
this.tableResult = client.query(tableId, ImmutableList.copyOf(columnNames), filter, useQueryCache, createDisposition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.JobInfo.CreateDisposition;
import com.google.common.collect.ImmutableList;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -23,11 +24,14 @@
import java.util.List;

import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;

public final class BigQuerySessionProperties
implements SessionPropertiesProvider
{
public static final String SKIP_VIEW_MATERIALIZATION = "skip_view_materialization";
public static final String QUERY_CACHE_ENABLED = "query_cache_enabled";
public static final String CREATE_DISPOSITION_TYPE = "create_disposition_type";

private final List<PropertyMetadata<?>> sessionProperties;

Expand All @@ -40,6 +44,17 @@ public BigQuerySessionProperties(BigQueryConfig config)
"Skip materializing views",
config.isSkipViewMaterialization(),
false))
.add(booleanProperty(
QUERY_CACHE_ENABLED,
"Enable query cache",
config.isQueryCacheEnabled(),
false))
.add(enumProperty(
CREATE_DISPOSITION_TYPE,
"Create disposition type",
CreateDisposition.class,
CreateDisposition.CREATE_IF_NEEDED,
true))
.build();
}

Expand All @@ -53,4 +68,14 @@ public static boolean isSkipViewMaterialization(ConnectorSession session)
{
return session.getProperty(SKIP_VIEW_MATERIALIZATION, Boolean.class);
}

public static boolean isQueryCacheEnabled(ConnectorSession session)
{
return session.getProperty(QUERY_CACHE_ENABLED, Boolean.class);
}

public static CreateDisposition createDisposition(ConnectorSession session)
{
return session.getProperty(CREATE_DISPOSITION_TYPE, CreateDisposition.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryCacheEnabled;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -139,7 +141,7 @@ private List<BigQuerySplit> createEmptyProjection(ConnectorSession session, Tabl
if (filter.isPresent()) {
// count the rows based on the filter
String sql = client.selectSql(remoteTableId, "COUNT(*)");
TableResult result = client.query(sql);
TableResult result = client.query(sql, isQueryCacheEnabled(session), createDisposition(session));
numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue();
}
else {
Expand All @@ -151,7 +153,7 @@ private List<BigQuerySplit> createEmptyProjection(ConnectorSession session, Tabl
}
else if (tableInfo.getDefinition().getType() == VIEW) {
String sql = client.selectSql(remoteTableId, "COUNT(*)");
TableResult result = client.query(sql);
TableResult result = client.query(sql, isQueryCacheEnabled(session), createDisposition(session));
numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue();
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public void testDefaults()
.setCaseInsensitiveNameMatching(false)
.setViewsCacheTtl(new Duration(15, MINUTES))
.setServiceCacheTtl(new Duration(3, MINUTES))
.setViewsEnabled(false));
.setViewsEnabled(false)
.setQueryCacheEnabled(false));
}

@Test
Expand All @@ -63,6 +64,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.put("bigquery.case-insensitive-name-matching", "true")
.put("bigquery.views-cache-ttl", "1m")
.put("bigquery.service-cache-ttl", "10d")
.put("bigquery.query-cache.enabled", "true")
.buildOrThrow();

BigQueryConfig expected = new BigQueryConfig()
Expand All @@ -77,7 +79,8 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.setMaxReadRowsRetries(10)
.setCaseInsensitiveNameMatching(true)
.setViewsCacheTtl(new Duration(1, MINUTES))
.setServiceCacheTtl(new Duration(10, DAYS));
.setServiceCacheTtl(new Duration(10, DAYS))
.setQueryCacheEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.Session;
import io.trino.testing.BaseConnectorTest;
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
Expand Down Expand Up @@ -621,6 +622,35 @@ public void testBigQuerySnapshotTable()
}
}

@Test
public void testQueryCache()
{
Session queryCacheSession = Session.builder(getSession())
.setCatalogSessionProperty("bigquery", "query_cache_enabled", "true")
.build();
Session createNeverDisposition = Session.builder(getSession())
.setCatalogSessionProperty("bigquery", "query_cache_enabled", "true")
.setCatalogSessionProperty("bigquery", "create_disposition_type", "create_never")
.build();

String materializedView = "test_materialized_view" + randomTableSuffix();
try {
onBigQuery("CREATE MATERIALIZED VIEW test." + materializedView + " AS SELECT count(1) AS cnt FROM tpch.region");

// Verify query cache is empty
assertThatThrownBy(() -> query(createNeverDisposition, "SELECT * FROM test." + materializedView))
.hasMessageContaining("Not found");
// Populate cache and verify it
assertQuery(queryCacheSession, "SELECT * FROM test." + materializedView, "VALUES 5");
assertQuery(createNeverDisposition, "SELECT * FROM test." + materializedView, "VALUES 5");

assertUpdate("DROP TABLE test." + materializedView);
}
finally {
onBigQuery("DROP MATERIALIZED VIEW IF EXISTS test." + materializedView);
}
}

private void onBigQuery(@Language("SQL") String sql)
{
bigQuerySqlExecutor.execute(sql);
Expand Down

0 comments on commit 4b3d24a

Please sign in to comment.