From ff0c9a37415aac997e4f8a7654b09483f5629fe6 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Sun, 9 Jul 2023 17:55:47 +0100 Subject: [PATCH] More efficient atomic fields builder --- .../core/src/main/resources/reference.conf | 4 +- .../Config.scala | 1 - .../processing/AtomicFields.scala | 3 +- .../processing/SparkUtils.scala | 2 +- .../processing/Transform.scala | 212 +++++++++++++++--- .../processing/ProcessingSparkSpec.scala | 8 +- 6 files changed, 191 insertions(+), 39 deletions(-) diff --git a/modules/core/src/main/resources/reference.conf b/modules/core/src/main/resources/reference.conf index 76128c9..c106dbb 100644 --- a/modules/core/src/main/resources/reference.conf +++ b/modules/core/src/main/resources/reference.conf @@ -1,14 +1,14 @@ { - "inMemBatchBytes": 51200000 + "inMemBatchBytes": 25600000 "cpuParallelismFraction": 0.5 "windows": "5 minutes" "spark": { "taskRetries": 3 - "threads": 2 "conf": { "spark.ui.enabled": "false" "spark.local.dir": "/tmp" # This is the default but it's important for us "spark.serializer": "org.apache.spark.serializer.KryoSerializer" + "spark.memory.fraction": "0.2" } } diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala index 32d0290..7717087 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/Config.scala @@ -66,7 +66,6 @@ object Config { case class Spark( taskRetries: Int, - threads: Int, conf: Map[String, String] ) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/AtomicFields.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/AtomicFields.scala index ac47f25..317eede 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/AtomicFields.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/AtomicFields.scala @@ -13,7 +13,8 @@ import com.snowplowanalytics.iglu.schemaddl.parquet.Type.DecimalPrecision import com.snowplowanalytics.iglu.schemaddl.parquet.Type.Nullability.{Nullable, Required} private[processing] object AtomicFields { - private val customDecimal = Type.Decimal(DecimalPrecision.Digits18, 2) + val customDecimal = Type.Decimal(DecimalPrecision.Digits18, 2) + val customDecimalPrecision = Type.DecimalPrecision.toInt(customDecimal.precision) val schemaKey: SchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1, 0, 0)) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala index 1452cde..10607ac 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/SparkUtils.scala @@ -38,7 +38,7 @@ private[processing] object SparkUtils { SparkSession .builder() .appName("snowplow-lake-loader") - .master(s"local[${config.threads}, ${config.taskRetries}]") + .master(s"local[*, ${config.taskRetries}]") configureSparkForTarget(builder, target) configureSparkWithExtras(builder, config.conf) diff --git a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Transform.scala b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Transform.scala index 4d83759..b004717 100644 --- a/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Transform.scala +++ b/modules/core/src/main/scala/com.snowplowanalytics.snowplow.lakes/processing/Transform.scala @@ -13,6 +13,8 @@ import io.circe.Json import org.apache.spark.sql.Row +import java.sql.Timestamp + import com.snowplowanalytics.iglu.schemaddl.parquet.Field import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure => BadRowFailure, FailureDetails, Processor => BadRowProcessor} import com.snowplowanalytics.snowplow.badrows.{Payload => BadPayload} @@ -40,16 +42,15 @@ private[processing] object Transform { event: Event, batchInfo: NonAtomicFields ): Either[BadRow, Row] = - failForResolverErrors(processor, event, batchInfo.igluFailures).flatMap { _ => + failForResolverErrors(processor, event, batchInfo.igluFailures) *> (forAtomic(event), forEntities(event, batchInfo.fields)) .mapN { case (atomic, nonAtomic) => - buildRow(atomic ::: nonAtomic) + Row.fromSeq(atomic ::: nonAtomic.map(extractFieldValue)) } .toEither .leftMap { nel => BadRow.LoaderIgluError(processor, BadRowFailure.LoaderIgluErrors(nel), BadPayload.LoaderPayload(event)) } - } private def failForResolverErrors( processor: BadRowProcessor, @@ -81,35 +82,19 @@ private[processing] object Transform { } } - private def buildRow(fieldValues: List[FieldValue]): Row = { - def extractFieldValue(fv: FieldValue): Any = fv match { - case FieldValue.NullValue => null - case FieldValue.StringValue(v) => v - case FieldValue.BooleanValue(v) => v - case FieldValue.IntValue(v) => v - case FieldValue.LongValue(v) => v - case FieldValue.DoubleValue(v) => v - case FieldValue.DecimalValue(v, _) => v - case FieldValue.TimestampValue(v) => v - case FieldValue.DateValue(v) => v - case FieldValue.ArrayValue(vs) => vs.map(extractFieldValue) - case FieldValue.StructValue(vs) => buildRow(vs.map(_.value)) - case FieldValue.JsonValue(v) => v.noSpaces - } - Row.fromSeq(fieldValues.map(extractFieldValue)) - } - - private def forAtomic( - event: Event - ): ValidatedNel[FailureDetails.LoaderIgluError, List[FieldValue]] = { - val atomicJsonValues = event.atomic - AtomicFields.static - .map { atomicField => - val jsonFieldValue = atomicJsonValues.getOrElse(atomicField.name, Json.Null) - FieldValue.cast(atomicField)(jsonFieldValue) - } - .sequence - .leftMap(castErrorToLoaderIgluError(AtomicFields.schemaKey, _)) + private def extractFieldValue(fv: FieldValue): Any = fv match { + case FieldValue.NullValue => null + case FieldValue.StringValue(v) => v + case FieldValue.BooleanValue(v) => v + case FieldValue.IntValue(v) => v + case FieldValue.LongValue(v) => v + case FieldValue.DoubleValue(v) => v + case FieldValue.DecimalValue(v, _) => v + case FieldValue.TimestampValue(v) => v + case FieldValue.DateValue(v) => v + case FieldValue.ArrayValue(vs) => vs.map(extractFieldValue) + case FieldValue.StructValue(vs) => Row.fromSeq(vs.map(v => extractFieldValue(v.value))) + case FieldValue.JsonValue(v) => v.noSpaces } private def forEntities( @@ -198,4 +183,167 @@ private[processing] object Transform { obj.add("_schema_version", Json.fromString(sdd.schema.version.asString)) } + /** + * Although this code looks prone to error, it is a very efficient way to build a Row from an + * Event + * + * It is much more cpu-efficient than going via intermediate Json. + * + * TODO: implement this using Shapeless to make it less fragile + */ + private def forAtomic(event: Event): ValidatedNel[FailureDetails.LoaderIgluError, List[Any]] = + ( + event.tr_total.traverse(doubleToDecimal), + event.tr_tax.traverse(doubleToDecimal), + event.tr_shipping.traverse(doubleToDecimal), + event.ti_price.traverse(doubleToDecimal), + event.tr_total_base.traverse(doubleToDecimal), + event.tr_tax_base.traverse(doubleToDecimal), + event.tr_shipping_base.traverse(doubleToDecimal), + event.ti_price_base.traverse(doubleToDecimal) + ).mapN { case (trTotal, trTax, trShipping, tiPrice, trTotalBase, trTaxBase, trShippingBase, tiPriceBase) => + List[Any]( + event.app_id.orNull, + event.platform.orNull, + event.etl_tstamp.map(Timestamp.from).orNull, + Timestamp.from(event.collector_tstamp), + event.dvce_created_tstamp, + event.event.orNull, + event.event_id.toString, + event.txn_id.getOrElse(null), + event.name_tracker.orNull, + event.v_tracker.orNull, + event.v_collector, + event.v_etl, + event.user_id, + event.user_ipaddress, + event.user_fingerprint, + event.domain_userid.orNull, + event.domain_sessionidx.getOrElse(null), + event.network_userid.orNull, + event.geo_country.orNull, + event.geo_region.orNull, + event.geo_city.orNull, + event.geo_zipcode.orNull, + event.geo_latitude.getOrElse(null), + event.geo_longitude.getOrElse(null), + event.geo_region_name.orNull, + event.ip_isp.orNull, + event.ip_organization.orNull, + event.ip_domain.orNull, + event.ip_netspeed.orNull, + event.page_url.orNull, + event.page_title.orNull, + event.page_referrer.orNull, + event.page_urlscheme.orNull, + event.page_urlhost.orNull, + event.page_urlport.getOrElse(null), + event.page_urlpath.orNull, + event.page_urlquery.orNull, + event.page_urlfragment.orNull, + event.refr_urlscheme.orNull, + event.refr_urlhost.orNull, + event.refr_urlport.getOrElse(null), + event.refr_urlpath.orNull, + event.refr_urlquery.orNull, + event.refr_urlfragment.orNull, + event.refr_medium.orNull, + event.refr_source.orNull, + event.refr_term.orNull, + event.mkt_medium.orNull, + event.mkt_source.orNull, + event.mkt_term.orNull, + event.mkt_content.orNull, + event.mkt_campaign.orNull, + event.se_category.orNull, + event.se_action.orNull, + event.se_label.orNull, + event.se_property.orNull, + event.se_value.getOrElse(null), + event.tr_orderid.orNull, + event.tr_affiliation.orNull, + trTotal.orNull, + trTax.orNull, + trShipping.orNull, + event.tr_city.orNull, + event.tr_state.orNull, + event.tr_country.orNull, + event.ti_orderid.orNull, + event.ti_sku.orNull, + event.ti_name.orNull, + event.ti_category.orNull, + tiPrice.orNull, + event.ti_quantity.getOrElse(null), + event.pp_xoffset_min.getOrElse(null), + event.pp_xoffset_max.getOrElse(null), + event.pp_yoffset_min.getOrElse(null), + event.pp_yoffset_max.getOrElse(null), + event.useragent.orNull, + event.br_name.orNull, + event.br_family.orNull, + event.br_version.orNull, + event.br_type.orNull, + event.br_renderengine.orNull, + event.br_lang.orNull, + event.br_features_pdf.getOrElse(null), + event.br_features_flash.getOrElse(null), + event.br_features_java.getOrElse(null), + event.br_features_director.orNull, + event.br_features_quicktime.orNull, + event.br_features_realplayer.orNull, + event.br_features_windowsmedia.orNull, + event.br_features_gears.orNull, + event.br_features_silverlight.orNull, + event.br_cookies.orNull, + event.br_colordepth.orNull, + event.br_viewwidth.orNull, + event.br_viewheight.orNull, + event.os_name.orNull, + event.os_family.orNull, + event.os_manufacturer.orNull, + event.os_timezone.orNull, + event.dvce_type.orNull, + event.dvce_ismobile.orNull, + event.dvce_screenwidth.orNull, + event.dvce_screenheight.orNull, + event.doc_charset.orNull, + event.doc_width.orNull, + event.doc_height.orNull, + event.tr_currency.orNull, + trTotalBase.orNull, + trTaxBase.orNull, + trShippingBase.orNull, + event.ti_currency.orNull, + tiPriceBase.orNull, + event.base_currency.orNull, + event.geo_timezone.orNull, + event.mkt_clickid.orNull, + event.mkt_network.orNull, + event.etl_tags.orNull, + event.dvce_sent_tstamp.map(Timestamp.from).orNull, + event.refr_domain_userid.orNull, + event.refr_dvce_tstamp.map(Timestamp.from).orNull, + event.domain_sessionid.orNull, + event.derived_tstamp.map(Timestamp.from).orNull, + event.event_vendor.orNull, + event.event_name.orNull, + event.event_format.orNull, + event.event_version.orNull, + event.event_fingerprint.orNull, + event.true_tstamp.map(Timestamp.from).orNull + ) + }.leftMap(castErrorToLoaderIgluError(AtomicFields.schemaKey, _)) + + private def doubleToDecimal(d: Double): ValidatedNel[CastError, BigDecimal] = { + val bigDec = BigDecimal(d) + Either.catchOnly[java.lang.ArithmeticException] { + bigDec.setScale(AtomicFields.customDecimal.scale, BigDecimal.RoundingMode.UNNECESSARY) + } match { + case Right(scaled) if scaled.precision <= AtomicFields.customDecimalPrecision => + scaled.validNel + case _ => + CastError.WrongType(Json.fromDoubleOrNull(d), AtomicFields.customDecimal).invalidNel + } + } + } diff --git a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSparkSpec.scala b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSparkSpec.scala index a8cfefa..7ee4fdc 100644 --- a/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSparkSpec.scala +++ b/modules/core/src/test/scala/com.snowplowanalytics.snowplow.lakes/processing/ProcessingSparkSpec.scala @@ -60,6 +60,7 @@ class ProcessingSparkSpec extends Specification with CatsEffect { val inputEventIds = inputEvents.flatten.map(_.event_id.toString) val outputEventIds = df.select("event_id").as[String].collect().toSeq val loadTstamps = df.select("load_tstamp").as[java.sql.Timestamp].collect().toSeq + val trTotals = df.select("tr_total").as[BigDecimal].collect().toSeq List[MatchResult[Any]]( cols must contain("event_id"), @@ -67,7 +68,8 @@ class ProcessingSparkSpec extends Specification with CatsEffect { df.count() must beEqualTo(4L), outputEventIds.toSeq must containTheSameElementsAs(inputEventIds), loadTstamps.toSet must haveSize(1), // single timestamp for entire window - loadTstamps.head must not beNull + loadTstamps.head must not beNull, + trTotals must contain(BigDecimal(1.23)) ).reduce(_ and _) } } @@ -85,7 +87,9 @@ object ProcessingSparkSpec { eventId2 <- IO.randomUUID collectorTstamp <- IO.realTimeInstant } yield { - val event1 = Event.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0") + val event1 = Event + .minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0") + .copy(tr_total = Some(1.23)) val event2 = Event.minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0") val serialized = List(event1, event2).map { e => e.toTsv.getBytes(StandardCharsets.UTF_8)