-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingestion/glue): delta schemas #10299
feat(ingestion/glue): delta schemas #10299
Conversation
"nullable": true, | ||
"type": { | ||
"type": { | ||
"com.linkedin.pegasus2avro.schema.NullType": {} | ||
"com.linkedin.pegasus2avro.schema.NumberType": {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for reviewers:
This is for the new mapping I added here
"long": "long", |
num_dataset_schema_invalid: int = 0 | ||
num_dataset_buggy_delta_schema: int = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for reviewers:
I added these two mainly for the testing. I'm ok to rename or remove even
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice. Let's please rename as
num_dataset_schema_invalid -> num_dataset_invalid_delta_schema
num_dataset_buggy_delta_schema -> num_dataset_valid_delta_schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
addressed in d634b10
@@ -1796,397 +1796,6 @@ | |||
"lastRunId": "no-run-id-provided" | |||
} | |||
}, | |||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note for the reviewers:
Removed aspects are not lost; they were duplicated and updating the golden file resulted on removing the duplicated aspects.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add config extract_delta_schema_from_parameters
to enable this behavior. can be set to default True. If disabled, none of this delta customization would be used.
Can you confirm if this issue is with particular spark/delta version or all of them ?
Also, if I understand correctly, this workaround can entirely be removed if the issue is fixed in new spark/delta version - for example - if the delta.io PR that you have linked is merged, right ? If so, please add a comment in codebase to mention this.
num_dataset_schema_invalid: int = 0 | ||
num_dataset_buggy_delta_schema: int = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is nice. Let's please rename as
num_dataset_schema_invalid -> num_dataset_invalid_delta_schema
num_dataset_buggy_delta_schema -> num_dataset_valid_delta_schema
@@ -1148,9 +1152,35 @@ def get_s3_tags() -> Optional[GlobalTagsClass]: | |||
return new_tags | |||
|
|||
def get_schema_metadata() -> Optional[SchemaMetadata]: | |||
if not table.get("StorageDescriptor"): | |||
def is_delta_schema(columns: Optional[List[Mapping[str, Any]]]) -> bool: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defining functions within other functions is discouraged as per coding style. Can you please move this function outside ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this be refractored to accept both tableParameters and tableStorageDescriptor and then return boolean ? this function can subsume this check as well -> (provider == "delta") and (num_parts > 0)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Defining functions within other functions is discouraged as per coding style. Can you please move this function outside ?
While I do agree, I just followed the existing pattern in the code. Note get_owner
, get_dataset_properties
, get_s3_tags
are all defined within _extract_record
method.
So, are you suggesting to move is_delta_schema
at _extract_record
level too or at GlueSource
level?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As just moved the is_delta_schema
at _extra_record
level, to keep it aligned with all others existing methods in the codebase.
return None | ||
|
||
def _get_glue_schema_metadata() -> Optional[SchemaMetadata]: | ||
assert table.get("StorageDescriptor") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can remove this assert as this is already checked earlier.
Thanks for the review @mayurinehate . I will address comments soon.
Sure, good point!
Not a particular version but an long-time overall issue. As you can see in the two links listed in the PR description, community is recurrently complaining about this issue.
True. As soon as the hive integration with Spark is correctly providing the schema as expected in the
|
Added the |
@mayurinehate Could you please review again? Thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor edits suggested. Everything else looks good to me.
extract_delta_schema_from_parameters: Optional[bool] = Field( | ||
default=False, | ||
description="If enabled, delta schemas can be alternatively fetched from table parameters " | ||
"(https://github.com/delta-io/delta/pull/2310)", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's please move this PR's link as code comment on this config - rather than in description of config,
Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
@mayurinehate Thanks for the feedback and approval. Build is all green but a nifi test. Surprisingly the test complains about not matching golden fine in 3.10 while it works fine in 3.8. Do you think is that related to my updates? or just some random error? |
CI Failures are unrelated to the changes in the PR. |
Co-authored-by: Mayuri Nehate <33225191+mayurinehate@users.noreply.github.com>
Hive sync up in Spark is wrongly reported as
col (array<string>)
. Experience shows that the issue happens only when using Scala and for tables (not views), which is actually a very common case. Given that the correct schema is serialized in thespark.sql.sources.schema.part.{i}
table parameters, this PR parses and processes the schema from those properties so such a valuable metadata is not lost.This is solving the following feature request: https://feature-requests.datahubproject.io/p/glue-crawler-support-for-correct-delta-schema-stored-in-properties
This bug in Spark has been reported many times and never solved. Some references here:
Checklist