diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml new file mode 100644 index 00000000000..112a011b856 --- /dev/null +++ b/.github/workflows/main.yml @@ -0,0 +1,40 @@ +name: Main CI + +on: + push: + branches: + - master + - branch-0.6 + +jobs: + java: + name: Scala/sbt + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - uses: olafurpg/setup-scala@v5 + - name: Cache local sbt repository + uses: actions/cache@v2 + with: + path: | + ~/.ivy2/cache + ~/.sbt + key: ${{ runner.os }}-sbt-${{ hashFiles('**/build.sbt') }} + - name: Build with sbt + run: sbt test publishM2 + env: + GITHUB_TOKEN: ${{ github.token }} + - name: Push with maven + run: | + export VERSION=`cat version.sbt|awk '{print $5}'|sed 's/"//g'` + # maven doesn't like pushing from local repository so we copy before pushing + cp /home/runner/.m2/repository/io/delta/delta-core_2.12/$VERSION/delta-core_2.12-$VERSION.* . + mvn -B deploy:deploy-file -DpomFile=delta-core_2.12-$VERSION.pom -DrepositoryId=github -Dfile=delta-core_2.12-$VERSION.jar -Durl=https://maven.pkg.github.com/projectnessie/delta + env: + GITHUB_TOKEN: ${{ github.token }} + diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml new file mode 100644 index 00000000000..010e6d9745a --- /dev/null +++ b/.github/workflows/pull-request.yml @@ -0,0 +1,28 @@ +name: Pull Request CI Test + +on: + pull_request: + +jobs: + java: + name: Scala/sbt + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - uses: olafurpg/setup-scala@v5 + - name: Cache local sbt repository + uses: actions/cache@v2 + with: + path: | + ~/.ivy2/cache + ~/.sbt + key: ${{ runner.os }}-sbt-${{ hashFiles('**/build.sbt') }} + - name: Build with sbt + run: sbt compile test + env: + GITHUB_TOKEN: ${{ github.token }} diff --git a/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index e4f77095e05..58e781f7a3d 100644 --- a/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -25,7 +25,8 @@ import org.apache.spark.sql.delta.actions.{Action, Metadata, SingleAction} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.util.DeltaFileOperations -import org.apache.spark.sql.delta.util.FileNames._ +import org.apache.spark.sql.delta.LogFileMeta.isCheckpointFile +import org.apache.spark.sql.delta.util.FileNames.{checkpointFileSingular, checkpointFileWithParts, checkpointPrefix, checkpointVersion, numCheckpointParts} import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.{JobConf, TaskAttemptContextImpl, TaskAttemptID} @@ -171,10 +172,10 @@ trait Checkpoints extends DeltaLogging { protected def findLastCompleteCheckpoint(cv: CheckpointInstance): Option[CheckpointInstance] = { var cur = math.max(cv.version, 0L) while (cur >= 0) { - val checkpoints = store.listFrom(checkpointPrefix(logPath, math.max(0, cur - 1000))) - .map(_.getPath) + val checkpointPrefixPath = checkpointPrefix(logPath, math.max(0, cur - 1000)) + val checkpoints = logFileHandler.listFilesFrom(checkpointPrefixPath) .filter(isCheckpointFile) - .map(CheckpointInstance(_)) + .map(_.asCheckpointInstance()) .takeWhile(tv => (cur == 0 || tv.version <= cur) && tv.isEarlierThan(cv)) .toArray val lastCheckpoint = getLatestCompleteCheckpointFromList(checkpoints, cv) diff --git a/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala b/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala index 3c24b1702cf..be88cd118f9 100644 --- a/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala +++ b/src/main/scala/org/apache/spark/sql/delta/DeltaHistoryManager.scala @@ -26,11 +26,13 @@ import org.apache.spark.sql.delta.actions.{Action, CommitInfo, CommitMarker} import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.storage.LogStore import org.apache.spark.sql.delta.util.{DateTimeUtils, FileNames, TimestampFormatter} -import org.apache.spark.sql.delta.util.FileNames._ +import org.apache.spark.sql.delta.util.FileNames import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.SparkEnv import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.LogFileMeta.{isCheckpointFile, isDeltaFile} +import org.apache.spark.sql.delta.util.FileNames.deltaFile import org.apache.spark.sql.internal.SQLConf import org.apache.spark.util.SerializableConfiguration @@ -117,7 +119,10 @@ class DeltaHistoryManager( val commit = if (latestVersion - earliest > 2 * maxKeysPerList) { parallelSearch(time, earliest, latestVersion + 1) } else { - val commits = getCommits(deltaLog.store, deltaLog.logPath, earliest, Some(latestVersion + 1)) + val commits = getCommits(deltaLog.logFileHandler, + deltaLog.logPath, + earliest, + Some(latestVersion + 1)) // If it returns empty, we will fail below with `timestampEarlierThanCommitRetention`. lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head) } @@ -171,13 +176,14 @@ class DeltaHistoryManager( * This value must be used as a lower bound. */ private def getEarliestDeltaFile: Long = { - val earliestVersionOpt = deltaLog.store.listFrom(FileNames.deltaFile(deltaLog.logPath, 0)) - .filter(f => FileNames.isDeltaFile(f.getPath)) - .take(1).toArray.headOption + val earliestVersionOpt = deltaLog.logFileHandler + .listFilesFrom(FileNames.deltaFile(deltaLog.logPath, 0)) + .filter(isDeltaFile) + .take(1).map(_.version).toArray.headOption if (earliestVersionOpt.isEmpty) { throw DeltaErrors.noHistoryFound(deltaLog.logPath) } - FileNames.deltaVersion(earliestVersionOpt.get.getPath) + earliestVersionOpt.get } /** @@ -190,8 +196,8 @@ class DeltaHistoryManager( * commits are contiguous. */ private def getEarliestReproducibleCommit: Long = { - val files = deltaLog.store.listFrom(FileNames.deltaFile(deltaLog.logPath, 0)) - .filter(f => FileNames.isDeltaFile(f.getPath) || FileNames.isCheckpointFile(f.getPath)) + val files = deltaLog.logFileHandler.listFilesFrom(FileNames.deltaFile(deltaLog.logPath, 0)) + .filter(f => isDeltaFile(f) || isCheckpointFile(f)) // A map of checkpoint version and number of parts, to number of parts observed val checkpointMap = new scala.collection.mutable.HashMap[(Long, Int), Int]() @@ -202,9 +208,9 @@ class DeltaHistoryManager( // Checkpoint files come before deltas, so when we see a checkpoint, we remember it and // return it once we detect that we've seen a smaller or equal delta version. while (files.hasNext) { - val nextFilePath = files.next().getPath - if (FileNames.isDeltaFile(nextFilePath)) { - val version = FileNames.deltaVersion(nextFilePath) + val nextFilePath = files.next() + if (isDeltaFile(nextFilePath)) { + val version = nextFilePath.version if (version == 0L) return version smallestDeltaVersion = math.min(version, smallestDeltaVersion) @@ -213,9 +219,9 @@ class DeltaHistoryManager( if (lastCompleteCheckpoint.exists(_ >= smallestDeltaVersion)) { return lastCompleteCheckpoint.get } - } else if (FileNames.isCheckpointFile(nextFilePath)) { - val checkpointVersion = FileNames.checkpointVersion(nextFilePath) - val parts = FileNames.numCheckpointParts(nextFilePath) + } else if (isCheckpointFile(nextFilePath)) { + val checkpointVersion = nextFilePath.version + val parts = nextFilePath.numParts if (parts.isEmpty) { lastCompleteCheckpoint = Some(checkpointVersion) } else { @@ -262,15 +268,15 @@ object DeltaHistoryManager extends DeltaLogging { * Exposed for tests. */ private[delta] def getCommits( - logStore: LogStore, - logPath: Path, - start: Long, - end: Option[Long] = None): Array[Commit] = { + logStore: LogFileMetaParser, + logPath: Path, + start: Long, + end: Option[Long] = None): Array[Commit] = { val until = end.getOrElse(Long.MaxValue) - val commits = logStore.listFrom(deltaFile(logPath, start)) - .filter(f => isDeltaFile(f.getPath)) + val commits = logStore.listFilesFrom(deltaFile(logPath, start)) + .filter(isDeltaFile) .map { fileStatus => - Commit(deltaVersion(fileStatus.getPath), fileStatus.getModificationTime) + Commit(fileStatus.version, fileStatus.fileStatus.getModificationTime) } .takeWhile(_.version < until) @@ -344,10 +350,11 @@ object DeltaHistoryManager extends DeltaLogging { import spark.implicits._ val possibleCommits = spark.range(start, end, step).mapPartitions { startVersions => val logStore = LogStore(SparkEnv.get.conf, conf.value) + val logFileHandler = LogFileMetaParser(SparkEnv.get.conf, conf.value, logStore) val basePath = new Path(logPath) startVersions.map { startVersion => val commits = getCommits( - logStore, basePath, startVersion, Some(math.min(startVersion + step, end))) + logFileHandler, basePath, startVersion, Some(math.min(startVersion + step, end))) lastCommitBeforeTimestamp(commits, time).getOrElse(commits.head) } }.collect() diff --git a/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala b/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala index 31e5081cd03..ec277a981ed 100644 --- a/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala +++ b/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala @@ -29,10 +29,12 @@ import com.databricks.spark.util.TagDefinitions._ import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.commands.WriteIntoDelta import org.apache.spark.sql.delta.files.{TahoeBatchFileIndex, TahoeLogFileIndex} +import org.apache.spark.sql.delta.LogFileMeta.isDeltaFile import org.apache.spark.sql.delta.metering.DeltaLogging import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.LogStoreProvider +import org.apache.spark.sql.delta.util.FileNames.deltaFile import com.google.common.cache.{CacheBuilder, RemovalListener, RemovalNotification} import org.apache.hadoop.fs.Path @@ -64,9 +66,8 @@ class DeltaLog private( with MetadataCleanup with LogStoreProvider with SnapshotManagement - with ReadChecksum { - - import org.apache.spark.sql.delta.util.FileNames._ + with ReadChecksum + with LogFileMetaProvider { private lazy implicit val _clock = clock @@ -77,6 +78,9 @@ class DeltaLog private( /** Used to read and write physical log files and checkpoints. */ lazy val store = createLogStore(spark) + + lazy val logFileHandler = createLogFileMetaParser(spark, store) + /** Direct access to the underlying storage system. */ private[delta] lazy val fs = logPath.getFileSystem(spark.sessionState.newHadoopConf) @@ -224,12 +228,9 @@ class DeltaLog private( * return an empty Iterator. */ def getChanges(startVersion: Long): Iterator[(Long, Seq[Action])] = { - val deltas = store.listFrom(deltaFile(logPath, startVersion)) - .filter(f => isDeltaFile(f.getPath)) + val deltas = logFileHandler.listFilesFrom(deltaFile(logPath, startVersion)).filter(isDeltaFile) deltas.map { status => - val p = status.getPath - val version = deltaVersion(p) - (version, store.read(p).map(Action.fromJson)) + (status.version, store.read(status.fileStatus.getPath).map(Action.fromJson)) } } @@ -305,9 +306,9 @@ class DeltaLog private( def isValid(): Boolean = { val expectedExistingFile = deltaFile(logPath, currentSnapshot.version) try { - store.listFrom(expectedExistingFile) + logFileHandler.listFilesFrom(expectedExistingFile) .take(1) - .exists(_.getPath.getName == expectedExistingFile.getName) + .exists(_.version == currentSnapshot.version) } catch { case _: FileNotFoundException => // Parent of expectedExistingFile doesn't exist diff --git a/src/main/scala/org/apache/spark/sql/delta/LogFileMetaParser.scala b/src/main/scala/org/apache/spark/sql/delta/LogFileMetaParser.scala new file mode 100644 index 00000000000..b2746e09447 --- /dev/null +++ b/src/main/scala/org/apache/spark/sql/delta/LogFileMetaParser.scala @@ -0,0 +1,115 @@ +/* + * Copyright (2020) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.storage.LogStore +import org.apache.spark.sql.delta.util.FileNames.{checkpointVersion, deltaVersion, isCheckpointFile, isChecksumFile, isDeltaFile, numCheckpointParts} +import org.apache.spark.util.Utils + +import scala.util.Try + +sealed case class DeltaFileType(value: String) + +object DeltaFileType { + object DELTA extends DeltaFileType("DELTA") + object CHECKPOINT extends DeltaFileType("CHECKPOINT") + object CHECKSUM extends DeltaFileType("CHECKSUM") + object UNKNOWN extends DeltaFileType("UNKNOWN") + + val values = Seq(DELTA, CHECKPOINT, CHECKSUM, UNKNOWN) + + def getFileType(path: Path): DeltaFileType = { + path match { + case f if isCheckpointFile(f) => DeltaFileType.CHECKPOINT + case f if isDeltaFile(f) => DeltaFileType.DELTA + case f if isChecksumFile(f) => DeltaFileType.CHECKSUM + case _ => DeltaFileType.UNKNOWN + } + } +} + +case class LogFileMeta(fileStatus: FileStatus, + version: Long, + fileType: DeltaFileType, + numParts: Option[Int]) { + + def asCheckpointInstance(): CheckpointInstance = { + CheckpointInstance(version, numParts) + } +} + +object LogFileMeta { + def isCheckpointFile(logFileMeta: LogFileMeta): Boolean = { + logFileMeta.fileType == DeltaFileType.CHECKPOINT + } + + def isDeltaFile(logFileMeta: LogFileMeta): Boolean = { + logFileMeta.fileType == DeltaFileType.DELTA + } +} + + +class LogFileMetaParser(logStore: LogStore) { + + def listFilesFrom(logPath: Path): Iterator[LogFileMeta] = { + + logStore.listFrom(logPath).map(fs => { + LogFileMeta(fs, + Try(deltaVersion(fs.getPath)).getOrElse(Try(checkpointVersion(fs.getPath)).getOrElse(-1L)), + DeltaFileType.getFileType(fs.getPath), + numCheckpointParts(fs.getPath)) + }) + } + +} + +object LogFileMetaParser extends LogFileMetaProvider + with Logging { + + def apply(sc: SparkContext, logStore: LogStore): LogFileMetaParser = { + apply(sc.getConf, sc.hadoopConfiguration, logStore) + } + + def apply(sparkConf: SparkConf, + hadoopConf: Configuration, + logStore: LogStore): LogFileMetaParser = { + createLogFileMetaParser(sparkConf, hadoopConf, logStore) + } +} + +trait LogFileMetaProvider { + + def createLogFileMetaParser(spark: SparkSession, logStore: LogStore): LogFileMetaParser = { + val sc = spark.sparkContext + createLogFileMetaParser(sc.getConf, sc.hadoopConfiguration, logStore) + } + + def createLogFileMetaParser(sparkConf: SparkConf, + hadoopConf: Configuration, + logStore: LogStore): LogFileMetaParser = { + val logStoreClassName = sparkConf.get("spark.delta.logFileHandler.class", + classOf[LogFileMetaParser].getName) + val logStoreClass = Utils.classForName(logStoreClassName) + logStoreClass.getConstructor(classOf[LogStore]) + .newInstance(logStore).asInstanceOf[LogFileMetaParser] + } +} diff --git a/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala b/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala index cbd1e7e0a7b..ac85a68a4f4 100644 --- a/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala +++ b/src/main/scala/org/apache/spark/sql/delta/MetadataCleanup.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.delta import java.util.{Calendar, TimeZone} import org.apache.spark.sql.delta.DeltaHistoryManager.BufferingLogDeletionIterator +import org.apache.spark.sql.delta.LogFileMeta.{isCheckpointFile, isDeltaFile} import org.apache.spark.sql.delta.metering.DeltaLogging +import org.apache.spark.sql.delta.util.FileNames.checkpointPrefix import org.apache.commons.lang3.time.DateUtils import org.apache.hadoop.fs.{FileStatus, Path} @@ -69,22 +71,19 @@ trait MetadataCleanup extends DeltaLogging { * - be older than `fileCutOffTime` */ private def listExpiredDeltaLogs(fileCutOffTime: Long): Iterator[FileStatus] = { - import org.apache.spark.sql.delta.util.FileNames._ val latestCheckpoint = lastCheckpoint if (latestCheckpoint.isEmpty) return Iterator.empty val threshold = latestCheckpoint.get.version - 1L - val files = store.listFrom(checkpointPrefix(logPath, 0)) - .filter(f => isCheckpointFile(f.getPath) || isDeltaFile(f.getPath)) - def getVersion(filePath: Path): Long = { - if (isCheckpointFile(filePath)) { - checkpointVersion(filePath) - } else { - deltaVersion(filePath) - } - } + val files = logFileHandler.listFilesFrom(checkpointPrefix(logPath, 0)) + .filter(f => isCheckpointFile(f) || isDeltaFile(f)) + val versionTuples = files.map(f => (f.fileStatus, f.version)).toArray + val versionMap = versionTuples.map(vt => (vt._1.getPath, vt._2)).toMap - new BufferingLogDeletionIterator(files, fileCutOffTime, threshold, getVersion) + new BufferingLogDeletionIterator(versionTuples.map(_._1).iterator, + fileCutOffTime, + threshold, + versionMap) } /** Truncates a timestamp down to the previous midnight and returns the time and a log string */ diff --git a/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala b/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala index 12e18f4ab65..49f8d46bb1a 100644 --- a/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala +++ b/src/main/scala/org/apache/spark/sql/delta/SnapshotManagement.scala @@ -24,11 +24,12 @@ import scala.util.{Failure, Success, Try} import com.databricks.spark.util.TagDefinitions.TAG_ASYNC import org.apache.spark.sql.delta.actions.{Metadata, SingleAction} import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.util.FileNames._ import org.apache.spark.sql.delta.util.JsonUtils import org.apache.hadoop.fs.FileStatus import org.apache.spark.SparkContext +import org.apache.spark.sql.delta.LogFileMeta.{isCheckpointFile, isDeltaFile} +import org.apache.spark.sql.delta.util.FileNames.{checkpointPrefix, deltaFile} import org.apache.spark.sql.{AnalysisException, Dataset} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.util.{ThreadUtils, Utils} @@ -76,37 +77,38 @@ trait SnapshotManagement { self: DeltaLog => startCheckpoint: Option[Long], versionToLoad: Option[Long] = None): LogSegment = { - val newFiles = store + val newFiles = logFileHandler // List from the starting checkpoint. If a checkpoint doesn't exist, this will still return // deltaVersion=0. - .listFrom(checkpointPrefix(logPath, startCheckpoint.getOrElse(0L))) + .listFilesFrom(checkpointPrefix(logPath, startCheckpoint.getOrElse(0L))) // Pick up all checkpoint and delta files - .filter { file => isCheckpointFile(file.getPath) || isDeltaFile(file.getPath) } + .filter { file => isCheckpointFile(file) || isDeltaFile(file) } // filter out files that aren't atomically visible. Checkpoint files of 0 size are invalid - .filterNot { file => isCheckpointFile(file.getPath) && file.getLen == 0 } + .filterNot { file => isCheckpointFile(file) && file.fileStatus.getLen == 0 } // take files until the version we want to load - .takeWhile(f => versionToLoad.forall(v => getFileVersion(f.getPath) <= v)) + .takeWhile(f => versionToLoad.forall(v => f.version <= v)) .toArray if (newFiles.isEmpty && startCheckpoint.isEmpty) { throw DeltaErrors.emptyDirectoryException(logPath.toString) } - val (checkpoints, deltas) = newFiles.partition(f => isCheckpointFile(f.getPath)) + val (checkpoints, deltas) = newFiles.partition(isCheckpointFile) // Find the latest checkpoint in the listing that is not older than the versionToLoad val lastChkpoint = versionToLoad.map(CheckpointInstance(_, None)) .getOrElse(CheckpointInstance.MaxValue) - val checkpointFiles = checkpoints.map(f => CheckpointInstance(f.getPath)) + val checkpointFiles = checkpoints.map(_.asCheckpointInstance()) val newCheckpoint = getLatestCompleteCheckpointFromList(checkpointFiles, lastChkpoint) if (newCheckpoint.isDefined) { // If there is a new checkpoint, start new lineage there. + val newCheckpointVersion = newCheckpoint.get.version val newCheckpointPaths = newCheckpoint.get.getCorrespondingFiles(logPath).toSet val deltasAfterCheckpoint = deltas.filter { file => - deltaVersion(file.getPath) > newCheckpointVersion + file.version > newCheckpointVersion } - val deltaVersions = deltasAfterCheckpoint.map(f => deltaVersion(f.getPath)) + val deltaVersions = deltasAfterCheckpoint.map(_.version) // We may just be getting a checkpoint file after the filtering if (deltaVersions.nonEmpty) { @@ -119,20 +121,21 @@ trait SnapshotManagement { self: DeltaLog => } } val newVersion = deltaVersions.lastOption.getOrElse(newCheckpoint.get.version) - val newCheckpointFiles = checkpoints.filter(f => newCheckpointPaths.contains(f.getPath)) + val newCheckpointFiles = checkpoints.filter( + f => newCheckpointPaths.contains(f.fileStatus.getPath)) assert(newCheckpointFiles.length == newCheckpointPaths.size, "Failed in getting the file information for:\n" + newCheckpointPaths.mkString(" -", "\n -", "") + "\n" + - "among\n" + checkpoints.map(_.getPath).mkString(" -", "\n -", "")) + "among\n" + checkpoints.map(_.fileStatus.getPath).mkString(" -", "\n -", "")) // In the case where `deltasAfterCheckpoint` is empty, `deltas` should still not be empty, // they may just be before the checkpoint version unless we have a bug in log cleanup - val lastCommitTimestamp = deltas.last.getModificationTime + val lastCommitTimestamp = deltas.last.fileStatus.getModificationTime LogSegment( newVersion, - deltasAfterCheckpoint, - newCheckpointFiles, + deltasAfterCheckpoint.map(_.fileStatus), + newCheckpointFiles.map(_.fileStatus), newCheckpoint.map(_.version), lastCommitTimestamp) } else { @@ -145,7 +148,7 @@ trait SnapshotManagement { self: DeltaLog => s"Checkpoint file to load version: ${startCheckpoint.get} is missing.")) } - val deltaVersions = deltas.map(f => deltaVersion(f.getPath)) + val deltaVersions = deltas.map(_.version) verifyDeltaVersions(deltaVersions) if (deltaVersions.head != 0) { throw DeltaErrors.logFileNotFoundException( @@ -158,11 +161,11 @@ trait SnapshotManagement { self: DeltaLog => val latestCommit = deltas.last LogSegment( - deltaVersion(latestCommit.getPath), // deltas is not empty, so can call .last - deltas, + latestCommit.version, // deltas is not empty, so can call .last + deltas.map(_.fileStatus), Nil, None, - latestCommit.getModificationTime) + latestCommit.fileStatus.getModificationTime) } } diff --git a/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala b/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala index a146454db26..d9004a04535 100644 --- a/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala +++ b/src/main/scala/org/apache/spark/sql/delta/util/DeltaFileOperations.scala @@ -32,6 +32,7 @@ import org.apache.parquet.hadoop.{Footer, ParquetFileReader} import org.apache.spark.{SparkEnv, TaskContext} import org.apache.spark.broadcast.Broadcast +import org.apache.spark.sql.delta.LogFileMetaParser import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.util.{SerializableConfiguration, ThreadUtils} @@ -117,7 +118,7 @@ object DeltaFileOperations extends DeltaLogging { /** Iterate through the contents of directories. */ private def listUsingLogStore( - logStore: LogStore, + logStore: LogFileMetaParser, subDirs: Iterator[String], recurse: Boolean, hiddenFileNameFilter: String => Boolean): Iterator[SerializableFileStatus] = { @@ -125,9 +126,9 @@ object DeltaFileOperations extends DeltaLogging { def list(dir: String, tries: Int): Iterator[SerializableFileStatus] = { logInfo(s"Listing $dir") try { - logStore.listFrom(new Path(dir, "\u0000")) - .filterNot(f => hiddenFileNameFilter(f.getPath.getName)) - .map(SerializableFileStatus.fromStatus) + logStore.listFilesFrom(new Path(dir, "\u0000")) + .filterNot(f => hiddenFileNameFilter(f.fileStatus.getPath.getName)) + .map(f => SerializableFileStatus.fromStatus(f.fileStatus)) } catch { case NonFatal(e) if isThrottlingError(e) && tries > 0 => randomBackoff("listing", e) @@ -151,7 +152,7 @@ object DeltaFileOperations extends DeltaLogging { /** Given an iterator of files and directories, recurse directories with its contents. */ private def recurseDirectories( - logStore: LogStore, + logStore: LogFileMetaParser, filesAndDirs: Iterator[SerializableFileStatus], hiddenFileNameFilter: String => Boolean): Iterator[SerializableFileStatus] = { filesAndDirs.flatMap { @@ -193,12 +194,14 @@ object DeltaFileOperations extends DeltaLogging { val listParallelism = fileListingParallelism.getOrElse(spark.sparkContext.defaultParallelism) val dirsAndFiles = spark.sparkContext.parallelize(subDirs).mapPartitions { dirs => val logStore = LogStore(SparkEnv.get.conf, hadoopConf.value.value) - listUsingLogStore(logStore, dirs, recurse = false, hiddenFileNameFilter) + val logFileHandler = LogFileMetaParser(SparkEnv.get.conf, hadoopConf.value.value, logStore) + listUsingLogStore(logFileHandler, dirs, recurse = false, hiddenFileNameFilter) }.repartition(listParallelism) // Initial list of subDirs may be small val allDirsAndFiles = dirsAndFiles.mapPartitions { firstLevelDirsAndFiles => val logStore = LogStore(SparkEnv.get.conf, hadoopConf.value.value) - recurseDirectories(logStore, firstLevelDirsAndFiles, hiddenFileNameFilter) + val logFileHandler = LogFileMetaParser(SparkEnv.get.conf, hadoopConf.value.value, logStore) + recurseDirectories(logFileHandler, firstLevelDirsAndFiles, hiddenFileNameFilter) } spark.createDataset(allDirsAndFiles) } diff --git a/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala b/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala index 8bf1e6f6f3e..90ef3b6ca5b 100644 --- a/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala +++ b/src/test/scala/org/apache/spark/sql/delta/DeltaTimeTravelSuite.scala @@ -122,7 +122,7 @@ class DeltaTimeTravelSuite extends QueryTest start - 2.seconds, // adjusts to start + 4 ms start + 10.seconds) - val commits = DeltaHistoryManager.getCommits(deltaLog.store, deltaLog.logPath, 0, None) + val commits = DeltaHistoryManager.getCommits(deltaLog.logFileHandler, deltaLog.logPath, 0, None) assert(commits.map(_.timestamp) === Seq(start, start + 1.millis, start + 2.millis, start + 3.millis, start + 4.millis, start + 10.seconds)) } diff --git a/version.sbt b/version.sbt index ada7f1a42e4..aa797b0e7d7 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.6.2-SNAPSHOT" \ No newline at end of file +version in ThisBuild := "0.6.2-nessie-SNAPSHOT"