Skip to content

Commit

Permalink
Support partition pseudo-columns in BigQuery
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Apr 12, 2022
1 parent a9b6477 commit 5881a66
Show file tree
Hide file tree
Showing 9 changed files with 263 additions and 11 deletions.
29 changes: 29 additions & 0 deletions docs/src/main/sphinx/connector/bigquery.rst
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,35 @@ For each Trino table which maps to BigQuery view there exists a system table whi
Given a BigQuery view ``customer_view`` you can send query
``SELECT * customer_view$view_definition`` to see the SQL which defines view in BigQuery.

.. _bigquery_special_columns:

Special columns
---------------

In addition to the defined columns, the BigQuery connector exposes
partition information in a number of hidden columns:

* ``$partition_date``: Equivalent to ``_PARTITIONDATE`` pseudo-column in BigQuery

* ``$partition_time``: Equivalent to ``_PARTITIONTIME`` pseudo-column in BigQuery

You can use these columns in your SQL statements like any other column. They
can be selected directly, or used in conditional statements. For example, you
can inspect the partition date and time for each record::

SELECT *, "$partition_date", "$partition_time"
FROM bigquery.web.page_views;

Retrieve all records stored in the partition ``_PARTITIONDATE = '2022-04-07'``::

SELECT *
FROM bigquery.web.page_views
WHERE "$partition_date" = date '2022-04-07';

.. note::

Two special partitions ``__NULL__`` and ``__UNPARTITIONED__`` are not supported.

.. _bigquery-sql-support:

SQL support
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.Iterables.getOnlyElement;
import static io.trino.plugin.bigquery.BigQueryUtil.toBigQueryColumnName;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;

Expand Down Expand Up @@ -72,7 +73,7 @@ private List<String> toConjuncts(List<BigQueryColumnHandle> columns)
for (BigQueryColumnHandle column : columns) {
Domain domain = tupleDomain.getDomains().get().get(column);
if (domain != null) {
toPredicate(column.getName(), domain, column).ifPresent(clauses::add);
toPredicate(toBigQueryColumnName(column.getName()), domain, column).ifPresent(clauses::add);
}
}
return clauses.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.trino.plugin.bigquery.BigQueryErrorCode.BIGQUERY_LISTING_DATASET_ERROR;
import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_DATE;
import static io.trino.plugin.bigquery.BigQueryPseudoColumn.PARTITION_TIME;
import static io.trino.plugin.bigquery.BigQueryTableHandle.BigQueryPartitionType.INGESTION;
import static io.trino.plugin.bigquery.BigQueryType.toField;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -247,6 +250,10 @@ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, Connect
for (BigQueryColumnHandle column : client.getColumns(handle)) {
columnMetadata.add(column.getColumnMetadata());
}
if (handle.getPartitionType().isPresent() && handle.getPartitionType().get() == INGESTION) {
columnMetadata.add(PARTITION_DATE.getColumnMetadata());
columnMetadata.add(PARTITION_TIME.getColumnMetadata());
}
return new ConnectorTableMetadata(handle.getSchemaTableName(), columnMetadata.build());
}

