Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] why is the schema evolution done while not setting hoodie.schema.on.read.enable #8018

Open
menna224 opened this issue Feb 22, 2023 · 16 comments
Assignees
Labels
schema-and-data-types writer-core Issues relating to core transactions/write actions

Comments

@menna224
Copy link

menna224 commented Feb 22, 2023

we have a glue streaming job that writes to hudi table, we try to do schema evolution, when we add a new col to any record, it works fine and the new col is shown when querying the table, the thing is we expect that it should not evolute the schema because we didn't set the config hoodie.schema.on.read.enable, and as we understand that this config is set by default to false, and as per hudi docs:

"Enables support for Schema Evolution feature
Default Value: false (Optional)
Config Param: SCHEMA_EVOLUTION_ENABLE"

so when didn't define it on our config, it shouldn't allow for the schema evolution and adding of the new columns, right?
we even tried to explicitly set it to false in our connection options, but still , when we add a new col it's shown to our table
To Reproduce

Steps to reproduce the behavior:

  1. run the glue streaming job
  2. add a record with new col/ attribute( attribute in case of dynamodb)
  3. query the hudi table

Expected behavior

it shouldn't show the added cols/attributes as we disabled schema evolution and the col/attribute shouldn't be existing also in the schema of the table in the datalake.

Environment Description

  • Hudi version : .12

  • Spark version : 3

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

  • glue version: 4

Additional context

our connection options are:

hudiWriteConfig = {
    'className': 'org.apache.hudi',
    'hoodie.datasource.write.table.type': 'MERGE_ON_READ',
    'hoodie.table.name': hudi_table_name,
    'hoodie.datasource.write.table.name': hudi_table_name,
    'hoodie.datasource.write.precombine.field': 'timestamp',
    'hoodie.datasource.write.recordkey.field': 'user_id',
    'hoodie.datasource.write.operation': 'upsert',
    #"hoodie.compact.schedule.inline":"true",
    'hoodie.datasource.hive_sync.use_jdbc':'false',
    'hoodie.datasource.hive_sync.mode':'hms',
    "hoodie.compact.inline": "true",
    "hoodie.compact.inline.max.delta.commits":"3",
    "hoodie.schema.on.read.enable":"false",
    "hoodie.deltastreamer.schemaprovider.source.schema.file":"s3://hudi-test-table/menna/src.acsv",
    "hoodie.deltastreamer.schemaprovider.target.schema.file":"s3://hudi-test-table/menna/target.acsv"
    #'hoodie.datasource.write.partitionpath.field': 'year:SIMPLE,month:SIMPLE,day:SIMPLE',
    #'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator',
    #'hoodie.deltastreamer.keygen.timebased.timestamp.type': 'DATE_STRING',
    #'hoodie.deltastreamer.keygen.timebased.input.dateformat': 'yyyy-mm-dd',
    #'hoodie.deltastreamer.keygen.timebased.output.dateformat': 'yyyy/MM/dd'
}

hudiGlueConfig = {
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.sync_as_datasource': 'true',
    'hoodie.datasource.hive_sync.database': database_name,
    'hoodie.datasource.hive_sync.table': hudi_table_name,
    'hoodie.datasource.hive_sync.use_jdbc': 'false',
    'hoodie.datasource.write.hive_style_partitioning': 'false',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    #'hoodie.datasource.hive_sync.partition_fields': 'year,month,day'
}

commonConfig = {
    'path': s3_path_hudi_table
}

combinedConf = {
    **commonConfig,
    **hudiWriteConfig,
    **hudiGlueConfig
}

in glue streaming job we use:

glueContext.forEachBatch(
    frame=data_frame_DataSource0,
    batch_function=processBatch,
    options={
        "windowSize": window_size,
        "checkpointLocation": s3_path_spark_checkpoints
    }
)

and:

data_frame_DataSource0 = glueContext.create_data_frame.from_catalog(
    database=database_name,
    table_name=kinesis_table_name,
    transformation_ctx="DataSource0",
    additional_options={"inferSchema": "true", "startingPosition": starting_position_of_kinesis_iterator}
)

