Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix file format checks to be exact and handle Delta Lake column mapping [databricks] #9279

Merged
merged 5 commits into from
Sep 22, 2023

Conversation

jlowe
Copy link
Contributor

@jlowe jlowe commented Sep 20, 2023

Fixes #9255.

This fixes the following problems:

  • Format checks were allowing derived classes of the CPU class to be considered supported
  • DeltaParquetFileFormat semantics were being completely ignored (i.e.; column mapping and deletion vectors)

File format checks were updated to check for specific CPU classes so we don't accidentally think we know how to replace the semantics when a derived class that alters the behavior is used instead.

The interface for Delta Lake providers was updated to handle read interfaces, and ExternalSource was updated accordingly.

Signed-off-by: Jason Lowe <jlowe@nvidia.com>
@jlowe jlowe self-assigned this Sep 20, 2023
@jlowe
Copy link
Contributor Author

jlowe commented Sep 20, 2023

build

Copy link
Collaborator

@revans2 revans2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good just one question, and there are some test failures so I am not going to approve it yet.

@@ -41,4 +49,31 @@ object Delta20xProvider extends DeltaIOProvider {
.disabledByDefault("Delta Lake update support is experimental")
).map(r => (r.getClassFor.asSubclass(classOf[RunnableCommand]), r)).toMap
}

override def isSupportedFormat(format: Class[_ <: FileFormat]): Boolean = {
format == classOf[DeltaParquetFileFormat] || format == classOf[GpuDelta20xParquetFileFormat]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When would we ever have replaced the file format and be tagging the plan again? I understand if we are just being cautious, but with the other checks for an exact class it feels like we would have made some horrible franken mix of CPU exec and GPU FileFormat to hit this. That or I don't really understand the context that this is called in. If that is the case, then we need some better docs for the method definition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This checks for the GPU version of the format because isSupportedFormat is how ExternalSource figures out which source, of many, is supposed to handle a particular call. For example, one of the interfaces is createMultiFileReaderFactory and that happens after we've already converted to a GPU format. I'll add some comments to the various isSupported methods of the provider interfaces to make this clearer.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay could you add some docs to the isSupportedFormat declaration about who could call this and what is expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided this API is inherently confusing and fragile given it's looking at non-CPU formats. I ended up solving this via #9283 and will update this PR to be based on that. I'll also update the isSupportedFormat calls to check for the explicit CPU class we're expecting.

@jlowe jlowe marked this pull request as draft September 21, 2023 20:34
@jlowe
Copy link
Contributor Author

jlowe commented Sep 21, 2023

Putting this in draft until it's been upmerged with #9283.

@LIN-Yu-Ting
Copy link

@jlowe. Very interesting development. I saw that

  1. you modified GpuFileSourceScanExec.scala to handle DeltaParquetFileFormat with ExternalSource.scala.
  2. DeltaProvider inside ExternalSource, implemented by DeltaProviderImpl.scala will execute function createMultiFileReaderFactory.
  3. Inside, you will run createMultiFileReaderFactory based on delta table version and finally all of them call GpuDeltaParquetFileFormat to return ReaderFactory.

At the end, you prepare metadata such as StructType for DeltaTable with columnMappingMode.

    GpuParquetMultiFilePartitionReaderFactory(
      fileScan.conf,
      broadcastedConf,
      prepareSchema(fileScan.relation.dataSchema),
      prepareSchema(fileScan.requiredSchema),
      prepareSchema(fileScan.readPartitionSchema),
      pushedFilters,
      fileScan.rapidsConf,
      fileScan.allMetrics,
      fileScan.queryUsesInputFile,
      fileScan.alluxioPathsMap)

which is different from original return value inside GpuFileSourceScanExec.

      GpuParquetMultiFilePartitionReaderFactory(
        sqlConf,
        broadcastedHadoopConf,
        relation.dataSchema,
        requiredSchema,
        readPartitionSchema,
        pushedDownFilters.toArray,
        rapidsConf,
        allMetrics,
        queryUsesInputFile,
        alluxioPathReplacementMap)

@jlowe
Copy link
Contributor Author

jlowe commented Sep 22, 2023

you prepare metadata such as StructType for DeltaTable with columnMappingMode

Yes, this reflects the behavior implemented by DeltaParquetFileFormat, the format used by the CPU when reading Delta Lake tables. See https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala#L128-L136

@jlowe jlowe marked this pull request as ready for review September 22, 2023 15:23
@jlowe
Copy link
Contributor Author

jlowe commented Sep 22, 2023

build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[BUG] Unable to read DeltaTable with columnMapping.mode = name
4 participants