Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry writing delta after concurrent modification exceptions (close #7) #8

Merged
merged 3 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Version 0.1.2 (2023-09-18)
--------------------------
Delta: Support data skipping on tstamp columns (#9)
Retry writing delta after concurrent modification exceptions (#7)

Version 0.1.1 (2023-09-13)
--------------------------
Add license file to packaged jar (#5)
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

This project contains applications required to load Snowplow data into Open Table Formats.

Lake Loader 0.1.1 supports [Delta](https://docs.delta.io/latest/index.html) format only. Future releases will add support for [Iceberg](https://iceberg.apache.org/docs/latest/) and [Hudi](https://hudi.apache.org/docs/overview/) as output formats.
Lake Loader 0.1.2 supports [Delta](https://docs.delta.io/latest/index.html) format only. Future releases will add support for [Iceberg](https://iceberg.apache.org/docs/latest/) and [Hudi](https://hudi.apache.org/docs/overview/) as output formats.

Check out [the example config files](./config) for how to configure your lake loader.

Expand All @@ -22,7 +22,7 @@ Basic usage:
docker run \
-v /path/to/config.hocon:/var/config.hocon \
-v /path/to/iglu.json:/var/iglu.json \
snowplow/lake-loader-azure:0.1.1 \
snowplow/lake-loader-azure:0.1.2 \
--config /var/config.hocon \
--iglu-config /var/iglu.json
```
Expand All @@ -35,7 +35,7 @@ The GCP lake loader reads the stream of enriched events from Pubsub and writes t
docker run \
-v /path/to/config.hocon:/var/config.hocon \
-v /path/to/iglu.json:/var/iglu.json \
snowplow/lake-loader-gcp:0.1.1 \
snowplow/lake-loader-gcp:0.1.2 \
--config /var/config.hocon \
--iglu-config /var/iglu.json
```
Expand Down Expand Up @@ -66,7 +66,7 @@ Licensed under the [Snowplow Community License](https://docs.snowplow.io/communi
[build-image]: https://github.com/snowplow-incubator/snowplow-lake-loader/workflows/CI/badge.svg
[build]: https://github.com/snowplow-incubator/snowplow-lake-loader/actions/workflows/ci.yml

[release-image]: https://img.shields.io/badge/release-0.1.1-blue.svg?style=flat
[release-image]: https://img.shields.io/badge/release-0.1.2-blue.svg?style=flat
[releases]: https://github.com/snowplow-incubator/snowplow-lake-loader/releases

[license]: https://docs.snowplow.io/docs/contributing/community-license-faq/
Expand Down
10 changes: 10 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@
# -- - For Azure blob storage, the uri should start with `abfs://`
# -- - For local development, this can be a path to a local file e.g. `file:///path/to/events`
"location": "abfs://container1@streeterdev1.dfs.core.windows.net/events"

# -- Atomic columns which should be brought to the "left-hand-side" of the events table, to
# -- enable Delta's Data Skipping feature.
# -- The Delta table property `delta.dataSkippingNumIndexedCols` will be set to the size of the list
"dataSkippingColumns": [
"load_tstamp"
"collector_tstamp"
"derived_tstamp"
"dvce_created_tstamp"
]
}

"bad": {
Expand Down
10 changes: 10 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@
# -- - For Azure blob storage, the uri should start with `abfs://`
# -- - For local development, this can be a path to a local file e.g. `file:///path/to/events`
"location": "abfs://container1@streeterdev1.dfs.core.windows.net/events"

# -- Atomic columns which should be brought to the "left-hand-side" of the events table, to
# -- enable Delta's Data Skipping feature.
# -- The Delta table property `delta.dataSkippingNumIndexedCols` will be set to the size of the list
"dataSkippingColumns": [
"load_tstamp"
"collector_tstamp"
"derived_tstamp"
"dvce_created_tstamp"
]
}

"bad": {
Expand Down
3 changes: 0 additions & 3 deletions modules/azure/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
}
}
"output": {
"good": {
"type": "Delta"
}
"bad": {
"producerConf": {
"client.id": "snowplow-lake-loader"
Expand Down
12 changes: 12 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
{
"output": {
"good": {
"type": "Delta"
"dataSkippingColumns": [
"load_tstamp"
"collector_tstamp"
"derived_tstamp"
"dvce_created_tstamp"
]
}
}

"inMemBatchBytes": 25600000
"cpuParallelismFraction": 0.75
"windowing": "5 minutes"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ object Config {
def location: URI
}

case class Delta(location: URI) extends Target
case class Delta(
location: URI,
dataSkippingColumns: List[String]
) extends Target

sealed trait Iceberg extends Target

case class IcebergSnowflake(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.apache.spark.sql.types._

import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type}
import com.snowplowanalytics.snowplow.loaders.{AtomicFields, TypedTabledEntity}
import com.snowplowanalytics.snowplow.lakes.Config

private[processing] object SparkSchema {

Expand Down Expand Up @@ -40,12 +41,21 @@ private[processing] object SparkSchema {
* Ordered spark Fields corresponding to the output of this loader
*
* Includes fields added by the loader, e.g. `load_tstamp`
*
* @param config
* The Delta config, whose `dataSkippingColumn` param tells us which columns must go first in
* the table definition. See Delta's data skipping feature to understand why.
*/
def atomicPlusTimestamps: List[StructField] =
atomic :+ StructField("load_tstamp", TimestampType, nullable = false)
def fieldsForDeltaCreate(config: Config.Delta): List[StructField] = {
val (withStats, noStats) = AtomicFields.withLoadTstamp.partition { f =>
config.dataSkippingColumns.contains(f.name)
}
(withStats ::: noStats).map(asSparkField)
}

/** String representation of the atomic schema for creating a table using SQL dialiect */
def ddlForTableCreate: String = StructType(atomicPlusTimestamps).toDDL
def ddlForIcebergCreate: String =
StructType(AtomicFields.withLoadTstamp.map(asSparkField)).toDDL

private def asSparkField(ddlField: Field): StructField = {
val normalizedName = Field.normalize(ddlField).name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.functions.current_timestamp
import org.apache.spark.sql.types.StructType
import org.apache.spark.SnowplowOverrideShutdownHook
import org.apache.spark.sql.delta.DeltaAnalysisException
import org.apache.spark.sql.delta.{DeltaAnalysisException, DeltaConcurrentModificationException}
import io.delta.tables.DeltaTable

import com.snowplowanalytics.snowplow.lakes.Config
Expand Down Expand Up @@ -54,7 +54,7 @@ private[processing] object SparkUtils {

private def configureSparkForTarget(builder: SparkSession.Builder, target: Config.Target): Unit =
target match {
case Config.Delta(_) =>
case Config.Delta(_, _) =>
builder
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog"): Unit
Expand Down Expand Up @@ -98,8 +98,9 @@ private[processing] object SparkUtils {
.partitionedBy("load_tstamp_date", "event_name")
.location(target.location.toString)
.tableName("events_internal_id") // The name does not matter
.property("delta.dataSkippingNumIndexedCols", target.dataSkippingColumns.toSet.size.toString)

SparkSchema.atomicPlusTimestamps.foreach(builder.addColumn(_))
SparkSchema.fieldsForDeltaCreate(target).foreach(builder.addColumn(_))

// This column needs special treatment because of the `generatedAlwaysAs` clause
builder.addColumn {
Expand Down Expand Up @@ -144,7 +145,7 @@ private[processing] object SparkUtils {
spark.sql(s"CREATE DATABASE IF NOT EXISTS $db"): Unit
spark.sql(s"""
CREATE TABLE IF NOT EXISTS $name
(${SparkSchema.ddlForTableCreate})
(${SparkSchema.ddlForIcebergCreate})
USING ICEBERG
PARTITIONED BY (date(load_tstamp), event_name)
TBLPROPERTIES($tblProperties)
Expand Down Expand Up @@ -187,14 +188,8 @@ private[processing] object SparkUtils {

private def sinkForTarget[F[_]: Sync](target: Config.Target, df: DataFrame): F[Unit] =
target match {
case Config.Delta(location) =>
Sync[F].blocking {
df.write
.format("delta")
.mode("append")
.option("mergeSchema", true)
.save(location.toString)
}
case delta: Config.Delta =>
sinkDelta(delta, df)
case iceberg: Config.Iceberg =>
Sync[F].blocking {
df.write
Expand All @@ -206,6 +201,33 @@ private[processing] object SparkUtils {
}
}

/**
* Sink to delta with retries
*
* Retry is needed if a concurrent writer updated the table metadata. It is only needed during
* schema evolution, when the pipeine starts tracking a new schema for the first time.
*
* Retry happens immediately with no delay. For this type of exception there is no reason to
* delay.
*/
private def sinkDelta[F[_]: Sync](target: Config.Delta, df: DataFrame): F[Unit] =
Sync[F].untilDefinedM {
Sync[F]
.blocking[Option[Unit]] {
df.write
.format("delta")
.mode("append")
.option("mergeSchema", true)
.save(target.location.toString)
Some(())
}
.recoverWith { case e: DeltaConcurrentModificationException =>
Logger[F]
.warn(s"Retryable error writing to delta table: DeltaConcurrentModificationException with conflict type ${e.conflictType}")
.as(None)
}
}

def dropViews[F[_]: Sync](spark: SparkSession, dataFramesOnDisk: List[DataFrameOnDisk]): F[Unit] =
Logger[F].info(s"Removing ${dataFramesOnDisk.size} spark data frames from local disk...") >>
Sync[F].blocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object TestSparkEnvironment {
Resource.raiseError[IO, Nothing, Throwable](new RuntimeException("http failure"))
}

private def targetConfig(tmp: Path) = Config.Delta(tmp.resolve("events").toUri)
private def targetConfig(tmp: Path) = Config.Delta(tmp.resolve("events").toUri, List("load_tstamp", "collector_tstamp"))

val appInfo = new AppInfo {
def name = "lake-loader-test"
Expand Down
3 changes: 0 additions & 3 deletions modules/gcp/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
"maxDurationPerAckExtension": "600 seconds"
}
"output": {
"good": {
"type": "Delta"
}
"bad": {
"batchSize": 100
"requestByteThreshold": 1000000
Expand Down
Loading