and the way we write our hudi table is:

 kinesis_data_frame.write.format("hudi").options(**combinedConf).mode("append").save()

sometimes we write it as follows but it gives the same behaviour:

 glueContext.write_dynamic_frame.from_options(
            frame=DynamicFrame.fromDF(kinesis_data_frame, glueContext, "evolved_kinesis_data_frame"),
            connection_type="custom.spark",
            connection_options=combinedConf
        )
@kazdy
Copy link
Contributor

kazdy commented Feb 22, 2023

Hi,

Hudi has optional "schema on read" and "out of the box schema evolution" which is the default.
With "out of the box schema evolution" adding new columns is supported by default, see this link. Currently you can not "disable" schema evolution in full as far as I know.

@menna224
Copy link
Author

hi @kazdy thanks for your reply. sorry can you please elaborate more ? I understand that out of box is when we add a new records with new col it's automatically added to the table and the other options that are mentioned in the link you shared, but in this case, how is schema on read different than this? if we enabled hoodie.schema.on.read.enable , how is the behavior differing?

@danielfordfc
Copy link

To add to this, it's not just new columns being added that is handled natively. Adding complex types like arrays and maps, Valid type conversions, 👍 , extending enums etc...

I was just browsing the issues and was also planning on asking this same question!

@danny0405
Copy link
Contributor

There is another option for the writer.
image
Maybe that is what you need.

@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Feb 24, 2023
@danny0405 danny0405 moved this from ⏳ Awaiting Triage to 👤 User Action in Hudi Issue Support Feb 24, 2023
@danny0405 danny0405 added the writer-core Issues relating to core transactions/write actions label Feb 24, 2023
@kazdy
Copy link
Contributor

kazdy commented Feb 24, 2023

It feels like target table schema enforcement is needed (this topic also returns from time to time in Hudi slack).
Afaik Hudi does not support such a thing overall, but it does this for MERGE INTO statement (new columns are dropped to fit the target table schema).

@menna224
Copy link
Author

menna224 commented Feb 27, 2023

hey @danny0405, thanks for your reply. what I understood from the description of hoodie.datasource.write.reconcile.schema
, that we need to set it to true to enable it as if it's disabled it will update the schema. we tried to set it as true as follows:

'hoodie.datasource.write.reconcile.schema':"true"

but unfortunately, it's still adding the new col to the table, is there any config option we need to add along with hoodie.datasource.write.reconcile.schema so that it keeps the schema? as it's written in the description "writer-schema will be picked such that table's schema (after txn) is either kept the same or extended, meaning that we'll always prefer the schema that either adds new columns or stays the same." I don't know how if it's enabled it can do both options keeping it the same or extending, isn't extending the same as disabling the feature from the first place?

so how do we keep it the same?

@danny0405
Copy link
Contributor

when we add a new col it's shown to our table

Did you change the table schema through spark catalog or you just injest a data frame with different schema?

@menna224
Copy link
Author

menna224 commented Mar 1, 2023

@danny0405 we just injested a data with different schema like this:

  kinesis_data_frame.write.format("hudi").options(**combinedConf).mode("append").save()

and it's updating the schema

@voonhous
Copy link
Member

voonhous commented Mar 2, 2023

@menna224

Prior to Hudi Full schema evolution (HFSE), which is enabled using hoodie.schema.on.read.enable=true, Hudi relies on Avro's Schema resolution (ASR) to perform schema evolution.

What you are encountering is basically schema-evolution performed via ASR. I've written some test and fixes that are unrelated to your issue before where you can get an idea of how it is different from HFSE here:
#7444

I noticed that you are using 0.12., not sure which minor version you are using, but it doesn't really matter. In the 0.12. branches, HFSE is an experimental feature and the feature that you wanted that was governed by hoodie.datasource.write.reconcile.schema was essentially changed to be adapted for HFSE's use.

As such, you will not be able to use hoodie.datasource.write.reconcile.schema to prevent schemas from being implicitly evolved.

I hope this answers questions that you have.

With regards to how one can prevent schema from being evolved implicitly when ASR is used... I don't think it's possible in 0.12.*. (Anyone in the community that is more familiar with this, please correct me if I am wrong)

