Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support query cache option in BigQuery #12408

Merged
merged 2 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/src/main/sphinx/connector/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ Property Description
``bigquery.credentials-key`` The base64 encoded credentials key None. See the `requirements <#requirements>`_ section.
``bigquery.credentials-file`` The path to the JSON credentials file None. See the `requirements <#requirements>`_ section.
``bigquery.case-insensitive-name-matching`` Match dataset and table names case-insensitively ``false``
``bigquery.query-results-cache.enabled`` Enable `query results cache
<https://cloud.google.com/bigquery/docs/cached-results>`_ ``false``
===================================================== ============================================================== ======================================================

Data types
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.CreateDisposition;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.Table;
Expand Down Expand Up @@ -208,23 +209,29 @@ Job create(JobInfo jobInfo)
return bigQuery.create(jobInfo);
}

public TableResult query(String sql)
public TableResult query(String sql, boolean useQueryResultsCache, CreateDisposition createDisposition)
{
try {
return bigQuery.query(QueryJobConfiguration.of(sql));
return bigQuery.query(QueryJobConfiguration.newBuilder(sql)
.setUseQueryCache(useQueryResultsCache)
.setCreateDisposition(createDisposition)
.build());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new BigQueryException(BaseHttpServiceException.UNKNOWN_CODE, format("Failed to run the query [%s]", sql), e);
}
}

public TableResult query(TableId table, List<String> requiredColumns, Optional<String> filter)
public TableResult query(TableId table, List<String> requiredColumns, Optional<String> filter, boolean useQueryResultsCache, CreateDisposition createDisposition)
{
String sql = selectSql(table, requiredColumns, filter);
log.debug("Execute query: %s", sql);
try {
return bigQuery.query(QueryJobConfiguration.of(sql));
return bigQuery.query(QueryJobConfiguration.newBuilder(sql)
.setUseQueryCache(useQueryResultsCache)
.setCreateDisposition(createDisposition)
.build());
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class BigQueryConfig
private boolean caseInsensitiveNameMatching;
private Duration viewsCacheTtl = new Duration(15, MINUTES);
private Duration serviceCacheTtl = new Duration(3, MINUTES);
private boolean queryResultsCacheEnabled;

public Optional<String> getProjectId()
{
Expand Down Expand Up @@ -212,6 +213,18 @@ public BigQueryConfig setServiceCacheTtl(Duration serviceCacheTtl)
return this;
}

public boolean isQueryResultsCacheEnabled()
{
return queryResultsCacheEnabled;
}

@Config("bigquery.query-results-cache.enabled")
public BigQueryConfig setQueryResultsCacheEnabled(boolean queryResultsCacheEnabled)
{
this.queryResultsCacheEnabled = queryResultsCacheEnabled;
return this;
}

@PostConstruct
public void validate()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,22 @@
package io.trino.plugin.bigquery;

import io.airlift.log.Logger;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.Connector;
import io.trino.spi.connector.ConnectorMetadata;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.transaction.IsolationLevel;

import javax.inject.Inject;

import java.util.List;
import java.util.Set;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.spi.transaction.IsolationLevel.READ_COMMITTED;
import static io.trino.spi.transaction.IsolationLevel.checkConnectorSupports;
import static java.util.Objects.requireNonNull;
Expand All @@ -36,16 +42,21 @@ public class BigQueryConnector
private final BigQueryMetadata metadata;
private final BigQuerySplitManager splitManager;
private final BigQueryPageSourceProvider pageSourceProvider;
private final List<PropertyMetadata<?>> sessionProperties;

@Inject
public BigQueryConnector(
BigQueryMetadata metadata,
BigQuerySplitManager splitManager,
BigQueryPageSourceProvider pageSourceProvider)
BigQueryPageSourceProvider pageSourceProvider,
Set<SessionPropertiesProvider> sessionPropertiesProviders)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.splitManager = requireNonNull(splitManager, "splitManager is null");
this.pageSourceProvider = requireNonNull(pageSourceProvider, "pageSourceProvider is null");
this.sessionProperties = requireNonNull(sessionPropertiesProviders, "sessionPropertiesProviders is null").stream()
.flatMap(sessionPropertiesProvider -> sessionPropertiesProvider.getSessionProperties().stream())
.collect(toImmutableList());
}

