diff --git a/CHANGELOG b/CHANGELOG index 9cacf12..90360f8 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,8 @@ +Version 0.1.1 (2023-09-13) +-------------------------- +Add license file to packaged jar (#5) +Fix loading when a schema needs recovery columns (#4) + Version 0.1.0 (2023-09-10) -------------------------- Initial release diff --git a/README.md b/README.md index 23f737c..0e583d9 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ This project contains applications required to load Snowplow data into Open Table Formats. -Lake Loader 0.1.0 supports [Delta](https://docs.delta.io/latest/index.html) format only. Future releases will add support for [Iceberg](https://iceberg.apache.org/docs/latest/) and [Hudi](https://hudi.apache.org/docs/overview/) as output formats. +Lake Loader 0.1.1 supports [Delta](https://docs.delta.io/latest/index.html) format only. Future releases will add support for [Iceberg](https://iceberg.apache.org/docs/latest/) and [Hudi](https://hudi.apache.org/docs/overview/) as output formats. Check out [the example config files](./config) for how to configure your lake loader. @@ -22,7 +22,7 @@ Basic usage: docker run \ -v /path/to/config.hocon:/var/config.hocon \ -v /path/to/iglu.json:/var/iglu.json \ - snowplow/lake-loader-azure:0.1.0 \ + snowplow/lake-loader-azure:0.1.1 \ --config /var/config.hocon \ --iglu-config /var/iglu.json ``` @@ -35,7 +35,7 @@ The GCP lake loader reads the stream of enriched events from Pubsub and writes t docker run \ -v /path/to/config.hocon:/var/config.hocon \ -v /path/to/iglu.json:/var/iglu.json \ - snowplow/lake-loader-gcp:0.1.0 \ + snowplow/lake-loader-gcp:0.1.1 \ --config /var/config.hocon \ --iglu-config /var/iglu.json ``` @@ -66,7 +66,7 @@ Licensed under the [Snowplow Community License](https://docs.snowplow.io/communi [build-image]: https://github.com/snowplow-incubator/snowplow-lake-loader/workflows/CI/badge.svg [build]: https://github.com/snowplow-incubator/snowplow-lake-loader/actions/workflows/ci.yml -[release-image]: https://img.shields.io/badge/release-0.1.0-blue.svg?style=flat +[release-image]: https://img.shields.io/badge/release-0.1.1-blue.svg?style=flat [releases]: https://github.com/snowplow-incubator/snowplow-lake-loader/releases [license]: https://docs.snowplow.io/docs/contributing/community-license-faq/ diff --git a/build.sbt b/build.sbt index 3575295..5b6c323 100755 --- a/build.sbt +++ b/build.sbt @@ -10,6 +10,8 @@ lazy val root = project.in(file(".")) .aggregate( streams, kafka, + pubsub, + loadersCommon, core, azure, gcp diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala index a9d7cc3..79faf87 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Processing.scala @@ -143,7 +143,7 @@ object Processing { private def rememberColumnNames[F[_]](ref: Ref[F, WindowState], fields: List[TypedTabledEntity]): F[Unit] = { val colNames = fields.flatMap { typedTabledEntity => - typedTabledEntity.mergedField.name :: typedTabledEntity.recoveries.values.map(_.name).toList + typedTabledEntity.mergedField.name :: typedTabledEntity.recoveries.map(_._2.name) }.toSet ref.update(state => state.copy(nonAtomicColumnNames = state.nonAtomicColumnNames ++ colNames)) } @@ -224,7 +224,7 @@ object Processing { } .map { results => val (bad, good) = results.separate - (bad, RowsWithSchema(good, SparkSchema.forBatch(entities.fields.map(_.mergedField)))) + (bad, RowsWithSchema(good, SparkSchema.forBatch(entities.fields))) } private def sendAndDropFailedEvents[F[_]: Applicative, A](env: Environment[F]): Pipe[F, (List[BadRow], A), A] = diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkSchema.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkSchema.scala index f17c5f9..854ff46 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkSchema.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkSchema.scala @@ -10,7 +10,7 @@ package com.snowplowanalytics.snowplow.lakes.processing import org.apache.spark.sql.types._ import com.snowplowanalytics.iglu.schemaddl.parquet.{Field, Type} -import com.snowplowanalytics.snowplow.loaders.AtomicFields +import com.snowplowanalytics.snowplow.loaders.{AtomicFields, TypedTabledEntity} private[processing] object SparkSchema { @@ -19,8 +19,12 @@ private[processing] object SparkSchema { * * The returned schema includes atomic fields and non-atomic fields but not the load_tstamp column */ - def forBatch(entities: List[Field]): StructType = - StructType(atomic ::: entities.map(asSparkField)) + def forBatch(entities: List[TypedTabledEntity]): StructType = { + val nonAtomicFields = entities.flatMap { tte => + tte.mergedField :: tte.recoveries.map(_._2) + } + StructType(atomic ::: nonAtomicFields.map(asSparkField)) + } /** * Ordered Fields corresponding to the output from Enrich diff --git a/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/badevolution/jsonschema/1-0-0 b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/badevolution/jsonschema/1-0-0 new file mode 100644 index 0000000..90ba0ba --- /dev/null +++ b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/badevolution/jsonschema/1-0-0 @@ -0,0 +1,14 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "myvendor", + "name": "badevolution", + "format": "jsonschema", + "version": "1-0-0" + }, + "type": "object", + "properties": { + "col_a": {"type": "string"} + }, + "required": ["col_a"] +} diff --git a/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/badevolution/jsonschema/1-0-1 b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/badevolution/jsonschema/1-0-1 new file mode 100644 index 0000000..3b3524f --- /dev/null +++ b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/badevolution/jsonschema/1-0-1 @@ -0,0 +1,14 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "myvendor", + "name": "badevolution", + "format": "jsonschema", + "version": "1-0-1" + }, + "type": "object", + "properties": { + "col_a": {"type": "integer"} + }, + "required": ["col_a"] +} diff --git a/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/goodschema/jsonschema/7-0-0 b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/goodschema/jsonschema/7-0-0 new file mode 100644 index 0000000..1800741 --- /dev/null +++ b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/goodschema/jsonschema/7-0-0 @@ -0,0 +1,14 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "myvendor", + "name": "goodschema", + "format": "jsonschema", + "version": "7-0-0" + }, + "type": "object", + "properties": { + "col_a": {"type": "string"} + }, + "required": ["col_a"] +} diff --git a/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/goodschema/jsonschema/7-0-1 b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/goodschema/jsonschema/7-0-1 new file mode 100644 index 0000000..aea9db0 --- /dev/null +++ b/modules/core/src/test/resources/iglu-client-embedded/schemas/myvendor/goodschema/jsonschema/7-0-1 @@ -0,0 +1,15 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "self": { + "vendor": "myvendor", + "name": "goodschema", + "format": "jsonschema", + "version": "7-0-1" + }, + "type": "object", + "properties": { + "col_a": {"type": "string"}, + "col_b": {"type": "string"} + }, + "required": ["col_a", "col_b"] +} diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSparkSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSparkSpec.scala index 7ee4fdc..0fb3bc3 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSparkSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSparkSpec.scala @@ -10,6 +10,7 @@ package com.snowplowanalytics.snowplow.lakes.processing import cats.effect.IO import cats.effect.kernel.Resource import cats.effect.testing.specs2.CatsEffect +import io.circe.Json import fs2.Stream import org.specs2.Specification import org.specs2.matcher.MatchResult @@ -20,7 +21,8 @@ import io.delta.tables.DeltaTable import scala.concurrent.duration.DurationInt import java.nio.charset.StandardCharsets -import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} +import com.snowplowanalytics.snowplow.analytics.scalasdk.{Event, SnowplowEvent} import com.snowplowanalytics.snowplow.lakes.TestSparkEnvironment import com.snowplowanalytics.snowplow.sources.TokenedEvents @@ -32,6 +34,7 @@ class ProcessingSparkSpec extends Specification with CatsEffect { def is = sequential ^ s2""" The lake loader should: Write a single window to a delta table $e1 + Successfully write parquet file when there is an invalid schema evolution $e2 """ def e1 = { @@ -65,8 +68,9 @@ class ProcessingSparkSpec extends Specification with CatsEffect { List[MatchResult[Any]]( cols must contain("event_id"), cols must contain("load_tstamp"), + cols must contain("unstruct_event_myvendor_goodschema_7"), df.count() must beEqualTo(4L), - outputEventIds.toSeq must containTheSameElementsAs(inputEventIds), + outputEventIds must containTheSameElementsAs(inputEventIds), loadTstamps.toSet must haveSize(1), // single timestamp for entire window loadTstamps.head must not beNull, trTotals must contain(BigDecimal(1.23)) @@ -75,6 +79,48 @@ class ProcessingSparkSpec extends Specification with CatsEffect { } } } + + def e2 = { + + val resources = for { + inputs <- Resource.eval(generateEventsBadEvolution.take(2).compile.toList) + env <- TestSparkEnvironment.build(List(inputs.map(_._1))) + } yield (inputs.map(_._2), env) + + val result = resources.use { case (inputEvents, env) => + Processing + .stream(env.environment) + .compile + .drain + .as((inputEvents, env.tmpDir)) + } + + result.flatMap { case (inputEvents, tmpDir) => + sparkForAssertions.use { spark => + IO.delay { + import spark.implicits._ + val tbl = DeltaTable.forPath(spark, tmpDir.resolve("events").toString) + val df = tbl.toDF + val cols = df.columns.toSeq + + val inputEventIds = inputEvents.flatten.map(_.event_id.toString) + val outputEventIds = df.select("event_id").as[String].collect().toSeq + val loadTstamps = df.select("load_tstamp").as[java.sql.Timestamp].collect().toSeq + + List[MatchResult[Any]]( + cols must contain("event_id"), + cols must contain("load_tstamp"), + cols must contain("unstruct_event_myvendor_badevolution_1"), + cols must contain("unstruct_event_myvendor_badevolution_1_recovered_1_0_1_164698669"), + df.count() must beEqualTo(4L), + outputEventIds must containTheSameElementsAs(inputEventIds), + loadTstamps.toSet must haveSize(1), // single timestamp for entire window + loadTstamps.head must not beNull + ).reduce(_ and _) + } + } + } + } } object ProcessingSparkSpec { @@ -90,7 +136,31 @@ object ProcessingSparkSpec { val event1 = Event .minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0") .copy(tr_total = Some(1.23)) - val event2 = Event.minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0") + .copy(unstruct_event = ueGood700) + val event2 = Event + .minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0") + .copy(unstruct_event = ueGood701) + val serialized = List(event1, event2).map { e => + e.toTsv.getBytes(StandardCharsets.UTF_8) + } + (TokenedEvents(serialized, ack), List(event1, event2)) + } + }.repeat + + def generateEventsBadEvolution: Stream[IO, (TokenedEvents, List[Event])] = + Stream.eval { + for { + ack <- IO.unique + eventId1 <- IO.randomUUID + eventId2 <- IO.randomUUID + collectorTstamp <- IO.realTimeInstant + } yield { + val event1 = Event + .minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0") + .copy(unstruct_event = ueBadEvolution100) + val event2 = Event + .minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0") + .copy(unstruct_event = ueBadEvolution101) val serialized = List(event1, event2).map { e => e.toTsv.getBytes(StandardCharsets.UTF_8) } @@ -112,4 +182,51 @@ object ProcessingSparkSpec { Resource.make(io)(s => IO.blocking(s.close())) } + /** Some unstructured events * */ + + val ueGood700 = SnowplowEvent.UnstructEvent( + Some( + SelfDescribingData( + SchemaKey("myvendor", "goodschema", "jsonschema", SchemaVer.Full(7, 0, 0)), + Json.obj( + "col_a" -> Json.fromString("xyz") + ) + ) + ) + ) + + val ueGood701 = SnowplowEvent.UnstructEvent( + Some( + SelfDescribingData( + SchemaKey("myvendor", "goodschema", "jsonschema", SchemaVer.Full(7, 0, 1)), + Json.obj( + "col_a" -> Json.fromString("xyz"), + "col_b" -> Json.fromString("abc") + ) + ) + ) + ) + + val ueBadEvolution100 = SnowplowEvent.UnstructEvent( + Some( + SelfDescribingData( + SchemaKey("myvendor", "badevolution", "jsonschema", SchemaVer.Full(1, 0, 0)), + Json.obj( + "col_a" -> Json.fromString("xyz") + ) + ) + ) + ) + + val ueBadEvolution101 = SnowplowEvent.UnstructEvent( + Some( + SelfDescribingData( + SchemaKey("myvendor", "badevolution", "jsonschema", SchemaVer.Full(1, 0, 1)), + Json.obj( + "col_a" -> Json.fromInt(123) + ) + ) + ) + ) + } diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index eafa617..eece532 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -29,7 +29,14 @@ object BuildSettings { scalacOptions += "-Ywarn-macros:after", addCompilerPlugin(Dependencies.betterMonadicFor), ThisBuild / dynverVTagPrefix := false, // Otherwise git tags required to have v-prefix - ThisBuild / dynverSeparator := "-" // to be compatible with docker + ThisBuild / dynverSeparator := "-", // to be compatible with docker + + Compile / resourceGenerators += Def.task { + val license = (Compile / resourceManaged).value / "META-INF" / "LICENSE" + IO.copyFile(file("LICENSE.md"), license) + Seq(license) + }.taskValue + ) lazy val logSettings = Seq( diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9934e80..7690664 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -50,6 +50,7 @@ object Dependencies { val protobuf = "3.24.2" val snappy = "1.1.10.2" val thrift = "0.18.1" + val jackson = "2.14.2" // tests val specs2 = "4.20.0" @@ -94,6 +95,7 @@ object Dependencies { val snappy = "org.xerial.snappy" % "snappy-java" % V.snappy val hadoopYarn = "org.apache.hadoop" % "hadoop-yarn-server-resourcemanager" % V.hadoop val thrift = "org.apache.thrift" % "libthrift" % V.thrift + val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % V.jackson // snowplow: Note jackson-databind 2.14.x is incompatible with Spark val badrows = "com.snowplowanalytics" %% "snowplow-badrows" % V.badrows @@ -150,7 +152,8 @@ object Dependencies { trackerEmit, specs2, catsEffectSpecs2, - slf4j % Test + slf4j % Test, + jackson % Test // only needed because we excluded jackson because of the spark issue ) val coreDependencies = Seq( diff --git a/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/Metrics.scala b/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/Metrics.scala index 6302bfb..e8b4cb9 100644 --- a/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/Metrics.scala +++ b/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/Metrics.scala @@ -18,7 +18,11 @@ import java.net.{DatagramPacket, DatagramSocket, InetAddress} import java.nio.charset.StandardCharsets.UTF_8 import scala.concurrent.duration.{DurationInt, FiniteDuration} -abstract class Metrics[F[_]: Async, S <: Metrics.State](ref: Ref[F, S], emptyState: S, config: Option[Metrics.StatsdConfig]) { +abstract class Metrics[F[_]: Async, S <: Metrics.State]( + ref: Ref[F, S], + emptyState: S, + config: Option[Metrics.StatsdConfig] +) { def report: Stream[F, Nothing] = { val stream = for { reporters <- Stream.resource(Metrics.makeReporters[F](config)) diff --git a/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/NonAtomicFields.scala b/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/NonAtomicFields.scala index 6e2aa7d..7f10fe9 100644 --- a/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/NonAtomicFields.scala +++ b/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/NonAtomicFields.scala @@ -21,9 +21,9 @@ object NonAtomicFields { * * @param fields * field type information about each family of Iglu schema. E.g. if a batch contains versions - * 1-0-0, 1-0-1 and 1-1-0 of a schema, they will be present as a single item of this list. If the - * batch also contains version 2-0-0 of that schema, it will be present as an extra item of this - * list. + * 1-0-0, 1-0-1 and 1-1-0 of a schema, they will be present as a single item of this list. If + * the batch also contains version 2-0-0 of that schema, it will be present as an extra item of + * this list. * @param igluFailures * details of schemas that were present in the batch but could not be looked up by the Iglu * resolver. diff --git a/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/Telemetry.scala b/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/Telemetry.scala index 60b864d..8a57d5f 100644 --- a/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/Telemetry.scala +++ b/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/Telemetry.scala @@ -40,7 +40,6 @@ object Telemetry { moduleVersion: Option[String] ) - private implicit def unsafeLogger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] @@ -54,7 +53,7 @@ object Telemetry { else { val stream = for { uuid <- Stream.eval(Async[F].delay(UUID.randomUUID)) - sdj = makeHeartbeatEvent( config, appInfo, uuid) + sdj = makeHeartbeatEvent(config, appInfo, uuid) tracker <- Stream.resource(initTracker(config, appInfo.name, httpClient)) _ <- Stream.unit ++ Stream.fixedDelay[F](config.interval) _ <- Stream.eval(tracker.trackSelfDescribingEvent(unstructEvent = sdj)) diff --git a/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/Transform.scala b/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/Transform.scala index da42f4d..2651afe 100644 --- a/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/Transform.scala +++ b/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/Transform.scala @@ -88,7 +88,7 @@ object Transform { ): ValidatedNel[FailureDetails.LoaderIgluError, List[Caster.NamedValue[A]]] = entities.flatMap { case TypedTabledEntity(entity, field, subVersions, recoveries) => val head = forEntity(caster, entity, field, subVersions, event) - val tail = recoveries.toList.map { case (recoveryVersion, recoveryField) => + val tail = recoveries.map { case (recoveryVersion, recoveryField) => forEntity(caster, entity, recoveryField, Set(recoveryVersion), event) } head :: tail @@ -197,7 +197,7 @@ object Transform { List[A]( event.app_id.fold[A](caster.nullValue)(caster.stringValue(_)), event.platform.fold[A](caster.nullValue)(caster.stringValue(_)), - event.etl_tstamp.fold[A](caster.nullValue)(caster.timestampValue( _)), + event.etl_tstamp.fold[A](caster.nullValue)(caster.timestampValue(_)), caster.timestampValue(event.collector_tstamp), event.dvce_created_tstamp.fold[A](caster.nullValue)(caster.timestampValue(_)), event.event.fold[A](caster.nullValue)(caster.stringValue(_)), diff --git a/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/TypedTabledEntity.scala b/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/TypedTabledEntity.scala index 703dfa4..6205508 100644 --- a/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/TypedTabledEntity.scala +++ b/snowplow-common-internal/loaders-common/src/main/scala/com.snowplowanalytics.snowplow.loaders/TypedTabledEntity.scala @@ -37,7 +37,7 @@ case class TypedTabledEntity( tabledEntity: TabledEntity, mergedField: Field, mergedVersions: Set[SchemaSubVersion], - recoveries: Map[SchemaSubVersion, Field] + recoveries: List[(SchemaSubVersion, Field)] ) object TypedTabledEntity { @@ -61,11 +61,9 @@ object TypedTabledEntity { ): TypedTabledEntity = { // Schemas need to be ordered by key to merge in correct order. val NonEmptyList(root, tail) = schemas.sorted - val columnGroup = - TypedTabledEntity(tabledEntity, fieldFromSchema(tabledEntity, root.schema), Set(keyToSubVersion(root.schemaKey)), Map.empty) - tail + val tte = tail .map(schemaWithKey => (fieldFromSchema(tabledEntity, schemaWithKey.schema), schemaWithKey.schemaKey)) - .foldLeft(columnGroup) { case (columnGroup, (field, schemaKey)) => + .foldLeft(initColumnGroup(tabledEntity, root)) { case (columnGroup, (field, schemaKey)) => val subversion = keyToSubVersion(schemaKey) Migrations.mergeSchemas(columnGroup.mergedField, field) match { case Left(_) => @@ -74,7 +72,7 @@ object TypedTabledEntity { // typedField always has a single element in matchingKeys val recoverPoint = schemaKey.version.asString.replaceAll("-", "_") val newName = s"${field.name}_recovered_${recoverPoint}_$hash" - columnGroup.copy(recoveries = columnGroup.recoveries + (subversion -> field.copy(name = newName))) + columnGroup.copy(recoveries = (subversion -> field.copy(name = newName)) :: columnGroup.recoveries) } else { // do not create a recovered column if that type were not in the batch columnGroup @@ -83,8 +81,12 @@ object TypedTabledEntity { columnGroup.copy(mergedField = mergedField, mergedVersions = columnGroup.mergedVersions + subversion) } } + tte.copy(recoveries = tte.recoveries.reverse) } + private def initColumnGroup(tabledEntity: TabledEntity, root: SchemaWithKey): TypedTabledEntity = + TypedTabledEntity(tabledEntity, fieldFromSchema(tabledEntity, root.schema), Set(keyToSubVersion(root.schemaKey)), Nil) + private def fieldFromSchema(tabledEntity: TabledEntity, schema: Schema): Field = { val sdkEntityType = tabledEntity.entityType match { case TabledEntity.UnstructEvent => SdkData.UnstructEvent diff --git a/snowplow-common-internal/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/NonAtomicFieldsSpec.scala b/snowplow-common-internal/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/NonAtomicFieldsSpec.scala index dd7fb19..226bd9a 100644 --- a/snowplow-common-internal/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/NonAtomicFieldsSpec.scala +++ b/snowplow-common-internal/loaders-common/src/test/scala/com.snowplowanalytics.snowplow.loaders/NonAtomicFieldsSpec.scala @@ -47,7 +47,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { tabledEntity, expectedField, Set((0, 0), (0, 1), (1, 0)), - Map.empty + Nil ) NonAtomicFields.resolveTypes(resolver, entities).map { case NonAtomicFields.Result(fields, failures) => @@ -84,7 +84,7 @@ class NonAtomicFieldsSpec extends Specification with CatsEffect { tabledEntity, expectedField, Set((0, 0), (0, 1), (1, 0)), - Map.empty + Nil ) NonAtomicFields.resolveTypes(resolver, entities).map { case NonAtomicFields.Result(fields, failures) => diff --git a/snowplow-common-internal/pubsub/src/main/scala/com.snowplowanalytics.snowplow/sources/PubsubSource.scala b/snowplow-common-internal/pubsub/src/main/scala/com.snowplowanalytics.snowplow/sources/PubsubSource.scala index f7b3838..8ec36d5 100644 --- a/snowplow-common-internal/pubsub/src/main/scala/com.snowplowanalytics.snowplow/sources/PubsubSource.scala +++ b/snowplow-common-internal/pubsub/src/main/scala/com.snowplowanalytics.snowplow/sources/PubsubSource.scala @@ -106,7 +106,7 @@ object PubsubSource { override def failed(from: ApiService.State, failure: Throwable): Unit = dispatcher.unsafeRunSync { Logger[F].error(failure)("Error from Pubsub subscriber") *> - sig.complete(Left(failure)).void + sig.complete(Left(failure)).void } } @@ -115,7 +115,7 @@ object PubsubSource { queue: Queue[F, SingleMessage[F]], dispatcher: Dispatcher[F], semaphore: Semaphore[F], - sig: Deferred[F, Either[Throwable, Unit]], + sig: Deferred[F, Either[Throwable, Unit]] ): Stream[F, Unit] = { val name = ProjectSubscriptionName.of(config.subscription.projectId, config.subscription.subscriptionId) val receiver = messageReceiver(queue, dispatcher, semaphore, sig) @@ -123,33 +123,33 @@ object PubsubSource { for { executor <- Stream.bracket(Sync[F].delay(scheduledExecutorService))(s => Sync[F].delay(s.shutdown())) subscriber <- Stream.eval(Sync[F].delay { - Subscriber - .newBuilder(name, receiver) - .setMaxAckExtensionPeriod(convertDuration(config.maxAckExtensionPeriod)) - .setMaxDurationPerAckExtension(convertDuration(config.maxDurationPerAckExtension)) - .setMinDurationPerAckExtension(convertDuration(config.minDurationPerAckExtension)) - .setParallelPullCount(config.parallelPullCount) - .setExecutorProvider { - new ExecutorProvider { - def shouldAutoClose: Boolean = true - def getExecutor: ScheduledExecutorService = executor - } - } - .setFlowControlSettings { - // Switch off any flow control, because we handle it ourselves with the semaphore - FlowControlSettings.getDefaultInstance - } - .build - }) + Subscriber + .newBuilder(name, receiver) + .setMaxAckExtensionPeriod(convertDuration(config.maxAckExtensionPeriod)) + .setMaxDurationPerAckExtension(convertDuration(config.maxDurationPerAckExtension)) + .setMinDurationPerAckExtension(convertDuration(config.minDurationPerAckExtension)) + .setParallelPullCount(config.parallelPullCount) + .setExecutorProvider { + new ExecutorProvider { + def shouldAutoClose: Boolean = true + def getExecutor: ScheduledExecutorService = executor + } + } + .setFlowControlSettings { + // Switch off any flow control, because we handle it ourselves with the semaphore + FlowControlSettings.getDefaultInstance + } + .build + }) _ <- Stream.eval(Sync[F].delay { - subscriber.addListener(errorListener(dispatcher, sig), MoreExecutors.directExecutor) - }) + subscriber.addListener(errorListener(dispatcher, sig), MoreExecutors.directExecutor) + }) _ <- Stream.bracket(Sync[F].delay(subscriber.startAsync())) { apiService => - for { - _ <- Sync[F].delay(apiService.stopAsync()) - _ <- drainQueue(queue) - } yield () - } + for { + _ <- Sync[F].delay(apiService.stopAsync()) + _ <- drainQueue(queue) + } yield () + } } yield () } @@ -168,7 +168,6 @@ object PubsubSource { } } - private def messageReceiver[F[_]: Async]( queue: QueueSink[F, SingleMessage[F]], dispatcher: Dispatcher[F], @@ -180,7 +179,8 @@ object PubsubSource { val put = semaphore.acquireN(message.getData.size.toLong) *> queue.offer(SingleMessage(message.getData.toByteArray, ackReply)) - val io = put.race(sig.get) + val io = put + .race(sig.get) .flatMap { case Right(_) => FutureInterop.fromFuture(ackReply.nack())