Skip to content

Commit

Permalink
Changes to support customization of delta file names - 0.6 branch (de…
Browse files Browse the repository at this point in the history
…lta-io#5)

* pick workflows onto branch-0.6

* Parse file metadata as a separate task

* change version to distinguish this branch
  • Loading branch information
Ryan Murray committed Sep 27, 2020
1 parent 7cdc891 commit d6c9e24
Show file tree
Hide file tree
Showing 11 changed files with 272 additions and 75 deletions.
40 changes: 40 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
name: Main CI

on:
push:
branches:
- master
- branch-0.6

jobs:
java:
name: Scala/sbt
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- uses: olafurpg/setup-scala@v5
- name: Cache local sbt repository
uses: actions/cache@v2
with:
path: |
~/.ivy2/cache
~/.sbt
key: ${{ runner.os }}-sbt-${{ hashFiles('**/build.sbt') }}
- name: Build with sbt
run: sbt test publishM2
env:
GITHUB_TOKEN: ${{ github.token }}
- name: Push with maven
run: |
export VERSION=`cat version.sbt|awk '{print $5}'|sed 's/"//g'`
# maven doesn't like pushing from local repository so we copy before pushing
cp /home/runner/.m2/repository/io/delta/delta-core_2.12/$VERSION/delta-core_2.12-$VERSION.* .
mvn -B deploy:deploy-file -DpomFile=delta-core_2.12-$VERSION.pom -DrepositoryId=github -Dfile=delta-core_2.12-$VERSION.jar -Durl=https://maven.pkg.github.com/projectnessie/delta
env:
GITHUB_TOKEN: ${{ github.token }}

28 changes: 28 additions & 0 deletions .github/workflows/pull-request.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
name: Pull Request CI Test

on:
pull_request:

jobs:
java:
name: Scala/sbt
runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- uses: olafurpg/setup-scala@v5
- name: Cache local sbt repository
uses: actions/cache@v2
with:
path: |
~/.ivy2/cache
~/.sbt
key: ${{ runner.os }}-sbt-${{ hashFiles('**/build.sbt') }}
- name: Build with sbt
run: sbt compile test
env:
GITHUB_TOKEN: ${{ github.token }}
9 changes: 5 additions & 4 deletions src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ import org.apache.spark.sql.delta.actions.{Action, Metadata, SingleAction}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.DeltaFileOperations
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.LogFileMeta.isCheckpointFile
import org.apache.spark.sql.delta.util.FileNames.{checkpointFileSingular, checkpointFileWithParts, checkpointPrefix, checkpointVersion, numCheckpointParts}
import org.apache.spark.sql.delta.util.JsonUtils
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID}
Expand Down Expand Up @@ -171,10 +172,10 @@ trait Checkpoints extends DeltaLogging {
protected def findLastCompleteCheckpoint(cv: CheckpointInstance): Option[CheckpointInstance] = {
var cur = math.max(cv.version, 0L)
while (cur >= 0) {
val checkpoints = store.listFrom(checkpointPrefix(logPath, math.max(0, cur - 1000)))
.map(_.getPath)
val checkpointPrefixPath = checkpointPrefix(logPath, math.max(0, cur - 1000))
val checkpoints = logFileHandler.listFilesFrom(checkpointPrefixPath)
.filter(isCheckpointFile)
.map(CheckpointInstance(_))
.map(_.asCheckpointInstance())
.takeWhile(tv => (cur == 0 || tv.version <= cur) && tv.isEarlierThan(cv))
.toArray
val lastCheckpoint = getLatestCompleteCheckpointFromList(checkpoints, cv)
Expand Down
51 changes: 29 additions & 22 deletions src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import org.apache.spark.sql.delta.actions.{Action, CommitInfo, CommitMarker}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.{DateTimeUtils, FileNames, TimestampFormatter}
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.SparkEnv
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.LogFileMeta.{isCheckpointFile, isDeltaFile}
import org.apache.spark.sql.delta.util.FileNames.deltaFile
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.SerializableConfiguration

Expand Down Expand Up @@ -117,7 +119,10 @@ class DeltaHistoryManager(
val commit = if (latestVersion - earliest > 2 * maxKeysPerList) {
parallelSearch(time, earliest, latestVersion + 1)
} else {
val commits = getCommits(deltaLog.store, deltaLog.logPath, earliest, Some(latestVersion + 1))
val commits = getCommits(deltaLog.logFileHandler,
deltaLog.logPath,
earliest,
Some(latestVersion + 1))
// If it returns empty, we will fail below with `timestampEarlierThanCommitRetention`.
lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head)
}
Expand Down Expand Up @@ -171,13 +176,14 @@ class DeltaHistoryManager(
* This value must be used as a lower bound.
*/
private def getEarliestDeltaFile: Long = {
val earliestVersionOpt = deltaLog.store.listFrom(FileNames.deltaFile(deltaLog.logPath, 0))
.filter(f => FileNames.isDeltaFile(f.getPath))
.take(1).toArray.headOption
val earliestVersionOpt = deltaLog.logFileHandler
.listFilesFrom(FileNames.deltaFile(deltaLog.logPath, 0))
.filter(isDeltaFile)
.take(1).map(_.version).toArray.headOption
if (earliestVersionOpt.isEmpty) {
throw DeltaErrors.noHistoryFound(deltaLog.logPath)
}
FileNames.deltaVersion(earliestVersionOpt.get.getPath)
earliestVersionOpt.get
}

/**
Expand All @@ -190,8 +196,8 @@ class DeltaHistoryManager(
* commits are contiguous.
*/
private def getEarliestReproducibleCommit: Long = {
val files = deltaLog.store.listFrom(FileNames.deltaFile(deltaLog.logPath, 0))
.filter(f => FileNames.isDeltaFile(f.getPath) || FileNames.isCheckpointFile(f.getPath))
val files = deltaLog.logFileHandler.listFilesFrom(FileNames.deltaFile(deltaLog.logPath, 0))
.filter(f => isDeltaFile(f) || isCheckpointFile(f))

// A map of checkpoint version and number of parts, to number of parts observed
val checkpointMap = new scala.collection.mutable.HashMap[(Long, Int), Int]()
Expand All @@ -202,9 +208,9 @@ class DeltaHistoryManager(
// Checkpoint files come before deltas, so when we see a checkpoint, we remember it and
// return it once we detect that we've seen a smaller or equal delta version.
while (files.hasNext) {
val nextFilePath = files.next().getPath
if (FileNames.isDeltaFile(nextFilePath)) {
val version = FileNames.deltaVersion(nextFilePath)
val nextFilePath = files.next()
if (isDeltaFile(nextFilePath)) {
val version = nextFilePath.version
if (version == 0L) return version
smallestDeltaVersion = math.min(version, smallestDeltaVersion)

Expand All @@ -213,9 +219,9 @@ class DeltaHistoryManager(
if (lastCompleteCheckpoint.exists(_ >= smallestDeltaVersion)) {
return lastCompleteCheckpoint.get
}
} else if (FileNames.isCheckpointFile(nextFilePath)) {
val checkpointVersion = FileNames.checkpointVersion(nextFilePath)
val parts = FileNames.numCheckpointParts(nextFilePath)
} else if (isCheckpointFile(nextFilePath)) {
val checkpointVersion = nextFilePath.version
val parts = nextFilePath.numParts
if (parts.isEmpty) {
lastCompleteCheckpoint = Some(checkpointVersion)
} else {
Expand Down Expand Up @@ -262,15 +268,15 @@ object DeltaHistoryManager extends DeltaLogging {
* Exposed for tests.
*/
private[delta] def getCommits(
logStore: LogStore,
logPath: Path,
start: Long,
end: Option[Long] = None): Array[Commit] = {
logStore: LogFileMetaParser,
logPath: Path,
start: Long,
end: Option[Long] = None): Array[Commit] = {
val until = end.getOrElse(Long.MaxValue)
val commits = logStore.listFrom(deltaFile(logPath, start))
.filter(f => isDeltaFile(f.getPath))
val commits = logStore.listFilesFrom(deltaFile(logPath, start))
.filter(isDeltaFile)
.map { fileStatus =>
Commit(deltaVersion(fileStatus.getPath), fileStatus.getModificationTime)
Commit(fileStatus.version, fileStatus.fileStatus.getModificationTime)
}
.takeWhile(_.version < until)

Expand Down Expand Up @@ -344,10 +350,11 @@ object DeltaHistoryManager extends DeltaLogging {
import spark.implicits._
val possibleCommits = spark.range(start, end, step).mapPartitions { startVersions =>
val logStore = LogStore(SparkEnv.get.conf, conf.value)
val logFileHandler = LogFileMetaParser(SparkEnv.get.conf, conf.value, logStore)
val basePath = new Path(logPath)
startVersions.map { startVersion =>
val commits = getCommits(
logStore, basePath, startVersion, Some(math.min(startVersion + step, end)))
logFileHandler, basePath, startVersion, Some(math.min(startVersion + step, end)))
lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head)
}
}.collect()
Expand Down
21 changes: 11 additions & 10 deletions src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ import com.databricks.spark.util.TagDefinitions._
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.commands.WriteIntoDelta
import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeLogFileIndex}
import org.apache.spark.sql.delta.LogFileMeta.isDeltaFile
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStoreProvider
import org.apache.spark.sql.delta.util.FileNames.deltaFile
import com.google.common.cache.{CacheBuilder, RemovalListener, RemovalNotification}
import org.apache.hadoop.fs.Path

Expand Down Expand Up @@ -64,9 +66,8 @@ class DeltaLog private(
with MetadataCleanup
with LogStoreProvider
with SnapshotManagement
with ReadChecksum {

import org.apache.spark.sql.delta.util.FileNames._
with ReadChecksum
with LogFileMetaProvider {


private lazy implicit val _clock = clock
Expand All @@ -77,6 +78,9 @@ class DeltaLog private(

/** Used to read and write physical log files and checkpoints. */
lazy val store = createLogStore(spark)

lazy val logFileHandler = createLogFileMetaParser(spark, store)

/** Direct access to the underlying storage system. */
private[delta] lazy val fs = logPath.getFileSystem(spark.sessionState.newHadoopConf)

Expand Down Expand Up @@ -224,12 +228,9 @@ class DeltaLog private(
* return an empty Iterator.
*/
def getChanges(startVersion: Long): Iterator[(Long, Seq[Action])] = {
val deltas = store.listFrom(deltaFile(logPath, startVersion))
.filter(f => isDeltaFile(f.getPath))
val deltas = logFileHandler.listFilesFrom(deltaFile(logPath, startVersion)).filter(isDeltaFile)
deltas.map { status =>
val p = status.getPath
val version = deltaVersion(p)
(version, store.read(p).map(Action.fromJson))
(status.version, store.read(status.fileStatus.getPath).map(Action.fromJson))
}
}

Expand Down Expand Up @@ -305,9 +306,9 @@ class DeltaLog private(
def isValid(): Boolean = {
val expectedExistingFile = deltaFile(logPath, currentSnapshot.version)
try {
store.listFrom(expectedExistingFile)
logFileHandler.listFilesFrom(expectedExistingFile)
.take(1)
.exists(_.getPath.getName == expectedExistingFile.getName)
.exists(_.version == currentSnapshot.version)
} catch {
case _: FileNotFoundException =>
// Parent of expectedExistingFile doesn't exist
Expand Down
115 changes: 115 additions & 0 deletions src/main/scala/org/apache/spark/sql/delta/LogFileMetaParser.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* Copyright (2020) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.FileNames.{checkpointVersion, deltaVersion, isCheckpointFile, isChecksumFile, isDeltaFile, numCheckpointParts}
import org.apache.spark.util.Utils

import scala.util.Try

sealed case class DeltaFileType(value: String)

object DeltaFileType {
object DELTA extends DeltaFileType("DELTA")
object CHECKPOINT extends DeltaFileType("CHECKPOINT")
object CHECKSUM extends DeltaFileType("CHECKSUM")
object UNKNOWN extends DeltaFileType("UNKNOWN")

val values = Seq(DELTA, CHECKPOINT, CHECKSUM, UNKNOWN)

def getFileType(path: Path): DeltaFileType = {
path match {
case f if isCheckpointFile(f) => DeltaFileType.CHECKPOINT
case f if isDeltaFile(f) => DeltaFileType.DELTA
case f if isChecksumFile(f) => DeltaFileType.CHECKSUM
case _ => DeltaFileType.UNKNOWN
}
}
}

case class LogFileMeta(fileStatus: FileStatus,
version: Long,
fileType: DeltaFileType,
numParts: Option[Int]) {

def asCheckpointInstance(): CheckpointInstance = {
CheckpointInstance(version, numParts)
}
}

object LogFileMeta {
def isCheckpointFile(logFileMeta: LogFileMeta): Boolean = {
logFileMeta.fileType == DeltaFileType.CHECKPOINT
}

def isDeltaFile(logFileMeta: LogFileMeta): Boolean = {
logFileMeta.fileType == DeltaFileType.DELTA
}
}


class LogFileMetaParser(logStore: LogStore) {

def listFilesFrom(logPath: Path): Iterator[LogFileMeta] = {

logStore.listFrom(logPath).map(fs => {
LogFileMeta(fs,
Try(deltaVersion(fs.getPath)).getOrElse(Try(checkpointVersion(fs.getPath)).getOrElse(-1L)),
DeltaFileType.getFileType(fs.getPath),
numCheckpointParts(fs.getPath))
})
}

}

object LogFileMetaParser extends LogFileMetaProvider
with Logging {

def apply(sc: SparkContext, logStore: LogStore): LogFileMetaParser = {
apply(sc.getConf, sc.hadoopConfiguration, logStore)
}

def apply(sparkConf: SparkConf,
hadoopConf: Configuration,
logStore: LogStore): LogFileMetaParser = {
createLogFileMetaParser(sparkConf, hadoopConf, logStore)
}
}

trait LogFileMetaProvider {

def createLogFileMetaParser(spark: SparkSession, logStore: LogStore): LogFileMetaParser = {
val sc = spark.sparkContext
createLogFileMetaParser(sc.getConf, sc.hadoopConfiguration, logStore)
}

def createLogFileMetaParser(sparkConf: SparkConf,
hadoopConf: Configuration,
logStore: LogStore): LogFileMetaParser = {
val logStoreClassName = sparkConf.get("spark.delta.logFileHandler.class",
classOf[LogFileMetaParser].getName)
val logStoreClass = Utils.classForName(logStoreClassName)
logStoreClass.getConstructor(classOf[LogStore])
.newInstance(logStore).asInstanceOf[LogFileMetaParser]
}
}
Loading

0 comments on commit d6c9e24

Please sign in to comment.