@Override
Expand Down Expand Up @@ -73,4 +84,10 @@ public ConnectorPageSourceProvider getPageSourceProvider()
{
return pageSourceProvider;
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.google.inject.Scopes;
import com.google.inject.Singleton;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.NodeManager;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;

Expand Down Expand Up @@ -52,6 +54,7 @@ protected void setup(Binder binder)
binder.bind(BigQueryPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(ViewMaterializationCache.class).in(Scopes.SINGLETON);
configBinder(binder).bindConfig(BigQueryConfig.class);
newSetBinder(binder, SessionPropertiesProvider.class).addBinding().to(BigQuerySessionProperties.class).in(Scopes.SINGLETON);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled;
import static java.util.Objects.requireNonNull;

public class BigQueryPageSourceProvider
Expand Down Expand Up @@ -104,6 +106,8 @@ private ConnectorPageSource createQueryPageSource(ConnectorSession session, BigQ
table,
columnHandles.stream().map(BigQueryColumnHandle::getName).collect(toImmutableList()),
columnHandles.stream().map(BigQueryColumnHandle::getTrinoType).collect(toImmutableList()),
filter);
filter,
isQueryResultsCacheEnabled(session),
createDisposition(session));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.cloud.bigquery.FieldValue;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.JobInfo.CreateDisposition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -83,7 +84,9 @@ public BigQueryQueryPageSource(
BigQueryTableHandle table,
List<String> columnNames,
List<Type> columnTypes,
Optional<String> filter)
Optional<String> filter,
boolean useQueryResultsCache,
CreateDisposition createDisposition)
{
requireNonNull(client, "client is null");
requireNonNull(table, "table is null");
Expand All @@ -94,7 +97,7 @@ public BigQueryQueryPageSource(
this.columnTypes = ImmutableList.copyOf(columnTypes);
this.pageBuilder = new PageBuilder(columnTypes);
TableId tableId = TableId.of(client.getProjectId(), table.getRemoteTableName().getDatasetName(), table.getRemoteTableName().getTableName());
this.tableResult = client.query(tableId, ImmutableList.copyOf(columnNames), filter);
this.tableResult = client.query(tableId, ImmutableList.copyOf(columnNames), filter, useQueryResultsCache, createDisposition);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.JobInfo.CreateDisposition;
import com.google.common.collect.ImmutableList;
import io.trino.plugin.base.session.SessionPropertiesProvider;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;

import javax.inject.Inject;

import java.util.List;

import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.enumProperty;

public final class BigQuerySessionProperties
implements SessionPropertiesProvider
{
private static final String SKIP_VIEW_MATERIALIZATION = "skip_view_materialization";
private static final String QUERY_RESULTS_CACHE_ENABLED = "query_results_cache_enabled";
private static final String CREATE_DISPOSITION_TYPE = "create_disposition_type";

private final List<PropertyMetadata<?>> sessionProperties;

@Inject
public BigQuerySessionProperties(BigQueryConfig config)
{
sessionProperties = ImmutableList.<PropertyMetadata<?>>builder()
.add(booleanProperty(
SKIP_VIEW_MATERIALIZATION,
"Skip materializing views",
config.isSkipViewMaterialization(),
false))
.add(booleanProperty(
QUERY_RESULTS_CACHE_ENABLED,
"Enable query results cache",
config.isQueryResultsCacheEnabled(),
false))
.add(enumProperty(
CREATE_DISPOSITION_TYPE,
"Create disposition type",
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
CreateDisposition.class,
CreateDisposition.CREATE_IF_NEEDED, // https://cloud.google.com/bigquery/docs/cached-results
true))
.build();
}

@Override
public List<PropertyMetadata<?>> getSessionProperties()
{
return sessionProperties;
}

public static boolean isSkipViewMaterialization(ConnectorSession session)
{
return session.getProperty(SKIP_VIEW_MATERIALIZATION, Boolean.class);
}

public static boolean isQueryResultsCacheEnabled(ConnectorSession session)
{
return session.getProperty(QUERY_RESULTS_CACHE_ENABLED, Boolean.class);
}

public static CreateDisposition createDisposition(ConnectorSession session)
{
return session.getProperty(CREATE_DISPOSITION_TYPE, CreateDisposition.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import static com.google.cloud.bigquery.TableDefinition.Type.VIEW;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.createDisposition;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isQueryResultsCacheEnabled;
import static io.trino.plugin.bigquery.BigQuerySessionProperties.isSkipViewMaterialization;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
Expand All @@ -61,7 +64,6 @@ public class BigQuerySplitManager
private final Optional<Integer> parallelism;
private final boolean viewEnabled;
private final Duration viewExpiration;
private final boolean skipViewMaterialization;
private final NodeManager nodeManager;

@Inject
Expand All @@ -78,7 +80,6 @@ public BigQuerySplitManager(
this.parallelism = config.getParallelism();
this.viewEnabled = config.isViewsEnabled();
this.viewExpiration = config.getViewExpireDuration();
this.skipViewMaterialization = config.isSkipViewMaterialization();
this.nodeManager = requireNonNull(nodeManager, "nodeManager cannot be null");
}

Expand Down Expand Up @@ -120,7 +121,7 @@ private List<BigQuerySplit> readFromBigQuery(ConnectorSession session, TableDefi
// Storage API doesn't support reading materialized views
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
}
if (skipViewMaterialization && type == VIEW) {
if (isSkipViewMaterialization(session) && type == VIEW) {
return ImmutableList.of(BigQuerySplit.forViewStream(columns, filter));
}
ReadSession readSession = new ReadSessionCreator(bigQueryClientFactory, bigQueryReadClientFactory, viewEnabled, viewExpiration)
Expand All @@ -140,7 +141,7 @@ private List<BigQuerySplit> createEmptyProjection(ConnectorSession session, Tabl
if (filter.isPresent()) {
// count the rows based on the filter
String sql = client.selectSql(remoteTableId, "COUNT(*)");
TableResult result = client.query(sql);
TableResult result = client.query(sql, isQueryResultsCacheEnabled(session), createDisposition(session));
numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue();
}
else {
Expand All @@ -152,7 +153,7 @@ private List<BigQuerySplit> createEmptyProjection(ConnectorSession session, Tabl
}
else if (tableInfo.getDefinition().getType() == VIEW) {
String sql = client.selectSql(remoteTableId, "COUNT(*)");
TableResult result = client.query(sql);
TableResult result = client.query(sql, isQueryResultsCacheEnabled(session), createDisposition(session));
numberOfRows = result.iterateAll().iterator().next().get(0).getLongValue();
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ public void testDefaults()
.setCaseInsensitiveNameMatching(false)
.setViewsCacheTtl(new Duration(15, MINUTES))
.setServiceCacheTtl(new Duration(3, MINUTES))
.setViewsEnabled(false));
.setViewsEnabled(false)
.setQueryResultsCacheEnabled(false));
}

@Test
Expand All @@ -63,6 +64,7 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.put("bigquery.case-insensitive-name-matching", "true")
.put("bigquery.views-cache-ttl", "1m")
.put("bigquery.service-cache-ttl", "10d")
.put("bigquery.query-results-cache.enabled", "true")
.buildOrThrow();

BigQueryConfig expected = new BigQueryConfig()
Expand All @@ -77,7 +79,8 @@ public void testExplicitPropertyMappingsWithCredentialsKey()
.setMaxReadRowsRetries(10)
.setCaseInsensitiveNameMatching(true)
.setViewsCacheTtl(new Duration(1, MINUTES))
.setServiceCacheTtl(new Duration(10, DAYS));
.setServiceCacheTtl(new Duration(10, DAYS))
.setQueryResultsCacheEnabled(true);

assertFullMapping(properties, expected);
}
Expand Down
Loading