Skip to content

Commit

Permalink
Retry writing delta after concurrent modification exceptions (close #7)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Sep 14, 2023
1 parent 42081d0 commit dd3f703
Showing 1 changed file with 28 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.types.StructType
import org.apache.spark.SnowplowOverrideShutdownHook
import org.apache.spark.sql.delta.DeltaAnalysisException
import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaConcurrentModificationException}
import io.delta.tables.DeltaTable

import com.snowplowanalytics.snowplow.lakes.Config
Expand Down Expand Up @@ -187,14 +187,8 @@ private[processing] object SparkUtils {

private def sinkForTarget[F[_]: Sync](target: Config.Target, df: DataFrame): F[Unit] =
target match {
case Config.Delta(location) =>
Sync[F].blocking {
df.write
.format("delta")
.mode("append")
.option("mergeSchema", true)
.save(location.toString)
}
case delta: Config.Delta =>
sinkDelta(delta, df)
case iceberg: Config.Iceberg =>
Sync[F].blocking {
df.write
Expand All @@ -206,6 +200,31 @@ private[processing] object SparkUtils {
}
}

/**
* Sink to delta with retries
*
* Retry is needed if a concurrent writer updated the table metadata. It is only needed during
* schema evolution, when the pipeine starts tracking a new schema for the first time.
*
* Retry happens immediately with no delay. For this type of exception there is no reason to
* delay.
*/
private def sinkDelta[F[_]: Sync](target: Config.Delta, df: DataFrame): F[Unit] =
Sync[F].untilDefinedM {
Sync[F]
.blocking[Option[Unit]] {
df.write
.format("delta")
.mode("append")
.option("mergeSchema", true)
.save(target.location.toString)
Some(())
}
.recoverWith { case e: DeltaConcurrentModificationException =>
Logger[F].warn(s"Retryable error writing to delta table: ${e.getMessage}").as(None)
}
}

def dropViews[F[_]: Sync](spark: SparkSession, dataFramesOnDisk: List[DataFrameOnDisk]): F[Unit] =
Logger[F].info(s"Removing ${dataFramesOnDisk.size} spark data frames from local disk...") >>
Sync[F].blocking {
Expand Down

0 comments on commit dd3f703

Please sign in to comment.