-
Notifications
You must be signed in to change notification settings - Fork 514
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve BQ typed support #5529
base: main
Are you sure you want to change the base?
Improve BQ typed support #5529
Conversation
case t if t =:= typeOf[LocalTime] => | ||
q"_root_.com.spotify.scio.bigquery.Time($tree)" | ||
q"_root_.com.spotify.scio.bigquery.Time.micros($tree)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so it was returning String before?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed, by changing this, I created a behavior change.
When extracting from avro, we disabled the logical types -> those types are returned as STRING (mentioned here). This is very annoying because the same record can't be written back in the table.
Setting useAvroLogicalTypes
on the IO enables that. This will however be breaking for all pipelines reading BQ DATE
, TIME
and DATETIME
that expects string
and will get logical type.
This is also not in sync with storage
API that uses logical types for such columns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated the PR to keep the same behavior.
Introduced new Format.GenericRecordWithLogicalType
to enable BQ avro read with desired setup.
Change BigQueryTyped.Table
to always use logical types
35a1b7e
to
158c9bf
Compare
@@ -203,7 +203,7 @@ final class SCollectionGenericRecordOps[T <: GenericRecord](private val self: SC | |||
self | |||
.covary[GenericRecord] | |||
.write( | |||
BigQueryTypedTable(table, Format.GenericRecord)( | |||
BigQueryTypedTable(table, Format.GenericRecordWithLogicalTypes)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On write, logical types were enabled by default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is aligned with the BQ storage API as alluded to in another comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I'm just keeping old behavior from here
@@ -334,7 +334,7 @@ private[types] object TypeProvider { | |||
q"override def schema: ${p(c, GModel)}.TableSchema = ${p(c, SUtil)}.parseSchema(${schema.toString})" | |||
} | |||
val defAvroSchema = | |||
q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(${cName.toString}, this.schema.getFields)" | |||
q"override def avroSchema: org.apache.avro.Schema = ${p(c, BigQueryUtils)}.toGenericAvroSchema(this.schema, true)" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use logical types by default for typed BQ avro
val schemaFactory = Functions.serializableFn[TableSchema, org.apache.avro.Schema] { _ => | ||
BigQueryType[T].avroSchema | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use strict schema from the annotated class instead of relying on the table conversion
158c9bf
to
22908a8
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5529 +/- ##
==========================================
- Coverage 61.31% 61.25% -0.07%
==========================================
Files 314 314
Lines 11249 11262 +13
Branches 770 796 +26
==========================================
+ Hits 6897 6898 +1
- Misses 4352 4364 +12 ☔ View full report in Codecov by Sentry. |
6da6ac2
to
aadb8fb
Compare
@@ -250,6 +257,9 @@ object DateTime { | |||
/** Convert BigQuery `DATETIME` string to `LocalDateTime`. */ | |||
def parse(datetime: String): LocalDateTime = | |||
Parser.parseLocalDateTime(datetime) | |||
|
|||
// For BigQueryType macros only, do not use directly | |||
def format(datetime: LocalDateTime): String = apply(datetime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't be private[scio]
for some reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unfortunatelly no. This is used by code generated in macros, meaning it can be referenced from any package.
@@ -48,7 +48,8 @@ object TypedBigQueryIT { | |||
timestamp: Instant, | |||
date: LocalDate, | |||
time: LocalTime, | |||
datetime: LocalDateTime, | |||
// BQ DATETIME is problematic with avro: export as 'string(datetime)', load as '(long)local-timestamp-micros' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make this comment slightly more verbose
@@ -203,7 +203,7 @@ final class SCollectionGenericRecordOps[T <: GenericRecord](private val self: SC | |||
self | |||
.covary[GenericRecord] | |||
.write( | |||
BigQueryTypedTable(table, Format.GenericRecord)( | |||
BigQueryTypedTable(table, Format.GenericRecordWithLogicalTypes)( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is aligned with the BQ storage API as alluded to in another comment?
Leverages upstream changes available for BQ:
Annotated BQ typed avro translation leverages logical-type to have symmetric read/write. This fixes integration testfailure introduced in #5523