From e8b6a4a3693c213a7fa13d78b538ea009d8ee2ba Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Thu, 23 Mar 2023 16:16:43 +0800 Subject: [PATCH] Fix read from range column partition table error (#2639) (#2656) --- .../spark/sql/execution/CoprocessorRDD.scala | 5 +- .../spark/sql/PartitionTableSuite.scala | 12 +++++ docs/features/partition_table.md | 39 +++++++++++++++ .../tikv/expression/PartitionPruner.java | 48 ++++++++++++++++++- 4 files changed, 101 insertions(+), 3 deletions(-) create mode 100644 docs/features/partition_table.md diff --git a/core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala b/core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala index 41319c2cc6..be47814ff6 100644 --- a/core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala +++ b/core/src/main/scala/org/apache/spark/sql/execution/CoprocessorRDD.scala @@ -56,9 +56,12 @@ trait LeafColumnarExecRDD extends LeafExecNode { } b.append("]") b.toString - } else { + } else if (tiRDDs.lengthCompare(1) == 0) { s"${dagRequest.getStoreType.name()} $nodeName{$dagRequest}" + s"${TiUtil.getReqEstCountStr(dagRequest)}" + } else { + // return empty TiRDD when there is no tiRDDs + "Empty TiRDD" } def dagRequest: TiDAGRequest = tiRDDs.head.dagRequest diff --git a/core/src/test/scala/org/apache/spark/sql/PartitionTableSuite.scala b/core/src/test/scala/org/apache/spark/sql/PartitionTableSuite.scala index 32e28b53e9..4ac87583da 100644 --- a/core/src/test/scala/org/apache/spark/sql/PartitionTableSuite.scala +++ b/core/src/test/scala/org/apache/spark/sql/PartitionTableSuite.scala @@ -32,6 +32,18 @@ class PartitionTableSuite extends BasePlanTest { super.afterAll() } + test("reading from range column partition") { + tidbStmt.execute("drop table if exists range_column_test") + tidbStmt.execute( + "create table range_column_test (id varchar(10)) partition by RANGE COLUMNS(`id`) (PARTITION `p1` VALUES LESS THAN ('''CN001'''),PARTITION `p2` VALUES LESS THAN ('CN002'))") + tidbStmt.execute("insert into `range_column_test` values('CN001')") + tidbStmt.execute("insert into `range_column_test` values('''CN001''')") + + judge("select * from range_column_test where id = 'CN001'") + judge("select * from range_column_test where id = '\\'CN001\\''") + judge("select * from range_column_test where id = 'CN002'") + } + test("reading from hash partition") { enablePartitionForTiDB() tidbStmt.execute("drop table if exists t") diff --git a/docs/features/partition_table.md b/docs/features/partition_table.md new file mode 100644 index 0000000000..6bd7406671 --- /dev/null +++ b/docs/features/partition_table.md @@ -0,0 +1,39 @@ +# TiSpark partition table + +## Read from partition table + +TiSpark supports reads the range, hash and list partition table from TiDB. + +TiSpark doesn't support a MySQL/TiDB partition table syntax `select col_name from table_name partition(partition_name)`, but you can still use `where` condition to filter the partitions. + +## Partition pruning in Reading + +TiSpark decides whether to apply partition pruning according to the partition type and the partition expression associated with the table. If partition pruning is not applied, TiSpark's reading is equivalent to doing a table scan over all partitions. + +TiSpark only supports partition pruning with the following partition expression in **range** partition: + ++ column expression ++ `YEAR(col)` and its type is datetime/string/date literal that can be parsed as datetime. ++ `TO_DAYS(col)` and its type is datetime/string/date literal that can be parsed as datetime. + +### Limitations + +- TiSpark does not support partition pruning in hash and list partition. +- TiSpark can not apply partition pruning for some special characters in partition definition. For example, Partition definition with `""` can not be pruned. such as `partition p0 values less than ('"string"')`. + +## Write into partition table + +Currently, TiSpark only supports writing into the range and hash partition table under the following conditions: ++ the partition expression is column expression ++ the partition expression is `YEAR($argument)` where the argument is a column and its type is datetime or string literal + that can be parsed as datetime. + +There are two ways to write into partition table: +1. Use datasource API to write into partition table which supports replace and append semantics. +2. Use delete statement with Spark SQL. + +> [!NOTE] +> Because different character sets and collations have different sort orders, the character sets and +> collations in use may affect which partition of a table partitioned by RANGE COLUMNS a given row +> is stored in when using string columns as partitioning columns. +> For supported character sets and collations, see [Limitations](../README.md#limitations) \ No newline at end of file diff --git a/tikv-client/src/main/java/com/pingcap/tikv/expression/PartitionPruner.java b/tikv-client/src/main/java/com/pingcap/tikv/expression/PartitionPruner.java index d1077cd3ff..fbc7d9273e 100644 --- a/tikv-client/src/main/java/com/pingcap/tikv/expression/PartitionPruner.java +++ b/tikv-client/src/main/java/com/pingcap/tikv/expression/PartitionPruner.java @@ -68,6 +68,13 @@ public static List prune(TiTableInfo tableInfo, List return tableInfo.getPartitionInfo().getDefs(); } + // prune can not handle \" now. + for (int i = 0; i < tableInfo.getPartitionInfo().getDefs().size(); i++) { + TiPartitionDef pDef = tableInfo.getPartitionInfo().getDefs().get(i); + if (pDef.getLessThan().get(0).contains("\"")) { + return tableInfo.getPartitionInfo().getDefs(); + } + } RangeColumnPartitionPruner pruner = new RangeColumnPartitionPruner(tableInfo); return pruner.prune(filters); } @@ -88,7 +95,7 @@ static void generateRangeExprs( // partExprColRefs.addAll(PredicateUtils.extractColumnRefFromExpression(partExpr)); for (int i = 0; i < partInfo.getDefs().size(); i++) { TiPartitionDef pDef = partInfo.getDefs().get(i); - String current = pDef.getLessThan().get(lessThanIdx); + String current = wrapValue(pDef.getLessThan().get(lessThanIdx)); String leftHand; if (current.equals("MAXVALUE")) { leftHand = "true"; @@ -98,7 +105,7 @@ static void generateRangeExprs( if (i == 0) { partExprs.add(parser.parseExpression(leftHand)); } else { - String previous = partInfo.getDefs().get(i - 1).getLessThan().get(lessThanIdx); + String previous = wrapValue(partInfo.getDefs().get(i - 1).getLessThan().get(lessThanIdx)); String and = String.format("%s >= %s and %s", wrapColumnName(partExprStr), previous, leftHand); partExprs.add(parser.parseExpression(and)); @@ -116,4 +123,41 @@ private static String wrapColumnName(String columnName) { return String.format("`%s`", columnName); } } + + /** + * Spark SQL will parse string literal without escape, So we need to parse partition definition + * without escape too. + * + *

wrapValue will replace the first '' to "", so that antlr will not regard the first '' as a + * part of string literal. + * + *

wrapValue will also delete the escape character in string literal. + * + *

e.g. 'string' -> "string" '''string''' -> "'string'" 'string''' -> "string'" + * + *

Can't handle '""'. e.g. '"string"' -> ""string"". parseExpression will parse ""string"" to + * empty string, parse '"string"' to 'string' + * + * @param value + * @return + */ + private static String wrapValue(String value) { + if (value.startsWith("'") && value.endsWith("'")) { + String newValue = String.format("\"%s\"", value.substring(1, value.length() - 1)); + StringBuilder valueWithoutEscape = new StringBuilder(); + for (int i = 0; i < newValue.length(); i++) { + if (newValue.charAt(i) != '\'') { + valueWithoutEscape.append(newValue.charAt(i)); + } else { + if (i + 1 < newValue.length()) { + valueWithoutEscape.append(newValue.charAt(i + 1)); + } + i++; + } + } + return valueWithoutEscape.toString(); + } else { + return value; + } + } }