Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43039][SQL] Support custom fields in the file source _metadata column. #40677

Conversation

ryan-johnson-databricks
Copy link
Contributor

What changes were proposed in this pull request?

Allow FileFormat instances to define the schema of the _metadata column they expose.

Why are the changes needed?

Today, the schema of the file source _metadata column depends on the file format (e.g. parquet file format supports _metadata.row_index) but this is hard-wired into the FileFormat itself. Not only is this an ugly design, it also prevents custom file formats from adding their own fields to the _metadata column.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New unit tests.

@@ -264,9 +261,11 @@ object FileFormat {
fileSize: Long,
fileBlockStart: Long,
fileBlockLength: Long,
fileModificationTime: Long): InternalRow = {
fileModificationTime: Long,
otherConstantMetadataColumnValues: Map[String, Any]): InternalRow = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is otherConstantMetadataColumnValues generated? FileFormat doesn't have a API for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the unit tests -- the FileIndex.listFiles is responsible to provide it as part of the PartitionDirectory it creates for each file.

@HyukjinKwon HyukjinKwon changed the title [SPARK-43039] Support custom fields in the file source _metadata column. [SPARK-43039][SQL] Support custom fields in the file source _metadata column. Apr 10, 2023
case _: LongType | _: IntegerType | _: ShortType | _: ByteType => true
case _: DoubleType | _: FloatType => true
case _: StringType => true
case _: TimestampType => true // really just Long
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add DateType (int), DayTimeIntervalType (long) and YearMonthIntervalType (int)`?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed by using Literal, PhysicalType, and ColumnVectorUtils.populate, which also has the nice side effect of simplifying the code.

*/
def createFileMetadataCol(): AttributeReference = {
// Strip out the fields' metadata to avoid exposing it to the user. [[FileSourceStrategy]]
// avoids confusion by mapping back to [[metadataSchemaFields]].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit .. but in these regular comments, we could just use backticks. [[...]] is the syntax for Scaladoc (not for the comments).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally find the brackets more readable (and my editor likes them better than backticks as well).
Is there a rule against using them in normal comments?

// Other metadata columns use the file-provided value (if any). As a courtesy, convert any
// normal strings to the required [[UTF8String]].
//
// TODO(frj): Do we need to potentially support value-producing functions?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we file a JIRA and fix it like TODO(SPARK-XXXXX) instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing for now, because the TODO doesn't add any meaningful information.
If/when the need arises, it will be obvious enough that this code needs to change.

* custom file-constant metadata columns, but in general tasks and readers can use the per-file
* metadata however they see fit.
*/
case class FileStatusWithMetadata(fileStatus: FileStatus, metadata: Map[String, Any] = Map.empty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's think more about the API design. I think it's too fragile to use Any in the API, without a well-defined rule for what the actually allowed values are.

I'd suggest using Map[String, Literal]. Then we can remove def isSupportedType as all types can be supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my TODO above... we may need to consider supporting value-producing functions, to allow full pruning in cases where the value is somehow expensive to compute. Requiring Literal would block that (and AFAIK only Any could capture both Literal and () => Literal).

The FILE_PATH case that calls Path.toString, and the call sites of PartitionedFile is a small example of that possibility that got me thinking -- what if instead of passing length, path, etc as arguments, we just passed the actual file status, and used the extractors on it? Probably doesn't make sense to actually do that for the hard-wired cases, tho.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like the idea of supporting Literal as one of the supported cases -- it simplifies the type checking a bit, in that the "supported" primitive types are merely those for which the implementation will automatically create the Literal wrapper as a courtesy (similar to string vs. UTF8String).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I remember now another reason why I had added isSupportedDataType -- ConstantColumnVector (needed by FileScanRDD...createMetadataColumnVector below) supports a limited subset of types, and relies on type-specific getters and setters. Even if I wrote the (complex recursive) code to handle structs, maps, and arrays... we still wouldn't have complete coverage for all types.

Do we know for certain that ConstantColumnVector supports all types that can ever be encountered during vectorized execution? If not, we must keep the isSupportedDataType method I introduced, regardless of whether we choose to add support for metadata fields with complex types in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: ConstantColumnVector looks like an incompletely implemented API... it "supports" array/map/struct on the surface (e.g. ConstantColumnVectorSuite has superficial tests for it), but e.g. ColumnVectorUtils.populate doesn't actually handle them and ColumnVectorUtilsSuite.scala has negative tests to verify that they cannot be used in practice.

As far as I can tell, the class really only supports data types that can be used as partition columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc comment here to explain that file-source metadata fields is only one possible usage for the extra file metadata (which is conceptually at a deeper layer than catalyst and Literal).

Also updated isSupportedType doc comment to explain why not all types are supported.

Relevant implementation details:

  1. It would take a lot of work to support all data types, regardless of whether we use Literal vs. Any.
  2. We anyway end up wrapping the provided value in a call to Literal(_), because doing so simplifies null handling by making null-because-missing equivalent to null-because-null. At that point, we get wrapping of primitive values "for free" if we happen to pass Any instead.

case PhysicalNullType => true
case PhysicalBooleanType => true
case PhysicalByteType | PhysicalShortType | PhysicalIntegerType | PhysicalLongType => true
case PhysicalFloatType | PhysicalDoubleType => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: case _: PhysicalPrimitiveType => true

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... it's currently true that ColumnVectorUtils.populate supports all physical primitive types, but somebody neglected to make the latter a sealed trait. I'll fix that and simplify the match here.

* NOTE: It is not possible to change the semantics of the base metadata fields by overriding this
* method. Technically, a file format could choose suppress them, but that is not recommended.
*/
def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should have 2 APIs:

def constantMetadataClolumns: Seq[StructField]
def generatedMetadataColumns: Seq[StructField]

Then Spark can add metadata fields which means less work for the implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, how can a file source define custom constant metadata columns? The file listing logic is shared for all file sources and I can't think of a way to customize it for certain file sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs a custom FileIndex to go with the FileFormat (see the unit test for an example).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. How about my first comment? Or do we expect the implementations to properly separate constant and generated metadata columns by using those util objects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate what we gain by splitting out two lists? For generated columns, in particular, we must use the helper object because the user should specify the physical column name to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to make it easier for third-party file sources to implement the new functions. The fewer internal details we expose through API, the more API stability we have.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we don't have a choice here. The implementation needs to specify the physical column name, and we must expose these details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least the surface is pretty minimal (nobody needs to know the specific metadata tags that get used): Instead of saying

StructField(name, dataType, nullable)

they pick one of:

FileSourceConstantMetadataStructField(name, dataType, nullable)
FileSourceGeneratedMetadataStructField(name, internalName, dataType, nullable)

@cloud-fan
Copy link
Contributor

thanks, merging to master!

tdas pushed a commit to delta-io/delta that referenced this pull request Apr 23, 2024
…sting DV Information (#2888)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

<!--
- Describe what this PR changes.
- Describe why we need the change.
 
If this PR resolves an issue be sure to include "Resolves #XXX" to
correctly link and close the issue upon merge.
-->
Back then, we relied on an [expensive Broadcast of DV
files](#1542) to pass the DV files
to the associated Parquet Files. With the introduction of [adding custom
metadata to files](apache/spark#40677)
introduced in Spark 3.5, we can now pass the DV through the custom
metadata field, this is expected to improve the performance of DV reads
in Delta.
## How was this patch tested?

<!--
If tests were added, say they were added here. Please make sure to test
the changes thoroughly including negative and positive cases if
possible.
If the changes were tested in any way other than unit tests, please
clarify how you tested step by step (ideally copy and paste-able, so
that other reviewers can test and check, and descendants can verify in
the future).
If the changes were not tested, please explain why.
-->
Adjusted the existing UTs that cover our changes.
## Does this PR introduce _any_ user-facing changes?
No.
<!--
If yes, please clarify the previous behavior and the change this PR
proposes - provide the console output, description and/or an example to
show the behavior difference if possible.
If possible, please also clarify if this is a user-facing change
compared to the released Delta Lake versions or within the unreleased
branches such as master.
If no, write 'No'.
-->
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants