Skip to content

Commit

Permalink
Support Limit pushdown in Bigquery connector
Browse files Browse the repository at this point in the history
Limit pushdown will be supported only when query based approach is applied

1. external table
2. native query
3. view/materialized-view (with skip_view_materialization enabled)
  • Loading branch information
krvikash authored and Praveen2112 committed Nov 26, 2024
1 parent cf4d6c5 commit 33cb3ad
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
Expand Down Expand Up @@ -275,7 +276,7 @@ public Optional<TableInfo> getTable(TableId remoteTableId)

public TableInfo getCachedTable(Duration viewExpiration, TableInfo remoteTableId, List<BigQueryColumnHandle> requiredColumns, Optional<String> filter)
{
String query = selectSql(remoteTableId.getTableId(), requiredColumns, filter);
String query = selectSql(remoteTableId.getTableId(), requiredColumns, filter, OptionalLong.empty());
log.debug("query is %s", query);
return materializationCache.getCachedTable(this, query, viewExpiration, remoteTableId);
}
Expand Down Expand Up @@ -501,7 +502,7 @@ public TableId getDestinationTable(String sql)
return requireNonNull(((QueryJobConfiguration) jobConfiguration).getDestinationTable(), "Cannot determine destination table for query");
}

public static String selectSql(TableId table, List<BigQueryColumnHandle> requiredColumns, Optional<String> filter)
public static String selectSql(TableId table, List<BigQueryColumnHandle> requiredColumns, Optional<String> filter, OptionalLong limit)
{
return selectSql(table,
requiredColumns.stream()
Expand All @@ -513,17 +514,21 @@ public static String selectSql(TableId table, List<BigQueryColumnHandle> require
.collect(toImmutableList()))
.build()))
.collect(joining(",")),
filter);
filter,
limit);
}

public static String selectSql(TableId table, String formattedColumns, Optional<String> filter)
public static String selectSql(TableId table, String formattedColumns, Optional<String> filter, OptionalLong limit)
{
String tableName = fullTableName(table);
String query = format("SELECT %s FROM `%s`", formattedColumns, tableName);
if (filter.isEmpty()) {
return query;
if (filter.isPresent()) {
query = query + " WHERE " + filter.get();
}
return query + " WHERE " + filter.get();
if (limit.isPresent()) {
query = query + " LIMIT " + limit.getAsLong();
}
return query;
}

private static String fullTableName(TableId remoteTableId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.ConstraintApplicationResult;
import io.trino.spi.connector.InMemoryRecordSet;
import io.trino.spi.connector.LimitApplicationResult;
import io.trino.spi.connector.ProjectionApplicationResult;
import io.trino.spi.connector.RecordCursor;
import io.trino.spi.connector.RelationCommentMetadata;
Expand Down Expand Up @@ -356,7 +357,8 @@ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTable
Optional.ofNullable(tableInfo.get().getDescription()),
useStorageApi),
TupleDomain.all(),
Optional.empty())
Optional.empty(),
OptionalLong.empty())
.withProjectedColumns(columns.build());
}

Expand Down Expand Up @@ -1084,6 +1086,29 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(
return Optional.of(new ConstraintApplicationResult<>(updatedHandle, remainingFilter, constraint.getExpression(), false));
}

@Override
public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(ConnectorSession session, ConnectorTableHandle handle, long limit)
{
BigQueryTableHandle table = (BigQueryTableHandle) handle;

if (table.limit().isPresent() && table.limit().getAsLong() <= limit) {
return Optional.empty();
}

if (!isLimitPushdownSupported(table)) {
return Optional.empty();
}

return Optional.of(new LimitApplicationResult<>(table.withLimit(limit), true, false));
}

private static boolean isLimitPushdownSupported(BigQueryTableHandle table)
{
return table.isQueryRelation()
// Storage API doesn't support limiting rows
|| (table.isNamedRelation() && !table.getRequiredNamedRelation().isUseStorageApi());
}

