Skip to content

Commit

Permalink
Add page content field for page level (#1)
Browse files Browse the repository at this point in the history
This PR adds an option to enable page content when viewing individual pages. When you specify `pageContent = true` in DataFrame options, page level metadata will populate `page_content` column. 

Make sure to only use it when inspecting individual pages otherwise it might affect performance.
  • Loading branch information
sadikovi authored Apr 2, 2021
1 parent 92d54b3 commit 234fc54
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 21 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ Supported datasource options:
|------|-------------|---------|
| `source` | Specifies the source of the table: `parquet`, or `file` (any other format) | Automatically inferred from the path
| `level` | Shows level of metadata for the `source`. Values are `file` (file metadata), `rowgroup` (Parquet row group metadata), `column` (Parquet column chunk metadata), `page` (Parquet page metadata). Note that not all of the sources support all levels | `file`
| `maxparts` | Defines the number of partitions to use when reading data. For example, if you have hundreds of thousands of files, you can use this option to read all of the data in 2000 partitions instead | `min(200, files.length)`
| `buffersize` | Sets buffer size in bytes for reading Parquet page level data. This reduces the amount of remote calls to DBFS, S3, WASB, ABFS, etc. It is recommended to use a large value, e.g. 64 MB or 128 MB | `128 MB`, typical row group size
| `maxParts` | Defines the number of partitions to use when reading data. For example, if you have hundreds of thousands of files, you can use this option to read all of the data in 2000 partitions instead | `min(200, files.length)`
| `bufferSize` | Sets buffer size in bytes for reading Parquet page level data. This reduces the amount of remote calls to DBFS, S3, WASB, ABFS, etc. It is recommended to use a large value, e.g. 64 MB or 128 MB | `128 MB`, typical row group size
| `pageContent` | Enables page content for the `page` level, available values are `true` or `false`. It is recommended to only enable it when inspecting a particular set of pages | `false`

DataFrame schema for each level is in
[MetadataLevel.scala](./src/main/scala/com/github/sadikovi/metadata/MetadataLevel.scala).
Expand Down
21 changes: 17 additions & 4 deletions src/main/scala/com/github/sadikovi/metadata/DefaultSource.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ class DefaultSource
}
log.info(s"Buffer size: $bufferSize")

// Check if page content is enabled
val pageContentEnabled = parameters.get(PAGE_CONTENT_OPT).map(_.toLowerCase) match {
case Some(v) => v.toBoolean
case None => PAGE_CONTENT_DEFAULT
}
log.info(s"Page content enabled: $pageContentEnabled")

// // Select file index based on the underlying data source
val fileIndex = inferFileIndex(spark, rootPath, parameters)

Expand Down Expand Up @@ -82,10 +89,10 @@ class DefaultSource

// Verify the combination of source and level
val metadataLevel = inferMetadataLevel(source, level)
log.info(s"Metadata level $metadataLevel")
log.info(s"Metadata level: $metadataLevel")

// Select file format
inferFileFormat(spark, fileIndex, metadataLevel, maxPartitions, bufferSize)
inferFileFormat(spark, fileIndex, metadataLevel, maxPartitions, bufferSize, pageContentEnabled)
}
}

