diff --git a/CHANGELOG.md b/CHANGELOG.md index 7799a1f3..01d77736 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - Spark app to diff files or tables and write result back to file or table. (#160) +### Changed + +- Added null value count to `parquetBlockColumns` and `parquet_block_columns`. (#162) +- Changed data type of column name in `parquetBlockColumns` and `parquet_block_columns` to array of strings. + Cast to string to get earlier behaviour (string column name). (#162) + ## [2.6.0] - 2023-04-11 ### Added diff --git a/PARQUET.md b/PARQUET.md index 79f71405..f4207677 100644 --- a/PARQUET.md +++ b/PARQUET.md @@ -109,32 +109,33 @@ spark.read.parquetBlockColumns("/path/to/parquet").show() spark.read.parquet_block_columns("/path/to/parquet").show() ``` ``` -+-------------+-----+------+------+-------------------+-------------------+--------------------+------------------+-----------+---------------+-----------------+------+ -| filename|block|column| codec| type| encodings| minValue| maxValue|columnStart|compressedBytes|uncompressedBytes|values| -+-------------+-----+------+------+-------------------+-------------------+--------------------+------------------+-----------+---------------+-----------------+------+ -|file1.parquet| 1| [id]|SNAPPY| required int64 id|[BIT_PACKED, PLAIN]| 0| 99| 4| 437| 826| 100| -|file1.parquet| 1| [val]|SNAPPY|required double val|[BIT_PACKED, PLAIN]|0.005067503372006343|0.9973357672164814| 441| 831| 826| 100| -|file2.parquet| 1| [id]|SNAPPY| required int64 id|[BIT_PACKED, PLAIN]| 100| 199| 4| 438| 825| 100| -|file2.parquet| 1| [val]|SNAPPY|required double val|[BIT_PACKED, PLAIN]|0.010617521596503865| 0.999189783846449| 442| 831| 826| 100| -|file2.parquet| 2| [id]|SNAPPY| required int64 id|[BIT_PACKED, PLAIN]| 200| 299| 1273| 440| 826| 100| -|file2.parquet| 2| [val]|SNAPPY|required double val|[BIT_PACKED, PLAIN]|0.011277044401634018| 0.970525681750662| 1713| 830| 825| 100| -+-------------+-----+------+------+-------------------+-------------------+--------------------+------------------+-----------+---------------+-----------------+------+ -``` - -|column |type |description | -|:-----------------|:----:|:-----------------------------------------------------| -|filename |string|The Parquet file name | -|block |int |Block / RowGroup number starting at 1 | -|column |string|Block / RowGroup column name | -|codec |string|The coded used to compress the block column values | -|type |string|The data type of the block column | -|encodings |string|Encodings of the block column | -|minValue |string|Minimum value of this column in this block | -|maxValue |string|Maximum value of this column in this block | -|columnStart |long |Start position of the block column in the Parquet file| -|compressedBytes |long |Number of compressed bytes of this block column | -|uncompressedBytes |long |Number of uncompressed bytes of this block column | -|valueCount |long |Number of values in this block column | ++-------------+-----+------+------+-------------------+-------------------+--------------------+------------------+-----------+---------------+-----------------+------+-----+ +| filename|block|column| codec| type| encodings| minValue| maxValue|columnStart|compressedBytes|uncompressedBytes|values|nulls| ++-------------+-----+------+------+-------------------+-------------------+--------------------+------------------+-----------+---------------+-----------------+------+-----+ +|file1.parquet| 1| [id]|SNAPPY| required int64 id|[BIT_PACKED, PLAIN]| 0| 99| 4| 437| 826| 100| 0| +|file1.parquet| 1| [val]|SNAPPY|required double val|[BIT_PACKED, PLAIN]|0.005067503372006343|0.9973357672164814| 441| 831| 826| 100| 0| +|file2.parquet| 1| [id]|SNAPPY| required int64 id|[BIT_PACKED, PLAIN]| 100| 199| 4| 438| 825| 100| 0| +|file2.parquet| 1| [val]|SNAPPY|required double val|[BIT_PACKED, PLAIN]|0.010617521596503865| 0.999189783846449| 442| 831| 826| 100| 0| +|file2.parquet| 2| [id]|SNAPPY| required int64 id|[BIT_PACKED, PLAIN]| 200| 299| 1273| 440| 826| 100| 0| +|file2.parquet| 2| [val]|SNAPPY|required double val|[BIT_PACKED, PLAIN]|0.011277044401634018| 0.970525681750662| 1713| 830| 825| 100| 0| ++-------------+-----+------+------+-------------------+-------------------+--------------------+------------------+-----------+---------------+-----------------+------+-----+ +``` + +|column |type |description | +|:-----------------|:-----------:|:-----------------------------------------------------| +|filename |string |The Parquet file name | +|block |int |Block / RowGroup number starting at 1 | +|column |array|Block / RowGroup column name | +|codec |string |The coded used to compress the block column values | +|type |string |The data type of the block column | +|encodings |array|Encodings of the block column | +|minValue |string |Minimum value of this column in this block | +|maxValue |string |Maximum value of this column in this block | +|columnStart |long |Start position of the block column in the Parquet file| +|compressedBytes |long |Number of compressed bytes of this block column | +|uncompressedBytes |long |Number of uncompressed bytes of this block column | +|values |long |Number of values in this block column | +|nulls |long |Number of null values in this block column | ## Parquet partition metadata diff --git a/python/gresearch/spark/parquet/__init__.py b/python/gresearch/spark/parquet/__init__.py index af1fe597..7e3f5990 100644 --- a/python/gresearch/spark/parquet/__init__.py +++ b/python/gresearch/spark/parquet/__init__.py @@ -72,17 +72,18 @@ def parquet_block_columns(self: DataFrameReader, *paths: str) -> DataFrame: This provides the following per-block-column information: - filename (string): The file name - block (int): Block / RowGroup number starting at 1 - - column (string): Block / RowGroup column name + - column (array): Block / RowGroup column name - codec (string): The coded used to compress the block column values - type (string): The data type of the block column - - encodings (string): Encodings of the block column + - encodings (array): Encodings of the block column - minValue (string): Minimum value of this column in this block - maxValue (string): Maximum value of this column in this block - columnStart (long): Start position of the block column in the Parquet file - compressedBytes (long): Number of compressed bytes of this block column - uncompressedBytes (long): Number of uncompressed bytes of this block column - values (long): Number of values in this block column - + - nulls (long): Number of null values in this block column + :param self: a Spark DataFrameReader :param paths: paths one or more paths to Parquet files or directories :return: dataframe with Parquet metadata diff --git a/src/main/scala/uk/co/gresearch/spark/parquet/package.scala b/src/main/scala/uk/co/gresearch/spark/parquet/package.scala index 3e2004db..e75fd8c4 100644 --- a/src/main/scala/uk/co/gresearch/spark/parquet/package.scala +++ b/src/main/scala/uk/co/gresearch/spark/parquet/package.scala @@ -22,10 +22,10 @@ import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.metadata.BlockMetaData import org.apache.parquet.hadoop.{Footer, ParquetFileReader} import org.apache.spark.sql.execution.datasources.FilePartition -import org.apache.spark.sql.functions.spark_partition_id import org.apache.spark.sql.{DataFrame, DataFrameReader, Encoder, Encoders} import scala.collection.JavaConverters.collectionAsScalaIterableConverter +import scala.collection.convert.ImplicitConversions.`iterable AsScalaIterable` package object parquet { private implicit val intEncoder: Encoder[Int] = Encoders.scalaInt @@ -159,7 +159,7 @@ package object parquet { ( footer.getFile.toString, idx + 1, - column.getPath.toString, + column.getPath.toSeq, column.getCodec.toString, column.getPrimitiveType.toString, column.getEncodings.asScala.toSeq.map(_.toString).sorted, @@ -169,11 +169,12 @@ package object parquet { column.getTotalSize, column.getTotalUncompressedSize, column.getValueCount, + column.getStatistics.getNumNulls, ) } } } - }.toDF("filename", "block", "column", "codec", "type", "encodings", "minValue", "maxValue", "columnStart", "compressedBytes", "uncompressedBytes", "values") + }.toDF("filename", "block", "column", "codec", "type", "encodings", "minValue", "maxValue", "columnStart", "compressedBytes", "uncompressedBytes", "values", "nulls") } /** diff --git a/src/test/scala/uk/co/gresearch/spark/parquet/ParquetSuite.scala b/src/test/scala/uk/co/gresearch/spark/parquet/ParquetSuite.scala index 16266acc..e7fc335b 100644 --- a/src/test/scala/uk/co/gresearch/spark/parquet/ParquetSuite.scala +++ b/src/test/scala/uk/co/gresearch/spark/parquet/ParquetSuite.scala @@ -101,7 +101,7 @@ class ParquetSuite extends AnyFunSuite with SparkTestSession with SparkVersion { StructType(Seq( StructField("filename", StringType, nullable = true), StructField("block", IntegerType, nullable = false), - StructField("column", StringType, nullable = true), + StructField("column", ArrayType(StringType), nullable = true), StructField("codec", StringType, nullable = true), StructField("type", StringType, nullable = true), StructField("encodings", ArrayType(StringType), nullable = true), @@ -111,16 +111,19 @@ class ParquetSuite extends AnyFunSuite with SparkTestSession with SparkVersion { StructField("compressedBytes", LongType, nullable = false), StructField("uncompressedBytes", LongType, nullable = false), StructField("values", LongType, nullable = false), + StructField("nulls", LongType, nullable = false), )), Seq( - Row("file1.parquet", 1, "[id]", "SNAPPY", "required int64 id", "[BIT_PACKED, PLAIN]", "0", "99", 4, 437, 826, 100), - Row("file1.parquet", 1, "[val]", "SNAPPY", "required double val", "[BIT_PACKED, PLAIN]", "0.005067503372006343", "0.9973357672164814", 441, 831, 826, 100), - Row("file2.parquet", 1, "[id]", "SNAPPY", "required int64 id", "[BIT_PACKED, PLAIN]", "100", "199", 4, 438, 825, 100), - Row("file2.parquet", 1, "[val]", "SNAPPY", "required double val", "[BIT_PACKED, PLAIN]", "0.010617521596503865", "0.999189783846449", 442, 831, 826, 100), - Row("file2.parquet", 2, "[id]", "SNAPPY", "required int64 id", "[BIT_PACKED, PLAIN]", "200", "299", 1273, 440, 826, 100), - Row("file2.parquet", 2, "[val]", "SNAPPY", "required double val", "[BIT_PACKED, PLAIN]", "0.011277044401634018", "0.970525681750662", 1713, 830, 825, 100), + Row("file1.parquet", 1, "[id]", "SNAPPY", "required int64 id", "[BIT_PACKED, PLAIN]", "0", "99", 4, 437, 826, 100, 0), + Row("file1.parquet", 1, "[val]", "SNAPPY", "required double val", "[BIT_PACKED, PLAIN]", "0.005067503372006343", "0.9973357672164814", 441, 831, 826, 100, 0), + Row("file2.parquet", 1, "[id]", "SNAPPY", "required int64 id", "[BIT_PACKED, PLAIN]", "100", "199", 4, 438, 825, 100, 0), + Row("file2.parquet", 1, "[val]", "SNAPPY", "required double val", "[BIT_PACKED, PLAIN]", "0.010617521596503865", "0.999189783846449", 442, 831, 826, 100, 0), + Row("file2.parquet", 2, "[id]", "SNAPPY", "required int64 id", "[BIT_PACKED, PLAIN]", "200", "299", 1273, 440, 826, 100, 0), + Row("file2.parquet", 2, "[val]", "SNAPPY", "required double val", "[BIT_PACKED, PLAIN]", "0.011277044401634018", "0.970525681750662", 1713, 830, 825, 100, 0), ), - (df: DataFrame) => df.withColumn("encodings", $"encodings".cast(StringType)) + (df: DataFrame) => df + .withColumn("column", $"column".cast(StringType)) + .withColumn("encodings", $"encodings".cast(StringType)) ) }