Skip to content

Commit

Permalink
More efficient atomic fields builder
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Jul 9, 2023
1 parent a90d19c commit ff0c9a3
Show file tree
Hide file tree
Showing 6 changed files with 191 additions and 39 deletions.
4 changes: 2 additions & 2 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
{
"inMemBatchBytes": 51200000
"inMemBatchBytes": 25600000
"cpuParallelismFraction": 0.5
"windows": "5 minutes"
"spark": {
"taskRetries": 3
"threads": 2
"conf": {
"spark.ui.enabled": "false"
"spark.local.dir": "/tmp" # This is the default but it's important for us
"spark.serializer": "org.apache.spark.serializer.KryoSerializer"
"spark.memory.fraction": "0.2"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ object Config {

case class Spark(
taskRetries: Int,
threads: Int,
conf: Map[String, String]
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import com.snowplowanalytics.iglu.schemaddl.parquet.Type.DecimalPrecision
import com.snowplowanalytics.iglu.schemaddl.parquet.Type.Nullability.{Nullable, Required}

private[processing] object AtomicFields {
private val customDecimal = Type.Decimal(DecimalPrecision.Digits18, 2)
val customDecimal = Type.Decimal(DecimalPrecision.Digits18, 2)
val customDecimalPrecision = Type.DecimalPrecision.toInt(customDecimal.precision)

val schemaKey: SchemaKey = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1, 0, 0))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[processing] object SparkUtils {
SparkSession
.builder()
.appName("snowplow-lake-loader")
.master(s"local[${config.threads}, ${config.taskRetries}]")
.master(s"local[*, ${config.taskRetries}]")

configureSparkForTarget(builder, target)
configureSparkWithExtras(builder, config.conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import io.circe.Json

import org.apache.spark.sql.Row

import java.sql.Timestamp

import com.snowplowanalytics.iglu.schemaddl.parquet.Field
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure => BadRowFailure, FailureDetails, Processor => BadRowProcessor}
import com.snowplowanalytics.snowplow.badrows.{Payload => BadPayload}
Expand Down Expand Up @@ -40,16 +42,15 @@ private[processing] object Transform {
event: Event,
batchInfo: NonAtomicFields
): Either[BadRow, Row] =
failForResolverErrors(processor, event, batchInfo.igluFailures).flatMap { _ =>
failForResolverErrors(processor, event, batchInfo.igluFailures) *>
(forAtomic(event), forEntities(event, batchInfo.fields))
.mapN { case (atomic, nonAtomic) =>
buildRow(atomic ::: nonAtomic)
Row.fromSeq(atomic ::: nonAtomic.map(extractFieldValue))
}
.toEither
.leftMap { nel =>
BadRow.LoaderIgluError(processor, BadRowFailure.LoaderIgluErrors(nel), BadPayload.LoaderPayload(event))
}
}

private def failForResolverErrors(
processor: BadRowProcessor,
Expand Down Expand Up @@ -81,35 +82,19 @@ private[processing] object Transform {
}
}

private def buildRow(fieldValues: List[FieldValue]): Row = {
def extractFieldValue(fv: FieldValue): Any = fv match {
case FieldValue.NullValue => null
case FieldValue.StringValue(v) => v
case FieldValue.BooleanValue(v) => v
case FieldValue.IntValue(v) => v
case FieldValue.LongValue(v) => v
case FieldValue.DoubleValue(v) => v
case FieldValue.DecimalValue(v, _) => v
case FieldValue.TimestampValue(v) => v
case FieldValue.DateValue(v) => v
case FieldValue.ArrayValue(vs) => vs.map(extractFieldValue)
case FieldValue.StructValue(vs) => buildRow(vs.map(_.value))
case FieldValue.JsonValue(v) => v.noSpaces
}
Row.fromSeq(fieldValues.map(extractFieldValue))
}

private def forAtomic(
event: Event
): ValidatedNel[FailureDetails.LoaderIgluError, List[FieldValue]] = {
val atomicJsonValues = event.atomic
AtomicFields.static
.map { atomicField =>
val jsonFieldValue = atomicJsonValues.getOrElse(atomicField.name, Json.Null)
FieldValue.cast(atomicField)(jsonFieldValue)
}
.sequence
.leftMap(castErrorToLoaderIgluError(AtomicFields.schemaKey, _))
private def extractFieldValue(fv: FieldValue): Any = fv match {
case FieldValue.NullValue => null
case FieldValue.StringValue(v) => v
case FieldValue.BooleanValue(v) => v
case FieldValue.IntValue(v) => v
case FieldValue.LongValue(v) => v
case FieldValue.DoubleValue(v) => v
case FieldValue.DecimalValue(v, _) => v
case FieldValue.TimestampValue(v) => v
case FieldValue.DateValue(v) => v
case FieldValue.ArrayValue(vs) => vs.map(extractFieldValue)
case FieldValue.StructValue(vs) => Row.fromSeq(vs.map(v => extractFieldValue(v.value)))
case FieldValue.JsonValue(v) => v.noSpaces
}

private def forEntities(
Expand Down Expand Up @@ -198,4 +183,167 @@ private[processing] object Transform {
obj.add("_schema_version", Json.fromString(sdd.schema.version.asString))
}

/**
* Although this code looks prone to error, it is a very efficient way to build a Row from an
* Event
*
* It is much more cpu-efficient than going via intermediate Json.
*
* TODO: implement this using Shapeless to make it less fragile
*/
private def forAtomic(event: Event): ValidatedNel[FailureDetails.LoaderIgluError, List[Any]] =
(
event.tr_total.traverse(doubleToDecimal),
event.tr_tax.traverse(doubleToDecimal),
event.tr_shipping.traverse(doubleToDecimal),
event.ti_price.traverse(doubleToDecimal),
event.tr_total_base.traverse(doubleToDecimal),
event.tr_tax_base.traverse(doubleToDecimal),
event.tr_shipping_base.traverse(doubleToDecimal),
event.ti_price_base.traverse(doubleToDecimal)
).mapN { case (trTotal, trTax, trShipping, tiPrice, trTotalBase, trTaxBase, trShippingBase, tiPriceBase) =>
List[Any](
event.app_id.orNull,
event.platform.orNull,
event.etl_tstamp.map(Timestamp.from).orNull,
Timestamp.from(event.collector_tstamp),
event.dvce_created_tstamp,
event.event.orNull,
event.event_id.toString,
event.txn_id.getOrElse(null),
event.name_tracker.orNull,
event.v_tracker.orNull,
event.v_collector,
event.v_etl,
event.user_id,
event.user_ipaddress,
event.user_fingerprint,
event.domain_userid.orNull,
event.domain_sessionidx.getOrElse(null),
event.network_userid.orNull,
event.geo_country.orNull,
event.geo_region.orNull,
event.geo_city.orNull,
event.geo_zipcode.orNull,
event.geo_latitude.getOrElse(null),
event.geo_longitude.getOrElse(null),
event.geo_region_name.orNull,
event.ip_isp.orNull,
event.ip_organization.orNull,
event.ip_domain.orNull,
event.ip_netspeed.orNull,
event.page_url.orNull,
event.page_title.orNull,
event.page_referrer.orNull,
event.page_urlscheme.orNull,
event.page_urlhost.orNull,
event.page_urlport.getOrElse(null),
event.page_urlpath.orNull,
event.page_urlquery.orNull,
event.page_urlfragment.orNull,
event.refr_urlscheme.orNull,
event.refr_urlhost.orNull,
event.refr_urlport.getOrElse(null),
event.refr_urlpath.orNull,
event.refr_urlquery.orNull,
event.refr_urlfragment.orNull,
event.refr_medium.orNull,
event.refr_source.orNull,
event.refr_term.orNull,
event.mkt_medium.orNull,
event.mkt_source.orNull,
event.mkt_term.orNull,
event.mkt_content.orNull,
event.mkt_campaign.orNull,
event.se_category.orNull,
event.se_action.orNull,
event.se_label.orNull,
event.se_property.orNull,
event.se_value.getOrElse(null),
event.tr_orderid.orNull,
event.tr_affiliation.orNull,
trTotal.orNull,
trTax.orNull,
trShipping.orNull,
event.tr_city.orNull,
event.tr_state.orNull,
event.tr_country.orNull,
event.ti_orderid.orNull,
event.ti_sku.orNull,
event.ti_name.orNull,
event.ti_category.orNull,
tiPrice.orNull,
event.ti_quantity.getOrElse(null),
event.pp_xoffset_min.getOrElse(null),
event.pp_xoffset_max.getOrElse(null),
event.pp_yoffset_min.getOrElse(null),
event.pp_yoffset_max.getOrElse(null),
event.useragent.orNull,
event.br_name.orNull,
event.br_family.orNull,
event.br_version.orNull,
event.br_type.orNull,
event.br_renderengine.orNull,
event.br_lang.orNull,
event.br_features_pdf.getOrElse(null),
event.br_features_flash.getOrElse(null),
event.br_features_java.getOrElse(null),
event.br_features_director.orNull,
event.br_features_quicktime.orNull,
event.br_features_realplayer.orNull,
event.br_features_windowsmedia.orNull,
event.br_features_gears.orNull,
event.br_features_silverlight.orNull,
event.br_cookies.orNull,
event.br_colordepth.orNull,
event.br_viewwidth.orNull,
event.br_viewheight.orNull,
event.os_name.orNull,
event.os_family.orNull,
event.os_manufacturer.orNull,
event.os_timezone.orNull,
event.dvce_type.orNull,
event.dvce_ismobile.orNull,
event.dvce_screenwidth.orNull,
event.dvce_screenheight.orNull,
event.doc_charset.orNull,
event.doc_width.orNull,
event.doc_height.orNull,
event.tr_currency.orNull,
trTotalBase.orNull,
trTaxBase.orNull,
trShippingBase.orNull,
event.ti_currency.orNull,
tiPriceBase.orNull,
event.base_currency.orNull,
event.geo_timezone.orNull,
event.mkt_clickid.orNull,
event.mkt_network.orNull,
event.etl_tags.orNull,
event.dvce_sent_tstamp.map(Timestamp.from).orNull,
event.refr_domain_userid.orNull,
event.refr_dvce_tstamp.map(Timestamp.from).orNull,
event.domain_sessionid.orNull,
event.derived_tstamp.map(Timestamp.from).orNull,
event.event_vendor.orNull,
event.event_name.orNull,
event.event_format.orNull,
event.event_version.orNull,
event.event_fingerprint.orNull,
event.true_tstamp.map(Timestamp.from).orNull
)
}.leftMap(castErrorToLoaderIgluError(AtomicFields.schemaKey, _))

private def doubleToDecimal(d: Double): ValidatedNel[CastError, BigDecimal] = {
val bigDec = BigDecimal(d)
Either.catchOnly[java.lang.ArithmeticException] {
bigDec.setScale(AtomicFields.customDecimal.scale, BigDecimal.RoundingMode.UNNECESSARY)
} match {
case Right(scaled) if scaled.precision <= AtomicFields.customDecimalPrecision =>
scaled.validNel
case _ =>
CastError.WrongType(Json.fromDoubleOrNull(d), AtomicFields.customDecimal).invalidNel
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,14 +60,16 @@ class ProcessingSparkSpec extends Specification with CatsEffect {
val inputEventIds = inputEvents.flatten.map(_.event_id.toString)
val outputEventIds = df.select("event_id").as[String].collect().toSeq
val loadTstamps = df.select("load_tstamp").as[java.sql.Timestamp].collect().toSeq
val trTotals = df.select("tr_total").as[BigDecimal].collect().toSeq

List[MatchResult[Any]](
cols must contain("event_id"),
cols must contain("load_tstamp"),
df.count() must beEqualTo(4L),
outputEventIds.toSeq must containTheSameElementsAs(inputEventIds),
loadTstamps.toSet must haveSize(1), // single timestamp for entire window
loadTstamps.head must not beNull
loadTstamps.head must not beNull,
trTotals must contain(BigDecimal(1.23))
).reduce(_ and _)
}
}
Expand All @@ -85,7 +87,9 @@ object ProcessingSparkSpec {
eventId2 <- IO.randomUUID
collectorTstamp <- IO.realTimeInstant
} yield {
val event1 = Event.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0")
val event1 = Event
.minimal(eventId1, collectorTstamp, "0.0.0", "0.0.0")
.copy(tr_total = Some(1.23))
val event2 = Event.minimal(eventId2, collectorTstamp, "0.0.0", "0.0.0")
val serialized = List(event1, event2).map { e =>
e.toTsv.getBytes(StandardCharsets.UTF_8)
Expand Down

0 comments on commit ff0c9a3

Please sign in to comment.