Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/0.1.1 #6

Merged
merged 3 commits into from
Sep 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
Version 0.1.1 (2023-09-13)
--------------------------
Add license file to packaged jar (#5)
Fix loading when a schema needs recovery columns (#4)

Version 0.1.0 (2023-09-10)
--------------------------
Initial release
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

This project contains applications required to load Snowplow data into Open Table Formats.

Lake Loader 0.1.0 supports [Delta](https://docs.delta.io/latest/index.html) format only. Future releases will add support for [Iceberg](https://iceberg.apache.org/docs/latest/) and [Hudi](https://hudi.apache.org/docs/overview/) as output formats.
Lake Loader 0.1.1 supports [Delta](https://docs.delta.io/latest/index.html) format only. Future releases will add support for [Iceberg](https://iceberg.apache.org/docs/latest/) and [Hudi](https://hudi.apache.org/docs/overview/) as output formats.

Check out [the example config files](./config) for how to configure your lake loader.

Expand All @@ -22,7 +22,7 @@ Basic usage:
docker run \
-v /path/to/config.hocon:/var/config.hocon \
-v /path/to/iglu.json:/var/iglu.json \
snowplow/lake-loader-azure:0.1.0 \
snowplow/lake-loader-azure:0.1.1 \
--config /var/config.hocon \
--iglu-config /var/iglu.json
```
Expand All @@ -35,7 +35,7 @@ The GCP lake loader reads the stream of enriched events from Pubsub and writes t
docker run \
-v /path/to/config.hocon:/var/config.hocon \
-v /path/to/iglu.json:/var/iglu.json \
snowplow/lake-loader-gcp:0.1.0 \
snowplow/lake-loader-gcp:0.1.1 \
--config /var/config.hocon \
--iglu-config /var/iglu.json
```
Expand Down Expand Up @@ -66,7 +66,7 @@ Licensed under the [Snowplow Community License](https://docs.snowplow.io/communi
[build-image]: https://github.com/snowplow-incubator/snowplow-lake-loader/workflows/CI/badge.svg
[build]: https://github.com/snowplow-incubator/snowplow-lake-loader/actions/workflows/ci.yml

[release-image]: https://img.shields.io/badge/release-0.1.0-blue.svg?style=flat
[release-image]: https://img.shields.io/badge/release-0.1.1-blue.svg?style=flat
[releases]: https://github.com/snowplow-incubator/snowplow-lake-loader/releases

[license]: https://docs.snowplow.io/docs/contributing/community-license-faq/
Expand Down
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ lazy val root = project.in(file("."))
.aggregate(
streams,
kafka,
pubsub,
loadersCommon,
core,
azure,
gcp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ object Processing {

private def rememberColumnNames[F[_]](ref: Ref[F, WindowState], fields: List[TypedTabledEntity]): F[Unit] = {
val colNames = fields.flatMap { typedTabledEntity =>
typedTabledEntity.mergedField.name :: typedTabledEntity.recoveries.values.map(_.name).toList
typedTabledEntity.mergedField.name :: typedTabledEntity.recoveries.map(_._2.name)
}.toSet
ref.update(state => state.copy(nonAtomicColumnNames = state.nonAtomicColumnNames ++ colNames))
}
Expand Down Expand Up @@ -224,7 +224,7 @@ object Processing {
}
.map { results =>
val (bad, good) = results.separate
(bad, RowsWithSchema(good, SparkSchema.forBatch(entities.fields.map(_.mergedField))))
(bad, RowsWithSchema(good, SparkSchema.forBatch(entities.fields)))
}

private def sendAndDropFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, (List[BadRow], A), A] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ package com.snowplowanalytics.snowplow.lakes.processing
import org.apache.spark.sql.types._

import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type}
import com.snowplowanalytics.snowplow.loaders.AtomicFields
import com.snowplowanalytics.snowplow.loaders.{AtomicFields, TypedTabledEntity}

private[processing] object SparkSchema {

Expand All @@ -19,8 +19,12 @@ private[processing] object SparkSchema {
*
* The returned schema includes atomic fields and non-atomic fields but not the load_tstamp column
*/
def forBatch(entities: List[Field]): StructType =
StructType(atomic ::: entities.map(asSparkField))
def forBatch(entities: List[TypedTabledEntity]): StructType = {
val nonAtomicFields = entities.flatMap { tte =>
tte.mergedField :: tte.recoveries.map(_._2)
}
StructType(atomic ::: nonAtomicFields.map(asSparkField))
}

/**
* Ordered Fields corresponding to the output from Enrich
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "myvendor",
"name": "badevolution",
"format": "jsonschema",
"version": "1-0-0"
},
"type": "object",
"properties": {
"col_a": {"type": "string"}
},
"required": ["col_a"]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "myvendor",
"name": "badevolution",
"format": "jsonschema",
"version": "1-0-1"
},
"type": "object",
"properties": {
"col_a": {"type": "integer"}
},
"required": ["col_a"]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "myvendor",
"name": "goodschema",
"format": "jsonschema",
"version": "7-0-0"
},
"type": "object",
"properties": {
"col_a": {"type": "string"}
},
"required": ["col_a"]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
"self": {
"vendor": "myvendor",
"name": "goodschema",
"format": "jsonschema",
"version": "7-0-1"
},
"type": "object",
"properties": {
"col_a": {"type": "string"},
"col_b": {"type": "string"}
},
"required": ["col_a", "col_b"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package com.snowplowanalytics.snowplow.lakes.processing
import cats.effect.IO
import cats.effect.kernel.Resource
import cats.effect.testing.specs2.CatsEffect
import io.circe.Json
import fs2.Stream
import org.specs2.Specification
import org.specs2.matcher.MatchResult
Expand All @@ -20,7 +21,8 @@ import io.delta.tables.DeltaTable
import scala.concurrent.duration.DurationInt
import java.nio.charset.StandardCharsets

import com.snowplowanalytics.snowplow.analytics.scalasdk.Event
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, SnowplowEvent}
import com.snowplowanalytics.snowplow.lakes.TestSparkEnvironment
import com.snowplowanalytics.snowplow.sources.TokenedEvents

Expand All @@ -32,6 +34,7 @@ class ProcessingSparkSpec extends Specification with CatsEffect {
def is = sequential ^ s2"""
The lake loader should:
Write a single window to a delta table $e1
Successfully write parquet file when there is an invalid schema evolution $e2
"""

def e1 = {
Expand Down Expand Up @@ -65,8 +68,9 @@ class ProcessingSparkSpec extends Specification with CatsEffect {
List[MatchResult[Any]](
cols must contain("event_id"),
cols must contain("load_tstamp"),
cols must contain("unstruct_event_myvendor_goodschema_7"),
df.count() must beEqualTo(4L),
outputEventIds.toSeq must containTheSameElementsAs(inputEventIds),
outputEventIds must containTheSameElementsAs(inputEventIds),
loadTstamps.toSet must haveSize(1), // single timestamp for entire window
loadTstamps.head must not beNull,
trTotals must contain(BigDecimal(1.23))
Expand All @@ -75,6 +79,48 @@ class ProcessingSparkSpec extends Specification with CatsEffect {
}
}
}

def e2 = {

val resources = for {
inputs <- Resource.eval(generateEventsBadEvolution.take(2).compile.toList)
env <- TestSparkEnvironment.build(List(inputs.map(_._1)))
} yield (inputs.map(_._2), env)

val result = resources.use { case (inputEvents, env) =>
Processing
.stream(env.environment)
.compile
.drain
.as((inputEvents, env.tmpDir))
}

result.flatMap { case (inputEvents, tmpDir) =>
sparkForAssertions.use { spark =>
IO.delay {
import spark.implicits._
val tbl = DeltaTable.forPath(spark, tmpDir.resolve("events").toString)
val df = tbl.toDF
val cols = df.columns.toSeq

val inputEventIds = inputEvents.flatten.map(_.event_id.toString)
val outputEventIds = df.select("event_id").as[String].collect().toSeq
val loadTstamps = df.select("load_tstamp").as[java.sql.Timestamp].collect().toSeq

List[MatchResult[Any]](
cols must contain("event_id"),
cols must contain("load_tstamp"),
istreeter marked this conversation as resolved.
Show resolved Hide resolved
cols must contain("unstruct_event_myvendor_badevolution_1"),
cols must contain("unstruct_event_myvendor_badevolution_1_recovered_1_0_1_164698669"),
df.count() must beEqualTo(4L),
outputEventIds must containTheSameElementsAs(inputEventIds),
loadTstamps.toSet must haveSize(1), // single timestamp for entire window
loadTstamps.head must not beNull
).reduce(_ and _)
}
}
}
}
}

object ProcessingSparkSpec {
Expand All @@ -90,7 +136,31 @@ object ProcessingSparkSpec {
val event1 = Event
.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0")
.copy(tr_total = Some(1.23))
val event2 = Event.minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0")
.copy(unstruct_event = ueGood700)
val event2 = Event
.minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0")
.copy(unstruct_event = ueGood701)
val serialized = List(event1, event2).map { e =>
e.toTsv.getBytes(StandardCharsets.UTF_8)
}
(TokenedEvents(serialized, ack), List(event1, event2))
}
}.repeat

def generateEventsBadEvolution: Stream[IO, (TokenedEvents, List[Event])] =
Stream.eval {
for {
ack <- IO.unique
eventId1 <- IO.randomUUID
eventId2 <- IO.randomUUID
collectorTstamp <- IO.realTimeInstant
} yield {
val event1 = Event
.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0")
.copy(unstruct_event = ueBadEvolution100)
val event2 = Event
.minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0")
.copy(unstruct_event = ueBadEvolution101)
val serialized = List(event1, event2).map { e =>
e.toTsv.getBytes(StandardCharsets.UTF_8)
}
Expand All @@ -112,4 +182,51 @@ object ProcessingSparkSpec {
Resource.make(io)(s => IO.blocking(s.close()))
}

/** Some unstructured events * */

val ueGood700 = SnowplowEvent.UnstructEvent(
Some(
SelfDescribingData(
SchemaKey("myvendor", "goodschema", "jsonschema", SchemaVer.Full(7, 0, 0)),
Json.obj(
"col_a" -> Json.fromString("xyz")
)
)
)
)

val ueGood701 = SnowplowEvent.UnstructEvent(
Some(
SelfDescribingData(
SchemaKey("myvendor", "goodschema", "jsonschema", SchemaVer.Full(7, 0, 1)),
Json.obj(
"col_a" -> Json.fromString("xyz"),
"col_b" -> Json.fromString("abc")
)
)
)
)

val ueBadEvolution100 = SnowplowEvent.UnstructEvent(
Some(
SelfDescribingData(
SchemaKey("myvendor", "badevolution", "jsonschema", SchemaVer.Full(1, 0, 0)),
Json.obj(
"col_a" -> Json.fromString("xyz")
)
)
)
)

val ueBadEvolution101 = SnowplowEvent.UnstructEvent(
Some(
SelfDescribingData(
SchemaKey("myvendor", "badevolution", "jsonschema", SchemaVer.Full(1, 0, 1)),
Json.obj(
"col_a" -> Json.fromInt(123)
)
)
)
)

}
9 changes: 8 additions & 1 deletion project/BuildSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,14 @@ object BuildSettings {
scalacOptions += "-Ywarn-macros:after",
addCompilerPlugin(Dependencies.betterMonadicFor),
ThisBuild / dynverVTagPrefix := false, // Otherwise git tags required to have v-prefix
ThisBuild / dynverSeparator := "-" // to be compatible with docker
ThisBuild / dynverSeparator := "-", // to be compatible with docker

Compile / resourceGenerators += Def.task {
val license = (Compile / resourceManaged).value / "META-INF" / "LICENSE"
IO.copyFile(file("LICENSE.md"), license)
Seq(license)
}.taskValue

)

lazy val logSettings = Seq(
Expand Down
5 changes: 4 additions & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ object Dependencies {
val protobuf = "3.24.2"
val snappy = "1.1.10.2"
val thrift = "0.18.1"
val jackson = "2.14.2"

// tests
val specs2 = "4.20.0"
Expand Down Expand Up @@ -94,6 +95,7 @@ object Dependencies {
val snappy = "org.xerial.snappy" % "snappy-java" % V.snappy
val hadoopYarn = "org.apache.hadoop" % "hadoop-yarn-server-resourcemanager" % V.hadoop
val thrift = "org.apache.thrift" % "libthrift" % V.thrift
val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson

// snowplow: Note jackson-databind 2.14.x is incompatible with Spark
val badrows = "com.snowplowanalytics" %% "snowplow-badrows" % V.badrows
Expand Down Expand Up @@ -150,7 +152,8 @@ object Dependencies {
trackerEmit,
specs2,
catsEffectSpecs2,
slf4j % Test
slf4j % Test,
jackson % Test // only needed because we excluded jackson because of the spark issue
)

val coreDependencies = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ import java.net.{DatagramPacket, DatagramSocket, InetAddress}
import java.nio.charset.StandardCharsets.UTF_8
import scala.concurrent.duration.{DurationInt, FiniteDuration}

abstract class Metrics[F[_]: Async, S <: Metrics.State](ref: Ref[F, S], emptyState: S, config: Option[Metrics.StatsdConfig]) {
abstract class Metrics[F[_]: Async, S <: Metrics.State](
ref: Ref[F, S],
emptyState: S,
config: Option[Metrics.StatsdConfig]
) {
def report: Stream[F, Nothing] = {
val stream = for {
reporters <- Stream.resource(Metrics.makeReporters[F](config))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ object NonAtomicFields {
*
* @param fields
* field type information about each family of Iglu schema. E.g. if a batch contains versions
* 1-0-0, 1-0-1 and 1-1-0 of a schema, they will be present as a single item of this list. If the
* batch also contains version 2-0-0 of that schema, it will be present as an extra item of this
* list.
* 1-0-0, 1-0-1 and 1-1-0 of a schema, they will be present as a single item of this list. If
* the batch also contains version 2-0-0 of that schema, it will be present as an extra item of
* this list.
* @param igluFailures
* details of schemas that were present in the batch but could not be looked up by the Iglu
* resolver.
Expand Down
Loading
Loading