Skip to content

Commit

Permalink
INSERT/REPLACE can omit clustering when catalog has default (#16260)
Browse files Browse the repository at this point in the history
* * fix

* * fix

* * address review comments

* * fix

* * simplify tests

* * fix complex type nullability issue

* * implement and add tests

* * address review comments

* * address test review comments

* * fix checkstyle

* * fix dependencies

* * all tests passing

* * cleanup

* * remove unneeded code

* * remove unused dependency

* * fix checkstyle
  • Loading branch information
zachjsh committed Apr 26, 2024
1 parent 64a6fc8 commit 365cd7e
Show file tree
Hide file tree
Showing 2 changed files with 370 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,12 @@
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWindow;
import org.apache.calcite.sql.SqlWith;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.validate.IdentifierNamespace;
import org.apache.calcite.sql.validate.SelectNamespace;
import org.apache.calcite.sql.validate.SqlNonNullableAccessors;
import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.calcite.sql.validate.SqlValidatorNamespace;
Expand All @@ -53,6 +55,7 @@
import org.apache.calcite.util.Static;
import org.apache.calcite.util.Util;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.StringUtils;
Expand All @@ -68,9 +71,11 @@
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
import org.apache.druid.utils.CollectionUtils;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -281,6 +286,37 @@ public void validateInsert(final SqlInsert insert)
}
}

@Override
protected SelectNamespace createSelectNamespace(
SqlSelect select,
SqlNode enclosingNode)
{
if (enclosingNode instanceof DruidSqlIngest) {
// The target is a new or existing datasource.
// The target namespace is both the target table ID and the row type for that table.
final SqlValidatorNamespace targetNamespace = Objects.requireNonNull(
getNamespace(enclosingNode),
() -> "namespace for " + enclosingNode
);
final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace;
SqlIdentifier identifier = insertNs.getId();
SqlValidatorTable catalogTable = getCatalogReader().getTable(identifier.names);
if (catalogTable != null) {
final DatasourceTable table = catalogTable.unwrap(DatasourceTable.class);

// An existing datasource may have metadata.
final DatasourceFacade tableMetadata = table == null
? null
: table.effectiveMetadata().catalogMetadata();
// Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
final SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
rewriteClusteringToOrderBy(select, (DruidSqlIngest) enclosingNode, catalogClustering);
return new SelectNamespace(this, select, enclosingNode);
}
}
return super.createSelectNamespace(select, enclosingNode);
}

/**
* Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource,
* or insert into an existing one. If the target exists, it must be a datasource. If it
Expand Down Expand Up @@ -349,6 +385,32 @@ private DatasourceTable validateInsertTarget(
}
}

/**
* Clustering is a kind of ordering. We push the CLUSTERED BY clause into the source query as
* an ORDER BY clause. In an ideal world, clustering would be outside of SELECT: it is an operation
* applied after the data is selected. For now, we push it into the SELECT, then MSQ pulls it back
* out. This is unfortunate as errors will be attributed to ORDER BY, which the user did not
* actually specify (it is an error to do so.) However, with the current hybrid structure, it is
* not possible to add the ORDER by later: doing so requires access to the order by namespace
* which is not visible to subclasses.
*/
private void rewriteClusteringToOrderBy(SqlNode source, DruidSqlIngest ingestNode, @Nullable SqlNodeList catalogClustering)
{
SqlNodeList clusteredBy = ingestNode.getClusteredBy();
if (clusteredBy == null || clusteredBy.getList().isEmpty()) {
if (catalogClustering == null || catalogClustering.getList().isEmpty()) {
return;
}
clusteredBy = catalogClustering;
}
while (source instanceof SqlWith) {
source = ((SqlWith) source).getOperandList().get(1);
}
final SqlSelect select = (SqlSelect) source;

select.setOrderBy(clusteredBy);
}

/**
* Gets the effective PARTITIONED BY granularity. Resolves the granularity from the granularity specified on the
* ingest node, and on the table metadata as stored in catalog, if any. Mismatches between the 2 granularities are
Expand Down Expand Up @@ -397,6 +459,34 @@ private Granularity getEffectiveGranularity(
return effectiveGranularity;
}

@Nullable
private SqlNodeList convertCatalogClustering(final DatasourceFacade tableMetadata)
{
if (tableMetadata == null) {
return null;
}
final List<ClusterKeySpec> keyCols = tableMetadata.clusterKeys();
if (CollectionUtils.isNullOrEmpty(keyCols)) {
return null;
}
final SqlNodeList keyNodes = new SqlNodeList(SqlParserPos.ZERO);
for (ClusterKeySpec keyCol : keyCols) {
final SqlIdentifier colIdent = new SqlIdentifier(
Collections.singletonList(keyCol.expr()),
null, SqlParserPos.ZERO,
Collections.singletonList(SqlParserPos.ZERO)
);
final SqlNode keyNode;
if (keyCol.desc()) {
keyNode = SqlStdOperatorTable.DESC.createCall(SqlParserPos.ZERO, colIdent);
} else {
keyNode = colIdent;
}
keyNodes.add(keyNode);
}
return keyNodes;
}

/**
* Compute and validate the target type. In normal SQL, the engine would insert
* a project operator after the SELECT before the write to cast columns from the
Expand Down
Loading

0 comments on commit 365cd7e

Please sign in to comment.