Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Duplicate Record on Upsert Issue #527

Closed
SonuSingh1411 opened this issue Sep 29, 2020 · 59 comments
Closed

Duplicate Record on Upsert Issue #527

SonuSingh1411 opened this issue Sep 29, 2020 · 59 comments
Assignees
Labels
bug Something isn't working need author feedback Issue is waiting for the author to respond

Comments

@SonuSingh1411
Copy link

Just to give the background, we use AWS EMR to run Spark and use pyspark code for running the Delta code and save the file on S3.
We have been using Delta Lake to ingest data from Oracle in AWS S3 in an incremental fashion. But recently, we have discovered that Delta, is creating some duplicate id’s after the merge. We also checked oracle and found that the ID’s are auto increment Primary Keys. Could you please help us debug the issue?
Below is code snippet from our Merge Logic:

delta_table.alias("current").merge(
    new_df.alias("new"), "current.ID = new.ID"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

I can share the entire logic that we have if its needed. But any guidance would be really helpful.

@tdas
Copy link
Contributor

tdas commented Sep 29, 2020

are you sure there are no duplicate ids in the source of the merge, that is, in the new_df dataset? if there are duplicate ids in the source dataset, and those ids are not in the target table (that is, not matched), then all the duplicate ids will be inserted.

@SonuSingh1411
Copy link
Author

Yes. the ID column is sequence Primary Key in oracle and hence it cannot have duplicates. I also validated that by looking at the ID's that were duplicates in source and they are unique.

@tdas
Copy link
Contributor

tdas commented Sep 29, 2020

Are you possibly running into consistency issues of S3 by running merge concurrently from multiple spark clusters? That is, multiple clusters/drivers are not yet supported on S3 - https://docs.delta.io/latest/delta-storage.html#amazon-s3

@SonuSingh1411
Copy link
Author

We have looked at this and as we are ingesting from Oracle, we run a single Spark application from a single driver and the data gets inserted or updated from a single cluster. We don't have multiple cluster executing on the same datasets.

@SonuSingh1411
Copy link
Author

Any idea on the issue?

@arnehuang
Copy link

arnehuang commented Oct 16, 2020

I have the same issue, verified no duplicates from source. However on a long running job for the past 2 months we now have 77 instances of duplicate merge key rows. pyspark, aws EMR, s3, spark 2.4.5, delta 0.6.1, single spark application. We cannot reproduce reliably.

The job is running on YARN which occasionally restarts it from checkpoint. Also I am running multiple merges to different tables in the same application, could either of these be an issue?

@SonuSingh1411
Copy link
Author

Yeah...its the same issue with us. We cannot reproduce this issue. It gets randomly created. I assume this is a bug in Delta, but have not received any guidance on it.

@SonuSingh1411
Copy link
Author

SonuSingh1411 commented Oct 21, 2020

@arnehuang I am not sure if merging different table would cause an issue. Also, we run one job per application and still we are getting this bug. So I assume this is a bug with Delta open source.

@scanry
Copy link

scanry commented Nov 3, 2020

I have the same problem,i suspect it was due to the failure of the task
image

1 similar comment
@scanry
Copy link

scanry commented Nov 3, 2020

I have the same problem,i suspect it was due to the failure of the task
image

@zsxwing
Copy link
Member

zsxwing commented Nov 3, 2020

@scanry which column has the unique id? srcid? Are you using Delta Lake to read the table?

@zsxwing
Copy link
Member

zsxwing commented Nov 3, 2020

@SonuSingh1411 Could you show us the codes of new_df? Does the data source of new_df guarantee no duplicates? Is it Spark's JDBC data source?

@scanry
Copy link

scanry commented Nov 3, 2020

@scanry which column has the unique id? srcid? Are you using Delta Lake to read the table?

  1. The sys_ods_row_id is unique id,it is a physics unique id。
  2. read from delta lake
  3. i used dataset.dropDuplicates(sys_ods_row_id)

so :
I guess this delta_lake didn't take care of it when spark task exception

@SonuSingh1411
Copy link
Author

@zsxwing Yes, we use Oracle JDBC to connect and oracle doesnot have a duplicate record, the primary key is unique and auto increment, so the new_df should have the correct data.
Below is how we calculate the new_df:

new_df = (
                self.get_all_data()
                .where(fx.col("UPDATED_ON") >= fx.lit(max_updated_on))
                .withColumn("UPDATED_ON_MONTH", fx.trunc(fx.col("UPDATED_ON"), "mon"))
 )

@khannavivek
Copy link

Hi, I am also facing the same issue,

Did anyone find any solution to it?

Currently, after writing the data I am deleting the records with duplicate primary key keeping the record with the maximum modified date to avoid any data conflicts.

@rtjarvis
Copy link

rtjarvis commented Feb 4, 2021

We're having this issue too. Looks like it happens when the checkpoints are created. Duplicates are created AND records lost.

@scanry
Copy link

scanry commented Mar 15, 2021

hi,Does anyone care about this?

@SonuSingh1411
Copy link
Author

SonuSingh1411 commented Mar 15, 2021 via email

@rahulsmahadev
Copy link
Collaborator

@SonuSingh1411 @scanry is it possible to help us reproduce this issue.

@scanry
Copy link

scanry commented Mar 16, 2021

@SonuSingh1411 @scanry is it possible to help us reproduce this issue.
The following example occurs in a production environment:
image
logic unique column for application layer = company_id
physics unique column for data-platform ods layer = sys_ods_row_id
physics unique column for current layer = sys_row_id
logic unique column for current layer = sys_id = isUpdate?old_sys_id:sys_row_id
id of batch job instance = sys_job_id

Processing logic:
merge into dwd_pixxx_company t1
using (
select
output columns
from (
select
row_number() over(partition by company_id order by sys_row_id desc) as rnk,
other columns
from ods_pixxx_company
where sys_table_version = ${ods_pixxx_company.tableVersion}
and sys_job_id = ${ods_pixxx_company.jobId}
) t
where rnk = 1
) t2
on (t1.company_id=t2.company_id)
when matched then
.....
when not matched then
.....

@SonuSingh1411
Copy link
Author

SonuSingh1411 commented Mar 16, 2021 via email

@rahulsmahadev
Copy link
Collaborator

Thank you @SonuSingh1411 , what would be great is if there was a self contained example or set of steps to follow to reproduce this issue. Because we have not seen this before.

@SonuSingh1411
Copy link
Author

SonuSingh1411 commented Mar 24, 2021 via email

@rahulsmahadev
Copy link
Collaborator

If anyone else in this thread able to help us get a reproducer. That would be great.

@tdas
Copy link
Contributor

tdas commented Mar 25, 2021

Since this is so hard to reproduce, let me try outline a debugging strategy that you can use on your side.

  1. As soon as you detect duplicate keys, try to find out the files names that has those keys. This can be done by using the spark sql function input_file_name(). So run something like select *, input_file_name() from table where key = duplicate_key. This should produce the file names.

  2. If both the duplicate keys are in the same file, then that is a BIG issue - a single merge command definitely produced the duplicates. In that case, please provide your full merge command, with all the matched / not-matched clauses. If possible, please provide screenshots of the SQL plans of the jobs generated by MERGE from the Spark UI > SQL tab.

  3. If the duplicate keys are in different files, then it is possible (not guaranteed) that they were generated by different merge commands. That should not happen, unless there was some kind of race condition in committing to the table. Can happen with S3 if stars aligned in unfortunate ways. To confirm that, search for those two file names in the transaction log of the Delta table. That is, search through the json files in the tablePath/_delta_log/ directory for those file names. If those two files names are in the same json file (name like 000..000123.json where 123 is the table version), then that means both those files with duplicates were created by a single MERGE like case 2 above and follow the instructions in 2. If they are in different JSON files, then they were generated in two different MERGE commands. Look at the version and timestamp of those two JSON files, and see if they are very close by or not. If they are, then it could have been a race condition. Please post the contents of those two JSON files.

let's hope this gives some pointers.

@ericjm24
Copy link

ericjm24 commented May 3, 2021

Hi @tdas, I'm working with @SonuSingh1411 on the same project. We identified some duplicate data in the latest ingestion, and following your suggestion I tracked down the individual partition files of all the duplicates.

For reference, we are using PySpark 2.4.5, delta 0.7.0, aws EMR, ojdbc8 to ingest from oracle, writing to s3.

First thing to note is again the rarity and seeming randomness of the issue. We ingest dozens of tables and only two had noticeable defects, and of those the first changes ~150 records and had only 2 duplicates, while the second changed ~19,000 records with a total of 5 duplicates.

Duplicates always appeared across two different partition files, and the different records that were duplicated could appear in different partitions from each other. As an example, for our table with two duplicates:

Row(ID=Decimal('818321257.0000000000'), FILE_NAME='../PERS/UPDATED_ON_MONTH=2021-03-01/part-00003-ff3571cd-8466-4881-9456-fa49c9a0f989.c000.snappy.parquet')
Row(ID=Decimal('818321257.0000000000'), FILE_NAME='../PERS/UPDATED_ON_MONTH=2021-03-01/part-00002-d3989b64-983c-40c3-8ea5-20c6525b422e.c000.snappy.parquet')
Row(ID=Decimal('818565974.0000000000'), FILE_NAME='../PERS/UPDATED_ON_MONTH=2021-03-01/part-00006-4f94cdaa-1321-4f66-b1f3-6522496d1653.c000.snappy.parquet')
Row(ID=Decimal('818565974.0000000000'), FILE_NAME='../PERS/UPDATED_ON_MONTH=2021-03-01/part-00005-a19a0712-14a7-430a-bc0a-da19f8889091.c000.snappy.parquet')

The first two records have the same ID (which again is an auto-incrementing primary key in the source, and cannot be duplicated) but appear in different partition files. The second set of duplicates is similar, and the partition files for the first set of records is also independent of the second set. The UPDATED_ON_MONTH partition value agrees for all records.

I then went into the delta logs and identified that all 4 of these files are included in the same delta log json file, indicating that they were all created within the same merge statement. Our merge statement is very simple, as Sonu indicated above:

delta_table.alias("current").merge( new_df.alias("new"), "current.ID = new.ID" ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

tl;dr The duplication issue happens across different partition files within a single merge.

@dennyglee
Copy link
Contributor

Thanks for the additional information, we will dive in deeper to see if this can point us a direction to debug. Is there any chance that this could be repro'ed using a public dataset? Out of curiosity, if you were to re-run the same tests without partitioning, you would not get this error? As well, how wide are your tables? Thanks!

@dennyglee dennyglee added bug Something isn't working need author feedback Issue is waiting for the author to respond labels May 5, 2021
@ericjm24
Copy link

ericjm24 commented May 5, 2021

I am sure it is reproducible on a public dataset, but I am not capable of doing that in my current position. Maybe one of the other people who have commented on this issue can help. @scanry above provided a screenshot of their data, so they might be able to assist further assuming they are experiencing the same bug we are.

I suspect this problem would go away if we kept it to a single file per data partition, but we have time limits on our jobs and cannot afford to attempt that. It would be too slow.

As for table size, most of our tables are in the range of 10-20 columns wide. We have a few that are around 100 but I don't think we've ever seen this issue with those tables (they tend to be very slowly changing). Tables can also be in the 10-100 million range in length, but as noted above we are usually only changing at most a few tens of thousands of records in any given night.

@SonuSingh1411
Copy link
Author

@Nestor10 thanks a lot for the defect. Hopefully this helps to get to the root of the problem. Please do keep us posted if you get any concrete update on the resolution of the issue.

@tdas
Copy link
Contributor

tdas commented Jun 8, 2021

We are looking into it. Will keep this thread posted if we manage to find the root cause.

@bigdatamoore
Copy link

bigdatamoore commented Jun 15, 2021

We are experiencing a similar issue in Azure Databricks. We are seeing dupes in a table that we are loading with a merge. We have raised a ticket with Microsoft.
+++
Update on this - I was able to determine that our issue was caused by a user error, not a system one. So false alarm.

@tdas
Copy link
Contributor

tdas commented Jun 15, 2021

We have not been able figure out the root cause yet since it is so hard and rare to reproduce. But one very likely cause of duplicates can be non-determinism in the data generated by the source of the merge. Merge operates by two passes on the source - first pass to identify which files have matching data, and the second pass to rewrite those files. If the data generated through that source plan is different between the two passes, then that can produce incorrect results. This non-determinism can occur in various ways
(i) The underlying table data that is being read in the source may have changed between the two passes.
(ii) The transformation of the data that generates the source may be non-deterministic. E.g. some map operation on the source.

Here are the different follow-up steps to mitigate/debug the issue.

  1. If you want to eliminate chances of non-determinism between two passes of the source, then you can either
    (a) cache the source, before feeding it to merge, or
    (b) write the source to a temporary table on disk, and then use the table to create a new source. if the source data can be large, and caching the data in spark becomes a concern, then this is more robust.

  2. If you want to debug the non-determinism (I urge you, please help us debug it), then instead of removing the non-determinism by cache or temp table, keep it as it is. Instead, additionally, just write the source to the table for recording the exact source data that was used to make the merge. If you keep a record of the source for every merge made, then if one of the merges causes duplicates, you can easily recreate that situation using the saved source. For example if merge on version X caused duplicates, then you can replay the merge for debugging by

  • Create a new delta table by using version X of the target table
  • Merge the corresponding saved source into this new table.
  • Compare the result of this merge to target table X+1. If they are the same (that is, duplicates created), then we have created a reproducible situation!!! report it to us and we can debug it further. If they are different, that is, no duplicates, then it indicates that the source non-determinism was the problem -- the source in the original merge caused duplicates, but the "supposedly same source data" not does not.

@Nestor10
Copy link

@tdas Thanks for the attention to this issue! I was able to identify a non-deterministic behavior in my data. Current_timestamp, yep, that'll do it and so, I fall clearly onto category ii that you outlined above. I spent, probably way to much time, reproducing this behavior in a moch spark run. However, I had some very interesting result. At first, despite 1000s of tries and making my data sources non-deterministic, very non-deterministic, I could not get duplicate events. I discovered one condition that precluded ever seeing these duplicate rows. That condition was that the merge source (not the delta source that we are merging into but rather the data we wish to merge in) is not empty. Even when merge sources are non-deterministic and being mutated, as long as the merge operated over at least one row in the first pass duplicate records are never created. Its as if the first pass caches the join results conditionally; if there is data to join with but skip this caching if the source table is empty (totally makes sense, optimization). The second pass seems to re-read the source table only when a table is not cached. Not sure if this help or is just a weird coincident but that was my finding. Over the next couple of week my free time will be spent listening to Martin Odersky lecture and solving some overly clever scala problems but after that if there is still a need I believe i could create a standalone unit test for this.

@tdas
Copy link
Contributor

tdas commented Jun 22, 2021

That corner case is somehow an artifact of how merge is internally implemented, which, obviously is subject to change. Good job catching it though. And thank you for confirming that it is indeed due to non-determinism. I hope this is the reason for other's problems as well.

We will document this in the Delta docs.

@RohitJadhav95
Copy link

Hi, thanks for all the questions and information provided.
I recently faced a similar issue wherein I found duplicates in my delta table when merge upserting the data.

I am upserting the data to a delta table in S3 using a Glue Job. There is only one Glue Job that is writing to the delta table and to ensure deduplication from the source, I explicitly dedup the data before merging it to the final table.

However, in my case I observed that I faced this issue only when my delta table was partitioned and there were no duplicates in case of non-partitioned table.

I am using an external table in Athena created with the manifest file to verify the duplicates and querying data.

This is the merge query that I am using -

df.alias("business").merge(inputDF_partitioned.alias("delta"), "business.id = delta.id").whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

Question - Is there something we need to be aware of while upserting data into partitioned delta tables or it is handled by delta tables itself?

Thanks is Advance

@jgoldman83
Copy link

Hey guys, I was able to reproduce it and found a solution. Just make sure that the parquet file that you are reading for the upserts, have the same schema as the source.

@sebastianGruenwald
Copy link

sebastianGruenwald commented May 6, 2022

[SOLVED -- NOT THE SAME ISSUE, SEE BELOW]
Hi all,
I Just wanted to share some insights I just had on this issue. I was experiencing the same problem with duplicates after merge into a delta table.

Here is my setup:

  • All happens in Databricks (Runtime 10.4)
  • One Source Delta Table where Events are appended periodically. (Every 5 minutes)
  • One target Table that should consolidate the events (each key has only one entry with the latest received data in it)
  • Target Table is partitioned by date

Implementation:

But when I ran the code i got duplicates after at least two batches of data where processed.

Solution
After trying several thins I left away the partitioncolumn condition in the merge into clause. The batches take longer to run now but no more duplicates. It seems this issue had something to do with partition pruning.

Code
Sorry guys this can not be used directly to reproduce the issue. But I think any source dataset with duplicates and a partitioning that is normally consistent with the key should be enough to do so.

import pyspark.sql.functions as f
from delta import DeltaTable

def write_batch(data, batch_id):
    data_consolidated = data.groupby("ORDERID").agg(
        f.max("sequenceNumber").alias("max_sequence_number"),
        f.max("enqueuedTime").alias("max_enqueued_time"),
        *[
            f.last(x, ignorenulls=True).alias(x)
            for x in sorted(data.columns[:28])
            if x not in ["ORDERID", "sequenceNumber"]
        ]
    ).withColumn("BDATE", f.col("BTIMESTAMP").cast("date")).select("BDATE", *data.columns[:28], "max_sequence_number", "max_enqueued_time").drop("ingestTime", "ingestDate")
    data_consolidated.persist()
    spark.catalog.setCurrentDatabase("test")
    if "target_table" not in [x.name for x in spark.catalog.listTables()]:
        (data_consolidated.write.partitionBy("SOMECOL", "BDATE").mode("overwrite").saveAsTable("test.target_table"))
    else:
        target_table = DeltaTable.forName(spark, "test.target_table")
        ## For Partition Pruning: get all dates wich are in the update batch
        bdatus = [x["BDATE"].isoformat()[:10] for x in data_consolidated.select("BDATE").drop_duplicates().collect()]
        if batch_id % 10 == 0:
            spark.sql("optimize test.silver_forecasts")
        if batch_id % 100 == 0:
            spark.sql("optimize test.silver_forecasts zorder by btimestamp")
        (target_table.alias("t").merge(data_consolidated.alias("s"),
              f"""t.ORDERID = s.ORDERID"""
              ## This condidion caused the issue. If you leave it away the duplicates are gone
              + f""" and t.BDATE in ('{"','".join(bdatus)}')""" 
               )
    .whenMatchedUpdateAll("t.max_sequence_number < s.max_sequence_number")
    .whenNotMatchedInsertAll()
    .execute())

stream = spark.readStream.option("maxFilesPerTrigger", "1").table("test.source")
stream.writeStream.outputMode("append").foreachBatch(write_batch).start()

# query to identify duplicates:
spark.read.table("test.target_table").agg(f.countDistinct("ORDERID"), f.count("ORDERID")).display()

@bart-samwel
Copy link
Collaborator

@sebastianGruenwald Potentially a stupid question: I'm not sure what the .cast("BDATE") does in the 9th line of write_batch. Should that be .cast("DATE")?

@sebastianGruenwald
Copy link

yes of course you are right. I changed it

@bart-samwel
Copy link
Collaborator

@sebastianGruenwald Is BDATE ever NULL? Because the IN based pruning won't work in that case.

@sebastianGruenwald
Copy link

sebastianGruenwald commented May 19, 2022

No It can't be NULL. *But i think I found the issue. In the original statement I had *

.withColumn("BDATE", f.date_trunc("day", "BTIMESTAMP"))

instead of

.withColumn("BDATE", f.col("BTIMESTAMP").cast("date"))

I thought it wouldn't make a big difference but testing it again this accually solves the issue. So accutally my fault, I forgot that date_trunc gives back a timestamp and BDATE IN ('2022-05-01') woudn't match for BDATE='2022-05-01T00:00:00'

@bart-samwel
Copy link
Collaborator

That makes sense! So this is not an instance of the issue described in this ticket.

@sebastianGruenwald
Copy link

Should I remove it?

@bart-samwel
Copy link
Collaborator

@sebastianGruenwald It's probably better to keep the discussion for history purposes, but maybe you can edit the original message to add a header like [SOLVED -- NOT THE SAME ISSUE, SEE BELOW] so that future readers can skip over the section?

@aleksandraangelova
Copy link

aleksandraangelova commented Jun 20, 2022

[ UNSOLVED - NOT THE SAME ISSUE - see https://github.com//issues/1218 ]

@bart-samwel
Copy link
Collaborator

@aleksandraangelova This ticket is specifically about MERGE. The reason for the issue you are experiencing is certainly different. Could you file a new ticket?

@aleksandraangelova
Copy link

Sure, I just filed it - #1218
Should I delete my comment here?

@bart-samwel
Copy link
Collaborator

You can leave your comment around, but maybe update it with a comment at the top, just like #527 (comment) ? Thank you!

@jwbreedlove
Copy link

Facing this issue in production across multiple different tables from many source (oracle/postgres/db2) applications. Does appear to be isolated to the upsert merges. We are running Spark 3.1.2 w/ Delta 1.0.0 on Kubernetes. Data store is HDFS (not S3). The issue is not repeatable and sporadic. When we notice a drop in counts or duplicates we roll back to a prior snapshot and replay the transactions. Resulting data is then correct. I will go through the debug steps mentioned in this thread and report back findings.

@scanry
Copy link

scanry commented Aug 22, 2022

hi brother,I recently found out a rule in duplicate cases ,it was very often duplicate when merge through multiple columns(date,mall,source,type),and 'input_file_name()' is the same file。
image
image

@allisonport-db
Copy link
Collaborator

Minor update: we've made one fix w.r.t. non-determinism in merge in fadefea. This ensures all current date or time functions throughout the command are consistent.

@Kiran-G1
Copy link

I partially replicated this issue with a different use case.

Use case is to update the old record's datetime to current time and latest record datetime to 2050 in delta by performing merge operation when a new record comes.

But all of the records were considered as new records...

image

I noticed that all those records got inserted at the same time ( see the load time) into the delta table.

My hunch is, since delta table write happens in parallel by spark and all these records got inserted at the same time , due to this race condition merge condition didn't satisfy.

@tdas @SonuSingh1411

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working need author feedback Issue is waiting for the author to respond
Projects
None yet
Development

Successfully merging a pull request may close this issue.