Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

INSERT/REPLACE can omit clustering when catalog has default #16260

Merged
merged 26 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
fafcc76
* fix
zachjsh Mar 27, 2024
fe2c407
* fix
zachjsh Mar 28, 2024
80151fc
Merge remote-tracking branch 'apache/master' into fix-complex-types-s…
zachjsh Mar 28, 2024
357e6a7
* address review comments
zachjsh Mar 29, 2024
fd6cb24
* fix
zachjsh Apr 1, 2024
3012773
* simplify tests
zachjsh Apr 1, 2024
853ea76
* fix complex type nullability issue
zachjsh Apr 2, 2024
7b20b83
Merge remote-tracking branch 'apache/master' into validate-catalog-co…
zachjsh Apr 3, 2024
9890d91
Merge remote-tracking branch 'apache/master' into validate-catalog-co…
zachjsh Apr 8, 2024
758a414
* implement and add tests
zachjsh Apr 10, 2024
0401766
Merge remote-tracking branch 'apache/master' into use-catalog-cluster…
zachjsh Apr 11, 2024
b042bb6
Merge remote-tracking branch 'apache/master' into validate-catalog-co…
zachjsh Apr 16, 2024
fdf2140
* address review comments
zachjsh Apr 16, 2024
736a7c8
* address test review comments
zachjsh Apr 16, 2024
000e015
Merge remote-tracking branch 'origin/validate-catalog-complex-columns…
zachjsh Apr 16, 2024
7ad8289
* fix checkstyle
zachjsh Apr 16, 2024
c4bb77d
Merge remote-tracking branch 'origin/validate-catalog-complex-columns…
zachjsh Apr 16, 2024
6181eef
* fix dependencies
zachjsh Apr 16, 2024
87b4dd2
* all tests passing
zachjsh Apr 17, 2024
f9f6b7b
* cleanup
zachjsh Apr 17, 2024
009b684
Merge remote-tracking branch 'apache/master' into use-catalog-cluster…
zachjsh Apr 17, 2024
7cc749a
* remove unneeded code
zachjsh Apr 17, 2024
03838bf
* remove unused dependency
zachjsh Apr 18, 2024
3b9d78d
Merge remote-tracking branch 'apache/master' into use-catalog-cluster…
zachjsh Apr 25, 2024
659cac0
* fix checkstyle
zachjsh Apr 26, 2024
a0880bc
Merge remote-tracking branch 'apache/master' into use-catalog-cluster…
zachjsh Apr 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void finalizeTestFramework(SqlTestFramework sqlTestFramework)

