From ea7b63564b28a42d06ef6c33d835ea4ea5a09880 Mon Sep 17 00:00:00 2001 From: Andreas Chatzistergiou Date: Tue, 23 Apr 2024 19:57:28 +0200 Subject: [PATCH] # This is a combination of 12 commits. # This is the 1st commit message: flush # This is the commit message #2: flush # This is the commit message #3: First sane version without isRowDeleted # This is the commit message #4: Hack RowIndexMarkingFilters # This is the commit message #5: Add support for non-vectorized readers # This is the commit message #6: Metadata column fix # This is the commit message #7: Avoid non-deterministic UDF to filter deleted rows # This is the commit message #8: metadata with Expression ID # This is the commit message #9: Fix complex views issue # This is the commit message #10: Tests # This is the commit message #11: cleaning # This is the commit message #12: More tests and fixes --- .../execution/DeltaTableOperations.scala | 3 +- .../sql/delta/DeltaParquetFileFormat.scala | 2 +- .../sql/delta/PreprocessTableWithDVs.scala | 74 +++++-- .../DMLWithDeletionVectorsHelper.scala | 18 +- .../spark/sql/delta/TightBoundsSuite.scala | 66 ------ .../DeletionVectorsSuite.scala | 191 +++++++++++++++++- 6 files changed, 262 insertions(+), 92 deletions(-) diff --git a/spark/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala b/spark/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala index bbe4c4da3bb..56f60a28035 100644 --- a/spark/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala +++ b/spark/src/main/scala/io/delta/tables/execution/DeltaTableOperations.scala @@ -44,8 +44,7 @@ trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable => val delete = DeleteFromTable( self.toDF.queryExecution.analyzed, condition.getOrElse(Literal.TrueLiteral)) - val a = toDataset(sparkSession, delete) - a + toDataset(sparkSession, delete) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala index a7debad78dc..ea331c47640 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala @@ -423,7 +423,7 @@ case class DeltaParquetFileFormat( isRowDeletedColumn.foreach { columnMetadata => rowIndexFilter.get .materializeIntoVector( - columnarRow.rowId, // TODO check this + 0, columnarRow.getLong(rowIndexColumnIndex), tempVector) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala index 38c8d9ed5c5..e55e4dea748 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{LongType, StructField, StructType} /** * Plan transformer to inject a filter that removes the rows marked as deleted according to @@ -51,9 +52,28 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRela */ trait PreprocessTableWithDVs extends SubqueryTransformerHelper { def preprocessTablesWithDVs(plan: LogicalPlan): LogicalPlan = { - transformWithSubqueries(plan) { + val newPlan = transformWithSubqueries(plan) { case ScanWithDeletionVectors(dvScan) => dvScan } + + /* + transformWithSubqueries(newPlan) { + case a: AttributeReference if a.name == METADATA_NAME => newPlan + } + */ + /* + transformWithSubqueries(newPlan) { + case a: AttributeReference if a.name == METADATA_NAME => + val x = UnresolvedAttribute(a.qualifier :+ a.name) + val newPlan = FakeLogicalPlan(Seq(x), newPlan) + val spark = SparkSession.getActiveSession.get + val y = spark.sessionState.analyzer.execute(newPlan) + // val resolvedExprs = resolveExprs(Seq(x), newPlan) + newPlan + } + */ + + newPlan } } @@ -120,24 +140,40 @@ object ScanWithDeletionVectors { // Add a column for SKIP_ROW to the base output. Value of 0 means the row needs be kept, any // other values mean the row needs be skipped. val skipRowField = IS_ROW_DELETED_STRUCT_FIELD + // val rowIndexCol = AttributeReference( + // s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}", + // ROW_INDEX_STRUCT_FIELD.dataType)() // val rowIndexField = ParquetFileFormat.ROW_INDEX_FIELD - val newScanOutput = if (inputScan.output.map(_.name).contains(METADATA_NAME)) { - inputScan.output :+ AttributeReference(skipRowField.name, skipRowField.dataType)() + + val withReplacedMetadata = inputScan.output.collect { + case a: AttributeReference if a.name == METADATA_NAME && + !a.dataType.asInstanceOf[StructType].fieldNames.contains(ParquetFileFormat.ROW_INDEX) => + fileFormat.createFileMetadataCol().withExprId(a.exprId) + case o => o + } + + val newScanOutput = if (withReplacedMetadata.map(_.name).contains(METADATA_NAME)) { + withReplacedMetadata ++ Seq(AttributeReference(skipRowField.name, skipRowField.dataType)()) } else { - val fileMetadataCol = fileFormat.createFileMetadataCol() - /* - val rowIndexCol = AttributeReference( - s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}", - ROW_INDEX_STRUCT_FIELD.dataType)() - */ - inputScan.output ++ - Seq(AttributeReference(skipRowField.name, skipRowField.dataType)(), fileMetadataCol) + withReplacedMetadata ++ + Seq(AttributeReference(skipRowField.name, skipRowField.dataType)(), + fileFormat.createFileMetadataCol()) } + /* + val newScanOutput = inputScan.output.filterNot(_.name == METADATA_NAME) ++ + Seq( + AttributeReference(skipRowField.name, skipRowField.dataType)(), + fileMetadataCol.withExprId(aaa.exprId)) + */ + // Data schema and scan schema could be different. The scan schema may contain additional // columns such as `_metadata.file_path` (metadata columns) which are populated in Spark scan // operator after the data is read from the underlying file reader. - val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField) + val rowIndexField = + StructField(s"${ParquetFileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME}", LongType) + // ParquetFileFormat.ROW_INDEX_FIELD + val newDataSchema = hadoopFsRelation.dataSchema.add(skipRowField) // .add(rowIndexField) val newFileFormat = fileFormat.disableSplittingAndPushdown(tahoeFileIndex.path.toString) val newRelation = hadoopFsRelation.copy( @@ -153,7 +189,19 @@ object ScanWithDeletionVectors { require(skipRowColumnRefs.size == 1, s"Expected only one column with name=$IS_ROW_DELETED_COLUMN_NAME") val skipRowColumnRef = skipRowColumnRefs.head - + + /* + val rowIndexCol = AttributeReference( + s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}", + ROW_INDEX_STRUCT_FIELD.dataType)() + */ + // val rowIndexCol = Column(s"${METADATA_NAME}.${ParquetFileFormat.ROW_INDEX}").expr + // val spark = SparkSession.getActiveSession.get + // val projection = Project(Seq(Alias(rowIndexCol, ParquetFileFormat.ROW_INDEX)()), newScan) + // val analyzed = spark.sessionState.analyzer.execute(projection) + // val rowIndexProjection = Project(Seq(rowIndexCol), newScan) + // val newPlan = spark.sessionState.analyzer.execute(rowIndexProjection) + Filter(EqualTo(skipRowColumnRef, Literal(RowIndexFilter.KEEP_ROW_VALUE)), newScan) } } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala index c4e06d4c044..f93cff30492 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala @@ -91,18 +91,19 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand { fileMetadataCol = format.createFileMetadataCol() // Take the existing schema and add additional metadata columns val newDataSchema = - StructType(hfsr.dataSchema).add(ROW_INDEX_STRUCT_FIELD) + StructType(hfsr.dataSchema) // .add(ROW_INDEX_STRUCT_FIELD) val finalOutput = l.output :+ fileMetadataCol // Seq(rowIndexCol, fileMetadataCol) - /* + // Disable splitting and filter pushdown in order to generate the row-indexes - val newFormat = format.copy(isSplittable = true, disablePushDowns = false) + // val newFormat = format.copy(isSplittable = true, disablePushDowns = false) val newBaseRelation = hfsr.copy( - location = fileIndex, - dataSchema = newDataSchema, - fileFormat = newFormat)(hfsr.sparkSession) - */ + location = fileIndex // , + // dataSchema = newDataSchema, + // fileFormat = newFormat + )(hfsr.sparkSession) + - l.copy(relation = hfsr, output = finalOutput) + l.copy(relation = newBaseRelation, output = finalOutput) case p @ Project(projectList, _) => if (fileMetadataCol == null) { throw new IllegalStateException("File metadata column is not yet created.") @@ -111,7 +112,6 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand { p.copy(projectList = newProjectList) } newTarget - // target } /** diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala index 629bb7ac936..f9e515bda8b 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/TightBoundsSuite.scala @@ -16,9 +16,6 @@ package org.apache.spark.sql.delta -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.FILES_MAX_PARTITION_BYTES - import scala.collection.mutable.ArrayBuffer // scalastyle:off import.ordering.noEmptyLine @@ -287,69 +284,6 @@ class TightBoundsSuite assert(statsAfterDelete === expectedStatsAfterDelete) } } - - test("TEST") { - withTempDeltaTable( - // .repartition(1) - dataDF = spark.range(0, 50000000, 1, 1).toDF("id"), - // dataDF = spark.range(0, 25000000, 1, 1).toDF("id"), - // dataDF = spark.range(0, 100000000, 1, 1).toDF("id"), - enableDVs = true - ) { (targetTable, targetLog) => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> false.toString, - SQLConf.FILES_MAX_PARTITION_BYTES.key -> "128MB") { - targetTable().delete("id == 40000000") - - // val d = targetTable().toDF.filter("id != 1").queryExecution.executedPlan - // .filter("id != 1") - val a = targetTable().toDF.filter("id != 1").collect() - val c = targetLog.update().allFiles.collect() - val b = 1 - assert(a.length === 49999998) - // assert(a.length === 29999999) - - // a(40000000).getLong(0) - assert(a(1).getLong(0) === 2) - assert(a(39999998).getLong(0) === 39999999) - assert(a(39999999).getLong(0) === 40000001) - // assert(!a.map(_.getLong(0)).toSeq.contains(40000000)) - // assert(a === Seq(0, 100000000).drop(2)) - } - } - } - - test("TEST 2") { - withTempDeltaTable( - // .repartition(1) - dataDF = spark.range(0, 100, 1, 1).toDF("id"), - enableDVs = true - ) { (targetTable, targetLog) => - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> true.toString, - SQLConf.FILES_MAX_PARTITION_BYTES.key -> "128MB") { - targetTable().delete("id == 4") - targetTable().delete("id == 5") - - val a = 1 - } - } - } - - test(s"TEST COMPLEX TMP VIEW") { - import testImplicits._ - withTempView("v") { - withTable("tab") { - Seq((0, 3), (1, 2)).toDF("key", "value") - .write - .option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, true.toString) - .format("delta") - .saveAsTable("tab") - sql(s"CREATE OR REPLACE TEMP VIEW v AS SELECT value as key, key as value FROM tab") - sql(s"DELETE FROM v WHERE key >= 1 and value < 3") - spark.read.format("delta").table("v") - } - } - } - } class TightBoundsColumnMappingSuite extends TightBoundsSuite with DeltaColumnMappingEnableIdMode diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala index db776ddf76b..9b8b5f9cead 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala @@ -27,7 +27,9 @@ import org.apache.spark.sql.delta.deletionvectors.DeletionVectorsSuite._ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.{DeltaExceptionTestUtils, DeltaSQLCommandTest} import org.apache.spark.sql.delta.test.DeltaTestImplicits._ +import org.apache.spark.sql.execution.FileSourceScanExec import org.apache.spark.sql.delta.util.JsonUtils +import org.apache.spark.sql.internal.SQLConf import com.fasterxml.jackson.databind.node.ObjectNode import io.delta.tables.DeltaTable import org.apache.commons.io.FileUtils @@ -47,6 +49,24 @@ class DeletionVectorsSuite extends QueryTest with DeltaExceptionTestUtils { import testImplicits._ + // ~200MBs. Should contain 2 row groups. + val multiRowgroupTable = "multiRowgroupTable" + val multiRowgroupTableRowsNum = 50000000 + + override def beforeAll(): Unit = { + super.beforeAll() + spark.range(0, multiRowgroupTableRowsNum, 1, 1).toDF("id") + .write + .option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, true.toString) + .format("delta") + .saveAsTable(multiRowgroupTable) + } + + override def afterAll(): Unit = { + super.afterAll() + sql(s"DROP TABLE IF EXISTS $multiRowgroupTable") + } + test(s"read Delta table with deletion vectors") { def verifyVersion(version: Int, expectedData: Seq[Int]): Unit = { checkAnswer( @@ -86,8 +106,9 @@ class DeletionVectorsSuite extends QueryTest } test("select metadata columns from a Delta table with deletion vectors") { - val a = spark.read.format("delta").load(table1Path).distinct().count() + // val a = spark.read.format("delta").load(table1Path).distinct().count() + // spark.read.format("delta").load(table1Path).distinct().count() assert(spark.read.format("delta").load(table1Path) .select("_metadata.file_path").distinct().count() == 22) } @@ -710,6 +731,174 @@ class DeletionVectorsSuite extends QueryTest } } + private def testPredicatePushDown( + deletePredicates: Seq[String], + selectPredicate: Option[String], + expectedNumRows: Long, + validationPredicate: String, + vectorizedReaderEnabled: Boolean, + readColumnarBatchAsRows: Boolean): Unit = { + withTempDir { dir => + // This forces the code generator to not use codegen. As a result, Spark sets options to get + // rows instead of columnar batches from the Parquet reader. This allows to test the relevant + // code path in DeltaParquetFileFormat. + val codeGenMaxFields = if (readColumnarBatchAsRows) "0" else "100" + withSQLConf( + SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> codeGenMaxFields, + SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorizedReaderEnabled.toString) { + sql(s"CREATE TABLE delta.`${dir.getCanonicalPath}` SHALLOW CLONE $multiRowgroupTable") + + val targetTable = io.delta.tables.DeltaTable.forPath(dir.getCanonicalPath) + + // Execute multiple delete statements. These require to reconsile the metadata column + // between DV writing and scanning operations. + deletePredicates.foreach(targetTable.delete) + + val targetTableDF = selectPredicate.map(targetTable.toDF.filter).getOrElse(targetTable.toDF) + assertPredicatesArePushedDown(targetTableDF) + // Make sure there are splits. + assert(targetTableDF.rdd.partitions.size > 1) + + withTempDir { resultDir => + // Write results to a table without DVs for validation. We are doing this to avoid + // loading the dataset into memory. + targetTableDF + .write + .option(DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.key, false.toString) + .format("delta") + .save(resultDir.getCanonicalPath) + + val resultsTable = io.delta.tables.DeltaTable.forPath(resultDir.getCanonicalPath) + + assert(resultsTable.toDF.count() === expectedNumRows) + // The delete/filtered rows should not exist. + assert(resultsTable.toDF.filter(validationPredicate).count() === 0) + } + } + } + } + + for { + vectorizedReaderEnabled <- BOOLEAN_DOMAIN + readColumnarBatchAsRows <- if (vectorizedReaderEnabled) BOOLEAN_DOMAIN else Seq(false) + } test("PredicatePushdown: Single deletion at the first row group. " + + s"vectorizedReaderEnabled: $vectorizedReaderEnabled " + + s"readColumnarBatchAsRows: $readColumnarBatchAsRows") { + testPredicatePushDown( + deletePredicates = Seq("id == 100"), + selectPredicate = None, + expectedNumRows = multiRowgroupTableRowsNum - 1, + validationPredicate = "id == 100", + vectorizedReaderEnabled = vectorizedReaderEnabled, + readColumnarBatchAsRows = readColumnarBatchAsRows) + } + + for { + vectorizedReaderEnabled <- BOOLEAN_DOMAIN + readColumnarBatchAsRows <- if (vectorizedReaderEnabled) BOOLEAN_DOMAIN else Seq(false) + } test("PredicatePushdown: Single deletion at the second row group. " + + s"vectorizedReaderEnabled: $vectorizedReaderEnabled " + + s"readColumnarBatchAsRows: $readColumnarBatchAsRows") { + testPredicatePushDown( + deletePredicates = Seq("id == 40000000"), + selectPredicate = None, + expectedNumRows = multiRowgroupTableRowsNum - 1, + // (rowId, Expected value). + validationPredicate = "id == 40000000", + vectorizedReaderEnabled = vectorizedReaderEnabled, + readColumnarBatchAsRows = readColumnarBatchAsRows) + } + + for { + vectorizedReaderEnabled <- BOOLEAN_DOMAIN + readColumnarBatchAsRows <- if (vectorizedReaderEnabled) BOOLEAN_DOMAIN else Seq(false) + } test("PredicatePushdown: Single delete statement with multiple ids. " + + s"vectorizedReaderEnabled: $vectorizedReaderEnabled " + + s"readColumnarBatchAsRows: $readColumnarBatchAsRows") { + testPredicatePushDown( + deletePredicates = Seq("id in (200, 2000, 20000, 20000000, 40000000)"), + selectPredicate = None, + expectedNumRows = multiRowgroupTableRowsNum - 5, + validationPredicate = "id in (200, 2000, 20000, 20000000, 40000000)", + vectorizedReaderEnabled = vectorizedReaderEnabled, + readColumnarBatchAsRows = readColumnarBatchAsRows) + } + + for { + vectorizedReaderEnabled <- BOOLEAN_DOMAIN + readColumnarBatchAsRows <- if (vectorizedReaderEnabled) BOOLEAN_DOMAIN else Seq(false) + } test("PredicatePushdown: Multiple delete statements. " + + s"vectorizedReaderEnabled: $vectorizedReaderEnabled " + + s"readColumnarBatchAsRows: $readColumnarBatchAsRows") { + testPredicatePushDown( + deletePredicates = + Seq("id = 200", "id = 2000", "id = 20000", "id = 20000000", "id = 40000000"), + selectPredicate = None, + expectedNumRows = multiRowgroupTableRowsNum - 5, + validationPredicate = "id in (200, 2000, 20000, 20000000, 40000000)", + vectorizedReaderEnabled = vectorizedReaderEnabled, + readColumnarBatchAsRows = readColumnarBatchAsRows) + } + + for { + vectorizedReaderEnabled <- BOOLEAN_DOMAIN + readColumnarBatchAsRows <- if (vectorizedReaderEnabled) BOOLEAN_DOMAIN else Seq(false) + } test("PredicatePushdown: Scan with predicates. " + + s"vectorizedReaderEnabled: $vectorizedReaderEnabled " + + s"readColumnarBatchAsRows: $readColumnarBatchAsRows") { + testPredicatePushDown( + deletePredicates = Seq("id = 200", "id = 2000", "id = 40000000"), + selectPredicate = Some("id not in (20000, 20000000)"), + expectedNumRows = multiRowgroupTableRowsNum - 5, + validationPredicate = "id in (200, 2000, 20000, 20000000, 40000000)", + vectorizedReaderEnabled = vectorizedReaderEnabled, + readColumnarBatchAsRows = readColumnarBatchAsRows) + } + + for { + vectorizedReaderEnabled <- BOOLEAN_DOMAIN + readColumnarBatchAsRows <- if (vectorizedReaderEnabled) BOOLEAN_DOMAIN else Seq(false) + } test("PredicatePushdown: Scan with predicates - no deletes. " + + s"vectorizedReaderEnabled: $vectorizedReaderEnabled " + + s"readColumnarBatchAsRows: $readColumnarBatchAsRows") { + testPredicatePushDown( + deletePredicates = Seq.empty, + selectPredicate = Some("id not in (200, 2000, 20000, 20000000, 40000000)"), + expectedNumRows = multiRowgroupTableRowsNum - 5, + validationPredicate = "id in (200, 2000, 20000, 20000000, 40000000)", + vectorizedReaderEnabled = vectorizedReaderEnabled, + readColumnarBatchAsRows = readColumnarBatchAsRows) + } + + test("Predicate pushdown works on queries that select metadata fields") { + withTempDir { dir => + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> true.toString) { + sql(s"CREATE TABLE delta.`${dir.getCanonicalPath}` SHALLOW CLONE $multiRowgroupTable") + + val targetTable = io.delta.tables.DeltaTable.forPath(dir.getCanonicalPath) + targetTable.delete("id == 40000000") + + val r1 = targetTable.toDF.select("id", "_metadata.row_index").count() + assert(r1 === multiRowgroupTableRowsNum - 1) + + val r2 = targetTable.toDF.select("id", "_metadata.row_index", "_metadata.file_path").count() + assert(r2 === multiRowgroupTableRowsNum - 1) + + val r3 = targetTable + .toDF + .select("id", "_metadata.file_block_start", "_metadata.file_path").count() + assert(r3 === multiRowgroupTableRowsNum - 1) + } + } + } + + private def assertPredicatesArePushedDown(df: DataFrame): Unit = { + val scan = df.queryExecution.executedPlan.collectFirst { + case scan: FileSourceScanExec => scan + } + assert(scan.map(_.dataFilters.nonEmpty).getOrElse(true)) + } + private sealed case class DeleteUsingDVWithResults( scale: String, sqlRule: String,