From 6ffae14fe63c0c75eae70c9725be7b063966c022 Mon Sep 17 00:00:00 2001 From: Pavel Ivanov Date: Wed, 11 Sep 2024 07:39:26 +0200 Subject: [PATCH] fix: fixed scanning after maximum segment size exceeded --- Cargo.lock | 2 +- Cargo.toml | 8 ++++---- src/app.rs | 45 +++++++++++++++++++++++++++++++++++++++++++++ src/scanning.rs | 27 ++++++++++++++++++++++++++- 4 files changed, 76 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 61ebf694..6357d487 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -824,7 +824,7 @@ checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" [[package]] name = "hl" -version = "0.29.8" +version = "0.29.9-alpha.1" dependencies = [ "bincode", "byte-strings", diff --git a/Cargo.toml b/Cargo.toml index 084814b9..08c9c20a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ members = [".", "crate/encstr"] [workspace.package] repository = "https://github.com/pamburus/hl" authors = ["Pavel Ivanov "] -version = "0.29.8" +version = "0.29.9-alpha.1" edition = "2021" license = "MIT" @@ -21,7 +21,7 @@ license.workspace = true version.workspace = true [build-dependencies] -capnpc = "0.19" +capnpc = "0" hex = "0" serde = { version = "1", features = ["derive"] } serde_json = { version = "1", features = ["raw_value"] } @@ -30,8 +30,8 @@ sha2 = "0" [dependencies] bincode = "1" bytefmt = "0" -capnp = "0.19" -chrono = { version = "0.4", default-features = false, features = ["clock", "serde", "std"] } +capnp = "0" +chrono = { version = "0", default-features = false, features = ["clock", "serde", "std"] } chrono-tz = { version = "0", features = ["serde"] } clap = { version = "4", features = ["wrap_help", "derive", "env", "string"] } clap_complete = "4" diff --git a/src/app.rs b/src/app.rs index 307a9f6f..bd3bfc50 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1340,6 +1340,51 @@ mod tests { ); } + #[test] + fn test_incomplete_segment() { + let input = input(concat!( + "level=debug time=2024-01-25T19:10:20.435369+01:00 msg=m1 a.b.c=10 a.b.d=20 a.c.b=11\n", + "level=debug time=2024-01-25T19:10:21.764733+01:00 msg=m2 x=2\n" + )); + + let mut output = Vec::new(); + let app = App::new(Options { + buffer_size: NonZeroUsize::new(32).unwrap(), + max_message_size: NonZeroUsize::new(64).unwrap(), + ..options() + }); + app.run(vec![input], &mut output).unwrap(); + assert_eq!( + std::str::from_utf8(&output).unwrap(), + concat!( + "level=debug time=2024-01-25T19:10:20.435369+01:00 msg=m1 a.b.c=10 a.b.d=20 a.c.b=11\n", + "2024-01-25 18:10:21.764 |DBG| m2 x=2\n", + ) + ); + } + + #[test] + fn test_incomplete_segment_sorted() { + let data = concat!( + "level=debug time=2024-01-25T19:10:20.435369+01:00 msg=m1 a.b.c=10 a.b.d=20 a.c.b=11\n", + "level=debug time=2024-01-25T19:10:21.764733+01:00 msg=m2 x=2\n", + ); + let input = input(data); + + let mut output = Vec::new(); + let app = App::new(Options { + buffer_size: NonZeroUsize::new(16).unwrap(), + max_message_size: NonZeroUsize::new(64).unwrap(), + sort: true, + ..options() + }); + app.run(vec![input], &mut output).unwrap(); + assert_eq!( + std::str::from_utf8(&output).unwrap(), + "2024-01-25 18:10:21.764 |DBG| m2 x=2\n" + ); + } + #[test] fn test_issue_288_t1() { let input = input(concat!( diff --git a/src/scanning.rs b/src/scanning.rs index e2d4f032..476fbdb0 100644 --- a/src/scanning.rs +++ b/src/scanning.rs @@ -812,6 +812,9 @@ impl<'a, 'b, D: Delimit> Iterator for ScannerJumboIter<'a, 'b, D> { if self.can_complete() { return self.complete(); } + if placement == PartialPlacement::Last { + break; + } } next @ Some(_) => { self.next = next; @@ -824,7 +827,7 @@ impl<'a, 'b, D: Delimit> Iterator for ScannerJumboIter<'a, 'b, D> { break; } }; - if total > self.max_segment_size { + if total >= self.max_segment_size { break; } } @@ -1045,6 +1048,28 @@ mod tests { ) } + #[test] + fn test_jumbo_smart_new_line_2() { + let sf = Arc::new(SegmentBufFactory::new(3)); + let scanner = Scanner::new(sf.clone(), SmartNewLine); + let mut data = std::io::Cursor::new(b"test token\r\neof\r\n"); + let tokens = scanner + .items(&mut data) + .with_max_segment_size(9) + .collect::>>() + .unwrap(); + assert_eq!( + tokens, + vec![ + Segment::Incomplete(b"tes".into(), PartialPlacement::First), + Segment::Incomplete(b"t t".into(), PartialPlacement::Next), + Segment::Incomplete(b"oke".into(), PartialPlacement::Next), + Segment::Incomplete(b"n\r\n".into(), PartialPlacement::Last), + Segment::Complete(b"eof\r\n".into()), + ] + ) + } + #[test] fn test_buf_factory_recycle() { let factory = BufFactory::new(10);