Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-43039][SQL] Support custom fields in the file source _metadata column. #40677

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.plans.logical.EventTimeWatermark
import org.apache.spark.sql.catalyst.trees.TreePattern
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, METADATA_COL_ATTR_KEY}
import org.apache.spark.sql.types._
import org.apache.spark.util.collection.BitSet
Expand Down Expand Up @@ -554,6 +555,27 @@ object FileSourceMetadataAttribute {
metadata.getBoolean(FILE_SOURCE_METADATA_COL_ATTR_KEY)
}

/**
* True if the given data type is supported in file source metadata attributes.
*
* The set of supported types is limited by [[ColumnVectorUtils.populate]], which the constant
* file metadata implementation relies on. In general, types that can be partition columns are
* supported (including most primitive types). Notably unsupported types include [[ObjectType]],
* [[UserDefinedType]], and the complex types ([[StructType]], [[MapType]], [[ArrayType]]).
*/
def isSupportedType(dataType: DataType): Boolean = dataType.physicalDataType match {
// PhysicalPrimitiveType covers: Boolean, Byte, Double, Float, Integer, Long, Null, Short
case _: PhysicalPrimitiveType | _: PhysicalDecimalType => true
case PhysicalBinaryType | PhysicalStringType | PhysicalCalendarIntervalType => true
case _ => false
}

/** Returns the type unchanged if valid; otherwise throws [[IllegalArgumentException]]. */
def validateType(dataType: DataType): DataType = {
require(isSupportedType(dataType), s"Unsupported data type: $dataType")
dataType
}