@Override
public Optional<TableFunctionApplicationResult<ConnectorTableHandle>> applyTableFunction(ConnectorSession session, ConnectorTableFunctionHandle handle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.bigquery.BigQueryClient.selectSql;
import static io.trino.plugin.bigquery.BigQueryTypeManager.toTrinoTimestamp;
import static io.trino.plugin.bigquery.BigQueryUtil.buildNativeQuery;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
Expand Down Expand Up @@ -125,13 +126,10 @@ private String buildSql(BigQueryTableHandle table, String projectId, List<BigQue
{
if (isQueryFunction) {
BigQueryQueryRelationHandle queryRelationHandle = (BigQueryQueryRelationHandle) table.relationHandle();
if (filter.isEmpty()) {
return queryRelationHandle.getQuery();
}
return "SELECT * FROM (" + queryRelationHandle.getQuery() + " ) WHERE " + filter.get();
return buildNativeQuery(queryRelationHandle.getQuery(), filter, table.limit());
}
TableId tableId = TableId.of(projectId, table.asPlainTable().getRemoteTableName().datasetName(), table.asPlainTable().getRemoteTableName().tableName());
return selectSql(tableId, ImmutableList.copyOf(columns), filter);
return selectSql(tableId, ImmutableList.copyOf(columns), filter, table.limit());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;

import static com.google.cloud.bigquery.TableDefinition.Type.MATERIALIZED_VIEW;
Expand All @@ -50,6 +51,7 @@
import static io.trino.plugin.bigquery.BigQueryClient.TABLE_TYPES;
import static io.trino.plugin.bigquery.BigQueryClient.selectSql;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_FAILED_TO_EXECUTE_QUERY;
import static io.trino.plugin.bigquery.BigQueryUtil.buildNativeQuery;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.completedFuture;
Expand Down Expand Up @@ -134,6 +136,7 @@ private List<BigQuerySplit> getSplits(
{
TupleDomain<ColumnHandle> tableConstraint = bigQueryTableHandle.constraint();
Optional<String> filter = BigQueryFilterQueryBuilder.buildFilter(tableConstraint);
OptionalLong limit = bigQueryTableHandle.limit();

TableId remoteTableId;
TableDefinition.Type tableType;
Expand All @@ -144,16 +147,11 @@ private List<BigQuerySplit> getSplits(
List<BigQueryColumnHandle> columns = bigQueryTableHandle.projectedColumns().orElse(ImmutableList.of());
useStorageApi = bigQueryQueryRelationHandle.isUseStorageApi();

// projectedColumnsNames can not be used for generating select sql because the query fails if it does not
// include a column name. eg: query => 'SELECT 1'
String query = filter
.map(whereClause -> "SELECT * FROM (" + bigQueryQueryRelationHandle.getQuery() + " ) WHERE " + whereClause)
.orElseGet(bigQueryQueryRelationHandle::getQuery);

if (!useStorageApi) {
log.debug("Using Rest API for running query: %s", query);
log.debug("Using Rest API for running query: %s", bigQueryQueryRelationHandle.getQuery());
return List.of(BigQuerySplit.forViewStream(columns, filter));
}
String query = buildNativeQuery(bigQueryQueryRelationHandle.getQuery(), filter, limit);

TableId destinationTable = bigQueryQueryRelationHandle.getDestinationTableName().toTableId();
TableInfo tableInfo = new ViewMaterializationCache.DestinationTableBuilder(bigQueryClientFactory.create(session), viewExpiration, query, destinationTable).get();
Expand All @@ -170,7 +168,7 @@ private List<BigQuerySplit> getSplits(
}

return emptyProjectionIsRequired(bigQueryTableHandle.projectedColumns())
? createEmptyProjection(session, tableType, remoteTableId, filter)
? createEmptyProjection(session, tableType, remoteTableId, filter, limit)
: readFromBigQuery(session, tableType, remoteTableId, bigQueryTableHandle.projectedColumns(), tableConstraint, useStorageApi);
}

Expand Down Expand Up @@ -225,15 +223,16 @@ private static List<String> getProjectedColumnNames(List<BigQueryColumnHandle> c
return columns.stream().map(BigQueryColumnHandle::name).collect(toImmutableList());
}

private List<BigQuerySplit> createEmptyProjection(ConnectorSession session, TableDefinition.Type tableType, TableId remoteTableId, Optional<String> filter)
private List<BigQuerySplit> createEmptyProjection(ConnectorSession session, TableDefinition.Type tableType, TableId remoteTableId, Optional<String> filter, OptionalLong limit)
{
if (!TABLE_TYPES.containsKey(tableType)) {
throw new TrinoException(NOT_SUPPORTED, "Unsupported table type: " + tableType);
}

// Note that we cannot use row count from TableInfo because for writes via insertAll/streaming API the number is incorrect until the streaming buffer is flushed
// (and there's no mechanism to trigger an on-demand flush). This can lead to incorrect results for queries with empty projections.
String sql = selectSql(remoteTableId, "COUNT(*)", filter);
String sql = "SELECT COUNT(*) FROM (%s)".formatted(selectSql(remoteTableId, "true", filter, limit));

return createEmptyProjection(session, sql);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.List;
import java.util.Optional;
import java.util.OptionalLong;

import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
Expand All @@ -32,14 +33,16 @@
public record BigQueryTableHandle(
BigQueryRelationHandle relationHandle,
TupleDomain<ColumnHandle> constraint,
Optional<List<BigQueryColumnHandle>> projectedColumns)
Optional<List<BigQueryColumnHandle>> projectedColumns,
OptionalLong limit)
implements ConnectorTableHandle
{
public BigQueryTableHandle
{
requireNonNull(relationHandle, "relationHandle is null");
requireNonNull(constraint, "constraint is null");
requireNonNull(projectedColumns, "projectedColumns is null");
requireNonNull(limit, "limit is null");
}

@JsonIgnore
Expand Down Expand Up @@ -89,6 +92,7 @@ else if (!constraint.isAll()) {
.collect(joining(", ", "[", "]")));
}
projectedColumns.ifPresent(columns -> builder.append(" columns=").append(columns));
limit.ifPresent(value -> builder.append(" limit=").append(value));
return builder.toString();
}

Expand All @@ -100,12 +104,17 @@ public BigQueryNamedRelationHandle asPlainTable()

BigQueryTableHandle withConstraint(TupleDomain<ColumnHandle> newConstraint)
{
return new BigQueryTableHandle(relationHandle, newConstraint, projectedColumns);
return new BigQueryTableHandle(relationHandle, newConstraint, projectedColumns, limit);
}

public BigQueryTableHandle withProjectedColumns(List<BigQueryColumnHandle> newProjectedColumns)
{
return new BigQueryTableHandle(relationHandle, constraint, Optional.of(newProjectedColumns));
return new BigQueryTableHandle(relationHandle, constraint, Optional.of(newProjectedColumns), limit);
}

public BigQueryTableHandle withLimit(long limit)
{
return new BigQueryTableHandle(relationHandle, constraint, projectedColumns, OptionalLong.of(limit));
}

public enum BigQueryPartitionType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.util.Arrays;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;

import static com.google.cloud.bigquery.TableDefinition.Type.TABLE;
Expand Down Expand Up @@ -57,6 +58,17 @@ private static boolean isRetryableInternalError(Throwable t)
return false;
}

public static String buildNativeQuery(String nativeQuery, Optional<String> filter, OptionalLong limit)
{
// projected column names can not be used for generating select sql because the query fails if it does not
// include a column name. eg: query => 'SELECT 1'
String queryString = filter.map(s -> "SELECT * FROM (" + nativeQuery + ") WHERE " + s).orElse(nativeQuery);
if (limit.isPresent()) {
return "SELECT * FROM (" + queryString + ") LIMIT " + limit.getAsLong();
}
return queryString;
}

public static BigQueryException convertToBigQueryException(BigQueryError error)
{
return new BigQueryException(UNKNOWN_CODE, error.getMessage(), error);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;

import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.bigquery.ViewMaterializationCache.TEMP_TABLE_PREFIX;
Expand Down Expand Up @@ -118,7 +119,7 @@ public TableFunctionAnalysis analyze(
Schema schema = client.getSchema(query);

BigQueryQueryRelationHandle queryRelationHandle = new BigQueryQueryRelationHandle(query, new RemoteTableName(destinationTable), useStorageApi);
BigQueryTableHandle tableHandle = new BigQueryTableHandle(queryRelationHandle, TupleDomain.all(), Optional.empty());
BigQueryTableHandle tableHandle = new BigQueryTableHandle(queryRelationHandle, TupleDomain.all(), Optional.empty(), OptionalLong.empty());

ImmutableList.Builder<BigQueryColumnHandle> columnsBuilder = ImmutableList.builderWithExpectedSize(schema.getFields().size());
for (com.google.cloud.bigquery.Field field : schema.getFields()) {
Expand Down
Loading

0 comments on commit 33cb3ad

Please sign in to comment.