Skip to content

Commit

Permalink
Changes to support customization of delta file names (delta-io#4)
Browse files Browse the repository at this point in the history
* Parse file metadata as a separate task
* change version to distinguish this branch
* log store chooses where checkpoitns go (delta-io#6)
* handle snapshot names (delta-io#9)

Signed-off-by: Ryan Murray rymurr@gmail.com
  • Loading branch information
Ryan Murray committed Aug 25, 2021
1 parent 4fddd56 commit 25c3250
Show file tree
Hide file tree
Showing 7 changed files with 168 additions and 64 deletions.
23 changes: 16 additions & 7 deletions core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (2020) The Delta Lake Project Authors.
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.LogFileMeta.isCheckpointFile
import org.apache.spark.sql.delta.storage.LogFileMeta.isCheckpointFile
import org.apache.spark.sql.delta.util.FileNames.{checkpointFileSingular, checkpointFileWithParts, checkpointPrefix, checkpointVersion, numCheckpointParts}
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.fs.Path
Expand All @@ -38,6 +38,7 @@ import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.functions.{col, struct, when}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -121,12 +122,16 @@ trait Checkpoints extends DeltaLogging {
/** The path to the file that holds metadata about the most recent checkpoint. */
val LAST_CHECKPOINT = new Path(logPath, "_last_checkpoint")

/**
* Creates a checkpoint using the default snapshot.
*/
def checkpoint(): Unit = checkpoint(snapshot)

/**
* Creates a checkpoint using snapshotToCheckpoint. By default it uses the current log version.
*/
def checkpoint(_snapshotToCheckpoint: Option[Snapshot] = None): Unit =
def checkpoint(snapshotToCheckpoint: Snapshot): Unit =
recordDeltaOperation(this, "delta.checkpoint") {
val snapshotToCheckpoint = _snapshotToCheckpoint.getOrElse(snapshot)
if (snapshotToCheckpoint.version < 0) {
throw DeltaErrors.checkpointNonExistTable(dataPath)
}
Expand Down Expand Up @@ -343,9 +348,7 @@ object Checkpoints extends DeltaLogging {
val sessionConf = state.sparkSession.sessionState.conf
// We provide fine grained control using the session conf for now, until users explicitly
// opt in our out of the struct conf.
val includeStructColumns = DeltaConfigs.CHECKPOINT_WRITE_STATS_AS_STRUCT
.fromMetaData(snapshot.metadata)
.getOrElse(sessionConf.getConf(DeltaSQLConf.DELTA_CHECKPOINT_V2_ENABLED))
val includeStructColumns = getWriteStatsAsStructConf(sessionConf, snapshot)
if (includeStructColumns) {
additionalCols ++= CheckpointV2.extractPartitionValues(snapshot.metadata.partitionSchema)
}
Expand All @@ -361,6 +364,12 @@ object Checkpoints extends DeltaLogging {
))
)
}

def getWriteStatsAsStructConf(conf: SQLConf, snapshot: Snapshot): Boolean = {
DeltaConfigs.CHECKPOINT_WRITE_STATS_AS_STRUCT
.fromMetaData(snapshot.metadata)
.getOrElse(conf.getConf(DeltaSQLConf.DELTA_CHECKPOINT_V2_ENABLED))
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (2020) The Delta Lake Project Authors.
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,19 +19,16 @@ package org.apache.spark.sql.delta
// scalastyle:off import.ordering.noEmptyLine
import java.io.FileNotFoundException
import java.sql.Timestamp

import scala.collection.mutable

import org.apache.spark.sql.delta.actions.{Action, CommitInfo, CommitMarker}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.storage.{LogFileMetaParser, LogStore}
import org.apache.spark.sql.delta.util.{DateTimeUtils, FileNames, TimestampFormatter}
import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.LogFileMeta.{isCheckpointFile, isDeltaFile}
import org.apache.spark.sql.delta.storage.LogFileMeta.{isCheckpointFile, isDeltaFile}
import org.apache.spark.sql.delta.util.FileNames.deltaFile
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.SerializableConfiguration
Expand Down
42 changes: 14 additions & 28 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (2020) The Delta Lake Project Authors.
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,39 +17,35 @@
package org.apache.spark.sql.delta

// scalastyle:off import.ordering.noEmptyLine
import java.io.{File, FileNotFoundException, IOException}
import java.util.concurrent.{Callable, TimeUnit}
import java.io.{File, IOException}
import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock

import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.Future
import scala.util.Try
import scala.util.control.NonFatal

import com.databricks.spark.util.TagDefinitions._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.LogFileMeta.isDeltaFile
import org.apache.spark.sql.delta.storage.LogFileMeta.isDeltaFile
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.{DeltaDataSource, DeltaSQLConf}
import org.apache.spark.sql.delta.storage.LogStoreProvider
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.{LogFileMetaProvider, LogStoreProvider}
import org.apache.spark.sql.delta.util.FileNames.deltaFile
import com.google.common.cache.{CacheBuilder, RemovalListener, RemovalNotification}
import org.apache.hadoop.fs.Path

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisHelper, LocalRelation}
import org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
import org.apache.spark.util.{Clock, SystemClock}

/**
* Used to query the current state of the log as well as modify it by adding
Expand Down Expand Up @@ -225,8 +221,8 @@ class DeltaLog private(
* return an empty Iterator.
*/
def getChanges(
startVersion: Long,
failOnDataLoss: Boolean = false): Iterator[(Long, Seq[Action])] = {
startVersion: Long,
failOnDataLoss: Boolean = false): Iterator[(Long, Seq[Action])] = {
val deltas = logFileHandler.listFilesFrom(deltaFile(logPath, startVersion)).filter(isDeltaFile)
// Subtract 1 to ensure that we have the same check for the inclusive startVersion
var lastSeenVersion = startVersion - 1
Expand All @@ -245,13 +241,6 @@ class DeltaLog private(
| Protocol validation |
* --------------------- */

/**
* If the given `protocol` is older than that of the client.
*/
private def isProtocolOld(protocol: Protocol): Boolean = protocol != null &&
(Action.readerVersion > protocol.minReaderVersion ||
Action.writerVersion > protocol.minWriterVersion)

/**
* Asserts that the client is up to date with the protocol and
* allowed to read the table that is using the given `protocol`.
Expand Down Expand Up @@ -384,14 +373,12 @@ object DeltaLog extends DeltaLogging {
private val deltaLogCache = {
val builder = CacheBuilder.newBuilder()
.expireAfterAccess(60, TimeUnit.MINUTES)
.removalListener(new RemovalListener[Path, DeltaLog] {
override def onRemoval(removalNotification: RemovalNotification[Path, DeltaLog]) = {
.removalListener((removalNotification: RemovalNotification[Path, DeltaLog]) => {
val log = removalNotification.getValue
try log.snapshot.uncache() catch {
case _: java.lang.NullPointerException =>
// Various layers will throw null pointer if the RDD is already gone.
// Various layers will throw null pointer if the RDD is already gone.
}
}
})
sys.props.get("delta.log.cacheSize")
.flatMap(v => Try(v.toLong).toOption)
Expand Down Expand Up @@ -474,8 +461,7 @@ object DeltaLog extends DeltaLogging {
// - Different `authority` (e.g., different user tokens in the path)
// - Different mount point.
try {
deltaLogCache.get(path, new Callable[DeltaLog] {
override def call(): DeltaLog = recordDeltaOperation(
deltaLogCache.get(path, () => { recordDeltaOperation(
null, "delta.log.create", Map(TAG_TAHOE_PATH -> path.getParent.toString)) {
AnalysisHelper.allowInvokingTransformsInAnalyzer {
new DeltaLog(path, path.getParent, clock)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (2020) The Delta Lake Project Authors.
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,7 +19,7 @@ package org.apache.spark.sql.delta
import java.util.{Calendar, TimeZone}

import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator
import org.apache.spark.sql.delta.LogFileMeta.{isCheckpointFile, isDeltaFile}
import org.apache.spark.sql.delta.storage.LogFileMeta.{isCheckpointFile, isDeltaFile}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.FileNames.checkpointPrefix
import org.apache.commons.lang3.time.DateUtils
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (2020) The Delta Lake Project Authors.
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,9 @@

package org.apache.spark.sql.delta

import java.io.FileNotFoundException
import org.apache.spark.sql.delta.storage.LogFileMeta

import java.io.FileNotFoundException
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}

Expand All @@ -30,7 +31,7 @@ import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.SparkContext
import org.apache.spark.sql.delta.LogFileMeta.{isCheckpointFile, isDeltaFile}
import org.apache.spark.sql.delta.storage.LogFileMeta.{isCheckpointFile, isDeltaFile}
import org.apache.spark.sql.delta.util.FileNames.{checkpointPrefix, deltaFile}
import org.apache.spark.sql.{AnalysisException, Dataset}
import org.apache.spark.sql.execution.SQLExecution
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.storage

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.CheckpointInstance
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.util.Utils
import org.apache.spark.{SparkConf, SparkContext}

import scala.util.Try

sealed case class DeltaFileType(value: String)

object DeltaFileType {
object DELTA extends DeltaFileType("DELTA")
object CHECKPOINT extends DeltaFileType("CHECKPOINT")
object CHECKSUM extends DeltaFileType("CHECKSUM")
object UNKNOWN extends DeltaFileType("UNKNOWN")

val values = Seq(DELTA, CHECKPOINT, CHECKSUM, UNKNOWN)

def getFileType(path: Path): DeltaFileType = {
path match {
case f if isCheckpointFile(f) => DeltaFileType.CHECKPOINT
case f if isDeltaFile(f) => DeltaFileType.DELTA
case f if isChecksumFile(f) => DeltaFileType.CHECKSUM
case _ => DeltaFileType.UNKNOWN
}
}
}

case class LogFileMeta(fileStatus: FileStatus,
version: Long,
fileType: DeltaFileType,
numParts: Option[Int]) {

def asCheckpointInstance(): CheckpointInstance = {
CheckpointInstance(version, numParts)
}
}

object LogFileMeta {
def isCheckpointFile(logFileMeta: LogFileMeta): Boolean = {
logFileMeta.fileType == DeltaFileType.CHECKPOINT
}

def isDeltaFile(logFileMeta: LogFileMeta): Boolean = {
logFileMeta.fileType == DeltaFileType.DELTA
}
}


class LogFileMetaParser(logStore: LogStore) {

def listFilesFrom(logPath: Path): Iterator[LogFileMeta] = {

logStore.listFrom(logPath).map(fs => {
LogFileMeta(fs,
Try(deltaVersion(fs.getPath)).getOrElse(Try(checkpointVersion(fs.getPath)).getOrElse(-1L)),
DeltaFileType.getFileType(fs.getPath),
numCheckpointParts(fs.getPath))
})
}

}

object LogFileMetaParser extends LogFileMetaProvider
with Logging {

def apply(sc: SparkContext, logStore: LogStore): LogFileMetaParser = {
apply(sc.getConf, sc.hadoopConfiguration, logStore)
}

def apply(sparkConf: SparkConf,
hadoopConf: Configuration,
logStore: LogStore): LogFileMetaParser = {
createLogFileMetaParser(sparkConf, hadoopConf, logStore)
}
}

trait LogFileMetaProvider {

def createLogFileMetaParser(spark: SparkSession, logStore: LogStore): LogFileMetaParser = {
val sc = spark.sparkContext
createLogFileMetaParser(sc.getConf, sc.hadoopConfiguration, logStore)
}

def createLogFileMetaParser(sparkConf: SparkConf,
hadoopConf: Configuration,
logStore: LogStore): LogFileMetaParser = {
val logStoreClassName = sparkConf.get("spark.delta.logFileHandler.class",
classOf[LogFileMetaParser].getName)
val logStoreClass = Utils.classForName(logStoreClassName)
logStoreClass.getConstructor(classOf[LogStore])
.newInstance(logStore).asInstanceOf[LogFileMetaParser]
}
}
Loading

0 comments on commit 25c3250

Please sign in to comment.