Skip to content

Commit

Permalink
Issue 337: Remove compact() operation (#374)
Browse files Browse the repository at this point in the history
  • Loading branch information
osopardo1 committed Aug 2, 2024
1 parent 97008f4 commit 330879f
Show file tree
Hide file tree
Showing 12 changed files with 17 additions and 442 deletions.
9 changes: 0 additions & 9 deletions docs/AdvancedConfiguration.md
Original file line number Diff line number Diff line change
Expand Up @@ -238,15 +238,6 @@ You can change the number of retries for the LocalKeeper in order to test it.
--conf spark.qbeast.index.numberOfRetries=10000
```

## Min/Max file size for compaction

You can set the minimum and maximum size of your files for the compaction process.

```shell
--conf spark.qbeast.compact.minFileSizeInBytes=1 \
--conf spark.qbeast.compact.maxFileSizeInBytes=10000
```

## Data Staging
You can set up the `SparkSession` with a **data staging area** for all your Qbeast table writes.

Expand Down
22 changes: 0 additions & 22 deletions docs/QbeastFormat.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,28 +279,6 @@ revisions.foreach(revision =>
```
> Note that **Revision ID number 0 is reserved for Stagin Area** (non-indexed files). This ensures compatibility with underlying table formats.

## Compaction (<v0.6.0)

> Compaction is **NOT available from version 0.6.0**. Although it is present, it calls the `optimize` command underneath.
> Read all the reasoning and changes on the [Qbeast Format 0.6.0](./QbeastFormat0.6.0.md) document and check the issue [#294](https://github.com/Qbeast-io/qbeast-spark/issues/294) for more info.

From [Delta Lake's documentation](https://docs.delta.io/latest/best-practices.html):

If you continuously write data to a table, it will over time accumulate a large number of files, especially if you add data in small batches.
This can have an adverse effect on the efficiency of table reads, and it can also affect the performance of your file system

Ideally, **a large number of small files should be rewritten into a smaller number of larger files** on a regular basis.
This is known as `compaction`.


`Compaction` can be performed on the staging revision (subset of non-indexed files) to group small delta files following the [Bin-Packing strategy](https://docs.databricks.com/en/delta/optimize.html):
```scala
import io.qbeast.spark.QbeastTable

val table = QbeastTable.forPath(spark, "/pathToTable/")
table.compact(0)
```


## Index Replication (<v0.6.0)

Expand Down
16 changes: 0 additions & 16 deletions docs/Quickstart.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,22 +125,6 @@ qbeastTable.analyze()
qbeastTable.optimize()
```

## Compaction

Based on [Delta Lake command](https://docs.delta.io/2.0.0/optimizations-oss.html#compaction-bin-packing), files can be arranged (compacted) into single, bigger ones in order to **avoid performance problems for reading a lot of small files**.

This operation is **beneficial when we have a lot of WRITE** operation into a table, because each one will create a set of new added files depending on the size of the data. You can set up a **Compaction** task to read those small files and write them out into new, bigger ones, without corrupting any on-going operation.

```scala
import io.qbeast.spark.QbeastTable

val qbeastTable = QbeastTable.forPath(spark, qbeastTablePath)

// compacts the small files into bigger ones
qbeastTable.compact()
```
See [OTreeAlgorithm](OTreeAlgorithm.md) and [QbeastFormat](QbeastFormat.md) for more details.

# Other DML Operations

DML is a Data Manipulation Language that is used to manipulate data itself. For example: insert, update, and delete are instructions in SQL.
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/io/qbeast/core/model/DataWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ trait DataWriter[DATA, DataSchema, FileDescriptor] {
tableChanges: TableChanges): IISeq[FileDescriptor]

/**
* Compact the files
* Optimize the files
* @param tableID
* the table identifier
* @param schema
Expand All @@ -63,7 +63,7 @@ trait DataWriter[DATA, DataSchema, FileDescriptor] {
* @return
* the sequence of files written and deleted
*/
def compact(
def optimize(
tableID: QTableID,
schema: DataSchema,
revision: Revision,
Expand Down
35 changes: 13 additions & 22 deletions src/main/scala/io/qbeast/spark/QbeastTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import io.qbeast.core.model.RevisionID
import io.qbeast.core.model.StagingUtils
import io.qbeast.spark.delta.DeltaQbeastSnapshot
import io.qbeast.spark.internal.commands.AnalyzeTableCommand
import io.qbeast.spark.internal.commands.CompactTableCommand
import io.qbeast.spark.internal.commands.OptimizeTableCommand
import io.qbeast.spark.table._
import io.qbeast.spark.utils.IndexMetrics
Expand Down Expand Up @@ -83,10 +82,16 @@ class QbeastTable private (
}
}

def optimize(options: Map[String, String] = Map.empty): Unit = {
if (!isStaging(latestRevisionAvailableID)) {
optimize(latestRevisionAvailableID, options)
}
def optimize(revisionID: RevisionID): Unit = {
optimize(revisionID, Map.empty[String, String])
}

def optimize(options: Map[String, String]): Unit = {
optimize(latestRevisionAvailableID, options)
}

def optimize(): Unit = {
optimize(Map.empty[String, String])
}

/**
Expand All @@ -99,6 +104,9 @@ class QbeastTable private (
def optimize(files: Seq[String], options: Map[String, String]): Unit =
indexedTable.optimize(files, options)

def optimize(files: Seq[String]): Unit =
optimize(files, Map.empty[String, String])

/**
* The analyze operation should analyze the index structure and find the cubes that need
* optimization
Expand All @@ -123,23 +131,6 @@ class QbeastTable private (
else analyze(latestRevisionAvailableID)
}

/**
* The compact operation should compact the small files in the table
* @param revisionID
* the identifier of the revision to optimize. If doesn't exist or none is specified, would be
* the last available
*/
def compact(revisionID: RevisionID, options: Map[String, String]): Unit = {
checkRevisionAvailable(revisionID)
CompactTableCommand(revisionID, indexedTable, options)
.run(sparkSession)
.map(_.getString(0))
}

def compact(options: Map[String, String] = Map.empty): Unit = {
compact(latestRevisionAvailableID, options)
}

/**
* Outputs the indexed columns of the table
* @param revisionID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ object RollupDataWriter
.getOrElse(file)
}

override def compact(
override def optimize(
tableId: QTableID,
schema: StructType,
revision: Revision,
Expand Down

This file was deleted.

10 changes: 1 addition & 9 deletions src/main/scala/io/qbeast/spark/table/IndexedTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,6 @@ trait IndexedTable {
* the index files to optimize
*/
def optimize(files: Seq[String], options: Map[String, String]): Unit

/**
* Compacts the small files for a given table
*/
def compact(revisionID: RevisionID, options: Map[String, String]): Unit
}

/**
Expand Down Expand Up @@ -503,14 +498,11 @@ private[table] class IndexedTableImpl(
append = true) {
val tableChanges = BroadcastedTableChanges(None, indexStatus, Map.empty, Map.empty)
val fileActions =
dataWriter.compact(tableID, schema, revision, indexStatus, indexFiles)
dataWriter.optimize(tableID, schema, revision, indexStatus, indexFiles)
(tableChanges, fileActions)
}
}
}
}

override def compact(revisionID: RevisionID, options: Map[String, String]): Unit =
optimize(revisionID, options)

}
18 changes: 0 additions & 18 deletions src/main/scala/org/apache/spark/qbeast/config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,6 @@ package object config {
.intConf
.createWithDefault(2)

private[config] val minCompactionFileSizeInBytes: ConfigEntry[Int] =
ConfigBuilder("spark.qbeast.compact.minFileSizeInBytes")
.version("0.2.0")
.intConf
.createWithDefault(1024 * 1024 * 1024)

private[config] val maxCompactionFileSizeInBytes: ConfigEntry[Int] =
ConfigBuilder("spark.qbeast.compact.maxFileSizeInBytes")
.version("0.2.0")
.intConf
.createWithDefault(1024 * 1024 * 1024)

private[config] val stagingSizeInBytes: OptionalConfigEntry[Long] =
ConfigBuilder("spark.qbeast.index.stagingSizeInBytes")
.version("0.2.0")
Expand Down Expand Up @@ -79,12 +67,6 @@ package object config {
def CUBE_WEIGHTS_BUFFER_CAPACITY: Long = QbeastContext.config
.get(cubeWeightsBufferCapacity)

def MIN_COMPACTION_FILE_SIZE_IN_BYTES: Int =
QbeastContext.config.get(minCompactionFileSizeInBytes)

def MAX_COMPACTION_FILE_SIZE_IN_BYTES: Int =
QbeastContext.config.get(maxCompactionFileSizeInBytes)

def STAGING_SIZE_IN_BYTES: Option[Long] = QbeastContext.config.get(stagingSizeInBytes)

def COLUMN_SELECTOR_ENABLED: Boolean = QbeastContext.config.get(columnsToIndexSelectorEnabled)
Expand Down
22 changes: 0 additions & 22 deletions src/test/scala/io/qbeast/spark/utils/ConvertToQbeastTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -242,26 +242,4 @@ class ConvertToQbeastTest
assertLargeDatasetEquality(qbeastDf, sourceDf, orderedComparison = false)
})

"Compacting the staging revision" should "reduce the number of delta AddFiles" in
withSparkAndTmpDir((spark, tmpDir) => {
val fileFormat = "delta"
convertFromFormat(spark, fileFormat, tmpDir)

// Perform compaction
val qbeastTable = QbeastTable.forPath(spark, tmpDir)
qbeastTable.compact()

// Compare DataFrames
val sourceDf = spark.read.format(fileFormat).load(tmpDir)
val qbeastDf = spark.read.format("qbeast").load(tmpDir)
assertLargeDatasetEquality(qbeastDf, sourceDf, orderedComparison = false)

// Standard staging revision behavior
val qs = getQbeastSnapshot(spark, tmpDir)
val stagingCs = qs.loadLatestIndexFiles

stagingCs.count() shouldBe 1L
stagingCs.head.blocks.size shouldBe <(numSparkPartitions)
})

}
Loading

0 comments on commit 330879f

Please sign in to comment.