Skip to content

Commit

Permalink
Parquet: Verify 32-bit CRC checksum when decoding pages
Browse files Browse the repository at this point in the history
  • Loading branch information
xmakro committed Aug 22, 2024
1 parent 2795b94 commit 4cd942a
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 46 deletions.
155 changes: 109 additions & 46 deletions parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,73 +16,126 @@
# under the License.

[package]
name = "parquet"
version = { workspace = true }
license = { workspace = true }
authors = { workspace = true }
description = "Apache Parquet implementation in Rust"
edition = { workspace = true }
homepage = { workspace = true }
repository = { workspace = true }
authors = { workspace = true }
keywords = ["arrow", "parquet", "hadoop"]
license = { workspace = true }
name = "parquet"
readme = "README.md"
edition = { workspace = true }
repository = { workspace = true }
rust-version = "1.70.0"
version = { workspace = true }

[target.'cfg(target_arch = "wasm32")'.dependencies]
ahash = { version = "0.8", default-features = false, features = ["compile-time-rng"] }
ahash = { version = "0.8", default-features = false, features = [
"compile-time-rng",
] }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
ahash = { version = "0.8", default-features = false, features = ["runtime-rng"] }
ahash = { version = "0.8", default-features = false, features = [
"runtime-rng",
] }

[dependencies]
arrow-array = { workspace = true, optional = true }
arrow-buffer = { workspace = true, optional = true }
arrow-cast = { workspace = true, optional = true }
arrow-csv = { workspace = true, optional = true }
arrow-data = { workspace = true, optional = true }
arrow-ipc = { workspace = true, optional = true }
arrow-schema = { workspace = true, optional = true }
arrow-select = { workspace = true, optional = true }
arrow-ipc = { workspace = true, optional = true }
# Intentionally not a path dependency as object_store is released separately
object_store = { version = "0.11.0", default-features = false, optional = true }

base64 = { version = "0.22", default-features = false, features = [
"std",
], optional = true }
brotli = { version = "6.0", default-features = false, features = [
"std",
], optional = true }
bytes = { version = "1.1", default-features = false, features = ["std"] }
thrift = { version = "0.17", default-features = false }
snap = { version = "1.0", default-features = false, optional = true }
brotli = { version = "6.0", default-features = false, features = ["std"], optional = true }
flate2 = { version = "1.0", default-features = false, features = ["rust_backend"], optional = true }
lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true }
zstd = { version = "0.13", optional = true, default-features = false }
chrono = { workspace = true }
clap = { version = "4.1", default-features = false, features = [
"std",
"derive",
"env",
"help",
"error-context",
"usage",
], optional = true }
crc32fast = { version = "1.4", optional = true, default-features = false }
flate2 = { version = "1.0", default-features = false, features = [
"rust_backend",
], optional = true }
futures = { version = "0.3", default-features = false, features = [
"std",
], optional = true }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
hashbrown = { version = "0.14", default-features = false }
lz4_flex = { version = "0.11", default-features = false, features = [
"std",
"frame",
], optional = true }
num = { version = "0.4", default-features = false }
num-bigint = { version = "0.4", default-features = false }
base64 = { version = "0.22", default-features = false, features = ["std", ], optional = true }
clap = { version = "4.1", default-features = false, features = ["std", "derive", "env", "help", "error-context", "usage"], optional = true }
serde = { version = "1.0", default-features = false, features = ["derive"], optional = true }
serde_json = { version = "1.0", default-features = false, features = ["std"], optional = true }
paste = { version = "1.0" }
seq-macro = { version = "0.3", default-features = false }
futures = { version = "0.3", default-features = false, features = ["std"], optional = true }
tokio = { version = "1.0", optional = true, default-features = false, features = ["macros", "rt", "io-util"] }
hashbrown = { version = "0.14", default-features = false }
serde = { version = "1.0", default-features = false, features = [
"derive",
], optional = true }
serde_json = { version = "1.0", default-features = false, features = [
"std",
], optional = true }
snap = { version = "1.0", default-features = false, optional = true }
sysinfo = { version = "0.31.2", optional = true, default-features = false, features = [
"system",
] }
thrift = { version = "0.17", default-features = false }
tokio = { version = "1.0", optional = true, default-features = false, features = [
"macros",
"rt",
"io-util",
] }
twox-hash = { version = "1.6", default-features = false }
paste = { version = "1.0" }
half = { version = "2.1", default-features = false, features = ["num-traits"] }
sysinfo = { version = "0.31.2", optional = true, default-features = false, features = ["system"] }
zstd = { version = "0.13", optional = true, default-features = false }

