Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle ConcurrentAppendException in qbeast-spark #441

Closed
JosepSampe opened this issue Oct 22, 2024 · 1 comment
Closed

Handle ConcurrentAppendException in qbeast-spark #441

JosepSampe opened this issue Oct 22, 2024 · 1 comment
Assignees
Labels
type: bug Something isn't working

Comments

@JosepSampe
Copy link
Member

Since PR Issue #405: DataWriter refactory #402, a ConcurrentAppendException started appearing during a write() and a optimize()

io.delta.exceptions.ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
Conflicting commit: {"timestamp":1728639379364,"operation":"WRITE","operationParameters":{"mode":Append},"readVersion":4,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"3","numOutputRows":"160724","numOutputBytes":"120799894"},"engineInfo":"Apache-Spark/3.5.0 Delta-Lake/3.1.0","txnId":"27fd66c8-5027-4e2e-b5e7-5611887b2f73"}
Refer to https://docs.delta.io/latest/concurrency-control.html for more details.
| at org.apache.spark.sql.delta.DeltaErrorsBase.concurrentAppendException(DeltaErrors.scala:2300)
| at org.apache.spark.sql.delta.DeltaErrorsBase.concurrentAppendException$(DeltaErrors.scala:2291)
| at org.apache.spark.sql.delta.DeltaErrors$.concurrentAppendException(DeltaErrors.scala:3203)
| at org.apache.spark.sql.delta.ConflictChecker.$anonfun$checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn$1(ConflictChecker.scala:305)
| at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
| at org.apache.spark.sql.delta.ConflictChecker.recordTime(ConflictChecker.scala:499) 
| at org.apache.spark.sql.delta.ConflictChecker.checkForAddedFilesThatShouldHaveBeenReadByCurrentTxn(ConflictChecker.scala:276)
| at org.apache.spark.sql.delta.ConflictChecker.checkConflicts(ConflictChecker.scala:142)
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.checkForConflictsAgainstVersion(OptimisticTransaction.scala:1882)
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.checkForConflictsAgainstVersion$(OptimisticTransaction.scala:1872)
| at org.apache.spark.sql.delta.OptimisticTransaction.checkForConflictsAgainstVersion(OptimisticTransaction.scala:142)
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$checkForConflicts$10(OptimisticTransaction.scala:1860) 
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$checkForConflicts$10$adapted(OptimisticTransaction.scala:1856)
| at scala.collection.Iterator.foreach(Iterator.scala:943)
| at scala.collection.Iterator.foreach$(Iterator.scala:943)
| at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
| at scala.collection.IterableLike.foreach(IterableLike.scala:74)
| at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
| at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$checkForConflicts$2(OptimisticTransaction.scala:1856)  
| at org.apache.spark.sql.delta.catalog.DeltaTableV2$.withEnrichedUnsupportedTableException(DeltaTableV2.scala:364 
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$checkForConflicts$1(OptimisticTransaction.scala:1825)  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140)
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138)
| at org.apache.spark.sql.delta.OptimisticTransaction.recordFrameProfile(OptimisticTransaction.scala:142) 
| at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133) 
| at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.checkForConflicts(OptimisticTransaction.scala:1825) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.checkForConflicts$(OptimisticTransaction.scala:1815) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.checkForConflicts(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$4(OptimisticTransaction.scala:1654) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordFrameProfile(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$3(OptimisticTransaction.scala:1652) |  
| at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:158) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$2(OptimisticTransaction.scala:1648) |  
| at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordFrameProfile(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$doCommitRetryIteratively$1(OptimisticTransaction.scala:1648) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.lockCommitIfEnabled(OptimisticTransaction.scala:1626) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively(OptimisticTransaction.scala:1642) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.doCommitRetryIteratively$(OptimisticTransaction.scala:1638) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.doCommitRetryIteratively(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.liftedTree1$1(OptimisticTransaction.scala:1128) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.$anonfun$commitImpl$1(OptimisticTransaction.scala:1056) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:140) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:138) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordFrameProfile(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:133) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128) |  
| at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:132) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:122) |  
| at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:112) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.recordDeltaOperation(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.commitImpl(OptimisticTransaction.scala:1053) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.commitImpl$(OptimisticTransaction.scala:1048) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.commitImpl(OptimisticTransaction.scala:142) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit(OptimisticTransaction.scala:1042) |  
| at org.apache.spark.sql.delta.OptimisticTransactionImpl.commit$(OptimisticTransaction.scala:1038) |  
| at org.apache.spark.sql.delta.OptimisticTransaction.commit(OptimisticTransaction.scala:142) |  
| at io.qbeast.spark.delta.DeltaMetadataWriter.$anonfun$writeWithTransaction$5(DeltaMetadataWriter.scala:198) |  
| at io.qbeast.spark.delta.DeltaMetadataWriter.$anonfun$writeWithTransaction$5$adapted(DeltaMetadataWriter.scala:172) |  
| at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:223) |  
| at io.qbeast.spark.delta.DeltaMetadataWriter.writeWithTransaction(DeltaMetadataWriter.scala:172) |  
| at io.qbeast.spark.delta.DeltaMetadataManager$.updateWithTransaction(DeltaMetadataManager.scala:43) |  
| at io.qbeast.table.IndexedTableImpl.$anonfun$optimize$2(IndexedTable.scala:511) |  
| at io.qbeast.table.IndexedTableImpl.$anonfun$optimize$2$adapted(IndexedTable.scala:501) |  
| at scala.collection.Iterator.foreach(Iterator.scala:943) |  
| at scala.collection.Iterator.foreach$(Iterator.scala:943) |  
| at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) |  
| at scala.collection.IterableLike.foreach(IterableLike.scala:74) |  
| at scala.collection.IterableLike.foreach$(IterableLike.scala:73) |   
| at scala.collection.AbstractIterable.foreach(Iterable.scala:56) |  
| at io.qbeast.table.IndexedTableImpl.optimize(IndexedTable.scala:501) |  
| at io.qbeast.table.QbeastTable.optimize(QbeastTable.scala:100) |  
| at io.qbeast.use.managed.index.LeveledCompaction.$anonfun$doOptimize$3(LeveledCompaction.scala:98) |  
| at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) |  
| at io.qbeast.use.utils.TimeUtils.time(TimeUtils.scala:10) |  
| at io.qbeast.use.utils.TimeUtils.time$(TimeUtils.scala:8) |  
| at io.qbeast.use.managed.index.LeveledCompaction.time(LeveledCompaction.scala:31) |  
| at io.qbeast.use.managed.index.LeveledCompaction.doOptimize(LeveledCompaction.scala:98) |  
| at io.qbeast.use.managed.index.ContinuousExecution.optimize(ContinuousExecution.scala:29) |  
| at io.qbeast.use.managed.index.ContinuousExecution.optimize$(ContinuousExecution.scala:26) |  
| at io.qbeast.use.managed.index.LeveledCompaction.optimize(LeveledCompaction.scala:31) |  
| at io.qbeast.use.catalog.commands.LeveledCompactionTableCommand.run(LeveledCompactionTableCommand.scala:82) |  
| at io.qbeast.use.streaming.optimization.OptimizationRunnable.runOnce(OptimizationRunnable.scala:64) |  
| at io.qbeast.use.streaming.optimization.OptimizationRunnable.run(OptimizationRunnable.scala:95) |  
| at io.qbeast.use.streaming.optimization.OptimizationThread.run(OptimizationThread.scala:23)
@JosepSampe JosepSampe added the type: bug Something isn't working label Oct 22, 2024
@JosepSampe
Copy link
Member Author

