From a0faf88bc1138818485563a9b158b3e563ec5537 Mon Sep 17 00:00:00 2001 From: Ryan Murray Date: Wed, 30 Sep 2020 10:37:20 +0100 Subject: [PATCH] log store chooses where checkpoitns go (#6) --- .../org/apache/spark/sql/delta/Checkpoints.scala | 3 ++- .../apache/spark/sql/delta/storage/LogStore.scala | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index d4c6188414e..592f86d3d75 100644 --- a/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -229,7 +229,8 @@ object Checkpoints extends DeltaLogging { val checkpointSize = spark.sparkContext.longAccumulator("checkpointSize") val numOfFiles = spark.sparkContext.longAccumulator("numOfFiles") // Use the string in the closure as Path is not Serializable. - val path = checkpointFileSingular(snapshot.path, snapshot.version).toString + val resolvedPath = deltaLog.store.resolveCheckpointPath(snapshot.path) + val path = checkpointFileSingular(resolvedPath, snapshot.version).toString val base = snapshot.state .repartition(1) .map { action => diff --git a/src/main/scala/org/apache/spark/sql/delta/storage/LogStore.scala b/src/main/scala/org/apache/spark/sql/delta/storage/LogStore.scala index 98a36803b15..b9836e4a72f 100644 --- a/src/main/scala/org/apache/spark/sql/delta/storage/LogStore.scala +++ b/src/main/scala/org/apache/spark/sql/delta/storage/LogStore.scala @@ -83,6 +83,18 @@ trait LogStore { throw new UnsupportedOperationException() } + /** + * Let LogStore decide where checkpoints should be stored. + * + * Typically the checkpoint storage path would be the same as for Delta storage. + * This woudl be inside the _delta_log directory. The LogStore impl may wish to control this path + * and should inform the Checkpoints methods accordingly. This is only required for checkpoints + * as they are the only metadata file not directly written by LogStore. + */ + def resolveCheckpointPath(path: Path): Path = { + path + } + /** * Whether a partial write is visible when writing to `path`. *