Expand Down Expand Up @@ -116,6 +123,10 @@ object DefaultSource {
val LEVEL_PAGE = "page"
val ALL_LEVELS = Seq(LEVEL_FILE, LEVEL_ROW_GROUP, LEVEL_COLUMN, LEVEL_PAGE)

// Enables page content to be returned when page level is selected
val PAGE_CONTENT_OPT = "pagecontent"
val PAGE_CONTENT_DEFAULT = false

/** Infers metadata level */
def inferMetadataLevel(source: String, level: String): MetadataLevel = {
def assertUnsupported(source: String, level: String): MetadataLevel = {
Expand Down Expand Up @@ -160,12 +171,14 @@ object DefaultSource {
fileIndex: FileIndex,
level: MetadataLevel,
maxPartitions: Int,
bufferSize: Int): BaseRelation = {
bufferSize: Int,
pageContentEnabled: Boolean): BaseRelation = {
level match {
case FileLevel =>
new FileMetadataFormat(spark, fileIndex, level, maxPartitions)
case ParquetFileLevel | ParquetRowGroupLevel | ParquetColumnLevel | ParquetPageLevel =>
new ParquetMetadataFileFormat(spark, fileIndex, level, maxPartitions, bufferSize)
new ParquetMetadataFileFormat(
spark, fileIndex, level, maxPartitions, bufferSize, pageContentEnabled)
case other =>
throw new IllegalArgumentException(s"No file format for level $other")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ class ParquetMetadataFileFormat(
fileIndex: FileIndex,
level: MetadataLevel,
maxPartitions: Int,
bufferSize: Int)
bufferSize: Int,
pageContentEnabled: Boolean)
extends MetadataFileFormat(spark, fileIndex, level, maxPartitions) {

require(
Expand Down Expand Up @@ -249,6 +250,14 @@ class ParquetMetadataFileFormat(
))
}

val pageContent: Option[Array[Byte]] = if (pageContentEnabled) {
val tmp = new Array[Byte](page.compressedPageSize)
in.readFully(tmp, 0, tmp.length)
Some(tmp)
} else {
None
}

val values = Array(
cols(colIndex).rowGroupId,
cols(colIndex).id,
Expand All @@ -264,6 +273,7 @@ class ParquetMetadataFileFormat(
page.definitionEncoding,
page.repetitionEncoding,
statistics,
pageContent,
file.path
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ object ParquetPageLevel extends MetadataLevel {
StructField("min_value_length", IntegerType) ::
StructField("max_value_length", IntegerType) ::
Nil)) ::
StructField("page_content", ArrayType(ByteType)) ::
StructField("filepath", StringType) ::
Nil)
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.sadikovi.metadata

import java.io.{IOException, InputStream}
import java.io.{EOFException, IOException, InputStream}
import org.apache.hadoop.fs.Seekable

/**
Expand Down Expand Up @@ -48,6 +48,20 @@ class RemoteInputStream(
bytesRead
}

/** Naive implementation of readFully method */
def readFully(arr: Array[Byte], off: Int, len: Int): Unit = {
var bytesRead = len
var currOffset = off
while (bytesRead > 0) {
val bytes = read(arr, currOffset, bytesRead)
if (bytes < 0) {
throw new EOFException(s"Failed to read bytes at off $off and len $len: $bytes")
}
bytesRead -= bytes
currOffset += bytes
}
}

override def skip(bytes: Long): Long = {
assertOpen()
seek(pos + bytes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,18 +219,46 @@ class DefaultSourceSuite extends UnitTestSuite with SparkLocal {
checkAnswer(
df.drop("filepath"),
Seq(
Row(0, 0, 0, "DATA_PAGE", 4L, 64, 1011, 2000, null, 250, "PLAIN", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 8, 8)),
Row(0, 1, 0, "DICTIONARY_PAGE", 1079L, 13, 6, 4, null, 1, "PLAIN_DICTIONARY", null, null, null),
Row(0, 1, 1, "DATA_PAGE", 1098L, 46, 5, 3, null, 250, "PLAIN_DICTIONARY", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 4, 4)),
Row(0, 0, 0, "DATA_PAGE", 4L, 64, 1014, 2000, null, 250, "PLAIN", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 8, 8)),
Row(0, 1, 0, "DICTIONARY_PAGE", 1082L, 13, 6, 4, null, 1, "PLAIN_DICTIONARY", null, null, null),
Row(0, 1, 1, "DATA_PAGE", 1101L, 46, 5, 3, null, 250, "PLAIN_DICTIONARY", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 4, 4)),
Row(0, 0, 0, "DATA_PAGE", 4L, 64, 1017, 2000, null, 250, "PLAIN", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 8, 8)),
Row(0, 1, 0, "DICTIONARY_PAGE", 1085L, 13, 6, 4, null, 1, "PLAIN_DICTIONARY", null, null, null),
Row(0, 1, 1, "DATA_PAGE", 1104L, 46, 5, 3, null, 250, "PLAIN_DICTIONARY", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 4, 4)),
Row(0, 0, 0, "DATA_PAGE", 4L, 64, 1015, 2000, null, 250, "PLAIN", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 8, 8)),
Row(0, 1, 0, "DICTIONARY_PAGE", 1083L, 13, 6, 4, null, 1, "PLAIN_DICTIONARY", null, null, null),
Row(0, 1, 1, "DATA_PAGE", 1102L, 46, 5, 3, null, 250, "PLAIN_DICTIONARY", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 4, 4))
Row(0, 0, 0, "DATA_PAGE", 4L, 64, 1011, 2000, null, 250, "PLAIN", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 8, 8), null),
Row(0, 1, 0, "DICTIONARY_PAGE", 1079L, 13, 6, 4, null, 1, "PLAIN_DICTIONARY", null, null, null, null),
Row(0, 1, 1, "DATA_PAGE", 1098L, 46, 5, 3, null, 250, "PLAIN_DICTIONARY", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 4, 4), null),
Row(0, 0, 0, "DATA_PAGE", 4L, 64, 1014, 2000, null, 250, "PLAIN", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 8, 8), null),
Row(0, 1, 0, "DICTIONARY_PAGE", 1082L, 13, 6, 4, null, 1, "PLAIN_DICTIONARY", null, null, null, null),
Row(0, 1, 1, "DATA_PAGE", 1101L, 46, 5, 3, null, 250, "PLAIN_DICTIONARY", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 4, 4), null),
Row(0, 0, 0, "DATA_PAGE", 4L, 64, 1017, 2000, null, 250, "PLAIN", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 8, 8), null),
Row(0, 1, 0, "DICTIONARY_PAGE", 1085L, 13, 6, 4, null, 1, "PLAIN_DICTIONARY", null, null, null, null),
Row(0, 1, 1, "DATA_PAGE", 1104L, 46, 5, 3, null, 250, "PLAIN_DICTIONARY", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 4, 4), null),
Row(0, 0, 0, "DATA_PAGE", 4L, 64, 1015, 2000, null, 250, "PLAIN", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 8, 8), null),
Row(0, 1, 0, "DICTIONARY_PAGE", 1083L, 13, 6, 4, null, 1, "PLAIN_DICTIONARY", null, null, null, null),
Row(0, 1, 1, "DATA_PAGE", 1102L, 46, 5, 3, null, 250, "PLAIN_DICTIONARY", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 4, 4), null)
)
)
// scalastyle:on

