From 87dda324270fb29f9648a9ca7d820674f4923590 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 11 Dec 2024 13:49:04 +0000 Subject: [PATCH 1/5] feat: introduce `PuffinFileFooterReader` --- src/puffin/src/error.rs | 9 +- src/puffin/src/file_format/reader.rs | 1 + src/puffin/src/file_format/reader/footer.rs | 122 +++++++++++++++++++- src/puffin/src/tests.rs | 38 +++++- 4 files changed, 166 insertions(+), 4 deletions(-) diff --git a/src/puffin/src/error.rs b/src/puffin/src/error.rs index 57aec44d1fb8..0044129f4f01 100644 --- a/src/puffin/src/error.rs +++ b/src/puffin/src/error.rs @@ -165,6 +165,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Invalid puffin footer"))] + InvalidPuffinFooter { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display( "Unexpected puffin file size, min: {}, actual: {}", min_file_size, @@ -286,7 +292,8 @@ impl ErrorExt for Error { | BlobNotFound { .. } | BlobIndexOutOfBound { .. } | FileKeyNotMatch { .. } - | WalkDir { .. } => StatusCode::Unexpected, + | WalkDir { .. } + | InvalidPuffinFooter { .. } => StatusCode::Unexpected, UnsupportedCompression { .. } | UnsupportedDecompression { .. } => { StatusCode::Unsupported diff --git a/src/puffin/src/file_format/reader.rs b/src/puffin/src/file_format/reader.rs index 3f48bf4b105e..779098567efe 100644 --- a/src/puffin/src/file_format/reader.rs +++ b/src/puffin/src/file_format/reader.rs @@ -21,6 +21,7 @@ use common_base::range_read::RangeReader; use crate::blob_metadata::BlobMetadata; use crate::error::Result; pub use crate::file_format::reader::file::PuffinFileReader; +pub use crate::file_format::reader::footer::PuffinFileFooterReader; use crate::file_metadata::FileMetadata; /// `SyncReader` defines a synchronous reader for puffin data. diff --git a/src/puffin/src/file_format/reader/footer.rs b/src/puffin/src/file_format/reader/footer.rs index aa764fd32a21..63c1cd34dd14 100644 --- a/src/puffin/src/file_format/reader/footer.rs +++ b/src/puffin/src/file_format/reader/footer.rs @@ -19,8 +19,8 @@ use snafu::{ensure, ResultExt}; use crate::error::{ BytesToIntegerSnafu, DeserializeJsonSnafu, InvalidBlobAreaEndSnafu, InvalidBlobOffsetSnafu, - Lz4DecompressionSnafu, MagicNotMatchedSnafu, ParseStageNotMatchSnafu, ReadSnafu, Result, - SeekSnafu, UnexpectedFooterPayloadSizeSnafu, + InvalidPuffinFooterSnafu, Lz4DecompressionSnafu, MagicNotMatchedSnafu, ParseStageNotMatchSnafu, + ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu, }; use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE}; use crate::file_metadata::FileMetadata; @@ -305,3 +305,121 @@ impl StageParser { self.file_size - MAGIC_SIZE * 2 - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - self.payload_size } } + +/// Reader for the footer of a Puffin data file +/// +/// The footer has a specific layout that needs to be read and parsed to +/// extract metadata about the file, which is encapsulated in the [`FileMetadata`] type. +/// +/// This reader supports prefetching, allowing for more efficient reading +/// of the footer by fetching additional data ahead of time. +/// +/// ```text +/// Footer layout: HeadMagic Payload PayloadSize Flags FootMagic +/// [4] [?] [4] [4] [4] +/// ``` +pub struct PuffinFileFooterReader { + /// The source of the puffin file + source: R, + /// The content length of the puffin file + file_size: u64, + /// The prefetch footer size + prefetch_size: Option, +} + +impl<'a, R: RangeReader + 'a> PuffinFileFooterReader { + pub fn new(source: R, content_len: u64) -> Self { + Self { + source, + file_size: content_len, + prefetch_size: None, + } + } + + fn prefetch_size(&self) -> u64 { + self.prefetch_size.unwrap_or(MIN_FILE_SIZE) + } + + pub fn with_prefetch_size(mut self, prefetch_size: u64) -> Self { + self.prefetch_size = Some(prefetch_size.max(MIN_FILE_SIZE)); + self + } + + pub async fn metadata(&'a mut self) -> Result { + // Note: prefetch > content_len is allowed, since we're using saturating_sub. + let footer_start = self.file_size.saturating_sub(self.prefetch_size()); + let suffix = self + .source + .read(footer_start..self.file_size) + .await + .context(ReadSnafu)?; + let suffix_len = suffix.len(); + + // check the magic + let magic = Self::read_tailing_four_bytes(&suffix)?; + ensure!(magic == MAGIC, MagicNotMatchedSnafu); + + let flags = self.decode_flags(&suffix[..suffix_len - MAGIC_SIZE as usize])?; + let length = self.decode_payload_size( + &suffix[..suffix_len - MAGIC_SIZE as usize - FLAGS_SIZE as usize], + )?; + let footer_size = PAYLOAD_SIZE_SIZE + FLAGS_SIZE + MAGIC_SIZE; + + // Did not fetch the entire file metadata in the initial read, need to make a second request. + if length > suffix_len as u64 - footer_size { + let metadata_start = self.file_size - length - footer_size; + let meta = self + .source + .read(metadata_start..self.file_size - footer_size) + .await + .context(ReadSnafu)?; + self.parse_payload(&flags, &meta) + } else { + let metadata_start = self.file_size - length - footer_size - footer_start; + let meta = &suffix[metadata_start as usize..suffix_len - footer_size as usize]; + self.parse_payload(&flags, meta) + } + } + + fn parse_payload(&self, flags: &Flags, bytes: &[u8]) -> Result { + if flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4) { + let decoder = lz4_flex::frame::FrameDecoder::new(Cursor::new(bytes)); + let res = serde_json::from_reader(decoder).context(Lz4DecompressionSnafu)?; + Ok(res) + } else { + serde_json::from_slice(bytes).context(DeserializeJsonSnafu) + } + } + + fn read_tailing_four_bytes(suffix: &[u8]) -> Result<[u8; 4]> { + let suffix_len = suffix.len(); + ensure!(suffix_len >= 4, InvalidPuffinFooterSnafu); + let mut bytes = [0; 4]; + bytes.copy_from_slice(&suffix[suffix_len - 4..suffix_len]); + + Ok(bytes) + } + + fn decode_flags(&self, suffix: &[u8]) -> Result { + let flags = u32::from_le_bytes(Self::read_tailing_four_bytes(suffix)?); + Ok(Flags::from_bits_truncate(flags)) + } + + fn decode_payload_size(&self, suffix: &[u8]) -> Result { + let payload_size = i32::from_le_bytes(Self::read_tailing_four_bytes(suffix)?); + + ensure!( + payload_size >= 0, + UnexpectedFooterPayloadSizeSnafu { size: payload_size } + ); + let payload_size = payload_size as u64; + ensure!( + payload_size <= self.file_size - MIN_FILE_SIZE, + UnexpectedFooterPayloadSizeSnafu { + size: self.file_size as i32 + } + ); + + Ok(payload_size) + } +} diff --git a/src/puffin/src/tests.rs b/src/puffin/src/tests.rs index a152d4124bd6..279371b74f30 100644 --- a/src/puffin/src/tests.rs +++ b/src/puffin/src/tests.rs @@ -20,8 +20,11 @@ use std::vec; use common_base::range_read::{FileReader, RangeReader}; use futures::io::Cursor as AsyncCursor; -use crate::file_format::reader::{AsyncReader, PuffinFileReader, SyncReader}; +use crate::file_format::reader::{ + AsyncReader, PuffinFileFooterReader, PuffinFileReader, SyncReader, +}; use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter, SyncWriter}; +use crate::file_metadata::FileMetadata; #[test] fn test_read_empty_puffin_sync() { @@ -45,6 +48,39 @@ async fn test_read_empty_puffin_async() { assert_eq!(metadata.blobs.len(), 0); } +async fn test_read_puffin_file_metadata( + path: &str, + file_size: u64, + expeccted_metadata: FileMetadata, +) { + for prefetch_size in [0, file_size / 2, file_size, file_size + 10] { + let reader = FileReader::new(path).await.unwrap(); + let mut footer_reader = PuffinFileFooterReader::new(reader, file_size); + if prefetch_size > 0 { + footer_reader = footer_reader.with_prefetch_size(prefetch_size); + } + let metadata = footer_reader.metadata().await.unwrap(); + assert_eq!(metadata.properties, expeccted_metadata.properties,); + assert_eq!(metadata.blobs, expeccted_metadata.blobs); + } +} + +#[tokio::test] +async fn test_read_puffin_file_metadata_async() { + let paths = vec![ + "src/tests/resources/empty-puffin-uncompressed.puffin", + "src/tests/resources/sample-metric-data-uncompressed.puffin", + ]; + for path in paths { + let mut reader = FileReader::new(path).await.unwrap(); + let file_size = reader.metadata().await.unwrap().content_length; + let mut reader = PuffinFileReader::new(reader); + let metadata = reader.metadata().await.unwrap(); + + test_read_puffin_file_metadata(path, file_size, metadata).await; + } +} + #[test] fn test_sample_metric_data_puffin_sync() { let path = "src/tests/resources/sample-metric-data-uncompressed.puffin"; From 980be0cf54ec4b7808a479d425d0f5b4906edee4 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 11 Dec 2024 16:59:18 +0000 Subject: [PATCH 2/5] refactor: remove `SyncReader` trait and impl --- src/puffin/src/file_format/reader.rs | 13 -- src/puffin/src/file_format/reader/file.rs | 56 +------ src/puffin/src/file_format/reader/footer.rs | 28 +--- src/puffin/src/tests.rs | 156 +------------------- 4 files changed, 6 insertions(+), 247 deletions(-) diff --git a/src/puffin/src/file_format/reader.rs b/src/puffin/src/file_format/reader.rs index 779098567efe..162d7116a578 100644 --- a/src/puffin/src/file_format/reader.rs +++ b/src/puffin/src/file_format/reader.rs @@ -24,19 +24,6 @@ pub use crate::file_format::reader::file::PuffinFileReader; pub use crate::file_format::reader::footer::PuffinFileFooterReader; use crate::file_metadata::FileMetadata; -/// `SyncReader` defines a synchronous reader for puffin data. -pub trait SyncReader<'a> { - type Reader: std::io::Read + std::io::Seek; - - /// Fetches the FileMetadata. - fn metadata(&'a mut self) -> Result; - - /// Reads particular blob data based on given metadata. - /// - /// Data read from the reader is compressed leaving the caller to decompress the data. - fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result; -} - /// `AsyncReader` defines an asynchronous reader for puffin data. #[async_trait] pub trait AsyncReader<'a> { diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs index 3736ed5d2d8d..3fba48bdb32c 100644 --- a/src/puffin/src/file_format/reader/file.rs +++ b/src/puffin/src/file_format/reader/file.rs @@ -12,19 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::{self, SeekFrom}; - use async_trait::async_trait; use common_base::range_read::RangeReader; use snafu::{ensure, ResultExt}; use crate::blob_metadata::BlobMetadata; -use crate::error::{ - MagicNotMatchedSnafu, ReadSnafu, Result, SeekSnafu, UnexpectedPuffinFileSizeSnafu, - UnsupportedDecompressionSnafu, -}; +use crate::error::{MagicNotMatchedSnafu, ReadSnafu, Result, UnexpectedPuffinFileSizeSnafu}; use crate::file_format::reader::footer::FooterParser; -use crate::file_format::reader::{AsyncReader, SyncReader}; +use crate::file_format::reader::AsyncReader; use crate::file_format::{MAGIC, MAGIC_SIZE, MIN_FILE_SIZE}; use crate::file_metadata::FileMetadata; use crate::partial_reader::PartialReader; @@ -72,45 +67,6 @@ impl PuffinFileReader { } } -impl<'a, R: io::Read + io::Seek + 'a> SyncReader<'a> for PuffinFileReader { - type Reader = PartialReader<&'a mut R>; - - fn metadata(&mut self) -> Result { - if let Some(metadata) = &self.metadata { - return Ok(metadata.clone()); - } - - // check the magic - let mut magic = [0; MAGIC_SIZE as usize]; - self.source.read_exact(&mut magic).context(ReadSnafu)?; - ensure!(magic == MAGIC, MagicNotMatchedSnafu); - - let file_size = self.get_file_size_sync()?; - - // parse the footer - let metadata = FooterParser::new(&mut self.source, file_size).parse_sync()?; - self.metadata = Some(metadata.clone()); - Ok(metadata) - } - - fn blob_reader(&'a mut self, blob_metadata: &BlobMetadata) -> Result { - // TODO(zhongzc): support decompression - let compression = blob_metadata.compression_codec.as_ref(); - ensure!( - compression.is_none(), - UnsupportedDecompressionSnafu { - decompression: compression.unwrap().to_string() - } - ); - - Ok(PartialReader::new( - &mut self.source, - blob_metadata.offset as _, - blob_metadata.length as _, - )) - } -} - #[async_trait] impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader { type Reader = PartialReader<&'a mut R>; @@ -143,14 +99,6 @@ impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader { } } -impl PuffinFileReader { - fn get_file_size_sync(&mut self) -> Result { - let file_size = self.source.seek(SeekFrom::End(0)).context(SeekSnafu)?; - Self::validate_file_size(file_size)?; - Ok(file_size) - } -} - impl PuffinFileReader { async fn get_file_size_async(&mut self) -> Result { let file_size = self diff --git a/src/puffin/src/file_format/reader/footer.rs b/src/puffin/src/file_format/reader/footer.rs index 63c1cd34dd14..95d33251e30f 100644 --- a/src/puffin/src/file_format/reader/footer.rs +++ b/src/puffin/src/file_format/reader/footer.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::io::{self, Cursor, SeekFrom}; +use std::io::Cursor; use common_base::range_read::RangeReader; use snafu::{ensure, ResultExt}; @@ -20,7 +20,7 @@ use snafu::{ensure, ResultExt}; use crate::error::{ BytesToIntegerSnafu, DeserializeJsonSnafu, InvalidBlobAreaEndSnafu, InvalidBlobOffsetSnafu, InvalidPuffinFooterSnafu, Lz4DecompressionSnafu, MagicNotMatchedSnafu, ParseStageNotMatchSnafu, - ReadSnafu, Result, SeekSnafu, UnexpectedFooterPayloadSizeSnafu, + ReadSnafu, Result, UnexpectedFooterPayloadSizeSnafu, }; use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE}; use crate::file_metadata::FileMetadata; @@ -48,30 +48,6 @@ impl FooterParser { } } -impl FooterParser { - /// Parses the footer from the IO source in a synchronous manner. - pub fn parse_sync(&mut self) -> Result { - let mut parser = StageParser::new(self.file_size); - - let mut buf = vec![]; - while let Some(byte_to_read) = parser.next_to_read() { - self.source - .seek(SeekFrom::Start(byte_to_read.offset)) - .context(SeekSnafu)?; - let size = byte_to_read.size as usize; - - buf.resize(size, 0); - let buf = &mut buf[..size]; - - self.source.read_exact(buf).context(ReadSnafu)?; - - parser.consume_bytes(buf)?; - } - - parser.finish() - } -} - impl FooterParser { /// Parses the footer from the IO source in a asynchronous manner. pub async fn parse_async(&mut self) -> Result { diff --git a/src/puffin/src/tests.rs b/src/puffin/src/tests.rs index 279371b74f30..a3bb48587924 100644 --- a/src/puffin/src/tests.rs +++ b/src/puffin/src/tests.rs @@ -13,30 +13,15 @@ // limitations under the License. use std::collections::HashMap; -use std::fs::File; -use std::io::{Cursor, Read}; use std::vec; use common_base::range_read::{FileReader, RangeReader}; use futures::io::Cursor as AsyncCursor; -use crate::file_format::reader::{ - AsyncReader, PuffinFileFooterReader, PuffinFileReader, SyncReader, -}; -use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter, SyncWriter}; +use crate::file_format::reader::{AsyncReader, PuffinFileFooterReader, PuffinFileReader}; +use crate::file_format::writer::{AsyncWriter, Blob, PuffinFileWriter}; use crate::file_metadata::FileMetadata; -#[test] -fn test_read_empty_puffin_sync() { - let path = "src/tests/resources/empty-puffin-uncompressed.puffin"; - - let file = File::open(path).unwrap(); - let mut reader = PuffinFileReader::new(file); - let metadata = reader.metadata().unwrap(); - assert_eq!(metadata.properties.len(), 0); - assert_eq!(metadata.blobs.len(), 0); -} - #[tokio::test] async fn test_read_empty_puffin_async() { let path = "src/tests/resources/empty-puffin-uncompressed.puffin"; @@ -81,41 +66,6 @@ async fn test_read_puffin_file_metadata_async() { } } -#[test] -fn test_sample_metric_data_puffin_sync() { - let path = "src/tests/resources/sample-metric-data-uncompressed.puffin"; - - let file = File::open(path).unwrap(); - let mut reader = PuffinFileReader::new(file); - let metadata = reader.metadata().unwrap(); - - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); - - assert_eq!(metadata.blobs.len(), 2); - assert_eq!(metadata.blobs[0].blob_type, "some-blob"); - assert_eq!(metadata.blobs[0].offset, 4); - assert_eq!(metadata.blobs[0].length, 9); - - assert_eq!(metadata.blobs[1].blob_type, "some-other-blob"); - assert_eq!(metadata.blobs[1].offset, 13); - assert_eq!(metadata.blobs[1].length, 83); - - let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap(); - let mut buf = String::new(); - some_blob.read_to_string(&mut buf).unwrap(); - assert_eq!(buf, "abcdefghi"); - - let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap(); - let mut buf = Vec::new(); - some_other_blob.read_to_end(&mut buf).unwrap(); - let expected = include_bytes!("tests/resources/sample-metric-data.blob"); - assert_eq!(buf, expected); -} - #[tokio::test] async fn test_sample_metric_data_puffin_async() { let path = "src/tests/resources/sample-metric-data-uncompressed.puffin"; @@ -149,38 +99,6 @@ async fn test_sample_metric_data_puffin_async() { assert_eq!(buf, expected); } -#[test] -fn test_writer_reader_with_empty_sync() { - fn test_writer_reader_with_empty_sync(footer_compressed: bool) { - let mut buf = Cursor::new(vec![]); - - let mut writer = PuffinFileWriter::new(&mut buf); - writer.set_properties(HashMap::from([( - "created-by".to_string(), - "Test 1234".to_string(), - )])); - - writer.set_footer_lz4_compressed(footer_compressed); - let written_bytes = writer.finish().unwrap(); - assert!(written_bytes > 0); - - let mut buf = Cursor::new(buf.into_inner()); - let mut reader = PuffinFileReader::new(&mut buf); - let metadata = reader.metadata().unwrap(); - - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); - - assert_eq!(metadata.blobs.len(), 0); - } - - test_writer_reader_with_empty_sync(false); - test_writer_reader_with_empty_sync(true); -} - #[tokio::test] async fn test_writer_reader_empty_async() { async fn test_writer_reader_empty_async(footer_compressed: bool) { @@ -212,76 +130,6 @@ async fn test_writer_reader_empty_async() { test_writer_reader_empty_async(true).await; } -#[test] -fn test_writer_reader_sync() { - fn test_writer_reader_sync(footer_compressed: bool) { - let mut buf = Cursor::new(vec![]); - - let mut writer = PuffinFileWriter::new(&mut buf); - - let blob1 = "abcdefghi"; - writer - .add_blob(Blob { - compressed_data: Cursor::new(&blob1), - blob_type: "some-blob".to_string(), - properties: Default::default(), - compression_codec: None, - }) - .unwrap(); - - let blob2 = include_bytes!("tests/resources/sample-metric-data.blob"); - writer - .add_blob(Blob { - compressed_data: Cursor::new(&blob2), - blob_type: "some-other-blob".to_string(), - properties: Default::default(), - compression_codec: None, - }) - .unwrap(); - - writer.set_properties(HashMap::from([( - "created-by".to_string(), - "Test 1234".to_string(), - )])); - - writer.set_footer_lz4_compressed(footer_compressed); - let written_bytes = writer.finish().unwrap(); - assert!(written_bytes > 0); - - let mut buf = Cursor::new(buf.into_inner()); - let mut reader = PuffinFileReader::new(&mut buf); - let metadata = reader.metadata().unwrap(); - - assert_eq!(metadata.properties.len(), 1); - assert_eq!( - metadata.properties.get("created-by"), - Some(&"Test 1234".to_string()) - ); - - assert_eq!(metadata.blobs.len(), 2); - assert_eq!(metadata.blobs[0].blob_type, "some-blob"); - assert_eq!(metadata.blobs[0].offset, 4); - assert_eq!(metadata.blobs[0].length, 9); - - assert_eq!(metadata.blobs[1].blob_type, "some-other-blob"); - assert_eq!(metadata.blobs[1].offset, 13); - assert_eq!(metadata.blobs[1].length, 83); - - let mut some_blob = reader.blob_reader(&metadata.blobs[0]).unwrap(); - let mut buf = String::new(); - some_blob.read_to_string(&mut buf).unwrap(); - assert_eq!(buf, blob1); - - let mut some_other_blob = reader.blob_reader(&metadata.blobs[1]).unwrap(); - let mut buf = Vec::new(); - some_other_blob.read_to_end(&mut buf).unwrap(); - assert_eq!(buf, blob2); - } - - test_writer_reader_sync(false); - test_writer_reader_sync(true); -} - #[tokio::test] async fn test_writer_reader_async() { async fn test_writer_reader_async(footer_compressed: bool) { From 8575b0e5c52d319f291514b83165d5f16dfbe105 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 11 Dec 2024 17:08:27 +0000 Subject: [PATCH 3/5] refactor: replace `FooterParser` with `PuffinFileFooterReader` --- src/puffin/src/file_format/reader/file.rs | 21 +- src/puffin/src/file_format/reader/footer.rs | 261 +------------------- 2 files changed, 10 insertions(+), 272 deletions(-) diff --git a/src/puffin/src/file_format/reader/file.rs b/src/puffin/src/file_format/reader/file.rs index 3fba48bdb32c..31e8e10bc4d5 100644 --- a/src/puffin/src/file_format/reader/file.rs +++ b/src/puffin/src/file_format/reader/file.rs @@ -17,10 +17,10 @@ use common_base::range_read::RangeReader; use snafu::{ensure, ResultExt}; use crate::blob_metadata::BlobMetadata; -use crate::error::{MagicNotMatchedSnafu, ReadSnafu, Result, UnexpectedPuffinFileSizeSnafu}; -use crate::file_format::reader::footer::FooterParser; -use crate::file_format::reader::AsyncReader; -use crate::file_format::{MAGIC, MAGIC_SIZE, MIN_FILE_SIZE}; +use crate::error::{ReadSnafu, Result, UnexpectedPuffinFileSizeSnafu}; +use crate::file_format::reader::footer::DEFAULT_PREFETCH_SIZE; +use crate::file_format::reader::{AsyncReader, PuffinFileFooterReader}; +use crate::file_format::MIN_FILE_SIZE; use crate::file_metadata::FileMetadata; use crate::partial_reader::PartialReader; @@ -75,17 +75,10 @@ impl<'a, R: RangeReader + 'a> AsyncReader<'a> for PuffinFileReader { if let Some(metadata) = &self.metadata { return Ok(metadata.clone()); } - - // check the magic - let magic = self.source.read(0..MAGIC_SIZE).await.context(ReadSnafu)?; - ensure!(*magic == MAGIC, MagicNotMatchedSnafu); - let file_size = self.get_file_size_async().await?; - - // parse the footer - let metadata = FooterParser::new(&mut self.source, file_size) - .parse_async() - .await?; + let mut reader = PuffinFileFooterReader::new(&mut self.source, file_size) + .with_prefetch_size(DEFAULT_PREFETCH_SIZE); + let metadata = reader.metadata().await?; self.metadata = Some(metadata.clone()); Ok(metadata) } diff --git a/src/puffin/src/file_format/reader/footer.rs b/src/puffin/src/file_format/reader/footer.rs index 95d33251e30f..d0cd1e8ed4f0 100644 --- a/src/puffin/src/file_format/reader/footer.rs +++ b/src/puffin/src/file_format/reader/footer.rs @@ -18,269 +18,14 @@ use common_base::range_read::RangeReader; use snafu::{ensure, ResultExt}; use crate::error::{ - BytesToIntegerSnafu, DeserializeJsonSnafu, InvalidBlobAreaEndSnafu, InvalidBlobOffsetSnafu, - InvalidPuffinFooterSnafu, Lz4DecompressionSnafu, MagicNotMatchedSnafu, ParseStageNotMatchSnafu, + DeserializeJsonSnafu, InvalidPuffinFooterSnafu, Lz4DecompressionSnafu, MagicNotMatchedSnafu, ReadSnafu, Result, UnexpectedFooterPayloadSizeSnafu, }; use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE}; use crate::file_metadata::FileMetadata; -/// Parser for the footer of a Puffin data file -/// -/// The footer has a specific layout that needs to be read and parsed to -/// extract metadata about the file, which is encapsulated in the [`FileMetadata`] type. -/// -/// ```text -/// Footer layout: HeadMagic Payload PayloadSize Flags FootMagic -/// [4] [?] [4] [4] [4] -/// ``` -pub struct FooterParser { - // The underlying IO source - source: R, - - // The size of the file, used for calculating offsets to read from - file_size: u64, -} - -impl FooterParser { - pub fn new(source: R, file_size: u64) -> Self { - Self { source, file_size } - } -} - -impl FooterParser { - /// Parses the footer from the IO source in a asynchronous manner. - pub async fn parse_async(&mut self) -> Result { - let mut parser = StageParser::new(self.file_size); - - let mut buf = vec![]; - while let Some(byte_to_read) = parser.next_to_read() { - buf.clear(); - let range = byte_to_read.offset..byte_to_read.offset + byte_to_read.size; - self.source - .read_into(range, &mut buf) - .await - .context(ReadSnafu)?; - parser.consume_bytes(&buf)?; - } - - parser.finish() - } -} - -/// The internal stages of parsing the footer. -/// This enum allows the StageParser to keep track of which part -/// of the footer needs to be parsed next. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -enum ParseStage { - FootMagic, - Flags, - PayloadSize, - Payload, - HeadMagic, - Done, -} - -/// Manages the parsing process of the file's footer. -struct StageParser { - /// Current stage in the parsing sequence of the footer. - stage: ParseStage, - - /// Total file size; used for calculating offsets to read from. - file_size: u64, - - /// Flags from the footer, set when the `Flags` field is parsed. - flags: Flags, - - /// Size of the footer's payload, set when the `PayloadSize` is parsed. - payload_size: u64, - - /// Metadata from the footer's payload, set when the `Payload` is parsed. - metadata: Option, -} - -/// Represents a read operation that needs to be performed, including the -/// offset from the start of the file and the number of bytes to read. -struct BytesToRead { - offset: u64, - size: u64, -} - -impl StageParser { - fn new(file_size: u64) -> Self { - Self { - stage: ParseStage::FootMagic, - file_size, - payload_size: 0, - flags: Flags::empty(), - metadata: None, - } - } - - /// Determines the next segment of bytes to read based on the current parsing stage. - /// This method returns information like the offset and size of the next read, - /// or None if parsing is complete. - fn next_to_read(&self) -> Option { - if self.stage == ParseStage::Done { - return None; - } - - let btr = match self.stage { - ParseStage::FootMagic => BytesToRead { - offset: self.foot_magic_offset(), - size: MAGIC_SIZE, - }, - ParseStage::Flags => BytesToRead { - offset: self.flags_offset(), - size: FLAGS_SIZE, - }, - ParseStage::PayloadSize => BytesToRead { - offset: self.payload_size_offset(), - size: PAYLOAD_SIZE_SIZE, - }, - ParseStage::Payload => BytesToRead { - offset: self.payload_offset(), - size: self.payload_size, - }, - ParseStage::HeadMagic => BytesToRead { - offset: self.head_magic_offset(), - size: MAGIC_SIZE, - }, - ParseStage::Done => unreachable!(), - }; - - Some(btr) - } - - /// Processes the bytes that have been read according to the current parsing stage - /// and advances the parsing stage. It ensures the correct sequence of bytes is - /// encountered and stores the necessary information in the `StageParser`. - fn consume_bytes(&mut self, bytes: &[u8]) -> Result<()> { - match self.stage { - ParseStage::FootMagic => { - ensure!(bytes == MAGIC, MagicNotMatchedSnafu); - self.stage = ParseStage::Flags; - } - ParseStage::Flags => { - self.flags = Self::parse_flags(bytes)?; - self.stage = ParseStage::PayloadSize; - } - ParseStage::PayloadSize => { - self.payload_size = Self::parse_payload_size(bytes)?; - self.validate_payload_size()?; - self.stage = ParseStage::Payload; - } - ParseStage::Payload => { - self.metadata = Some(self.parse_payload(bytes)?); - self.validate_metadata()?; - self.stage = ParseStage::HeadMagic; - } - ParseStage::HeadMagic => { - ensure!(bytes == MAGIC, MagicNotMatchedSnafu); - self.stage = ParseStage::Done; - } - ParseStage::Done => unreachable!(), - } - - Ok(()) - } - - /// Finalizes the parsing process, ensuring all stages are complete, and returns - /// the parsed `FileMetadata`. It converts the raw footer payload into structured data. - fn finish(self) -> Result { - ensure!( - self.stage == ParseStage::Done, - ParseStageNotMatchSnafu { - expected: format!("{:?}", ParseStage::Done), - actual: format!("{:?}", self.stage), - } - ); - - Ok(self.metadata.unwrap()) - } - - fn parse_flags(bytes: &[u8]) -> Result { - let n = u32::from_le_bytes(bytes.try_into().context(BytesToIntegerSnafu)?); - Ok(Flags::from_bits_truncate(n)) - } - - fn parse_payload_size(bytes: &[u8]) -> Result { - let n = i32::from_le_bytes(bytes.try_into().context(BytesToIntegerSnafu)?); - ensure!(n >= 0, UnexpectedFooterPayloadSizeSnafu { size: n }); - Ok(n as u64) - } - - fn validate_payload_size(&self) -> Result<()> { - ensure!( - self.payload_size <= self.file_size - MIN_FILE_SIZE, - UnexpectedFooterPayloadSizeSnafu { - size: self.payload_size as i32 - } - ); - Ok(()) - } - - fn parse_payload(&self, bytes: &[u8]) -> Result { - if self.flags.contains(Flags::FOOTER_PAYLOAD_COMPRESSED_LZ4) { - let decoder = lz4_flex::frame::FrameDecoder::new(Cursor::new(bytes)); - let res = serde_json::from_reader(decoder).context(Lz4DecompressionSnafu)?; - Ok(res) - } else { - serde_json::from_slice(bytes).context(DeserializeJsonSnafu) - } - } - - fn validate_metadata(&self) -> Result<()> { - let metadata = self.metadata.as_ref().expect("metadata is not set"); - - let mut next_blob_offset = MAGIC_SIZE; - // check blob offsets - for blob in &metadata.blobs { - ensure!( - blob.offset as u64 == next_blob_offset, - InvalidBlobOffsetSnafu { - offset: blob.offset - } - ); - next_blob_offset += blob.length as u64; - } - - let blob_area_end = metadata - .blobs - .last() - .map_or(MAGIC_SIZE, |b| (b.offset + b.length) as u64); - ensure!( - blob_area_end == self.head_magic_offset(), - InvalidBlobAreaEndSnafu { - offset: blob_area_end - } - ); - - Ok(()) - } - - fn foot_magic_offset(&self) -> u64 { - self.file_size - MAGIC_SIZE - } - - fn flags_offset(&self) -> u64 { - self.file_size - MAGIC_SIZE - FLAGS_SIZE - } - - fn payload_size_offset(&self) -> u64 { - self.file_size - MAGIC_SIZE - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - } - - fn payload_offset(&self) -> u64 { - // `validate_payload_size` ensures that this subtraction will not overflow - self.file_size - MAGIC_SIZE - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - self.payload_size - } - - fn head_magic_offset(&self) -> u64 { - // `validate_payload_size` ensures that this subtraction will not overflow - self.file_size - MAGIC_SIZE * 2 - FLAGS_SIZE - PAYLOAD_SIZE_SIZE - self.payload_size - } -} +/// The default prefetch size for the footer reader. +pub const DEFAULT_PREFETCH_SIZE: u64 = 1024; // 1KiB /// Reader for the footer of a Puffin data file /// From 7d17eb02e76b6a6915619ec69b7f2538dcf8a838 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Wed, 11 Dec 2024 17:14:57 +0000 Subject: [PATCH 4/5] chore: remove unused errors --- src/index/src/inverted_index/error.rs | 11 +------ src/puffin/src/error.rs | 45 +-------------------------- 2 files changed, 2 insertions(+), 54 deletions(-) diff --git a/src/index/src/inverted_index/error.rs b/src/index/src/inverted_index/error.rs index 07a42b8b8767..49816e63c463 100644 --- a/src/index/src/inverted_index/error.rs +++ b/src/index/src/inverted_index/error.rs @@ -26,14 +26,6 @@ use crate::inverted_index::search::predicate::Predicate; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { - #[snafu(display("Failed to seek"))] - Seek { - #[snafu(source)] - error: IoError, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to read"))] Read { #[snafu(source)] @@ -215,8 +207,7 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; match self { - Seek { .. } - | Read { .. } + Read { .. } | Write { .. } | Flush { .. } | Close { .. } diff --git a/src/puffin/src/error.rs b/src/puffin/src/error.rs index 0044129f4f01..634ede5b1364 100644 --- a/src/puffin/src/error.rs +++ b/src/puffin/src/error.rs @@ -25,14 +25,6 @@ use snafu::{Location, Snafu}; #[snafu(visibility(pub))] #[stack_trace_debug] pub enum Error { - #[snafu(display("Failed to seek"))] - Seek { - #[snafu(source)] - error: IoError, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to read"))] Read { #[snafu(source)] @@ -119,14 +111,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Failed to convert bytes to integer"))] - BytesToInteger { - #[snafu(source)] - error: std::array::TryFromSliceError, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Unsupported decompression: {}", decompression))] UnsupportedDecompression { decompression: String, @@ -150,14 +134,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Parse stage not match, expected: {}, actual: {}", expected, actual))] - ParseStageNotMatch { - expected: String, - actual: String, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Unexpected footer payload size: {}", size))] UnexpectedFooterPayloadSize { size: i32, @@ -183,20 +159,6 @@ pub enum Error { location: Location, }, - #[snafu(display("Invalid blob offset: {}, location: {:?}", offset, location))] - InvalidBlobOffset { - offset: i64, - #[snafu(implicit)] - location: Location, - }, - - #[snafu(display("Invalid blob area end: {}, location: {:?}", offset, location))] - InvalidBlobAreaEnd { - offset: u64, - #[snafu(implicit)] - location: Location, - }, - #[snafu(display("Failed to compress lz4"))] Lz4Compression { #[snafu(source)] @@ -268,8 +230,7 @@ impl ErrorExt for Error { fn status_code(&self) -> StatusCode { use Error::*; match self { - Seek { .. } - | Read { .. } + Read { .. } | MagicNotMatched { .. } | DeserializeJson { .. } | Write { .. } @@ -281,12 +242,8 @@ impl ErrorExt for Error { | Remove { .. } | Rename { .. } | SerializeJson { .. } - | BytesToInteger { .. } - | ParseStageNotMatch { .. } | UnexpectedFooterPayloadSize { .. } | UnexpectedPuffinFileSize { .. } - | InvalidBlobOffset { .. } - | InvalidBlobAreaEnd { .. } | Lz4Compression { .. } | Lz4Decompression { .. } | BlobNotFound { .. } From 9e482d014a19ab2e65b6f992d59d0bfbc2637bc9 Mon Sep 17 00:00:00 2001 From: WenyXu Date: Thu, 12 Dec 2024 03:30:36 +0000 Subject: [PATCH 5/5] chore: apply suggestions from CR --- src/puffin/src/error.rs | 9 ++++++++- src/puffin/src/file_format/reader/footer.rs | 4 +++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/src/puffin/src/error.rs b/src/puffin/src/error.rs index 634ede5b1364..d9247fefea3a 100644 --- a/src/puffin/src/error.rs +++ b/src/puffin/src/error.rs @@ -224,6 +224,12 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Puffin file size too small"))] + PuffinFileSizeTooSmall { + #[snafu(implicit)] + location: Location, + }, } impl ErrorExt for Error { @@ -250,7 +256,8 @@ impl ErrorExt for Error { | BlobIndexOutOfBound { .. } | FileKeyNotMatch { .. } | WalkDir { .. } - | InvalidPuffinFooter { .. } => StatusCode::Unexpected, + | InvalidPuffinFooter { .. } + | PuffinFileSizeTooSmall { .. } => StatusCode::Unexpected, UnsupportedCompression { .. } | UnsupportedDecompression { .. } => { StatusCode::Unsupported diff --git a/src/puffin/src/file_format/reader/footer.rs b/src/puffin/src/file_format/reader/footer.rs index d0cd1e8ed4f0..0ec5f50f7e7e 100644 --- a/src/puffin/src/file_format/reader/footer.rs +++ b/src/puffin/src/file_format/reader/footer.rs @@ -19,7 +19,7 @@ use snafu::{ensure, ResultExt}; use crate::error::{ DeserializeJsonSnafu, InvalidPuffinFooterSnafu, Lz4DecompressionSnafu, MagicNotMatchedSnafu, - ReadSnafu, Result, UnexpectedFooterPayloadSizeSnafu, + PuffinFileSizeTooSmallSnafu, ReadSnafu, Result, UnexpectedFooterPayloadSizeSnafu, }; use crate::file_format::{Flags, FLAGS_SIZE, MAGIC, MAGIC_SIZE, MIN_FILE_SIZE, PAYLOAD_SIZE_SIZE}; use crate::file_metadata::FileMetadata; @@ -67,6 +67,8 @@ impl<'a, R: RangeReader + 'a> PuffinFileFooterReader { } pub async fn metadata(&'a mut self) -> Result { + ensure!(self.file_size >= MIN_FILE_SIZE, PuffinFileSizeTooSmallSnafu); + // Note: prefetch > content_len is allowed, since we're using saturating_sub. let footer_start = self.file_size.saturating_sub(self.prefetch_size()); let suffix = self