@danny0405
Copy link
Contributor

I inspected the master code base and the option hoodie.schema.on.read.enable=false should prevent the schema changes.

@kazdy
Copy link
Contributor

kazdy commented Mar 3, 2023

@danny0405 isn't it that with hoodie.schema.on.read.enable=false Hudi fallbacks on the default "out of the box" schema evolution that uses avro schema resolution (which allows for new columns to be added at the end of the schema)?

When it comes to reconciling schema, when I was playing with it in 0.10 and 0.11 it was allowing wider schemas on write, but when incoming cols were missing then these were added to match "current" target schema.
So for me reconciling schema worked like this:
wider schema -> accept as new table schema
missing columns -> add missing columns to match current table schema

There's a hacky way to prevent schema evolution.
One can get schema from metastore or from file containing avro schema definition for the table, then read it in your spark job and pass it to https://hudi.apache.org/docs/configurations#hoodiewriteschema which should overwrite the writer schema and effectively drop new columns.

Again, MERGE INTO stmt enforces the target table schema when writing records.

@danny0405
Copy link
Contributor

I guess we need a clear doc to elaborate the schema evolution details for 0.13.0

@menna224
Copy link
Author

menna224 commented Mar 5, 2023

thank you @kazdy for your reply. I tried to pass the schema this way to the config you mentioned but I get an error, I am not totally aware how to pass it can u please help?

schema = "user_id: string, firstname: string, operation: string, timestamp: double"
 'hoodie.write.schema':schema

got the following error:

Caused by: org.apache.avro.SchemaParseException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'user_id': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (String)"user_id: string, firstname: string, operation: string, timestamp: double"; line: 1, column: 8]

seems like I shouldn't pass it as a string, but I couldn't get from the doc how I should pass it.

Also, regarding your part when U mentioned "missing columns -> add missing columns to match current table schema"

did u need to add extra logic in your code or by default the missing cols were added? or it's just by adding 'hoodie.datasource.write.reconcile.schema':"true"?

and if the missing cols were added wo extra logic in your code, were u using pyspark+glue? or what did u use exactly? thanks

@Limess
Copy link

Limess commented Mar 8, 2023

I guess we need a clear doc to elaborate the schema evolution details for 0.13.0

Outside of the original issue here, we'd find that extremely useful.

We haven't found it clear from the release notes exactly what has changed or what the new behaviour is, or how we configure it.

We also don't understand how exactly hoodie.datasource.write.reconcile.schema and hoodie.avro.schema.validate work in either case. Specific examples would help here for both flags.

Ideally I'd like a table of possible operations for the "old" (avro) and "new" schema evolutions, with examples of adding/removing columns on a sample dataset. Also a direct comparison of how they differ.

@vinothchandar
Copy link
Member

+1 on @kazdy 's notes above on ASR. Hudi has always supported some automatic schema evolution to deal with streaming data similar to what Kafka/Schema registry model achieves. The reason was, users found it inconvenient to coordinate pausing pipelines and doing some manual maintenance/backfills when say, new columns were added. What we call full schema evolution/schema-on-read is orthogonal, and it just allows more backwards incompatible evolutions to go through as well.

Now on 0.13, I think the reconcile flag simple allows for skipping some columns in the incoming write (partial writes scenarios) and Hudi reconciles this with the table schema - while still respecting the automatic schema evolution. I think this is what 0.13 changes. https://hudi.apache.org/releases/release-0.13.0#schema-handling-in-write-path

We also don't understand how exactly hoodie.datasource.write.reconcile.schema and hoodie.avro.schema.validate work in either case. Specific examples would help here for both flags.

Note to @nfarah86 and @nsivabalan to cover this in the schema docs page that is being worked on now.

@vinothchandar
Copy link
Member

@menna224 For your original issue on not adding the new column, it's not something that has come up before. So we would need to provide some way to alter behavior to ignore the extra columns. Is that behavior expected?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
schema-and-data-types writer-core Issues relating to core transactions/write actions
Projects
Status: 👤 User Action
Development

No branches or pull requests

8 participants