df.select("filepath").as[String].collect.foreach { path =>
assert(path.length > 0)
}
}
}

test("test page level with page content") {
val implicits = spark.implicits
import implicits._

withTempDir { dir =>
Seq((1, "a", true), (2, "b", false)).toDF.coalesce(1).write
.option("compression", "none").parquet(dir + "/range")
val df = readDF.option("level", "page").option("pagecontent", "true").load(dir + "/range")

assert(df.schema === ParquetPageLevel.schema)

// scalastyle:off
checkAnswer(
df.drop("filepath"),
Seq(
Row(0, 0, 0, "DATA_PAGE", 4L, 45, 8, 8, null, 2, "PLAIN", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 4, 4), Array[Byte](1, 0, 0, 0, 2, 0, 0, 0)),
Row(0, 1, 0, "DATA_PAGE", 57L, 27, 16, 16, null, 2, "PLAIN", "RLE", "BIT_PACKED", Row(0L, null, 1, 1), Array[Byte](2, 0, 0, 0, 3, 3, 1, 0, 0, 0, 97, 1, 0, 0, 0, 98)),
Row(0, 2, 0, "DATA_PAGE", 100L, 33, 1, 1, null, 2, "PLAIN", "BIT_PACKED", "BIT_PACKED", Row(0L, null, 1, 1), Array[Byte](1))
)
)
// scalastyle:on
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.github.sadikovi.metadata

import java.io.{ByteArrayInputStream, IOException, InputStream}
import java.io.{ByteArrayInputStream, EOFException, IOException, InputStream}

import org.apache.hadoop.fs.{Path, Seekable}

Expand Down Expand Up @@ -130,6 +130,30 @@ class RemoteInputStreamSuite extends UnitTestSuite {
}
}

test("readFully") {
val in = remote(Array[Byte](1, 2, 3, 4, 5, 6, 7), 2)
val res = new Array[Byte](3)

in.readFully(res, 0, res.length)
assert(res === Seq(1, 2, 3))

in.readFully(res, 0, res.length)
assert(res === Seq(4, 5, 6))

in.readFully(res, 0, 1)
assert(res === Seq(7, 5, 6))
}

test("readFully with EOF") {
val in = remote(Array[Byte](1, 2, 3, 4, 5, 6, 7), 2)
val res = new Array[Byte](8)

val err = intercept[EOFException] {
in.readFully(res, 0, res.length)
}
assert(err.getMessage.contains("Failed to read bytes at off 0 and len 8: -1"))
}

test("seek within buffer") {
val in = remote(Array[Byte](1, 2, 3, 4, 5, 6), 4)
assert(in.read() === 1)
Expand Down

0 comments on commit 234fc54

Please sign in to comment.