-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: Dropping partition column from old partition table corrupts entire table #10234
Comments
Which Spark version are you using? |
I was originally trying with Spark 3.3.0 and iceberg 1.2.1 version. Later I tried with Spark-iceberg docker images |
I don't think drop column should be allowed when you still have data with the column as partition. |
Once partition column is dropped/ replaced from the partition spec, it should be similar to DROP COLUMN. Is that a right assumption? |
@EXPEbdodla To clarify, are you dropping a column that's part of the current partition spec? I did some work to fix this on previous partition specs: #5399 |
Thanks for sharing the steps on how to reproduce this. I think the problem here is that it is not the current spec, but it is still in use. This looks like a bug indeed and should be fixed. |
The issue is we don't persist the partition spec along with its schema in |
I was able to reproduce this with this short example and I agree that this is a bug that we need to fix:
|
Hi, may I ask the repair method is to traverse all schemas |
According to #10352 , I assume it traverses all manifests to find if any manifest file's partitionSpec matches the historical particionSpec whose referenced source column is going to be dropped. If there is a match, it will throw an exception to prevent dropping the column which is referenced by the active partitionSpec. Though the PR hasn't been merged, i think it's a good repair approach. |
Thank you very much! |
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible. |
Apache Iceberg version
1.5.1 (latest release)
Query engine
Spark
Please describe the bug 🐞
Problem: Dropping an old partition spec column corrupts table. Metadata queries failing, Select fails and dataframe appends failing. Verified this on 1.2.1, 1.4.3 and 1.5.1 iceberg versions
What to expect:
How to reproduce:
Try_Iceberg_partition_column_delete.md
Exported notebook as MD file and uploading it
StackTrace:
SELECT:
Caused by: java.lang.NullPointerException: Type cannot be null at org.apache.iceberg.relocated.com.google.common.base.Preconditions.checkNotNull(Preconditions.java:907) at org.apache.iceberg.types.Types$NestedField.<init>(Types.java:447) at org.apache.iceberg.types.Types$NestedField.optional(Types.java:416) at org.apache.iceberg.PartitionSpec.partitionType(PartitionSpec.java:132) at org.apache.iceberg.Partitioning.buildPartitionProjectionType(Partitioning.java:286) at org.apache.iceberg.Partitioning.partitionType(Partitioning.java:254) at org.apache.iceberg.spark.source.SparkTable.metadataColumns(SparkTable.java:251) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.metadataOutput$lzycompute(DataSourceV2Relation.scala:61) at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation.metadataOutput(DataSourceV2Relation.scala:51) at org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias.metadataOutput(basicLogicalOperators.scala:1339) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$3(Analyzer.scala:966) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$3$adapted(Analyzer.scala:966) at scala.collection.Iterator.exists(Iterator.scala:969) at scala.collection.Iterator.exists$(Iterator.scala:967) at scala.collection.AbstractIterator.exists(Iterator.scala:1431) at scala.collection.IterableLike.exists(IterableLike.scala:79) at scala.collection.IterableLike.exists$(IterableLike.scala:78) at scala.collection.AbstractIterable.exists(Iterable.scala:56) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$2(Analyzer.scala:966) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$2$adapted(Analyzer.scala:961) at org.apache.spark.sql.catalyst.trees.TreeNode.exists(TreeNode.scala:346) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$exists$1(TreeNode.scala:349) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$exists$1$adapted(TreeNode.scala:349) at scala.collection.Iterator.exists(Iterator.scala:969) at scala.collection.Iterator.exists$(Iterator.scala:967) at scala.collection.AbstractIterator.exists(Iterator.scala:1431) at scala.collection.IterableLike.exists(IterableLike.scala:79) at scala.collection.IterableLike.exists$(IterableLike.scala:78) at scala.collection.AbstractIterable.exists(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.exists(TreeNode.scala:349) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$exists$1(TreeNode.scala:349) at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$exists$1$adapted(TreeNode.scala:349) at scala.collection.Iterator.exists(Iterator.scala:969) at scala.collection.Iterator.exists$(Iterator.scala:967) at scala.collection.AbstractIterator.exists(Iterator.scala:1431) at scala.collection.IterableLike.exists(IterableLike.scala:79) at scala.collection.IterableLike.exists$(IterableLike.scala:78) at scala.collection.AbstractIterable.exists(Iterable.scala:56) at org.apache.spark.sql.catalyst.trees.TreeNode.exists(TreeNode.scala:349) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$1(Analyzer.scala:961) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$1$adapted(Analyzer.scala:961) at scala.collection.LinearSeqOptimized.exists(LinearSeqOptimized.scala:95) at scala.collection.LinearSeqOptimized.exists$(LinearSeqOptimized.scala:92) at scala.collection.immutable.Stream.exists(Stream.scala:204) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.org$apache$spark$sql$catalyst$analysis$Analyzer$AddMetadataColumns$$hasMetadataCol(Analyzer.scala:961) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$$anonfun$apply$12.applyOrElse(Analyzer.scala:931) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$$anonfun$apply$12.applyOrElse(Analyzer.scala:928) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$2(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsDownWithPruning$1(AnalysisHelper.scala:170) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning(AnalysisHelper.scala:168) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsDownWithPruning$(AnalysisHelper.scala:164) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsDownWithPruning(LogicalPlan.scala:30) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.apply(Analyzer.scala:928) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.apply(Analyzer.scala:924) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:211) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:91) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:208) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:200) at scala.collection.immutable.List.foreach(List.scala:431) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:200) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:231) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:227) at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:227) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:179) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:212) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:211) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:185) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510) ... 25 more
For Spark Dataframe Append:
org.apache.iceberg.exceptions.ValidationException: Cannot find source column for partition field: 1000: event_date: identity(6) at org.apache.iceberg.exceptions.ValidationException.check(ValidationException.java:49) at org.apache.iceberg.PartitionSpec.checkCompatibility(PartitionSpec.java:587) at org.apache.iceberg.PartitionSpec$Builder.build(PartitionSpec.java:568) at org.apache.iceberg.UnboundPartitionSpec.bind(UnboundPartitionSpec.java:46) at org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:71) at org.apache.iceberg.PartitionSpecParser.lambda$fromJson$1(PartitionSpecParser.java:88) at org.apache.iceberg.util.JsonUtil.parse(JsonUtil.java:95) at org.apache.iceberg.PartitionSpecParser.lambda$fromJson$2(PartitionSpecParser.java:88) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406) at java.base/java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1908) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108) at org.apache.iceberg.shaded.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62) at org.apache.iceberg.PartitionSpecParser.fromJson(PartitionSpecParser.java:86) at org.apache.iceberg.SerializableTable.lambda$specs$1(SerializableTable.java:192) at java.base/java.util.HashMap.forEach(HashMap.java:1337) at org.apache.iceberg.SerializableTable.specs(SerializableTable.java:190) at org.apache.iceberg.SerializableTable.spec(SerializableTable.java:180) at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:638) at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:632) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:430) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:829)
The text was updated successfully, but these errors were encountered: