Skip to content

Commit

Permalink
Bump schema-ddl to 0.21.0-M1
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Sep 7, 2023
1 parent 100c4c4 commit 5f56bf6
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import io.circe.Json

import org.apache.spark.sql.Row

import java.sql.{Date, Timestamp}
import java.time.{Instant, LocalDate}

import com.snowplowanalytics.iglu.schemaddl.parquet.Type
import com.snowplowanalytics.iglu.schemaddl.parquet.Caster
Expand All @@ -27,8 +27,8 @@ private[processing] object SparkCaster extends Caster[Any] {
override def doubleValue(v: Double): Double = v
override def decimalValue(unscaled: BigInt, details: Type.Decimal): BigDecimal =
BigDecimal(unscaled, details.scale)
override def timestampValue(v: Timestamp): Timestamp = v
override def dateValue(v: Date): Date = v
override def timestampValue(v: Instant): Instant = v
override def dateValue(v: LocalDate): LocalDate = v
override def arrayValue(vs: List[Any]): List[Any] = vs
override def structValue(vs: List[Caster.NamedValue[Any]]): Row = Row.fromSeq(vs.map(_.value))

Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ object Dependencies {
val sentry = "6.25.2"

// Snowplow
val schemaDdl = "0.20.0-M1"
val schemaDdl = "0.21.0-M1"
val badrows = "2.2.0"
val igluClient = "3.0.0"
val tracker = "2.0.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import cats.implicits._
import cats.data.{NonEmptyList, Validated, ValidatedNel}
import io.circe.Json

import java.time.Instant
import java.sql.Timestamp

import com.snowplowanalytics.iglu.schemaddl.parquet.{Caster, Field, Type}
import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure => BadRowFailure, FailureDetails, Processor => BadRowProcessor}
import com.snowplowanalytics.snowplow.badrows.{Payload => BadPayload}
Expand Down Expand Up @@ -200,9 +197,9 @@ object Transform {
List[A](
event.app_id.fold[A](caster.nullValue)(caster.stringValue(_)),
event.platform.fold[A](caster.nullValue)(caster.stringValue(_)),
event.etl_tstamp.fold[A](caster.nullValue)(forInstant(caster, _)),
forInstant(caster, event.collector_tstamp),
event.dvce_created_tstamp.fold[A](caster.nullValue)(forInstant(caster, _)),
event.etl_tstamp.fold[A](caster.nullValue)(caster.timestampValue( _)),
caster.timestampValue(event.collector_tstamp),
event.dvce_created_tstamp.fold[A](caster.nullValue)(caster.timestampValue(_)),
event.event.fold[A](caster.nullValue)(caster.stringValue(_)),
caster.stringValue(event.event_id.toString),
event.txn_id.fold[A](caster.nullValue)(caster.intValue(_)),
Expand Down Expand Up @@ -315,28 +312,24 @@ object Transform {
event.mkt_clickid.fold[A](caster.nullValue)(caster.stringValue(_)),
event.mkt_network.fold[A](caster.nullValue)(caster.stringValue(_)),
event.etl_tags.fold[A](caster.nullValue)(caster.stringValue(_)),
event.dvce_sent_tstamp.fold[A](caster.nullValue)(forInstant(caster, _)),
event.dvce_sent_tstamp.fold[A](caster.nullValue)(caster.timestampValue(_)),
event.refr_domain_userid.fold[A](caster.nullValue)(caster.stringValue(_)),
event.refr_dvce_tstamp.fold[A](caster.nullValue)(forInstant(caster, _)),
event.refr_dvce_tstamp.fold[A](caster.nullValue)(caster.timestampValue(_)),
event.domain_sessionid.fold[A](caster.nullValue)(caster.stringValue(_)),
event.derived_tstamp.fold[A](caster.nullValue)(forInstant(caster, _)),
event.derived_tstamp.fold[A](caster.nullValue)(caster.timestampValue(_)),
event.event_vendor.fold[A](caster.nullValue)(caster.stringValue(_)),
event.event_name.fold[A](caster.nullValue)(caster.stringValue(_)),
event.event_format.fold[A](caster.nullValue)(caster.stringValue(_)),
event.event_version.fold[A](caster.nullValue)(caster.stringValue(_)),
event.event_fingerprint.fold[A](caster.nullValue)(caster.stringValue(_)),
event.true_tstamp.fold[A](caster.nullValue)(forInstant(caster, _))
event.true_tstamp.fold[A](caster.nullValue)(caster.timestampValue(_))
)
.zip(AtomicFields.static)
.map { case (value, field) =>
Caster.NamedValue(field.name, value)
}
}.leftMap(castErrorToLoaderIgluError(AtomicFields.schemaKey, _))

// TODO: should we just use Instant?
private def forInstant[A](caster: Caster[A], v: Instant): A =
caster.timestampValue(Timestamp.from(v))

private val monetaryPrecision: Int = Type.DecimalPrecision.toInt(AtomicFields.monetaryDecimal.precision)

private def forMoney[A](caster: Caster[A], d: Double): ValidatedNel[CastError, A] = {
Expand Down

0 comments on commit 5f56bf6

Please sign in to comment.