Skip to content

Commit

Permalink
Stable TableRow converted from BQ types
Browse files Browse the repository at this point in the history
Coder[TableRow] is destructive (it is a dummy JSON serializer), we
should make sure that the TableRow object converted from a BQ model is
stable after serialization.

We currently have an issue with
- long that are serialized as string to avoid overflow
- float that are read back as double
- json that is read as nested TableRow
  • Loading branch information
RustedBones committed Dec 19, 2024
1 parent 4d701c1 commit e854bff
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,10 +387,10 @@ private[types] object ConverterProvider {
case t if provider.shouldOverrideType(c)(t) => q"$tree.toString"
case t if t =:= typeOf[Boolean] => tree
case t if t =:= typeOf[Int] => tree
case t if t =:= typeOf[Long] => tree
case t if t =:= typeOf[Float] => tree
case t if t =:= typeOf[Double] => tree
case t if t =:= typeOf[String] => tree
case t if t =:= typeOf[Long] => q"$tree.toString" // json doesn't support long
case t if t =:= typeOf[Float] => q"$tree.toDouble" // json doesn't support float
case t if t =:= typeOf[Double] => tree
case t if t =:= typeOf[String] => tree

case t if t =:= typeOf[BigDecimal] =>
q"_root_.com.spotify.scio.bigquery.Numeric($tree).toString"
Expand All @@ -412,7 +412,7 @@ private[types] object ConverterProvider {
case t if t =:= typeOf[Geography] =>
q"$tree.wkt"
case t if t =:= typeOf[Json] =>
// for TableRow/json, use JSON to prevent escaping
// for TableRow/json, use parsed JSON to prevent escaping
q"_root_.com.spotify.scio.bigquery.types.Json.parse($tree)"
case t if t =:= typeOf[BigNumeric] =>
// for TableRow/json, use string to avoid precision loss (like numeric)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package com.spotify.scio.bigquery

import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import com.spotify.scio.coders.Coder
import org.apache.avro.Conversions.DecimalConversion
import org.apache.avro.LogicalTypes
import org.apache.beam.sdk.extensions.gcp.util.Transport
import org.typelevel.scalaccompat.annotation.nowarn

import java.math.MathContext
Expand Down Expand Up @@ -63,10 +63,11 @@ package object types {
*/
case class Json(wkt: String)
object Json {
private lazy val mapper = new ObjectMapper()
@transient
private lazy val jsonFactory = Transport.getJsonFactory

def apply(node: JsonNode): Json = Json(mapper.writeValueAsString(node))
def parse(json: Json): JsonNode = mapper.readTree(json.wkt)
def apply(row: TableRow): Json = Json(jsonFactory.toString(row))
def parse(json: Json): TableRow = jsonFactory.fromString(json.wkt, classOf[TableRow])
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package com.spotify.scio.bigquery.types

import com.fasterxml.jackson.databind.node.{JsonNodeFactory, ObjectNode}
import com.google.protobuf.ByteString
import com.spotify.scio.bigquery._
import org.joda.time.{Instant, LocalDate, LocalDateTime, LocalTime}
import org.scalatest.matchers.should.Matchers
import org.scalatest.flatspec.AnyFlatSpec

Expand Down Expand Up @@ -74,6 +76,13 @@ class ConverterProviderTest extends AnyFlatSpec with Matchers {
RequiredWithMethod.fromTableRow(TableRow("a" -> "")) shouldBe RequiredWithMethod("")
BigQueryType.toTableRow[RequiredWithMethod](RequiredWithMethod("")) shouldBe TableRow("a" -> "")
}

it should "convert to stable types for the coder" in {
import com.spotify.scio.testing.CoderAssertions._
// Coder[TableRow] is destructive
// make sure the target TableRow format chosen by the BigQueryType conversion is stable
AllTypes.toTableRow(AllTypes()) coderShould roundtrip()
}
}

object ConverterProviderTest {
Expand Down Expand Up @@ -102,4 +111,23 @@ object ConverterProviderTest {
def accessorMethod: String = ""
def method(x: String): String = x
}

@BigQueryType.toTable
case class AllTypes(
bool: Boolean = true,
int: Int = 1,
long: Long = 2L,
float: Float = 3.3f,
double: Double = 4.4,
numeric: BigDecimal = BigDecimal(5),
string: String = "6",
byteString: ByteString = ByteString.copyFromUtf8("7"),
timestamp: Instant = Instant.now(),
date: LocalDate = LocalDate.now(),
time: LocalTime = LocalTime.now(),
datetime: LocalDateTime = LocalDateTime.now(),
geography: Geography = Geography("POINT (8 8)"),
json: Json = Json("""{"key": 9,"value": 10}"""),
bigNumeric: BigNumeric = BigNumeric(BigDecimal(11))
)
}

0 comments on commit e854bff

Please sign in to comment.