JosepSampe commented Oct 22, 2024

I've found where the ConcurrentAppendException problem is. There is a flag in Delta's AddFile called dataChange, which can be true or false.

In the new version, both Table.write() and Table.optimize() call RollupDataWriter.write(), which returns AddFile(data_change=true).
In the old version, Table.write() called RollupDataWriter.write(), which returned AddFile(data_change=true), while Table.optimize() called RollupDataWriter.optimize(), which returned AddFile(data_change=false).
Therefore, in the current code, since this flag is always set to true, the exception is triggered.
I have performed several tests, keeping the flags as before: true for write and false for optimize, and I haven't encountered the exception again.

@JosepSampe JosepSampe self-assigned this Oct 22, 2024
osopardo1 pushed a commit that referenced this issue Oct 24, 2024
* Set dataChange to false on Table.optimize()
JosepSampe added a commit that referenced this issue Oct 24, 2024
* Issue #424: Add sampling fraction option for optimization (#426)

* Add sampling fraction option for optimization and remove analyze from QbeastTable

* Issue #430: Simplify denormalized blocks creation (#431)

* Simplify Denormalized Blocks

* Issue #416: Add CDFQuantile Transformers and Transformations (#413)

* Issue 264: Update qviz for multiblock files (#437)

* Update Qbeast Visualiser (qviz) with multiblock files

---------

Co-authored-by: Jorge Marín <jorge.marin.rodenas@estudiantat.upc.edu>
Co-authored-by: Jorge Marín <100561030+jorgeMarin1@users.noreply.github.com>

* Issue #441: Fix dataChange flag in optimize (#444)

* Merge from main branch

---------

Co-authored-by: jiawei <47899566+Jiaweihu08@users.noreply.github.com>
Co-authored-by: Paola Pardo <paolapardoat@gmail.com>
Co-authored-by: Jorge Marín <jorge.marin.rodenas@estudiantat.upc.edu>
Co-authored-by: Jorge Marín <100561030+jorgeMarin1@users.noreply.github.com>
@fpj fpj closed this as completed Oct 24, 2024
JosepSampe added a commit to JosepSampe/qbeast-spark that referenced this issue Oct 24, 2024
* Set dataChange to false on Table.optimize()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type: bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants