-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
INSERT/REPLACE can omit clustering when catalog has default #16260
Changes from all commits
fafcc76
fe2c407
80151fc
357e6a7
fd6cb24
3012773
853ea76
7b20b83
9890d91
758a414
0401766
b042bb6
fdf2140
736a7c8
000e015
7ad8289
c4bb77d
6181eef
87b4dd2
f9f6b7b
009b684
7cc749a
03838bf
3b9d78d
659cac0
a0880bc
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -40,10 +40,12 @@ | |
import org.apache.calcite.sql.SqlUtil; | ||
import org.apache.calcite.sql.SqlWindow; | ||
import org.apache.calcite.sql.SqlWith; | ||
import org.apache.calcite.sql.fun.SqlStdOperatorTable; | ||
import org.apache.calcite.sql.parser.SqlParserPos; | ||
import org.apache.calcite.sql.type.SqlTypeName; | ||
import org.apache.calcite.sql.type.SqlTypeUtil; | ||
import org.apache.calcite.sql.validate.IdentifierNamespace; | ||
import org.apache.calcite.sql.validate.SelectNamespace; | ||
import org.apache.calcite.sql.validate.SqlNonNullableAccessors; | ||
import org.apache.calcite.sql.validate.SqlValidatorException; | ||
import org.apache.calcite.sql.validate.SqlValidatorNamespace; | ||
|
@@ -53,6 +55,7 @@ | |
import org.apache.calcite.util.Static; | ||
import org.apache.calcite.util.Util; | ||
import org.apache.druid.catalog.model.facade.DatasourceFacade; | ||
import org.apache.druid.catalog.model.table.ClusterKeySpec; | ||
import org.apache.druid.common.utils.IdUtils; | ||
import org.apache.druid.error.InvalidSqlInput; | ||
import org.apache.druid.java.util.common.StringUtils; | ||
|
@@ -68,9 +71,11 @@ | |
import org.apache.druid.sql.calcite.run.EngineFeature; | ||
import org.apache.druid.sql.calcite.table.DatasourceTable; | ||
import org.apache.druid.sql.calcite.table.RowSignatures; | ||
import org.apache.druid.utils.CollectionUtils; | ||
import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Objects; | ||
|
@@ -281,6 +286,37 @@ public void validateInsert(final SqlInsert insert) | |
} | ||
} | ||
|
||
@Override | ||
protected SelectNamespace createSelectNamespace( | ||
SqlSelect select, | ||
SqlNode enclosingNode) | ||
{ | ||
if (enclosingNode instanceof DruidSqlIngest) { | ||
// The target is a new or existing datasource. | ||
// The target namespace is both the target table ID and the row type for that table. | ||
final SqlValidatorNamespace targetNamespace = Objects.requireNonNull( | ||
getNamespace(enclosingNode), | ||
() -> "namespace for " + enclosingNode | ||
); | ||
final IdentifierNamespace insertNs = (IdentifierNamespace) targetNamespace; | ||
SqlIdentifier identifier = insertNs.getId(); | ||
SqlValidatorTable catalogTable = getCatalogReader().getTable(identifier.names); | ||
if (catalogTable != null) { | ||
final DatasourceTable table = catalogTable.unwrap(DatasourceTable.class); | ||
|
||
// An existing datasource may have metadata. | ||
final DatasourceFacade tableMetadata = table == null | ||
? null | ||
: table.effectiveMetadata().catalogMetadata(); | ||
// Convert CLUSTERED BY, or the catalog equivalent, to an ORDER BY clause | ||
final SqlNodeList catalogClustering = convertCatalogClustering(tableMetadata); | ||
rewriteClusteringToOrderBy(select, (DruidSqlIngest) enclosingNode, catalogClustering); | ||
return new SelectNamespace(this, select, enclosingNode); | ||
} | ||
} | ||
return super.createSelectNamespace(select, enclosingNode); | ||
} | ||
|
||
/** | ||
* Validate the target table. Druid {@code INSERT/REPLACE} can create a new datasource, | ||
* or insert into an existing one. If the target exists, it must be a datasource. If it | ||
|
@@ -349,6 +385,32 @@ private DatasourceTable validateInsertTarget( | |
} | ||
} | ||
|
||
/** | ||
* Clustering is a kind of ordering. We push the CLUSTERED BY clause into the source query as | ||
* an ORDER BY clause. In an ideal world, clustering would be outside of SELECT: it is an operation | ||
* applied after the data is selected. For now, we push it into the SELECT, then MSQ pulls it back | ||
* out. This is unfortunate as errors will be attributed to ORDER BY, which the user did not | ||
* actually specify (it is an error to do so.) However, with the current hybrid structure, it is | ||
* not possible to add the ORDER by later: doing so requires access to the order by namespace | ||
* which is not visible to subclasses. | ||
*/ | ||
private void rewriteClusteringToOrderBy(SqlNode source, DruidSqlIngest ingestNode, @Nullable SqlNodeList catalogClustering) | ||
zachjsh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ | ||
SqlNodeList clusteredBy = ingestNode.getClusteredBy(); | ||
if (clusteredBy == null || clusteredBy.getList().isEmpty()) { | ||
if (catalogClustering == null || catalogClustering.getList().isEmpty()) { | ||
return; | ||
} | ||
clusteredBy = catalogClustering; | ||
} | ||
while (source instanceof SqlWith) { | ||
source = ((SqlWith) source).getOperandList().get(1); | ||
} | ||
final SqlSelect select = (SqlSelect) source; | ||
|
||
select.setOrderBy(clusteredBy); | ||
} | ||
|
||
/** | ||
* Gets the effective PARTITIONED BY granularity. Resolves the granularity from the granularity specified on the | ||
* ingest node, and on the table metadata as stored in catalog, if any. Mismatches between the 2 granularities are | ||
|
@@ -397,6 +459,34 @@ private Granularity getEffectiveGranularity( | |
return effectiveGranularity; | ||
} | ||
|
||
@Nullable | ||
private SqlNodeList convertCatalogClustering(final DatasourceFacade tableMetadata) | ||
{ | ||
if (tableMetadata == null) { | ||
return null; | ||
} | ||
final List<ClusterKeySpec> keyCols = tableMetadata.clusterKeys(); | ||
if (CollectionUtils.isNullOrEmpty(keyCols)) { | ||
return null; | ||
} | ||
final SqlNodeList keyNodes = new SqlNodeList(SqlParserPos.ZERO); | ||
for (ClusterKeySpec keyCol : keyCols) { | ||
final SqlIdentifier colIdent = new SqlIdentifier( | ||
Collections.singletonList(keyCol.expr()), | ||
null, SqlParserPos.ZERO, | ||
Collections.singletonList(SqlParserPos.ZERO) | ||
); | ||
Comment on lines
+474
to
+478
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was wondering what will happen in the following case:
but it seems like the column in the select list will take precedence. one more thing I was wondering about: do we have a check that all There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. About whether there is a check that al keyCols are present in the selected column list, see the following tests:
Do these cover the cases you are talking about? About the join issue, do you have a concrete query in example, just to clarify? |
||
final SqlNode keyNode; | ||
if (keyCol.desc()) { | ||
keyNode = SqlStdOperatorTable.DESC.createCall(SqlParserPos.ZERO, colIdent); | ||
} else { | ||
keyNode = colIdent; | ||
} | ||
keyNodes.add(keyNode); | ||
} | ||
return keyNodes; | ||
} | ||
|
||
/** | ||
* Compute and validate the target type. In normal SQL, the engine would insert | ||
* a project operator after the SELECT before the write to cast columns from the | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wouldn't the fall-thru from this condtional will cause that the
CLUSTER BY
on theingestNode
will not be applied (line399 right now); even if its there - is that okay?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the ingestNode already has the clustering columns, they will be used. There are existing tests which test that the clustering columns are used in the plan returned from dml query, when clustering is defined at query time, and the table is / it not in catalog. Let me know if this covers the issue that think could occur.