Expand Down Expand Up @@ -290,7 +297,15 @@ public Map<String, ColumnHandle> getColumnHandles(ConnectorSession session, Conn
{
BigQueryClient client = bigQueryClientFactory.create(session);
log.debug("getColumnHandles(session=%s, tableHandle=%s)", session, tableHandle);
return client.getColumns((BigQueryTableHandle) tableHandle).stream()

BigQueryTableHandle table = (BigQueryTableHandle) tableHandle;
ImmutableList.Builder<BigQueryColumnHandle> columns = ImmutableList.builder();
columns.addAll(client.getColumns(table));
if (table.getPartitionType().isPresent() && table.getPartitionType().get() == INGESTION) {
columns.add(PARTITION_DATE.getColumnHandle());
columns.add(PARTITION_TIME.getColumnHandle());
}
return columns.build().stream()
.collect(toImmutableMap(columnHandle -> columnHandle.getColumnMetadata().getName(), identity()));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.bigquery;

import com.google.cloud.bigquery.Field;
import com.google.common.collect.ImmutableList;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.type.Type;

import static io.trino.spi.type.DateType.DATE;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MICROS;

public enum BigQueryPseudoColumn
{
PARTITION_DATE("$partition_date", "_PARTITIONDATE", DATE, BigQueryType.DATE),
PARTITION_TIME("$partition_time", "_PARTITIONTIME", TIMESTAMP_TZ_MICROS, BigQueryType.TIMESTAMP),
/**/;

private final String trinoColumnName;
private final String bigqueryColumnName;
private final Type trinoType;
private final BigQueryType bigqueryType;

BigQueryPseudoColumn(String trinoColumnName, String bigqueryColumnName, Type type, BigQueryType bigqueryType)
{
this.trinoColumnName = trinoColumnName;
this.bigqueryColumnName = bigqueryColumnName;
this.trinoType = type;
this.bigqueryType = bigqueryType;
}

public String getTrinoColumnName()
{
return trinoColumnName;
}

public String getBigqueryColumnName()
{
return bigqueryColumnName;
}

public BigQueryColumnHandle getColumnHandle()
{
return new BigQueryColumnHandle(
trinoColumnName,
bigqueryType,
Field.Mode.REQUIRED,
null,
null,
ImmutableList.of(),
null,
true);
}

public ColumnMetadata getColumnMetadata()
{
return ColumnMetadata.builder()
.setName(trinoColumnName)
.setType(trinoType)
.setHidden(true)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static io.airlift.slice.Slices.utf8Slice;
import static io.trino.plugin.bigquery.BigQueryType.toTrinoTimestamp;
import static io.trino.plugin.bigquery.BigQueryUtil.toBigQueryColumnName;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.DateType.DATE;
Expand Down Expand Up @@ -141,7 +142,7 @@ public Page getNextPage()
pageBuilder.declarePosition();
for (int column = 0; column < columnTypes.size(); column++) {
BlockBuilder output = pageBuilder.getBlockBuilder(column);
appendTo(columnTypes.get(column), record.get(columnNames.get(column)), output);
appendTo(columnTypes.get(column), record.get(toBigQueryColumnName(columnNames.get(column))), output);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.cloud.bigquery.RangePartitioning;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableDefinition;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;
Expand All @@ -34,6 +38,7 @@ public class BigQueryTableHandle
private final SchemaTableName schemaTableName;
private final RemoteTableName remoteTableName;
private final String type;
private final Optional<BigQueryPartitionType> partitionType;
private final TupleDomain<ColumnHandle> constraint;
private final Optional<List<ColumnHandle>> projectedColumns;

Expand All @@ -42,12 +47,14 @@ public BigQueryTableHandle(
@JsonProperty("schemaTableName") SchemaTableName schemaTableName,
@JsonProperty("remoteTableName") RemoteTableName remoteTableName,
@JsonProperty("type") String type,
@JsonProperty("partitionType") Optional<BigQueryPartitionType> partitionType,
@JsonProperty("constraint") TupleDomain<ColumnHandle> constraint,
@JsonProperty("projectedColumns") Optional<List<ColumnHandle>> projectedColumns)
{
this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
this.remoteTableName = requireNonNull(remoteTableName, "remoteTableName is null");
this.type = requireNonNull(type, "type is null");
this.partitionType = requireNonNull(partitionType, "partitionType is null");
this.constraint = requireNonNull(constraint, "constraint is null");
this.projectedColumns = requireNonNull(projectedColumns, "projectedColumns is null");
}
Expand All @@ -58,6 +65,7 @@ public BigQueryTableHandle(SchemaTableName schemaTableName, RemoteTableName remo
schemaTableName,
remoteTableName,
tableInfo.getDefinition().getType().toString(),
getPartitionType(tableInfo.getDefinition()),
TupleDomain.all(),
Optional.empty());
}
Expand All @@ -80,6 +88,12 @@ public String getType()
return type;
}

@JsonProperty
public Optional<BigQueryPartitionType> getPartitionType()
{
return partitionType;
}

@JsonProperty
public TupleDomain<ColumnHandle> getConstraint()
{
Expand All @@ -106,14 +120,15 @@ public boolean equals(Object o)
// TODO: Add tests for this (see TestJdbcTableHandle#testEquivalence for reference)
return Objects.equals(schemaTableName, that.schemaTableName) &&
Objects.equals(type, that.type) &&
Objects.equals(partitionType, that.partitionType) &&
Objects.equals(constraint, that.constraint) &&
Objects.equals(projectedColumns, that.projectedColumns);
}

@Override
public int hashCode()
{
return Objects.hash(schemaTableName, type, constraint, projectedColumns);
return Objects.hash(schemaTableName, type, partitionType, constraint, projectedColumns);
}

@Override
Expand All @@ -123,18 +138,47 @@ public String toString()
.add("remoteTableName", remoteTableName)
.add("schemaTableName", schemaTableName)
.add("type", type)
.add("partitionType", partitionType)
.add("constraint", constraint)
.add("projectedColumns", projectedColumns)
.toString();
}

BigQueryTableHandle withConstraint(TupleDomain<ColumnHandle> newConstraint)
{
return new BigQueryTableHandle(schemaTableName, remoteTableName, type, newConstraint, projectedColumns);
return new BigQueryTableHandle(schemaTableName, remoteTableName, type, partitionType, newConstraint, projectedColumns);
}

BigQueryTableHandle withProjectedColumns(List<ColumnHandle> newProjectedColumns)
{
return new BigQueryTableHandle(schemaTableName, remoteTableName, type, constraint, Optional.of(newProjectedColumns));
return new BigQueryTableHandle(schemaTableName, remoteTableName, type, partitionType, constraint, Optional.of(newProjectedColumns));
}

public enum BigQueryPartitionType
{
TIME,
INGESTION,
RANGE,
/**/
}

private static Optional<BigQueryPartitionType> getPartitionType(TableDefinition definition)
{
if (definition instanceof StandardTableDefinition) {
StandardTableDefinition standardTableDefinition = (StandardTableDefinition) definition;
RangePartitioning rangePartition = standardTableDefinition.getRangePartitioning();
if (rangePartition != null) {
return Optional.of(BigQueryPartitionType.RANGE);
}

TimePartitioning timePartition = standardTableDefinition.getTimePartitioning();
if (timePartition != null) {
if (timePartition.getField() != null) {
return Optional.of(BigQueryPartitionType.TIME);
}
return Optional.of(BigQueryPartitionType.INGESTION);
}
}
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import io.grpc.Status;
import io.grpc.StatusRuntimeException;

import java.util.Arrays;
import java.util.Optional;
import java.util.Set;

import static com.google.cloud.http.BaseHttpServiceException.UNKNOWN_CODE;
Expand All @@ -31,8 +33,6 @@ public final class BigQueryUtil
"Connection closed with unknown cause",
"Received unexpected EOS on DATA frame from server");

private static final Set<String> INVALID_COLUMN_NAMES = ImmutableSet.of("_partitiondate", "_PARTITIONDATE", "_partitiontime", "_PARTITIONTIME");

private BigQueryUtil() {}

public static boolean isRetryable(Throwable cause)
Expand All @@ -56,8 +56,14 @@ public static BigQueryException convertToBigQueryException(BigQueryError error)
return new BigQueryException(UNKNOWN_CODE, error.getMessage(), error);
}

public static boolean validColumnName(String columnName)
public static String toBigQueryColumnName(String columnName)
{
return !INVALID_COLUMN_NAMES.contains(columnName);
Optional<BigQueryPseudoColumn> pseudoColumn = Arrays.stream(BigQueryPseudoColumn.values())
.filter(column -> column.getTrinoColumnName().equals(columnName))
.findFirst();
if (pseudoColumn.isPresent()) {
return pseudoColumn.get().getBigqueryColumnName();
}
return columnName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public ReadSession create(ConnectorSession session, TableId remoteTable, List<St
TableInfo actualTable = getActualTable(client, tableDetails, selectedFields);

List<String> filteredSelectedFields = selectedFields.stream()
.filter(BigQueryUtil::validColumnName)
.map(BigQueryUtil::toBigQueryColumnName)
.collect(toList());

try (BigQueryReadClient bigQueryReadClient = bigQueryReadClientFactory.create(session)) {
Expand Down
Loading

0 comments on commit 5881a66

Please sign in to comment.