Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Unstable error messages when merging null values on NOT NULL columns #1279

Open
1 of 3 tasks
pedrosmv opened this issue Jul 18, 2022 · 1 comment
Open
1 of 3 tasks
Assignees
Labels
bug Something isn't working

Comments

@pedrosmv
Copy link

Bug

Describe the problem


We have a merge operation using Delta + pySpark that deals with CDC data, mostly Inserts and Updates. On our testing we found out that the behaviour when dealing with null values on NOT NULL columns is very erratic. Depending on the operations, we have different results.

Steps to reproduce

Spark setup:

def spark_loader():
    builder = pyspark.sql.SparkSession.builder.appName("testing-schema-evo") \
        .master("local[4]") \
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
        .config("spark.sql.catalog.spark_catalog",
                "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
        .config("spark.databricks.delta.schema.autoMerge.enabled", "true")

    return configure_spark_with_delta_pip(builder).getOrCreate()


Create the table:

spark.sql(f"""
CREATE TABLE IF NOT EXISTS delta.`testing-schema-evo` (
name STRING NOT NULL,
age STRING NOT NULL
) USING DELTA
""")


Load data into the table:

test_data = [
        {"name": "charly", "age": "sixteen"},
        {"name": "fabien", "age": "one"},
        {"name": "sam", "age": "two"},
        {"name": "sam", "age": "three"},
]

# Create the dataframe

test_data_df = spark.createDataFrame(Row(**data_row) for data_row in test_data)

# Then we merge using the first df:
condition = "current.name = new.name"
deltaTable = DeltaTable.forPath(spark, "delta-tables/test_table")
deltaTable.alias("current") \
    .merge(test_data_df.alias("new"), condition) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()


After loading the initial data, we run the merge again, trying to load data with null values:

test_data_with_null = [
    {"name": "joson", "age": "eleven"},
    {"name": "icarson", "age": None},
    {"name": "sam", "age": None},
]

test_data_with_null_df = spark.createDataFrame(Row(**data_row) for data_row in test_data_with_null)

condition = "current.name = new.name"
deltaTable = DeltaTable.forPath(spark, "delta-tables/test_table")
deltaTable.alias("current") \
    .merge(test_data_with_null_df.alias("new"), condition) \
    .whenMatchedUpdateAll() \
    .whenNotMatchedInsertAll() \
    .execute()

Observed results


​Running the test with the complete statement, the result is the following:

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2352)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2351)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2351)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1109)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1109)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2591)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2533)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2522)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:898)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:218)
	... 53 more
Caused by: org.apache.spark.SparkException: Task failed while writing rows.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:500)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:321)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.lang.RuntimeException: Error while decoding: java.lang.NullPointerException
createexternalrow(input[0, string, false].toString, input[1, string, false].toString, StructField(name,StringType,false), StructField(age,StringType,false))
	at org.apache.spark.sql.errors.QueryExecutionErrors$.expressionDecodingError(QueryExecutionErrors.scala:1047)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:184)
	at org.apache.spark.sql.delta.commands.MergeIntoCommand$JoinedRowProcessor.$anonfun$processPartition$8(MergeIntoCommand.scala:821)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:304)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
	... 9 more
Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificSafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Deserializer.apply(ExpressionEncoder.scala:181)
	... 20 more

When we have only the whenNotMatchedInsertAll(), the result is the expected one:

deltaTable.alias("current") \
    .merge(test_data_with_null_df.alias("new"), condition) \
    .whenNotMatchedInsertAll() \
    .execute()
py4j.protocol.Py4JJavaError: An error occurred while calling o98.execute.
: org.apache.spark.sql.delta.schema.InvariantViolationException: NOT NULL constraint violated for column: age.
​
	at org.apache.spark.sql.delta.schema.InvariantViolationException$.apply(InvariantViolationException.scala:49)
	at org.apache.spark.sql.delta.schema.InvariantViolationException.apply(InvariantViolationException.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.delta.constraints.DeltaInvariantCheckerExec.$anonfun$doExecute$3(DeltaInvariantCheckerExec.scala:87)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:92)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:304)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1496)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:311)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$16(FileFormatWriter.scala:229)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Expected results

Our expected result was the InvariantViolationException in both cases

Further details

Environment information

  • Delta Lake version: 1.2.1
  • Spark version: 3.2.0
  • Scala version: 2.12

Willingness to contribute


The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@pedrosmv pedrosmv added the bug Something isn't working label Jul 18, 2022
@nkarpov
Copy link
Collaborator

nkarpov commented Aug 12, 2022

Hi @pedrosmv - thank you for reporting this. Confirming we can reproduce this based on your steps. Since an error is still being thrown and no data corruption occurs, we'll keep this open for now for anyone who might like to contribute and prioritize accordingly otherwise. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants