Skip to content

Commit

Permalink
Changes to support customization of delta file names (delta-io#4)
Browse files Browse the repository at this point in the history
* Parse file metadata as a separate task

* change version to distinguish this branch
  • Loading branch information
Ryan Murray committed Nov 3, 2020
1 parent eca7fbf commit fe68e6c
Show file tree
Hide file tree
Showing 9 changed files with 207 additions and 77 deletions.
9 changes: 5 additions & 4 deletions src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.LogFileMeta.isCheckpointFile
import org.apache.spark.sql.delta.util.FileNames.{checkpointFileSingular, checkpointFileWithParts, checkpointPrefix, checkpointVersion, numCheckpointParts}
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID}
Expand Down Expand Up @@ -176,10 +177,10 @@ trait Checkpoints extends DeltaLogging {
protected def findLastCompleteCheckpoint(cv: CheckpointInstance): Option[CheckpointInstance] = {
var cur = math.max(cv.version, 0L)
while (cur >= 0) {
val checkpoints = store.listFrom(checkpointPrefix(logPath, math.max(0, cur - 1000)))
.map(_.getPath)
val checkpointPrefixPath = checkpointPrefix(logPath, math.max(0, cur - 1000))
val checkpoints = logFileHandler.listFilesFrom(checkpointPrefixPath)
.filter(isCheckpointFile)
.map(CheckpointInstance(_))
.map(_.asCheckpointInstance())
.takeWhile(tv => (cur == 0 || tv.version <= cur) && tv.isEarlierThan(cv))
.toArray
val lastCheckpoint = getLatestCompleteCheckpointFromList(checkpoints, cv)
Expand Down
51 changes: 29 additions & 22 deletions src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import org.apache.spark.sql.delta.actions.{Action, CommitInfo, CommitMarker}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.{DateTimeUtils, FileNames, TimestampFormatter}
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.LogFileMeta.{isCheckpointFile, isDeltaFile}
import org.apache.spark.sql.delta.util.FileNames.deltaFile
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -120,7 +122,10 @@ class DeltaHistoryManager(
val commit = if (latestVersion - earliest > 2 * maxKeysPerList) {
parallelSearch(time, earliest, latestVersion + 1)
} else {
val commits = getCommits(deltaLog.store, deltaLog.logPath, earliest, Some(latestVersion + 1))
val commits = getCommits(deltaLog.logFileHandler,
deltaLog.logPath,
earliest,
Some(latestVersion + 1))
// If it returns empty, we will fail below with `timestampEarlierThanCommitRetention`.
lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head)
}
Expand Down Expand Up @@ -178,13 +183,14 @@ class DeltaHistoryManager(
* This value must be used as a lower bound.
*/
private def getEarliestDeltaFile: Long = {
val earliestVersionOpt = deltaLog.store.listFrom(FileNames.deltaFile(deltaLog.logPath, 0))
.filter(f => FileNames.isDeltaFile(f.getPath))
.take(1).toArray.headOption
val earliestVersionOpt = deltaLog.logFileHandler
.listFilesFrom(FileNames.deltaFile(deltaLog.logPath, 0))
.filter(isDeltaFile)
.take(1).map(_.version).toArray.headOption
if (earliestVersionOpt.isEmpty) {
throw DeltaErrors.noHistoryFound(deltaLog.logPath)
}
FileNames.deltaVersion(earliestVersionOpt.get.getPath)
earliestVersionOpt.get
}

/**
Expand All @@ -197,8 +203,8 @@ class DeltaHistoryManager(
* commits are contiguous.
*/
private[delta] def getEarliestReproducibleCommit: Long = {
val files = deltaLog.store.listFrom(FileNames.deltaFile(deltaLog.logPath, 0))
.filter(f => FileNames.isDeltaFile(f.getPath) || FileNames.isCheckpointFile(f.getPath))
val files = deltaLog.logFileHandler.listFilesFrom(FileNames.deltaFile(deltaLog.logPath, 0))
.filter(f => isDeltaFile(f) || isCheckpointFile(f))

// A map of checkpoint version and number of parts, to number of parts observed
val checkpointMap = new scala.collection.mutable.HashMap[(Long, Int), Int]()
Expand All @@ -209,9 +215,9 @@ class DeltaHistoryManager(
// Checkpoint files come before deltas, so when we see a checkpoint, we remember it and
// return it once we detect that we've seen a smaller or equal delta version.
while (files.hasNext) {
val nextFilePath = files.next().getPath
if (FileNames.isDeltaFile(nextFilePath)) {
val version = FileNames.deltaVersion(nextFilePath)
val nextFilePath = files.next()
if (isDeltaFile(nextFilePath)) {
val version = nextFilePath.version
if (version == 0L) return version
smallestDeltaVersion = math.min(version, smallestDeltaVersion)

Expand All @@ -220,9 +226,9 @@ class DeltaHistoryManager(
if (lastCompleteCheckpoint.exists(_ >= smallestDeltaVersion)) {
return lastCompleteCheckpoint.get
}
} else if (FileNames.isCheckpointFile(nextFilePath)) {
val checkpointVersion = FileNames.checkpointVersion(nextFilePath)
val parts = FileNames.numCheckpointParts(nextFilePath)
} else if (isCheckpointFile(nextFilePath)) {
val checkpointVersion = nextFilePath.version
val parts = nextFilePath.numParts
if (parts.isEmpty) {
lastCompleteCheckpoint = Some(checkpointVersion)
} else {
Expand Down Expand Up @@ -269,15 +275,15 @@ object DeltaHistoryManager extends DeltaLogging {
* Exposed for tests.
*/
private[delta] def getCommits(
logStore: LogStore,
logPath: Path,
start: Long,
end: Option[Long] = None): Array[Commit] = {
logStore: LogFileMetaParser,
logPath: Path,
start: Long,
end: Option[Long] = None): Array[Commit] = {
val until = end.getOrElse(Long.MaxValue)
val commits = logStore.listFrom(deltaFile(logPath, start))
.filter(f => isDeltaFile(f.getPath))
val commits = logStore.listFilesFrom(deltaFile(logPath, start))
.filter(isDeltaFile)
.map { fileStatus =>
Commit(deltaVersion(fileStatus.getPath), fileStatus.getModificationTime)
Commit(fileStatus.version, fileStatus.fileStatus.getModificationTime)
}
.takeWhile(_.version < until)

Expand Down Expand Up @@ -351,10 +357,11 @@ object DeltaHistoryManager extends DeltaLogging {
import spark.implicits._
val possibleCommits = spark.range(start, end, step).mapPartitions { startVersions =>
val logStore = LogStore(SparkEnv.get.conf, conf.value)
val logFileHandler = LogFileMetaParser(SparkEnv.get.conf, conf.value, logStore)
val basePath = new Path(logPath)
startVersions.map { startVersion =>
val commits = getCommits(
logStore, basePath, startVersion, Some(math.min(startVersion + step, end)))
logFileHandler, basePath, startVersion, Some(math.min(startVersion + step, end)))
lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head)
}
}.collect()
Expand Down
17 changes: 9 additions & 8 deletions src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import com.databricks.spark.util.TagDefinitions._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.LogFileMeta.isDeltaFile
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStoreProvider
import org.apache.spark.sql.delta.util.FileNames.deltaFile
import com.google.common.cache.{CacheBuilder, RemovalListener, RemovalNotification}
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -63,9 +65,8 @@ class DeltaLog private(
with MetadataCleanup
with LogStoreProvider
with SnapshotManagement
with ReadChecksum {

import org.apache.spark.sql.delta.util.FileNames._
with ReadChecksum
with LogFileMetaProvider {


private lazy implicit val _clock = clock
Expand All @@ -76,6 +77,9 @@ class DeltaLog private(

/** Used to read and write physical log files and checkpoints. */
lazy val store = createLogStore(spark)

lazy val logFileHandler = createLogFileMetaParser(spark, store)

/** Direct access to the underlying storage system. */
private[delta] lazy val fs = logPath.getFileSystem(spark.sessionState.newHadoopConf)

Expand Down Expand Up @@ -220,12 +224,9 @@ class DeltaLog private(
* return an empty Iterator.
*/
def getChanges(startVersion: Long): Iterator[(Long, Seq[Action])] = {
val deltas = store.listFrom(deltaFile(logPath, startVersion))
.filter(f => isDeltaFile(f.getPath))
val deltas = logFileHandler.listFilesFrom(deltaFile(logPath, startVersion)).filter(isDeltaFile)
deltas.map { status =>
val p = status.getPath
val version = deltaVersion(p)
(version, store.read(p).map(Action.fromJson))
(status.version, store.read(status.fileStatus.getPath).map(Action.fromJson))
}
}

Expand Down
115 changes: 115 additions & 0 deletions src/main/scala/org/apache/spark/sql/delta/LogFileMetaParser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (2020) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.FileNames.{checkpointVersion, deltaVersion, isCheckpointFile, isChecksumFile, isDeltaFile, numCheckpointParts}
import org.apache.spark.util.Utils

import scala.util.Try

sealed case class DeltaFileType(value: String)

object DeltaFileType {
object DELTA extends DeltaFileType("DELTA")
object CHECKPOINT extends DeltaFileType("CHECKPOINT")
object CHECKSUM extends DeltaFileType("CHECKSUM")
object UNKNOWN extends DeltaFileType("UNKNOWN")

val values = Seq(DELTA, CHECKPOINT, CHECKSUM, UNKNOWN)

def getFileType(path: Path): DeltaFileType = {
path match {
case f if isCheckpointFile(f) => DeltaFileType.CHECKPOINT
case f if isDeltaFile(f) => DeltaFileType.DELTA
case f if isChecksumFile(f) => DeltaFileType.CHECKSUM
case _ => DeltaFileType.UNKNOWN
}
}
}

case class LogFileMeta(fileStatus: FileStatus,
version: Long,
fileType: DeltaFileType,
numParts: Option[Int]) {

def asCheckpointInstance(): CheckpointInstance = {
CheckpointInstance(version, numParts)
}
}

object LogFileMeta {
def isCheckpointFile(logFileMeta: LogFileMeta): Boolean = {
logFileMeta.fileType == DeltaFileType.CHECKPOINT
}

def isDeltaFile(logFileMeta: LogFileMeta): Boolean = {
logFileMeta.fileType == DeltaFileType.DELTA
}
}


class LogFileMetaParser(logStore: LogStore) {

def listFilesFrom(logPath: Path): Iterator[LogFileMeta] = {

logStore.listFrom(logPath).map(fs => {
LogFileMeta(fs,
Try(deltaVersion(fs.getPath)).getOrElse(Try(checkpointVersion(fs.getPath)).getOrElse(-1L)),
DeltaFileType.getFileType(fs.getPath),
numCheckpointParts(fs.getPath))
})
}

}

object LogFileMetaParser extends LogFileMetaProvider
with Logging {

def apply(sc: SparkContext, logStore: LogStore): LogFileMetaParser = {
apply(sc.getConf, sc.hadoopConfiguration, logStore)
}

def apply(sparkConf: SparkConf,
hadoopConf: Configuration,
logStore: LogStore): LogFileMetaParser = {
createLogFileMetaParser(sparkConf, hadoopConf, logStore)
}
}

trait LogFileMetaProvider {

def createLogFileMetaParser(spark: SparkSession, logStore: LogStore): LogFileMetaParser = {
val sc = spark.sparkContext
createLogFileMetaParser(sc.getConf, sc.hadoopConfiguration, logStore)
}

def createLogFileMetaParser(sparkConf: SparkConf,
hadoopConf: Configuration,
logStore: LogStore): LogFileMetaParser = {
val logStoreClassName = sparkConf.get("spark.delta.logFileHandler.class",
classOf[LogFileMetaParser].getName)
val logStoreClass = Utils.classForName(logStoreClassName)
logStoreClass.getConstructor(classOf[LogStore])
.newInstance(logStore).asInstanceOf[LogFileMetaParser]
}
}
21 changes: 10 additions & 11 deletions src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package org.apache.spark.sql.delta
import java.util.{Calendar, TimeZone}

import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator
import org.apache.spark.sql.delta.LogFileMeta.{isCheckpointFile, isDeltaFile}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.util.FileNames.checkpointPrefix
import org.apache.commons.lang3.time.DateUtils
import org.apache.hadoop.fs.{FileStatus, Path}

Expand Down Expand Up @@ -69,22 +71,19 @@ trait MetadataCleanup extends DeltaLogging {
* - be older than `fileCutOffTime`
*/
private def listExpiredDeltaLogs(fileCutOffTime: Long): Iterator[FileStatus] = {
import org.apache.spark.sql.delta.util.FileNames._

val latestCheckpoint = lastCheckpoint
if (latestCheckpoint.isEmpty) return Iterator.empty
val threshold = latestCheckpoint.get.version - 1L
val files = store.listFrom(checkpointPrefix(logPath, 0))
.filter(f => isCheckpointFile(f.getPath) || isDeltaFile(f.getPath))
def getVersion(filePath: Path): Long = {
if (isCheckpointFile(filePath)) {
checkpointVersion(filePath)
} else {
deltaVersion(filePath)
}
}
val files = logFileHandler.listFilesFrom(checkpointPrefix(logPath, 0))
.filter(f => isCheckpointFile(f) || isDeltaFile(f))
val versionTuples = files.map(f => (f.fileStatus, f.version)).toArray
val versionMap = versionTuples.map(vt => (vt._1.getPath, vt._2)).toMap

new BufferingLogDeletionIterator(files, fileCutOffTime, threshold, getVersion)
new BufferingLogDeletionIterator(versionTuples.map(_._1).iterator,
fileCutOffTime,
threshold,
versionMap)
}

/** Truncates a timestamp down to the previous midnight and returns the time and a log string */
Expand Down
Loading

0 comments on commit fe68e6c

Please sign in to comment.