Skip to content

Commit

Permalink
Add DeletionVectorStore to read from and write DVs to storage
Browse files Browse the repository at this point in the history
This PR is part of the feature: Support reading Delta tables with deletion vectors (more details at delta-io#1485)

It adds a `DeletionVectorStore` which contains APIs to load DVs from and write DVs to Hadoop FS compliant file system.  The format of the DV file is described in the protocol [here](https://github.com/delta-io/delta/blob/master/PROTOCOL.md#deletion-vector-file-storage-format).

Added a test suite.

GitOrigin-RevId: 72340c9854f7d0376ea2aeec0c4bbba08ce78259
  • Loading branch information
vkorukanti committed Dec 28, 2022
1 parent 6d255c4 commit e950b2d
Show file tree
Hide file tree
Showing 6 changed files with 610 additions and 2 deletions.
12 changes: 12 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,18 @@
],
"sqlState" : "42000"
},
"DELTA_DELETION_VECTOR_CHECKSUM_MISMATCH" : {
"message" : [
"Could not verify deletion vector integrity, CRC checksum verification failed."
],
"sqlState" : "22000"
},
"DELTA_DELETION_VECTOR_SIZE_MISMATCH" : {
"message" : [
"Deletion vector integrity check failed. Encountered a size mismatch."
],
"sqlState" : "22000"
},
"DELTA_DROP_COLUMN_AT_INDEX_LESS_THAN_ZERO" : {
"message" : [
"Index <columnIndex> to drop column is lower than 0"
Expand Down
29 changes: 27 additions & 2 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ import org.apache.spark.sql.delta.schema.{DeltaInvariantViolationException, Inva
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.JsonUtils
import io.delta.sql.DeltaSparkSessionExtension
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{ChecksumException, Path}
import org.json4s.JValue

import org.apache.spark.{SparkConf, SparkEnv, SparkException}
import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -2606,6 +2606,21 @@ trait DeltaErrorsBase
new DeltaRuntimeException(
errorClass = "DELTA_CANNOT_RECONSTRUCT_PATH_FROM_URI",
messageParameters = Array(uri))


def deletionVectorSizeMismatch(): Throwable = {
new DeltaChecksumException(
errorClass = "DELTA_DELETION_VECTOR_SIZE_MISMATCH",
messageParameters = Array.empty,
pos = 0)
}

def deletionVectorChecksumMismatch(): Throwable = {
new DeltaChecksumException(
errorClass = "DELTA_DELETION_VECTOR_CHECKSUM_MISMATCH",
messageParameters = Array.empty,
pos = 0)
}
}

object DeltaErrors extends DeltaErrorsBase
Expand Down Expand Up @@ -2891,6 +2906,16 @@ class ColumnMappingUnsupportedException(msg: String)
case class ColumnMappingException(msg: String, mode: DeltaColumnMappingMode)
extends AnalysisException(msg)

class DeltaChecksumException(
errorClass: String,
messageParameters: Array[String] = Array.empty,
pos: Long)
extends ChecksumException(
DeltaThrowableHelper.getMessage(errorClass, messageParameters), pos)
with DeltaThrowable {
override def getErrorClass: String = errorClass
}

/**
* Errors thrown when an operation is not supported with column mapping schema changes
* (rename / drop column).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.deletionvectors

import org.apache.spark.sql.delta.actions.DeletionVectorDescriptor
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.fs.Path


/**
* Bitmap for a Deletion Vector, implemented as a thin wrapper around a Deletion Vector
* Descriptor. The bitmap can be empty, inline or on-disk. In case of on-disk deletion
* vectors, `tableDataPath` must be set to the data path of the Delta table, which is where
* deletion vectors are stored.
*/
case class StoredBitmap(
dvDescriptor: DeletionVectorDescriptor,
tableDataPath: Option[Path] = None) {
require(tableDataPath.isDefined || !dvDescriptor.isOnDisk,
"Table path is required for on-disk deletion vectors")

/**
* Load this bitmap into memory.
*
* Use `dvStore` if this variant is in cloud storage, otherwise just deserialize.
*/
def load(dvStore: DeletionVectorStore): RoaringBitmapArray = {
if (isEmpty) {
new RoaringBitmapArray()
} else if (isInline) {
RoaringBitmapArray.readFrom(dvDescriptor.inlineData)
} else {
assert(isOnDisk)
dvStore.read(onDiskPath.get, dvDescriptor.offset.getOrElse(0), dvDescriptor.sizeInBytes)
}
}

/**
* The serialized size of the stored bitmap in bytes.
*
* Can be used for planning memory management without a round-trip to cloud storage.
*/
def size: Int = dvDescriptor.sizeInBytes

/**
* Number of entries in the bitmap.
*/
def cardinality: Long = dvDescriptor.cardinality

/** Returns a unique identifier for this bitmap (Deletion Vector serialized as a JSON object. */
def getUniqueId(): String = JsonUtils.toJson(dvDescriptor)

private def isEmpty: Boolean = dvDescriptor.isEmpty

private def isInline: Boolean = dvDescriptor.isInline

private def isOnDisk: Boolean = dvDescriptor.isOnDisk

/** The absolute path for on-disk deletion vectors. */
private lazy val onDiskPath: Option[Path] = tableDataPath.map(dvDescriptor.absolutePath(_))
}

object StoredBitmap {
/** The stored bitmap of an empty deletion vector. */
final val EMPTY = new StoredBitmap(DeletionVectorDescriptor.EMPTY, None)


/** Factory for inline deletion vectors. */
def inline(dvDescriptor: DeletionVectorDescriptor): StoredBitmap = {
require(dvDescriptor.isInline)
new StoredBitmap(dvDescriptor, None)
}

/** Factory for deletion vectors. */
def create(dvDescriptor: DeletionVectorDescriptor, tablePath: Path): StoredBitmap = {
if (dvDescriptor.isOnDisk) {
new StoredBitmap(dvDescriptor, Some(tablePath))
} else {
new StoredBitmap(dvDescriptor, None)
}
}
}
Loading

0 comments on commit e950b2d

Please sign in to comment.