public void buildDatasources()
{
resolvedTables.forEach((datasourceName, datasourceTable) -> {
RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> {
DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
Expand All @@ -92,14 +92,6 @@ public void buildDatasources()

createTableMetadata(tableBuilder.build());
});
DatasourceFacade catalogMetadata =
((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
tableBuilder.column(columnFacade.spec().name(), columnFacade.spec().dataType());
}
);
}

private void createTableMetadata(TableMetadata table)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void finalizeTestFramework(SqlTestFramework sqlTestFramework)

public void buildDatasources()
{
resolvedTables.forEach((datasourceName, datasourceTable) -> {
RESOLVED_TABLES.forEach((datasourceName, datasourceTable) -> {
DatasourceFacade catalogMetadata = ((DatasourceTable) datasourceTable).effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource(datasourceName, catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
Expand All @@ -92,7 +92,7 @@ public void buildDatasources()
createTableMetadata(tableBuilder.build());
});
DatasourceFacade catalogMetadata =
((DatasourceTable) resolvedTables.get("foo")).effectiveMetadata().catalogMetadata();
((DatasourceTable) RESOLVED_TABLES.get("foo")).effectiveMetadata().catalogMetadata();
TableBuilder tableBuilder = TableBuilder.datasource("foo", catalogMetadata.segmentGranularityString());
catalogMetadata.columnFacades().forEach(
columnFacade -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

//CHECKSTYLE.OFF: PackageName - Must be in Calcite

package org.apache.calcite.sql.validate;

import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlSelect;

public class ValidatorShim
{
/**
* Provide the ability to create an OrderBy scope to Druid-specific code.
* @param parent
* @param orderList
* @param select
* @return
*/
public static OrderByScope newOrderByScope(
SqlValidatorScope parent,
SqlNodeList orderList,
SqlSelect select)
{
return new OrderByScope(parent, orderList, select);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,36 +40,49 @@
import org.apache.calcite.sql.SqlUtil;
import org.apache.calcite.sql.SqlWindow;
import org.apache.calcite.sql.SqlWith;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.sql.type.SqlTypeUtil;
import org.apache.calcite.sql.util.IdPair;
import org.apache.calcite.sql.validate.IdentifierNamespace;
import org.apache.calcite.sql.validate.OrderByScope;
import org.apache.calcite.sql.validate.SqlNonNullableAccessors;
import org.apache.calcite.sql.validate.SqlValidatorException;
import org.apache.calcite.sql.validate.SqlValidatorImpl;
import org.apache.calcite.sql.validate.SqlValidatorNamespace;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.sql.validate.SqlValidatorTable;
import org.apache.calcite.sql.validate.ValidatorShim;
import org.apache.calcite.util.Pair;
import org.apache.calcite.util.Static;
import org.apache.calcite.util.Util;
import org.apache.commons.lang.reflect.FieldUtils;
import org.apache.druid.catalog.model.facade.DatasourceFacade;
import org.apache.druid.common.config.NullHandling;
import org.apache.druid.catalog.model.table.ClusterKeySpec;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.error.InvalidSqlInput;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.Types;
import org.apache.druid.segment.column.ValueType;
import org.apache.druid.sql.calcite.parser.DruidSqlIngest;
import org.apache.druid.sql.calcite.parser.DruidSqlInsert;
import org.apache.druid.sql.calcite.parser.DruidSqlParserUtils;
import org.apache.druid.sql.calcite.parser.ExternalDestinationSqlIdentifier;
import org.apache.druid.sql.calcite.run.EngineFeature;
import org.apache.druid.sql.calcite.table.DatasourceTable;
import org.apache.druid.sql.calcite.table.RowSignatures;
import org.apache.druid.utils.CollectionUtils;
import org.checkerframework.checker.nullness.qual.Nullable;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -88,6 +101,39 @@ public class DruidSqlValidator extends BaseDruidSqlValidator

private final PlannerContext plannerContext;

// This part is a bit sad. By the time we get here, the validator will have created
// the ORDER BY namespace if we had a real ORDER BY. We have to "catch up" and do the
// work that registerQuery() should have done. That's kind of OK. But, the orderScopes
// variable is private, so we have to play dirty tricks to get at it.
//
// Warning: this may no longer work if Java forbids access to private fields in a
// future release.
private static final Field CLAUSE_SCOPES_FIELD;
private static final Object ORDER_CLAUSE;

static {
try {
CLAUSE_SCOPES_FIELD = FieldUtils.getDeclaredField(
SqlValidatorImpl.class,
"clauseScopes",
true
);
}
catch (RuntimeException e) {
throw new ISE(e, "SqlValidatorImpl.clauseScopes is not accessible");
}

try {
Class<?> innerClass = Class.forName("org.apache.calcite.sql.validate.SqlValidatorImpl$Clause");
Method method = innerClass.getMethod("valueOf", Class.class, String.class);
method.setAccessible(true);
ORDER_CLAUSE = method.invoke(null, innerClass, "ORDER");
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
}
catch (Exception e) {
throw new ISE(e, "Could not construct ORDER clause instance.");
}
}

protected DruidSqlValidator(
SqlOperatorTable opTab,
CalciteCatalogReader catalogReader,
Expand Down Expand Up @@ -230,6 +276,10 @@ public void validateInsert(final SqlInsert insert)
// The source must be a SELECT
final SqlNode source = insert.getSource();

// Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause
final SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata);
rewriteClusteringToOrderBy(source, ingestNode, catalogClustering);

// Validate the source statement.
// Because of the non-standard Druid semantics, we can't define the target type: we don't know
// the target columns yet, and we can't infer types when they must come from the SELECT.
Expand Down Expand Up @@ -348,6 +398,42 @@ private DatasourceTable validateInsertTarget(
}
}

/**
* Clustering is a kind of ordering. We push the CLUSTERED BY clause into the source query as
* an ORDER BY clause. In an ideal world, clustering would be outside of SELECT: it is an operation
* applied after the data is selected. For now, we push it into the SELECT, then MSQ pulls it back
* out. This is unfortunate as errors will be attributed to ORDER BY, which the user did not
* actually specify (it is an error to do so.) However, with the current hybrid structure, it is
* not possible to add the ORDER by later: doing so requires access to the order by namespace
* which is not visible to subclasses.
*/
private void rewriteClusteringToOrderBy(SqlNode source, DruidSqlIngest ingestNode, @Nullable SqlNodeList catalogClustering)
zachjsh marked this conversation as resolved.
Show resolved Hide resolved
{
SqlNodeList clusteredBy = ingestNode.getClusteredBy();
if (clusteredBy == null || clusteredBy.getList().isEmpty()) {
if (catalogClustering == null || catalogClustering.getList().isEmpty()) {
return;
}
clusteredBy = catalogClustering;
}
while (source instanceof SqlWith) {
source = ((SqlWith) source).getOperandList().get(1);
}
final SqlSelect select = (SqlSelect) source;

select.setOrderBy(clusteredBy);
final OrderByScope orderScope = ValidatorShim.newOrderByScope(scopes.get(select), clusteredBy, select);
try {
@SuppressWarnings("unchecked")
final Map<IdPair<SqlSelect, Object>, Object> orderScopes =
(Map<IdPair<SqlSelect, Object>, Object>) CLAUSE_SCOPES_FIELD.get(this);
orderScopes.put(IdPair.of(select, ORDER_CLAUSE), orderScope);
}
catch (Exception e) {
throw new ISE(e, "scopes is not accessible");
}
}

/**
* Gets the effective PARTITIONED BY granularity. Resolves the granularity from the granularity specified on the
* ingest node, and on the table metadata as stored in catalog, if any. Mismatches between the 2 granularities are
Expand Down Expand Up @@ -396,6 +482,39 @@ private Granularity getEffectiveGranularity(
return effectiveGranularity;
}

private String statementArticle(final String operationName)
{
return "INSERT".equals(operationName) ? "an" : "a";
}

@Nullable
private SqlNodeList convertCatalogClustering(final DatasourceFacade tableMetadata)
{
if (tableMetadata == null) {
return null;
}
final List<ClusterKeySpec> keyCols = tableMetadata.clusterKeys();
if (CollectionUtils.isNullOrEmpty(keyCols)) {
return null;
}
final SqlNodeList keyNodes = new SqlNodeList(SqlParserPos.ZERO);
for (ClusterKeySpec keyCol : keyCols) {
final SqlIdentifier colIdent = new SqlIdentifier(
Collections.singletonList(keyCol.expr()),
null, SqlParserPos.ZERO,
Collections.singletonList(SqlParserPos.ZERO)
);
Comment on lines +474 to +478
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering what will happen in the following case:

  • say colunmn c is a clusterKey
  • we are selecting from a join which has column c on both sides

but it seems like the column in the select list will take precedence.

one more thing I was wondering about: do we have a check that all keyCols are present in the selected column list?

Copy link
Contributor Author

@zachjsh zachjsh Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About whether there is a check that al keyCols are present in the selected column list, see the following tests:

testInsertTableWithClusteringWithClusteringOnNewColumnFromQuery
testInsertTableWithClusteringWithClusteringOnBadColumn

Do these cover the cases you are talking about?

About the join issue, do you have a concrete query in example, just to clarify?

final SqlNode keyNode;
if (keyCol.desc()) {
keyNode = SqlStdOperatorTable.DESC.createCall(SqlParserPos.ZERO, colIdent);
} else {
keyNode = colIdent;
}
keyNodes.add(keyNode);
}
return keyNodes;
}

/**
* Compute and validate the target type. In normal SQL, the engine would insert
* a project operator after the SELECT before the write to cast columns from the
Expand Down Expand Up @@ -475,18 +594,27 @@ private RelDataType validateTargetType(
continue;
}
SqlTypeName sqlTypeName = SqlTypeName.get(definedCol.sqlStorageType());
RelDataType relType = typeFactory.createSqlType(sqlTypeName);
if (NullHandling.replaceWithDefault() && !SqlTypeFamily.STRING.contains(relType)) {
fields.add(Pair.of(
colName,
relType
));
RelDataType relType;
if (sqlTypeName != null) {
relType = typeFactory.createSqlType(sqlTypeName);
} else {
fields.add(Pair.of(
colName,
typeFactory.createTypeWithNullability(relType, true)
));
ColumnType columnType = ColumnType.fromString(definedCol.sqlStorageType());
if (columnType != null && columnType.getType().equals(ValueType.COMPLEX)) {
relType = RowSignatures.makeComplexType(typeFactory, columnType, sourceField.getType().isNullable());
} else {
relType = RowSignatures.columnTypeToRelDataType(
typeFactory,
columnType,
// this nullability is ignored for complex types for some reason, hence the check for complex above.
sourceField.getType().isNullable()
);
}
}

fields.add(Pair.of(
colName,
typeFactory.createTypeWithNullability(relType, sourceField.getType().isNullable())
));
}

// Perform the SQL-standard check: that the SELECT column can be
Expand Down Expand Up @@ -517,7 +645,17 @@ protected void checkTypeAssignment(
ColumnType sourceFieldColumnType = Calcites.getColumnTypeForRelDataType(sourceFielRelDataType);
ColumnType targetFieldColumnType = Calcites.getColumnTypeForRelDataType(targetFieldRelDataType);

if (targetFieldColumnType != ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType)) {
boolean incompatible;
try {
incompatible = !Objects.equals(
targetFieldColumnType,
ColumnType.leastRestrictiveType(targetFieldColumnType, sourceFieldColumnType)
);
}
catch (Types.IncompatibleTypeException e) {
incompatible = true;
}
if (incompatible) {
SqlNode node = getNthExpr(query, i, sourceCount);
String targetTypeString;
String sourceTypeString;
Expand Down
Loading
Loading