private def removeInternalMetadata(metadata: Metadata) = new MetadataBuilder()
.withMetadata(metadata)
.remove(METADATA_COL_ATTR_KEY)
Expand All @@ -574,17 +596,18 @@ object FileSourceConstantMetadataStructField {

/** Constructs a new metadata struct field of the given type; nullable by default */
def apply(name: String, dataType: DataType, nullable: Boolean = true): StructField =
StructField(name, dataType, nullable, metadata(name))
StructField(name, FileSourceMetadataAttribute.validateType(dataType), nullable, metadata(name))

def unapply(field: StructField): Option[StructField] =
if (isValid(field.metadata)) Some(field) else None
if (isValid(field.dataType, field.metadata)) Some(field) else None

def metadata(name: String): Metadata = new MetadataBuilder()
.withMetadata(FileSourceMetadataAttribute.metadata(name))
.putBoolean(FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY, value = true)
.build()

def isValid(metadata: Metadata): Boolean = {
def isValid(dataType: DataType, metadata: Metadata): Boolean = {
FileSourceMetadataAttribute.isSupportedType(dataType) &&
FileSourceMetadataAttribute.isValid(metadata) &&
metadata.contains(FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY) &&
metadata.getBoolean(FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY)
Expand All @@ -597,8 +620,10 @@ object FileSourceConstantMetadataStructField {
* usually appended to the output and not generated per row.
*/
object FileSourceConstantMetadataAttribute {
def unapply(attr: AttributeReference): Option[AttributeReference] =
if (FileSourceConstantMetadataStructField.isValid(attr.metadata)) Some(attr) else None
def unapply(attr: AttributeReference): Option[AttributeReference] = {
val valid = FileSourceConstantMetadataStructField.isValid(attr.dataType, attr.metadata)
if (valid) Some(attr) else None
}
}

/**
Expand Down Expand Up @@ -626,24 +651,27 @@ object FileSourceGeneratedMetadataStructField {
name: String,
internalName: String,
dataType: DataType,
nullable: Boolean = true): StructField =
StructField(name, dataType, nullable, metadata(name, internalName))
nullable: Boolean = true): StructField = {
val dt = FileSourceMetadataAttribute.validateType(dataType)
StructField(name, dt, nullable, metadata(name, internalName))
}

def unapply(field: StructField): Option[(StructField, String)] =
getInternalNameIfValid(field.metadata).map(field -> _)
getInternalNameIfValid(field.dataType, field.metadata).map(field -> _)

def metadata(name: String, internalName: String): Metadata = new MetadataBuilder()
.withMetadata(FileSourceMetadataAttribute.metadata(name))
.putString(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY, internalName)
.build()

def isValid(metadata: Metadata): Boolean = {
def isValid(dataType: DataType, metadata: Metadata): Boolean = {
FileSourceMetadataAttribute.isSupportedType(dataType) &&
FileSourceMetadataAttribute.isValid(metadata) &&
metadata.contains(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY)
}

def getInternalNameIfValid(metadata: Metadata): Option[String] = {
if (isValid(metadata)) {
def getInternalNameIfValid(dataType: DataType, metadata: Metadata): Option[String] = {
if (isValid(dataType, metadata)) {
Some(metadata.getString(FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY))
} else None
}
Expand All @@ -654,6 +682,9 @@ object FileSourceGeneratedMetadataStructField {
* the `metadata` struct that maps to some internal column the scanner returns.
*/
object FileSourceGeneratedMetadataAttribute {
def unapply(attr: AttributeReference): Option[(AttributeReference, String)] =
FileSourceGeneratedMetadataStructField.getInternalNameIfValid(attr.metadata).map(attr -> _)
def unapply(attr: AttributeReference): Option[(AttributeReference, String)] = {
FileSourceGeneratedMetadataStructField
.getInternalNameIfValid(attr.dataType, attr.metadata)
.map(attr -> _)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object PhysicalDataType {
def ordering(dt: DataType): Ordering[Any] = apply(dt).ordering.asInstanceOf[Ordering[Any]]
}

trait PhysicalPrimitiveType
sealed trait PhysicalPrimitiveType

class PhysicalBinaryType() extends PhysicalDataType {
private[sql] val ordering =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition
import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource}
import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat => ParquetSource, ParquetRowIndexUtil}
import org.apache.spark.sql.execution.datasources.v2.PushedDownOperators
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.execution.vectorized.ConstantColumnVector
Expand Down Expand Up @@ -703,7 +703,8 @@ case class FileSourceScanExec(
relation.sparkSession, relation.options, filePath) &&
// SPARK-39634: Allow file splitting in combination with row index generation once
// the fix for PARQUET-2161 is available.
!RowIndexUtil.isNeededForSchema(requiredSchema)
(!relation.fileFormat.isInstanceOf[ParquetSource]
|| !ParquetRowIndexUtil.isNeededForSchema(requiredSchema))
PartitionedFileUtil.splitFiles(
sparkSession = relation.sparkSession,
file = file,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.execution.datasources._
object PartitionedFileUtil {
def splitFiles(
sparkSession: SparkSession,
file: FileStatus,
file: FileStatusWithMetadata,
filePath: Path,
isSplitable: Boolean,
maxSplitBytes: Long,
Expand All @@ -36,28 +36,29 @@ object PartitionedFileUtil {
(0L until file.getLen by maxSplitBytes).map { offset =>
val remaining = file.getLen - offset
val size = if (remaining > maxSplitBytes) maxSplitBytes else remaining
val hosts = getBlockHosts(getBlockLocations(file), offset, size)
val hosts = getBlockHosts(getBlockLocations(file.fileStatus), offset, size)
PartitionedFile(partitionValues, SparkPath.fromPath(filePath), offset, size, hosts,
file.getModificationTime, file.getLen)
file.getModificationTime, file.getLen, file.metadata)
}
} else {
Seq(getPartitionedFile(file, filePath, partitionValues))
}
}

def getPartitionedFile(
file: FileStatus,
file: FileStatusWithMetadata,
filePath: Path,
partitionValues: InternalRow): PartitionedFile = {
val hosts = getBlockHosts(getBlockLocations(file), 0, file.getLen)
val hosts = getBlockHosts(getBlockLocations(file.fileStatus), 0, file.getLen)
PartitionedFile(partitionValues, SparkPath.fromPath(filePath), 0, file.getLen, hosts,
file.getModificationTime, file.getLen)
file.getModificationTime, file.getLen, file.metadata)
}

private def getBlockLocations(file: FileStatus): Array[BlockLocation] = file match {
case f: LocatedFileStatus => f.getBlockLocations
case f => Array.empty[BlockLocation]
}

// Given locations of all blocks of a single file, `blockLocations`, and an `(offset, length)`
// pair that represents a segment of the same file, find out the block that contains the largest
// fraction the segment, and returns location hosts of that block. If no such block can be found,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.{DataType, LongType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String


Expand Down Expand Up @@ -165,6 +164,17 @@ trait FileFormat {
}
}

/**
* Create a file metadata struct column containing fields supported by the given file format.
*/
def createFileMetadataCol(): AttributeReference = {
cloud-fan marked this conversation as resolved.
Show resolved Hide resolved
// Strip out the fields' metadata to avoid exposing it to the user. [[FileSourceStrategy]]
// avoids confusion by mapping back to [[metadataSchemaFields]].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit .. but in these regular comments, we could just use backticks. [[...]] is the syntax for Scaladoc (not for the comments).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I personally find the brackets more readable (and my editor likes them better than backticks as well).
Is there a rule against using them in normal comments?

val fields = metadataSchemaFields
.map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation)
FileSourceMetadataAttribute(FileFormat.METADATA_NAME, StructType(fields), nullable = false)
}

/**
* Returns whether this format supports the given [[DataType]] in read/write path.
* By default all data types are supported.
Expand All @@ -176,6 +186,23 @@ trait FileFormat {
* By default all field name is supported.
*/
def supportFieldName(name: String): Boolean = true

/**
* All fields the file format's _metadata struct defines.
*
* Each field's metadata should define [[METADATA_COL_ATTR_KEY]],
* [[FILE_SOURCE_METADATA_COL_ATTR_KEY]], and either
* [[FILE_SOURCE_CONSTANT_METADATA_COL_ATTR_KEY]] or
* [[FILE_SOURCE_GENERATED_METADATA_COL_ATTR_KEY]] as appropriate.
*
* Constant attributes will be extracted automatically from
* [[PartitionedFile.extraConstantMetadataColumnValues]], while generated metadata columns always
* map to some hidden/internal column the underslying reader provides.
*
* NOTE: It is not possible to change the semantics of the base metadata fields by overriding this
* method. Technically, a file format could choose suppress them, but that is not recommended.
*/
def metadataSchemaFields: Seq[StructField] = FileFormat.BASE_METADATA_FIELDS
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should have 2 APIs:

def constantMetadataClolumns: Seq[StructField]
def generatedMetadataColumns: Seq[StructField]

Then Spark can add metadata fields which means less work for the implementations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about it more, how can a file source define custom constant metadata columns? The file listing logic is shared for all file sources and I can't think of a way to customize it for certain file sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It needs a custom FileIndex to go with the FileFormat (see the unit test for an example).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it. How about my first comment? Or do we expect the implementations to properly separate constant and generated metadata columns by using those util objects?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate what we gain by splitting out two lists? For generated columns, in particular, we must use the helper object because the user should specify the physical column name to use.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to make it easier for third-party file sources to implement the new functions. The fewer internal details we expose through API, the more API stability we have.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but we don't have a choice here. The implementation needs to specify the physical column name, and we must expose these details.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least the surface is pretty minimal (nobody needs to know the specific metadata tags that get used): Instead of saying

StructField(name, dataType, nullable)

they pick one of:

FileSourceConstantMetadataStructField(name, dataType, nullable)
FileSourceGeneratedMetadataStructField(name, internalName, dataType, nullable)

}

object FileFormat {
Expand All @@ -192,15 +219,6 @@ object FileFormat {

val FILE_MODIFICATION_TIME = "file_modification_time"

val ROW_INDEX = "row_index"

// A name for a temporary column that holds row indexes computed by the file format reader
// until they can be placed in the _metadata struct.
val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"

val ROW_INDEX_FIELD = FileSourceGeneratedMetadataStructField(
ROW_INDEX, ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType, nullable = false)

val METADATA_NAME = "_metadata"

/**
Expand All @@ -223,27 +241,6 @@ object FileFormat {
FileSourceConstantMetadataStructField(FILE_BLOCK_LENGTH, LongType, nullable = false),
FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false))

/**
* Supported metadata fields of the given [[FileFormat]].
*/
def metadataSchemaFields(fileFormat: FileFormat): Seq[StructField] = fileFormat match {
case _: ParquetFileFormat =>
BASE_METADATA_FIELDS :+ ROW_INDEX_FIELD
case _ =>
BASE_METADATA_FIELDS
}

/**
* Create a file metadata struct column containing fields supported by the given [[FileFormat]].
*/
def createFileMetadataCol(fileFormat: FileFormat): AttributeReference = {
// Strip out the fields' metadata to avoid exposing it to the user. [[FileSourceStrategy]]
// avoids confusion by mapping back to [[metadataSchemaFields]].
val fields = metadataSchemaFields(fileFormat)
.map(FileSourceMetadataAttribute.cleanupFileSourceMetadataInformation)
FileSourceMetadataAttribute(FileFormat.METADATA_NAME, StructType(fields))
}

// create an internal row given required metadata fields and file information
def createMetadataInternalRow(
fieldNames: Seq[String],
Expand All @@ -253,7 +250,7 @@ object FileFormat {
// We are not aware of `FILE_BLOCK_START` and `FILE_BLOCK_LENGTH` before splitting files
assert(!fieldNames.contains(FILE_BLOCK_START) && !fieldNames.contains(FILE_BLOCK_LENGTH))
updateMetadataInternalRow(new GenericInternalRow(fieldNames.length), fieldNames,
filePath, fileSize, 0L, fileSize, fileModificationTime)
filePath, fileSize, 0L, fileSize, fileModificationTime, Map.empty)
}

// update an internal row given required metadata fields and file information
Expand All @@ -264,9 +261,11 @@ object FileFormat {
fileSize: Long,
fileBlockStart: Long,
fileBlockLength: Long,
fileModificationTime: Long): InternalRow = {
fileModificationTime: Long,
otherConstantMetadataColumnValues: Map[String, Any]): InternalRow = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is otherConstantMetadataColumnValues generated? FileFormat doesn't have a API for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the unit tests -- the FileIndex.listFiles is responsible to provide it as part of the PartitionDirectory it creates for each file.

fieldNames.zipWithIndex.foreach { case (name, i) =>
name match {
// NOTE: The base metadata fields are hard-wired here and cannot be overridden.
case FILE_PATH => row.update(i, UTF8String.fromString(filePath.toString))
case FILE_NAME => row.update(i, UTF8String.fromString(filePath.getName))
case FILE_SIZE => row.update(i, fileSize)
Expand All @@ -276,10 +275,13 @@ object FileFormat {
// the modificationTime from the file is in millisecond,
// while internally, the TimestampType `file_modification_time` is stored in microsecond
row.update(i, fileModificationTime * 1000L)
case ROW_INDEX =>
// Do nothing. Only the metadata fields that have identical values for each row of the
// file are set by this function, while fields that have different values (such as row
// index) are set separately.
case other =>
// Other metadata columns use the file-provided value (if any). Automatically convert raw
// values (including nulls) to literals as a courtesy.
Literal(otherConstantMetadataColumnValues.get(other).orNull) match {
case Literal(null, _) => row.setNullAt(i)
case literal => row.update(i, literal.value)
}
}
}
row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,30 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.StructType

/**
* A file status augmented with optional metadata, which tasks and file readers can use however they
* see fit. For example, a custom [[FileIndex]] and [[FileFormat]] working together could expose
* this extra metadata as file-constant fields of the file source metadata column.
*/
case class FileStatusWithMetadata(fileStatus: FileStatus, metadata: Map[String, Any] = Map.empty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's think more about the API design. I think it's too fragile to use Any in the API, without a well-defined rule for what the actually allowed values are.

I'd suggest using Map[String, Literal]. Then we can remove def isSupportedType as all types can be supported.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See my TODO above... we may need to consider supporting value-producing functions, to allow full pruning in cases where the value is somehow expensive to compute. Requiring Literal would block that (and AFAIK only Any could capture both Literal and () => Literal).

The FILE_PATH case that calls Path.toString, and the call sites of PartitionedFile is a small example of that possibility that got me thinking -- what if instead of passing length, path, etc as arguments, we just passed the actual file status, and used the extractors on it? Probably doesn't make sense to actually do that for the hard-wired cases, tho.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do like the idea of supporting Literal as one of the supported cases -- it simplifies the type checking a bit, in that the "supported" primitive types are merely those for which the implementation will automatically create the Literal wrapper as a courtesy (similar to string vs. UTF8String).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I remember now another reason why I had added isSupportedDataType -- ConstantColumnVector (needed by FileScanRDD...createMetadataColumnVector below) supports a limited subset of types, and relies on type-specific getters and setters. Even if I wrote the (complex recursive) code to handle structs, maps, and arrays... we still wouldn't have complete coverage for all types.

Do we know for certain that ConstantColumnVector supports all types that can ever be encountered during vectorized execution? If not, we must keep the isSupportedDataType method I introduced, regardless of whether we choose to add support for metadata fields with complex types in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: ConstantColumnVector looks like an incompletely implemented API... it "supports" array/map/struct on the surface (e.g. ConstantColumnVectorSuite has superficial tests for it), but e.g. ColumnVectorUtils.populate doesn't actually handle them and ColumnVectorUtilsSuite.scala has negative tests to verify that they cannot be used in practice.

As far as I can tell, the class really only supports data types that can be used as partition columns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the doc comment here to explain that file-source metadata fields is only one possible usage for the extra file metadata (which is conceptually at a deeper layer than catalyst and Literal).

Also updated isSupportedType doc comment to explain why not all types are supported.

Relevant implementation details:

  1. It would take a lot of work to support all data types, regardless of whether we use Literal vs. Any.
  2. We anyway end up wrapping the provided value in a call to Literal(_), because doing so simplifies null handling by making null-because-missing equivalent to null-because-null. At that point, we get wrapping of primitive values "for free" if we happen to pass Any instead.

// Wrapper methods to improve source compatibility in code that still expects a [[FileStatus]].
def getPath: Path = fileStatus.getPath
def getLen: Long = fileStatus.getLen
def getModificationTime: Long = fileStatus.getModificationTime
def isDirectory: Boolean = fileStatus.isDirectory
}

/**
* A collection of data files from a partitioned relation, along with the partition values in the
* form of an [[InternalRow]].
*/
case class PartitionDirectory(values: InternalRow, files: Seq[FileStatus])
case class PartitionDirectory(values: InternalRow, files: Seq[FileStatusWithMetadata])

object PartitionDirectory {
// For backward compat with code that does not know about extra file metadata
def apply(values: InternalRow, files: Array[FileStatus]): PartitionDirectory =
PartitionDirectory(values, files.map(FileStatusWithMetadata(_)))
}

/**
* An interface for objects capable of enumerating the root paths of a relation as well as the
Expand Down
Loading