[dev-dependencies]
arrow = { workspace = true, features = [
"ipc",
"test_utils",
"prettyprint",
"json",
] }
base64 = { version = "0.22", default-features = false, features = ["std"] }
brotli = { version = "6.0", default-features = false, features = ["std"] }
criterion = { version = "0.5", default-features = false }
flate2 = { version = "1.0", default-features = false, features = [
"rust_backend",
] }
lz4_flex = { version = "0.11", default-features = false, features = [
"std",
"frame",
] }
object_store = { version = "0.11.0", default-features = false, features = [
"azure",
] }
rand = { version = "0.8", default-features = false, features = [
"std",
"std_rng",
] }
serde_json = { version = "1.0", features = ["std"], default-features = false }
snap = { version = "1.0", default-features = false }
tempfile = { version = "3.0", default-features = false }
brotli = { version = "6.0", default-features = false, features = ["std"] }
flate2 = { version = "1.0", default-features = false, features = ["rust_backend"] }
lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"] }
tokio = { version = "1.0", default-features = false, features = [
"macros",
"rt",
"io-util",
"fs",
] }
zstd = { version = "0.13", default-features = false }
serde_json = { version = "1.0", features = ["std"], default-features = false }
arrow = { workspace = true, features = ["ipc", "test_utils", "prettyprint", "json"] }
tokio = { version = "1.0", default-features = false, features = ["macros", "rt", "io-util", "fs"] }
rand = { version = "0.8", default-features = false, features = ["std", "std_rng"] }
object_store = { version = "0.11.0", default-features = false, features = ["azure"] }

# TODO: temporary to fix parquet wasm build
# upstream issue: https://github.com/gyscos/zstd-rs/issues/269
Expand All @@ -100,7 +153,16 @@ default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
# Enable lz4
lz4 = ["lz4_flex"]
# Enable arrow reader/writer APIs
arrow = ["base64", "arrow-array", "arrow-buffer", "arrow-cast", "arrow-data", "arrow-schema", "arrow-select", "arrow-ipc"]
arrow = [
"base64",
"arrow-array",
"arrow-buffer",
"arrow-cast",
"arrow-data",
"arrow-schema",
"arrow-select",
"arrow-ipc",
]
# Enable CLI tools
cli = ["json", "base64", "clap", "arrow-csv", "serde"]
# Enable JSON APIs
Expand All @@ -117,35 +179,37 @@ object_store = ["dep:object_store", "async"]
zstd = ["dep:zstd", "zstd-sys"]
# Display memory in example/write_parquet.rs
sysinfo = ["dep:sysinfo"]
# Verify 32-bit CRC checksum when decoding parquet pages
crc = ["crc32fast"]

[[example]]
name = "read_parquet"
required-features = ["arrow"]
path = "./examples/read_parquet.rs"
required-features = ["arrow"]

[[example]]
name = "write_parquet"
required-features = ["cli", "sysinfo"]
path = "./examples/write_parquet.rs"
required-features = ["cli", "sysinfo"]

[[example]]
name = "async_read_parquet"
required-features = ["arrow", "async"]
path = "./examples/async_read_parquet.rs"
required-features = ["arrow", "async"]

[[example]]
name = "read_with_rowgroup"
required-features = ["arrow", "async"]
path = "./examples/read_with_rowgroup.rs"
required-features = ["arrow", "async"]

[[test]]
name = "arrow_writer_layout"
required-features = ["arrow"]

[[test]]
name = "arrow_reader"
required-features = ["arrow"]
path = "./tests/arrow_reader/mod.rs"
required-features = ["arrow"]

[[bin]]
name = "parquet-read"
Expand Down Expand Up @@ -184,34 +248,33 @@ name = "parquet-index"
required-features = ["cli"]

[[bench]]
harness = false
name = "arrow_writer"
required-features = ["arrow"]
harness = false

[[bench]]
harness = false
name = "arrow_reader"
required-features = ["arrow", "test_common", "experimental"]
harness = false

[[bench]]
harness = false
name = "arrow_statistics"
required-features = ["arrow"]
harness = false


[[bench]]
harness = false
name = "compression"
required-features = ["experimental", "default"]
harness = false

[[bench]]
harness = false
name = "encoding"
required-features = ["experimental", "default"]
harness = false

[[bench]]
name = "metadata"
harness = false
name = "metadata"

[lib]
bench = false
2 changes: 2 additions & 0 deletions parquet/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub enum ParquetError {
/// Returned when reading into arrow or writing from arrow.
ArrowError(String),
IndexOutOfBound(usize, usize),
Crc32Mismatch,
/// An external error variant
External(Box<dyn Error + Send + Sync>),
}
Expand All @@ -60,6 +61,7 @@ impl std::fmt::Display for ParquetError {
ParquetError::IndexOutOfBound(index, ref bound) => {
write!(fmt, "Index {index} out of bound: {bound}")
}
ParquetError::Crc32Mismatch => write!(fmt, "Parquet Page crc32 mismatch"),
ParquetError::External(e) => write!(fmt, "External: {e}"),
}
}
Expand Down
9 changes: 9 additions & 0 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,15 @@ pub(crate) fn decode_page(
physical_type: Type,
decompressor: Option<&mut Box<dyn Codec>>,
) -> Result<Page> {
// Verify the 32-bit CRC checksum of the page
#[cfg(feature = "crc")]
if let Some(expected_crc) = page_header.crc {
let crc = crc32fast::hash(&buffer);
if crc != expected_crc as u32 {
return Err(ParquetError::Crc32Mismatch);
}
}

// When processing data page v2, depending on enabled compression for the
// page, we should account for uncompressed data ('offset') of
// repetition and definition levels.
Expand Down

0 comments on commit 4cd942a

Please sign in to comment.