Skip to content

Commit

Permalink
Delta: Support data skipping on tstamp columns (close #9)
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Sep 15, 2023
1 parent faec16f commit 28a01bb
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 14 deletions.
10 changes: 10 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@
# -- - For Azure blob storage, the uri should start with `abfs://`
# -- - For local development, this can be a path to a local file e.g. `file:///path/to/events`
"location": "abfs://container1@streeterdev1.dfs.core.windows.net/events"

# -- Atomic columns which should be brought to the "left-hand-side" of the events table, to
# -- enable Delta's Data Skipping feature.
# -- The Delta table property `delta.dataSkippingNumIndexedCols` will be set to the size of the list
"dataSkippingColumns": [
"load_tstamp"
"collector_tstamp"
"derived_tstamp"
"dvce_created_tstamp"
]
}

"bad": {
Expand Down
10 changes: 10 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@
# -- - For Azure blob storage, the uri should start with `abfs://`
# -- - For local development, this can be a path to a local file e.g. `file:///path/to/events`
"location": "abfs://container1@streeterdev1.dfs.core.windows.net/events"

# -- Atomic columns which should be brought to the "left-hand-side" of the events table, to
# -- enable Delta's Data Skipping feature.
# -- The Delta table property `delta.dataSkippingNumIndexedCols` will be set to the size of the list
"dataSkippingColumns": [
"load_tstamp"
"collector_tstamp"
"derived_tstamp"
"dvce_created_tstamp"
]
}

"bad": {
Expand Down
3 changes: 0 additions & 3 deletions modules/azure/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@
}
}
"output": {
"good": {
"type": "Delta"
}
"bad": {
"producerConf": {
"client.id": "snowplow-lake-loader"
Expand Down
12 changes: 12 additions & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
{
"output": {
"good": {
"type": "Delta"
"dataSkippingColumns": [
"load_tstamp"
"collector_tstamp"
"derived_tstamp"
"dvce_created_tstamp"
]
}
}

"inMemBatchBytes": 25600000
"cpuParallelismFraction": 0.75
"windowing": "5 minutes"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,11 @@ object Config {
def location: URI
}

case class Delta(location: URI) extends Target
case class Delta(
location: URI,
dataSkippingColumns: List[String]
) extends Target

sealed trait Iceberg extends Target

case class IcebergSnowflake(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.apache.spark.sql.types._

import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type}
import com.snowplowanalytics.snowplow.loaders.{AtomicFields, TypedTabledEntity}
import com.snowplowanalytics.snowplow.lakes.Config

private[processing] object SparkSchema {

Expand Down Expand Up @@ -40,12 +41,21 @@ private[processing] object SparkSchema {
* Ordered spark Fields corresponding to the output of this loader
*
* Includes fields added by the loader, e.g. `load_tstamp`
*
* @param config
* The Delta config, whose `dataSkippingColumn` param tells us which columns must go first in
* the table definition. See Delta's data skipping feature to understand why.
*/
def atomicPlusTimestamps: List[StructField] =
atomic :+ StructField("load_tstamp", TimestampType, nullable = false)
def fieldsForDeltaCreate(config: Config.Delta): List[StructField] = {
val (withStats, noStats) = AtomicFields.withLoadTstamp.partition { f =>
config.dataSkippingColumns.contains(f.name)
}
(withStats ::: noStats).map(asSparkField)
}

/** String representation of the atomic schema for creating a table using SQL dialiect */
def ddlForTableCreate: String = StructType(atomicPlusTimestamps).toDDL
def ddlForIcebergCreate: String =
StructType(AtomicFields.withLoadTstamp.map(asSparkField)).toDDL

private def asSparkField(ddlField: Field): StructField = {
val normalizedName = Field.normalize(ddlField).name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ private[processing] object SparkUtils {

private def configureSparkForTarget(builder: SparkSession.Builder, target: Config.Target): Unit =
target match {
case Config.Delta(_) =>
case Config.Delta(_, _) =>
builder
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog"): Unit
Expand Down Expand Up @@ -98,8 +98,9 @@ private[processing] object SparkUtils {
.partitionedBy("load_tstamp_date", "event_name")
.location(target.location.toString)
.tableName("events_internal_id") // The name does not matter
.property("delta.dataSkippingNumIndexedCols", target.dataSkippingColumns.toSet.size.toString)

SparkSchema.atomicPlusTimestamps.foreach(builder.addColumn(_))
SparkSchema.fieldsForDeltaCreate(target).foreach(builder.addColumn(_))

// This column needs special treatment because of the `generatedAlwaysAs` clause
builder.addColumn {
Expand Down Expand Up @@ -144,7 +145,7 @@ private[processing] object SparkUtils {
spark.sql(s"CREATE DATABASE IF NOT EXISTS $db"): Unit
spark.sql(s"""
CREATE TABLE IF NOT EXISTS $name
(${SparkSchema.ddlForTableCreate})
(${SparkSchema.ddlForIcebergCreate})
USING ICEBERG
PARTITIONED BY (date(load_tstamp), event_name)
TBLPROPERTIES($tblProperties)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ object TestSparkEnvironment {
Resource.raiseError[IO, Nothing, Throwable](new RuntimeException("http failure"))
}

private def targetConfig(tmp: Path) = Config.Delta(tmp.resolve("events").toUri)
private def targetConfig(tmp: Path) = Config.Delta(tmp.resolve("events").toUri, List("load_tstamp", "collector_tstamp"))

val appInfo = new AppInfo {
def name = "lake-loader-test"
Expand Down
3 changes: 0 additions & 3 deletions modules/gcp/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@
"maxDurationPerAckExtension": "600 seconds"
}
"output": {
"good": {
"type": "Delta"
}
"bad": {
"batchSize": 100
"requestByteThreshold": 1000000
Expand Down

0 comments on commit 28a01bb

Please sign in to comment.