diff --git a/Cargo.lock b/Cargo.lock index 74deb2d..e498033 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -56,8 +56,6 @@ version = "0.2.3" dependencies = [ "bson", "byteorder", - "maybe-async", - "tokio", ] [[package]] @@ -554,14 +552,12 @@ dependencies = [ ] [[package]] -name = "maybe-async" -version = "0.2.10" +name = "matchers" +version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cf92c10c7e361d6b99666ec1c6f9805b0bea2c3bd8c78dc6fe98ac5bd78db11" +checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" dependencies = [ - "proc-macro2", - "quote", - "syn", + "regex-automata 0.1.10", ] [[package]] @@ -598,11 +594,12 @@ dependencies = [ "async-bson", "bson", "byteorder", + "bytes", "lazy_static", - "maybe-async", "prometheus", "tokio", "tracing", + "tracing-test", ] [[package]] @@ -629,7 +626,17 @@ dependencies = [ "tracing", "tracing-appender", "tracing-futures", - "tracing-subscriber", + "tracing-subscriber 0.2.25", +] + +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", ] [[package]] @@ -707,6 +714,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "percent-encoding" version = "2.3.1" @@ -1178,7 +1191,7 @@ checksum = "9965507e507f12c8901432a33e31131222abac31edd90cabbcf85cf544b7127a" dependencies = [ "chrono", "crossbeam-channel", - "tracing-subscriber", + "tracing-subscriber 0.2.25", ] [[package]] @@ -1223,6 +1236,17 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -1242,7 +1266,7 @@ dependencies = [ "ansi_term", "chrono", "lazy_static", - "matchers", + "matchers 0.0.1", "regex", "serde", "serde_json", @@ -1251,10 +1275,49 @@ dependencies = [ "thread_local", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.1.4", "tracing-serde", ] +[[package]] +name = "tracing-subscriber" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" +dependencies = [ + "matchers 0.1.0", + "nu-ansi-term", + "once_cell", + "regex", + "sharded-slab", + "smallvec", + "thread_local", + "tracing", + "tracing-core", + "tracing-log 0.2.0", +] + +[[package]] +name = "tracing-test" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" +dependencies = [ + "tracing-core", + "tracing-subscriber 0.3.18", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index f166c5b..306570d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,10 +6,5 @@ members = [ "async-bson", ] -# WASM library depends on sync version of async-bson and mongo-protocol. This -# conflicts with the proxy, so exclude it from the workspace to avoid build -# errors. -exclude = [ "envoy-wasm" ] - [profile.release] debug = false diff --git a/async-bson/Cargo.toml b/async-bson/Cargo.toml index 69cc5fe..1a244d2 100644 --- a/async-bson/Cargo.toml +++ b/async-bson/Cargo.toml @@ -4,14 +4,8 @@ version = "0.2.3" authors = ["mpihlak "] edition = "2018" -[features] -default = [ "tokio/io-util", "tokio/macros", "tokio/rt-multi-thread" ] -is_sync = [ "maybe-async/is_sync", "byteorder" ] - [dependencies] -maybe-async = { version = "0.2" } -tokio = { version = "1.34", default-features = false, optional = true } -byteorder = { version = "1.3", optional = true } +byteorder = { version = "1.3" } [dev-dependencies] bson = '1.1' diff --git a/async-bson/src/lib.rs b/async-bson/src/lib.rs index aad728f..fb66125 100644 --- a/async-bson/src/lib.rs +++ b/async-bson/src/lib.rs @@ -1,42 +1,26 @@ -//! An asynchronous BSON parser that only parses explicitly specified subset of fields. +//! A very basic BSON parser that only parses explicitly specified subset of fields. //! Useful for extracting a handful of fields from a larger document. //! //! It works by having the caller initialize a `DocumentParser`, specifying the fields //! to be extracted. Then calling `parse_document` with a stream the parser goes through the input, -//! extracting the specified elements and ignoring the rest. The input can be an async stream -//! or a buffer. In case of an async stream the parser simply yields the task when no more input -//! is available. If a buffer is passed, it expects the whole message to be there. +//! extracting the specified elements and ignoring the rest. //! -//! The emphasis on **asynchronous** -- this is a streaming parser that does not require the whole -//! BSON to be loaded into memory. As such it uses less memory than a conventional parser, but -//! does use extra CPU for pulling the bytes from a stream. -//! -//! The parser can be also used in synchronous mode by specifying `is_sync` feature. This is -//! useful in when we want to have a really fast parser and don't care about the memory overhead -//! of buffering the whole message. This is also useful in cases when don't want (or can't) use -//! an async runtime (i.e. in a WASM sandbox). -//! -//! ```text -//! [dependencies] -//! async-bson = { version = "0.2", features = ["is_sync"] } -//! ``` -//! -//! The default is to use the asynchronous parser. +//! The original motivation was to be able to have a streaming parser that works on +//! a stream without requiring the whole buffer to be passed in. Conceptually it still is a +//! streaming parser, except that the async IO support has been removed. //! //! # Example: //! //! ``` //! use async_bson::{DocumentParser, Document}; //! -//! #[tokio::main] -//! -//! async fn main() { +//! fn test_parse_message() { //! // This is our BSON "stream" //! let buf = b"\x16\x00\x00\x00\x02hello\x00\x06\x00\x00\x00world\x00\x00"; //! //! // Parse the value of /hello, storing the value under "foo" //! let parser = DocumentParser::builder().match_exact("/hello", "foo"); -//! let doc = parser.parse_document(&buf[..]).await.unwrap(); +//! let doc = parser.parse_document(&buf[..]).unwrap(); //! //! assert_eq!("world", doc.get_str("foo").unwrap()); //! } @@ -48,32 +32,10 @@ use std::io::{Error, ErrorKind}; use std::io::{Cursor, Result}; use std::collections::{HashMap, HashSet}; -#[cfg(not(feature="is_sync"))] -use { - tokio::io::{AsyncReadExt, AsyncBufReadExt}, - std::future::Future, - std::pin::Pin, -}; - -#[cfg(not(feature="is_sync"))] -type ParserResult<'a> = Pin> + 'a + Send>>; -#[cfg(not(feature="is_sync"))] -fn pin_maybe(v: T) -> Pin> { Box::pin(v) } - -#[cfg(feature="is_sync")] type ParserResult<'a> = Result<()>; -#[cfg(feature="is_sync")] -fn pin_maybe(v: T) -> T { v } - -#[cfg(not(feature="is_sync"))] -pub trait DocumentReader: AsyncReadExt+AsyncBufReadExt+Unpin+Send {} -#[cfg(not(feature="is_sync"))] -impl DocumentReader for T where T: AsyncReadExt+AsyncBufReadExt+Unpin+Send {} -#[cfg(feature="is_sync")] use byteorder::{LittleEndian, ReadBytesExt}; -#[cfg(feature="is_sync")] pub trait DocumentReader: std::io::BufRead { fn read_i32_le(&mut self) -> Result { self.read_i32::() @@ -92,7 +54,6 @@ pub trait DocumentReader: std::io::BufRead { } } -#[cfg(feature="is_sync")] impl DocumentReader for T where T: std::io::BufRead {} /// Async parser that extracts BSON fields into a Document. @@ -136,7 +97,7 @@ impl Matcher { } } -/// Parse a BSON document from an async reader. +/// Parse subset of a BSON document from a reader. /// /// The parser is initialized with a set of matching patterns that specify which elements to /// extract from the stream. During parsing it matches those patterns against the BSON stream and @@ -272,17 +233,15 @@ impl<'a> DocumentParser<'a> { /// Collect a new document from byte stream. /// Only the elements specified with matching patterns are collected, the /// rest is simply discarded. - #[maybe_async::maybe_async] - pub async fn parse_document<'b, R: DocumentReader>( + pub fn parse_document( &self, rdr: R, ) -> Result { - self.parse_document_keep_bytes(rdr, self.keep_bytes).await + self.parse_document_keep_bytes(rdr, self.keep_bytes) } /// Collect a new document from a byte stream, with additional options. - #[maybe_async::maybe_async] - pub async fn parse_document_keep_bytes<'b, R: DocumentReader>( + pub fn parse_document_keep_bytes( &self, mut rdr: R, keep_bytes: bool, @@ -291,7 +250,7 @@ impl<'a> DocumentParser<'a> { let starting_prefix = ""; let starting_matcher = self.get_matcher(starting_prefix); - let document_size = rdr.read_i32_le().await?; + let document_size = rdr.read_i32_le()?; if keep_bytes || self.keep_bytes { let length_bytes = document_size.to_le_bytes(); @@ -300,12 +259,12 @@ impl<'a> DocumentParser<'a> { // Put the length back so that the caller has the whole BSON buf[..length_bytes.len()].copy_from_slice(&length_bytes); - rdr.read_exact(&mut buf[4..]).await?; + rdr.read_exact(&mut buf[4..])?; // Use a Cursor to detect partial parses let mut cur = Cursor::new(&buf[..]); cur.set_position(4); - self.parse_internal(&mut cur, starting_prefix, 0, starting_matcher, &mut doc).await?; + self.parse_internal(&mut cur, starting_prefix, 0, starting_matcher, &mut doc)?; let remaining_bytes = document_size as u64 - cur.position(); if remaining_bytes > 0 { @@ -314,7 +273,7 @@ impl<'a> DocumentParser<'a> { doc.raw_bytes = Some(buf); } else { - self.parse_internal(&mut rdr, starting_prefix, 0, starting_matcher, &mut doc).await?; + self.parse_internal(&mut rdr, starting_prefix, 0, starting_matcher, &mut doc)?; } Ok(doc) @@ -363,7 +322,6 @@ impl<'a> DocumentParser<'a> { self.match_prefixes.contains(prefix) } - #[maybe_async::maybe_async] fn parse_internal<'x, R: DocumentReader + 'x>( &'x self, mut rdr: &'x mut R, @@ -373,192 +331,190 @@ impl<'a> DocumentParser<'a> { doc: &'x mut Document, ) -> ParserResult<'x> { - pin_maybe(async move { - let mut position = position; + let mut position = position; - loop { - position += 1; + loop { + position += 1; - let elem_type = rdr.read_u8().await?; + let elem_type = rdr.read_u8()?; - if elem_type == 0x00 { - break; - } - - let elem_name = read_cstring(&mut rdr).await?; - let prefix_name = format!("{}/{}", prefix, elem_name); + if elem_type == 0x00 { + break; + } - // We have 2 matchers - one that matches elements by prefix and position - // and another that matches the exact element name. Note: that when we - // recurse the exact matcher becomes the prefix matcher, thus we just - // pass it along to avoid a lookup. - let exact_matcher = self.get_matcher(&prefix_name); + let elem_name = read_cstring(&mut rdr)?; + let prefix_name = format!("{}/{}", prefix, elem_name); - let mut want_this_value = false; + // We have 2 matchers - one that matches elements by prefix and position + // and another that matches the exact element name. Note: that when we + // recurse the exact matcher becomes the prefix matcher, thus we just + // pass it along to avoid a lookup. + let exact_matcher = self.get_matcher(&prefix_name); - // Match for array length and element name. This will not use the matcher - // for the current element but instead need to use the matcher for its - // parent. - if let Some(matcher) = prefix_matcher { - if let Some(ref label) = matcher.match_array_len { - doc.insert(label.clone(), BsonValue::Int32(position as i32)); - } + let mut want_this_value = false; - if let Some((ref label, pos)) = matcher.match_name_at_pos { - if pos == position { - doc.insert(label.clone(), BsonValue::String(elem_name.to_string())); - } - } + // Match for array length and element name. This will not use the matcher + // for the current element but instead need to use the matcher for its + // parent. + if let Some(matcher) = prefix_matcher { + if let Some(ref label) = matcher.match_array_len { + doc.insert(label.clone(), BsonValue::Int32(position as i32)); + } - if matcher.match_value_at_pos.is_some() { - // Yes we want the value, by position - want_this_value = true; + if let Some((ref label, pos)) = matcher.match_name_at_pos { + if pos == position { + doc.insert(label.clone(), BsonValue::String(elem_name.to_string())); } } - if let Some(matcher) = exact_matcher { - // Yes, we want the value - want_this_value = want_this_value - || matcher.match_exact.is_some() || matcher.match_array_len.is_some(); + if matcher.match_value_at_pos.is_some() { + // Yes we want the value, by position + want_this_value = true; } + } - let elem_value = match elem_type { - 0x01 => { - // A float - let mut buf = [0_u8; 8]; - rdr.read_exact(&mut buf).await?; - BsonValue::Float(f64::from_le_bytes(buf)) - } - 0x02 => { - // String - let str_len = rdr.read_i32_le().await?; - if want_this_value { - BsonValue::String(read_string_with_len(&mut rdr, str_len as usize).await?) - } else { - skip_bytes(&mut rdr, str_len as usize).await?; - BsonValue::None - } - } - 0x03 | 0x04 => { - // Embedded document or an array. Both are represented as a document. - // We only go through the trouble of parsing this if the field selector - // wants the document value or some element within it. - let doc_len = rdr.read_i32_le().await?; - - if want_this_value || self.want_prefix(&prefix_name) { - self.parse_internal(rdr, &prefix_name, 0, exact_matcher, doc).await?; - BsonValue::Placeholder("") - } else { - skip_bytes(&mut rdr, doc_len as usize - 4).await?; - BsonValue::None - } - } - 0x05 => { - // Binary data - let len = rdr.read_i32_le().await?; - skip_bytes(&mut rdr, (len + 1) as usize).await?; - BsonValue::Placeholder("") - } - 0x06 => { - // Undefined value. Deprecated. - BsonValue::None - } - 0x07 => { - let mut bytes = [0_u8; 12]; - rdr.read_exact(&mut bytes).await?; - BsonValue::ObjectId(bytes) - } - 0x08 => { - // Boolean - let val = rdr.read_u8().await?; - BsonValue::Boolean(val != 0x00) - } - 0x09 => { - // UTC Datetime - skip_bytes(&mut rdr, 8).await?; - BsonValue::Placeholder("") - } - 0x0A => { - // Null value + if let Some(matcher) = exact_matcher { + // Yes, we want the value + want_this_value = want_this_value + || matcher.match_exact.is_some() || matcher.match_array_len.is_some(); + } + + let elem_value = match elem_type { + 0x01 => { + // A float + let mut buf = [0_u8; 8]; + rdr.read_exact(&mut buf)?; + BsonValue::Float(f64::from_le_bytes(buf)) + } + 0x02 => { + // String + let str_len = rdr.read_i32_le()?; + if want_this_value { + BsonValue::String(read_string_with_len(&mut rdr, str_len as usize)?) + } else { + skip_bytes(&mut rdr, str_len as usize)?; BsonValue::None } - 0x0B => { - // Regular expression - let _regx = read_cstring(&mut rdr).await?; - let _opts = read_cstring(&mut rdr).await?; - BsonValue::Placeholder("") - } - 0x0C => { - // DBPointer. Deprecated. - let len = rdr.read_i32_le().await?; - skip_bytes(&mut rdr, (len + 12) as usize).await?; + } + 0x03 | 0x04 => { + // Embedded document or an array. Both are represented as a document. + // We only go through the trouble of parsing this if the field selector + // wants the document value or some element within it. + let doc_len = rdr.read_i32_le()?; + + if want_this_value || self.want_prefix(&prefix_name) { + self.parse_internal(rdr, &prefix_name, 0, exact_matcher, doc)?; + BsonValue::Placeholder("") + } else { + skip_bytes(&mut rdr, doc_len as usize - 4)?; BsonValue::None } - 0x0D => { - // Javascript code - skip_read_len(&mut rdr).await?; - BsonValue::Placeholder("") - } - 0x0E => { - // Symbol. Deprecated. - skip_read_len(&mut rdr).await?; - BsonValue::Placeholder("") - } - 0x0F => { - // Code w/ scope - skip_read_len(&mut rdr).await?; - BsonValue::Placeholder("") - } - 0x10 => { - // Int32 - BsonValue::Int32(rdr.read_i32_le().await?) - } - 0x11 => { - // Timestamp - skip_bytes(&mut rdr, 8).await?; - BsonValue::Placeholder("") - } - 0x12 => { - // Int64 - BsonValue::Int64(rdr.read_i64_le().await?) - } - 0x13 => { - // Decimal128 - skip_bytes(&mut rdr, 16).await?; - BsonValue::Placeholder("") - } - 0xFF => { - // Min key. - BsonValue::Placeholder("") - } - 0x7F => { - // Min key. - BsonValue::Placeholder("") - } - other => { - return Err(Error::new( - ErrorKind::Other, - format!("BSON: unrecognized type: 0x{:02x}", other), - )); - } - }; + } + 0x05 => { + // Binary data + let len = rdr.read_i32_le()?; + skip_bytes(&mut rdr, (len + 1) as usize)?; + BsonValue::Placeholder("") + } + 0x06 => { + // Undefined value. Deprecated. + BsonValue::None + } + 0x07 => { + let mut bytes = [0_u8; 12]; + rdr.read_exact(&mut bytes)?; + BsonValue::ObjectId(bytes) + } + 0x08 => { + // Boolean + let val = rdr.read_u8()?; + BsonValue::Boolean(val != 0x00) + } + 0x09 => { + // UTC Datetime + skip_bytes(&mut rdr, 8)?; + BsonValue::Placeholder("") + } + 0x0A => { + // Null value + BsonValue::None + } + 0x0B => { + // Regular expression + let _regx = read_cstring(&mut rdr)?; + let _opts = read_cstring(&mut rdr)?; + BsonValue::Placeholder("") + } + 0x0C => { + // DBPointer. Deprecated. + let len = rdr.read_i32_le()?; + skip_bytes(&mut rdr, (len + 12) as usize)?; + BsonValue::None + } + 0x0D => { + // Javascript code + skip_read_len(&mut rdr)?; + BsonValue::Placeholder("") + } + 0x0E => { + // Symbol. Deprecated. + skip_read_len(&mut rdr)?; + BsonValue::Placeholder("") + } + 0x0F => { + // Code w/ scope + skip_read_len(&mut rdr)?; + BsonValue::Placeholder("") + } + 0x10 => { + // Int32 + BsonValue::Int32(rdr.read_i32_le()?) + } + 0x11 => { + // Timestamp + skip_bytes(&mut rdr, 8)?; + BsonValue::Placeholder("") + } + 0x12 => { + // Int64 + BsonValue::Int64(rdr.read_i64_le()?) + } + 0x13 => { + // Decimal128 + skip_bytes(&mut rdr, 16)?; + BsonValue::Placeholder("") + } + 0xFF => { + // Min key. + BsonValue::Placeholder("") + } + 0x7F => { + // Min key. + BsonValue::Placeholder("") + } + other => { + return Err(Error::new( + ErrorKind::Other, + format!("BSON: unrecognized type: 0x{:02x}", other), + )); + } + }; - if let Some(matcher) = prefix_matcher { - if let Some((ref label, pos)) = matcher.match_value_at_pos { - if pos == position { - doc.insert(label.clone(), elem_value.clone()); - } + if let Some(matcher) = prefix_matcher { + if let Some((ref label, pos)) = matcher.match_value_at_pos { + if pos == position { + doc.insert(label.clone(), elem_value.clone()); } } + } - if let Some(matcher) = exact_matcher { - if let Some(ref label) = matcher.match_exact { - doc.insert(label.clone(), elem_value); - } + if let Some(matcher) = exact_matcher { + if let Some(ref label) = matcher.match_exact { + doc.insert(label.clone(), elem_value); } } - Ok(()) - }) + } + Ok(()) } } @@ -688,27 +644,24 @@ impl Document { } } -#[maybe_async::maybe_async] -async fn skip_bytes(rdr: &mut T, bytes_to_skip: usize) -> Result { +fn skip_bytes(rdr: &mut T, bytes_to_skip: usize) -> Result { let mut buf = vec![0u8; bytes_to_skip]; let bytes_read = buf.len(); - rdr.read_exact(&mut buf).await?; + rdr.read_exact(&mut buf)?; Ok(bytes_read) } -#[maybe_async::maybe_async] -async fn skip_read_len(rdr: &mut T) -> Result { - let str_len = rdr.read_i32_le().await?; - skip_bytes(rdr, str_len as usize).await +fn skip_read_len(rdr: &mut T) -> Result { + let str_len = rdr.read_i32_le()?; + skip_bytes(rdr, str_len as usize) } -/// Read a null terminated string from async stream. -#[maybe_async::maybe_async] -pub async fn read_cstring(rdr: &mut R) -> Result { +/// Read a null terminated string from stream. +pub fn read_cstring(rdr: &mut R) -> Result { let mut bytes = Vec::new(); - rdr.read_until(0, &mut bytes).await?; + rdr.read_until(0, &mut bytes)?; let _ = bytes.pop(); // Drop the trailing zero if let Ok(res) = String::from_utf8(bytes) { @@ -718,10 +671,9 @@ pub async fn read_cstring(rdr: &mut R) -> Result { Err(Error::new(ErrorKind::Other, "cstring conversion error")) } -#[maybe_async::maybe_async] -async fn read_string_with_len(rdr: &mut R, str_len: usize) -> Result { +fn read_string_with_len(rdr: &mut R, str_len: usize) -> Result { let mut buf = vec![0u8; str_len]; - rdr.read_exact(&mut buf).await?; + rdr.read_exact(&mut buf)?; // Remove the trailing null, we won't need it let _ = buf.pop(); @@ -739,8 +691,8 @@ mod tests { use super::*; use bson::doc; - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] - async fn test_parse_bson() { + #[test] + fn test_parse_bson() { let doc = doc! { "first": "foo", @@ -776,7 +728,7 @@ mod tests { .match_value_at("/deeply/nested/array", 1, "array_first") .match_exact("/nested/monkey/name", "monkey"); - let doc = parser.parse_document(&buf[..]).await.unwrap(); + let doc = parser.parse_document(&buf[..]).unwrap(); assert_eq!("first", doc.get_str("first_elem_name").unwrap()); assert_eq!("foo", doc.get_str("first_elem_value").unwrap()); @@ -790,8 +742,8 @@ mod tests { assert_eq!(9, doc.len()); } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] - async fn test_multiple_docs() { + #[test] + fn test_multiple_docs() { let mut buf = Vec::new(); let doc = doc! { @@ -810,18 +762,18 @@ mod tests { for keep_bytes in vec![true, false] { let mut cursor = Cursor::new(&buf[..]); - let doc = parser.parse_document_keep_bytes(&mut cursor, keep_bytes).await.unwrap(); + let doc = parser.parse_document_keep_bytes(&mut cursor, keep_bytes).unwrap(); assert_eq!(1, doc.get_i32("foo").unwrap()); - let doc = parser.parse_document_keep_bytes(&mut cursor, keep_bytes).await.unwrap(); + let doc = parser.parse_document_keep_bytes(&mut cursor, keep_bytes).unwrap(); assert_eq!(2, doc.get_i32("bar").unwrap()); assert_eq!(buf.len(), cursor.position() as usize); } } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] - async fn test_nested_array() { + #[test] + fn test_nested_array() { let doc = doc! { "f": doc! { "array": [ @@ -840,22 +792,22 @@ mod tests { .match_exact("/f/array/0/foo", "b") .match_exact("/f/array/2/baz", "c"); - let doc = parser.parse_document(&buf[..]).await.unwrap(); + let doc = parser.parse_document(&buf[..]).unwrap(); assert_eq!(3, doc.get_i32("a").unwrap()); assert_eq!(42, doc.get_i32("b").unwrap()); assert_eq!(44, doc.get_i32("c").unwrap()); } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] - async fn test_keep_bytes() { + #[test] + fn test_keep_bytes() { let buf = b"\x16\x00\x00\x00\x02hello\x00\x06\x00\x00\x00world\x00\x00"; let parser = DocumentParser::builder() .match_exact("/hello", "foo") .keep_bytes(true); - let doc = parser.parse_document(&buf[..]).await.unwrap(); + let doc = parser.parse_document(&buf[..]).unwrap(); assert_eq!(buf, doc.get_raw_bytes().unwrap().as_slice()); } @@ -865,9 +817,9 @@ mod tests { // This is an expensive benchmark, ignore this by default // Run with: time cargo test -- --ignored - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] + #[test] #[ignore] - async fn benchmark_parser() { + fn benchmark_parser() { const NUM_ITERATIONS: i32 = 100_000; let doc = doc! { @@ -893,18 +845,18 @@ mod tests { println!("Parsing a {} byte document {} times.", buf.len(), NUM_ITERATIONS); for _ in 1..NUM_ITERATIONS { - let _ = parser.parse_document(&buf[..]).await.unwrap(); + let _ = parser.parse_document(&buf[..]).unwrap(); } } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] - async fn test_read_cstring() { + #[test] + fn test_read_cstring() { let buf = b"kala\0"; - let res = read_cstring(&mut Cursor::new(&buf[..])).await.unwrap(); + let res = read_cstring(&mut Cursor::new(&buf[..])).unwrap(); assert_eq!(res, "kala"); let buf = b"\0"; - let res = read_cstring(&mut Cursor::new(&buf[..])).await.unwrap(); + let res = read_cstring(&mut Cursor::new(&buf[..])).unwrap(); assert_eq!(res, ""); } } diff --git a/envoy-wasm/.cargo/config.toml b/envoy-wasm/.cargo/config.toml deleted file mode 100644 index c0509e6..0000000 --- a/envoy-wasm/.cargo/config.toml +++ /dev/null @@ -1,2 +0,0 @@ -[build] -#target = "wasm32-unknown-unknown" diff --git a/envoy-wasm/.gitignore b/envoy-wasm/.gitignore deleted file mode 100644 index 2f7896d..0000000 --- a/envoy-wasm/.gitignore +++ /dev/null @@ -1 +0,0 @@ -target/ diff --git a/envoy-wasm/Cargo.lock b/envoy-wasm/Cargo.lock deleted file mode 100644 index b8ac0c9..0000000 --- a/envoy-wasm/Cargo.lock +++ /dev/null @@ -1,705 +0,0 @@ -# This file is automatically @generated by Cargo. -# It is not intended for manual editing. -version = 3 - -[[package]] -name = "ahash" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6789e291be47ace86a60303502173d84af8327e3627ecf334356ee0f87a164c" - -[[package]] -name = "aho-corasick" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67fc08ce920c31afb70f013dcce1bfc3a3195de6a228474e45e1f145b36f8d04" -dependencies = [ - "memchr", -] - -[[package]] -name = "ansi_term" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52a9bb7ec0cf484c551830a7ce27bd20d67eac647e1befb56b0be4ee39a55d2" -dependencies = [ - "winapi", -] - -[[package]] -name = "async-bson" -version = "0.2.3" -dependencies = [ - "byteorder", - "maybe-async", - "tokio", -] - -[[package]] -name = "autocfg" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" - -[[package]] -name = "base64" -version = "0.12.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" - -[[package]] -name = "bson" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c11f16001d679cb13d14b2c93c7d0fa13bb484a87c34a6c4c39707ad936499b5" -dependencies = [ - "base64", - "chrono", - "hex", - "lazy_static", - "linked-hash-map", - "rand", - "serde", - "serde_json", -] - -[[package]] -name = "byteorder" -version = "1.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" - -[[package]] -name = "bytes" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" - -[[package]] -name = "cc" -version = "1.0.65" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95752358c8f7552394baf48cd82695b345628ad3f170d607de3ca03b8dacca15" - -[[package]] -name = "cfg-if" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" - -[[package]] -name = "cfg-if" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" - -[[package]] -name = "chrono" -version = "0.4.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "670ad68c9088c2a963aaa298cb369688cf3f9465ce5e2d4ca10e6e0098a1ce73" -dependencies = [ - "libc", - "num-integer", - "num-traits", - "time", - "winapi", -] - -[[package]] -name = "envoy-wasm" -version = "0.1.0" -dependencies = [ - "bson", - "log", - "mongo-protocol", - "proxy-wasm", - "regex", - "tracing", - "tracing-subscriber", -] - -[[package]] -name = "fnv" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" - -[[package]] -name = "generator" -version = "0.6.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8cdc09201b2e8ca1b19290cf7e65de2246b8e91fb6874279722189c4de7b94dc" -dependencies = [ - "cc", - "libc", - "log", - "rustc_version", - "winapi", -] - -[[package]] -name = "getrandom" -version = "0.1.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc587bc0ec293155d5bfa6b9891ec18a1e330c234f896ea47fbada4cadbe47e6" -dependencies = [ - "cfg-if 0.1.10", - "libc", - "wasi 0.9.0+wasi-snapshot-preview1", -] - -[[package]] -name = "hashbrown" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" -dependencies = [ - "ahash", -] - -[[package]] -name = "hermit-abi" -version = "0.1.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62b467343b94ba476dcb2500d242dadbb39557df889310ac77c5d99100aaac33" -dependencies = [ - "libc", -] - -[[package]] -name = "hex" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35" - -[[package]] -name = "indexmap" -version = "1.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55e2e4c765aa53a0424761bf9f41aa7a6ac1efa87238f59560640e27fca028f2" -dependencies = [ - "autocfg", - "hashbrown", -] - -[[package]] -name = "itoa" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc6f3ad7b9d11a0c00842ff8de1b60ee58661048eb8049ed33c73594f359d7e6" - -[[package]] -name = "lazy_static" -version = "1.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" - -[[package]] -name = "libc" -version = "0.2.80" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614" - -[[package]] -name = "linked-hash-map" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8dd5a6d5999d9907cda8ed67bbd137d3af8085216c2ac62de5be860bd41f304a" - -[[package]] -name = "log" -version = "0.4.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b" -dependencies = [ - "cfg-if 0.1.10", -] - -[[package]] -name = "loom" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a0e8460f2f2121162705187214720353c517b97bdfb3494c0b1e33d83ebe4bed" -dependencies = [ - "cfg-if 0.1.10", - "generator", - "scoped-tls", - "serde", - "serde_json", -] - -[[package]] -name = "matchers" -version = "0.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f099785f7595cc4b4553a174ce30dd7589ef93391ff414dbb67f62392b9e0ce1" -dependencies = [ - "regex-automata", -] - -[[package]] -name = "maybe-async" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd1afac51d14f8056cd544c83239b961c464e0a98c2ca65353195df93e636a20" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "memchr" -version = "2.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" - -[[package]] -name = "mongo-protocol" -version = "0.1.0" -dependencies = [ - "async-bson", - "bson", - "byteorder", - "lazy_static", - "maybe-async", - "prometheus", - "tokio", - "tracing", -] - -[[package]] -name = "num-integer" -version = "0.1.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" -dependencies = [ - "autocfg", - "num-traits", -] - -[[package]] -name = "num-traits" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" -dependencies = [ - "autocfg", -] - -[[package]] -name = "num_cpus" -version = "1.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19e64526ebdee182341572e50e9ad03965aa510cd94427a4549448f285e957a1" -dependencies = [ - "hermit-abi", - "libc", -] - -[[package]] -name = "once_cell" -version = "1.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e82dad04139b71a90c080c8463fe0dc7902db5192d939bd0950f074d014339e1" - -[[package]] -name = "pin-project-lite" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c" - -[[package]] -name = "ppv-lite86" -version = "0.2.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac74c624d6b2d21f425f752262f42188365d7b8ff1aff74c82e45136510a4857" - -[[package]] -name = "proc-macro2" -version = "1.0.24" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71" -dependencies = [ - "unicode-xid", -] - -[[package]] -name = "prometheus" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5567486d5778e2c6455b1b90ff1c558f29e751fc018130fa182e15828e728af1" -dependencies = [ - "cfg-if 0.1.10", - "fnv", - "lazy_static", - "protobuf", - "quick-error", - "spin", -] - -[[package]] -name = "protobuf" -version = "2.18.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da78e04bc0e40f36df43ecc6575e4f4b180e8156c4efd73f13d5619479b05696" - -[[package]] -name = "proxy-wasm" -version = "0.1.2" -source = "git+https://github.com/proxy-wasm/proxy-wasm-rust-sdk#4cb2db7e398fb8166f49fcc899ef5ede261005d1" -dependencies = [ - "hashbrown", - "log", -] - -[[package]] -name = "quick-error" -version = "1.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" - -[[package]] -name = "quote" -version = "1.0.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" -dependencies = [ - "proc-macro2", -] - -[[package]] -name = "rand" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" -dependencies = [ - "getrandom", - "libc", - "rand_chacha", - "rand_core", - "rand_hc", -] - -[[package]] -name = "rand_chacha" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" -dependencies = [ - "ppv-lite86", - "rand_core", -] - -[[package]] -name = "rand_core" -version = "0.5.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" -dependencies = [ - "getrandom", -] - -[[package]] -name = "rand_hc" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" -dependencies = [ - "rand_core", -] - -[[package]] -name = "regex" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "af83e617f331cc6ae2da5443c602dfa5af81e517212d9d611a5b3ba1777b5370" -dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax 0.7.1", -] - -[[package]] -name = "regex-automata" -version = "0.1.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ae1ded71d66a4a97f5e961fd0cb25a5f366a42a41570d16a763a69c092c26ae4" -dependencies = [ - "byteorder", - "regex-syntax 0.6.21", -] - -[[package]] -name = "regex-syntax" -version = "0.6.21" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189" - -[[package]] -name = "regex-syntax" -version = "0.7.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a5996294f19bd3aae0453a862ad728f60e6600695733dd5df01da90c54363a3c" - -[[package]] -name = "rustc_version" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" -dependencies = [ - "semver", -] - -[[package]] -name = "ryu" -version = "1.0.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" - -[[package]] -name = "scoped-tls" -version = "1.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea6a9290e3c9cf0f18145ef7ffa62d68ee0bf5fcd651017e586dc7fd5da448c2" - -[[package]] -name = "semver" -version = "0.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" -dependencies = [ - "semver-parser", -] - -[[package]] -name = "semver-parser" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" - -[[package]] -name = "serde" -version = "1.0.117" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b88fa983de7720629c9387e9f517353ed404164b1e482c970a90c1a4aaf7dc1a" -dependencies = [ - "serde_derive", -] - -[[package]] -name = "serde_derive" -version = "1.0.117" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "serde_json" -version = "1.0.59" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcac07dbffa1c65e7f816ab9eba78eb142c6d44410f4eeba1e26e4f5dfa56b95" -dependencies = [ - "indexmap", - "itoa", - "ryu", - "serde", -] - -[[package]] -name = "sharded-slab" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b4921be914e16899a80adefb821f8ddb7974e3f1250223575a44ed994882127" -dependencies = [ - "lazy_static", - "loom", -] - -[[package]] -name = "smallvec" -version = "1.9.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" - -[[package]] -name = "spin" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" - -[[package]] -name = "syn" -version = "1.0.67" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6498a9efc342871f91cc2d0d694c674368b4ceb40f62b65a7a08c3792935e702" -dependencies = [ - "proc-macro2", - "quote", - "unicode-xid", -] - -[[package]] -name = "thread_local" -version = "1.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5516c27b78311c50bf42c071425c560ac799b11c30b31f87e3081965fe5e0180" -dependencies = [ - "once_cell", -] - -[[package]] -name = "time" -version = "0.1.44" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" -dependencies = [ - "libc", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi", -] - -[[package]] -name = "tokio" -version = "1.19.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c51a52ed6686dd62c320f9b89299e9dfb46f730c7a48e635c19f21d116cb1439" -dependencies = [ - "bytes", - "memchr", - "num_cpus", - "once_cell", - "pin-project-lite", - "tokio-macros", -] - -[[package]] -name = "tokio-macros" -version = "1.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "tracing" -version = "0.1.22" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f47026cdc4080c07e49b37087de021820269d996f581aac150ef9e5583eefe3" -dependencies = [ - "cfg-if 1.0.0", - "pin-project-lite", - "tracing-attributes", - "tracing-core", -] - -[[package]] -name = "tracing-attributes" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80e0ccfc3378da0cce270c946b676a376943f5cd16aeba64568e7939806f4ada" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] -name = "tracing-core" -version = "0.1.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f50de3927f93d202783f4513cda820ab47ef17f624b03c096e86ef00c67e6b5f" -dependencies = [ - "lazy_static", -] - -[[package]] -name = "tracing-log" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e0f8c7178e13481ff6765bd169b33e8d554c5d2bbede5e32c356194be02b9b9" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - -[[package]] -name = "tracing-serde" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fb65ea441fbb84f9f6748fd496cf7f63ec9af5bca94dd86456978d055e8eb28b" -dependencies = [ - "serde", - "tracing-core", -] - -[[package]] -name = "tracing-subscriber" -version = "0.2.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1fa8f0c8f4c594e4fc9debc1990deab13238077271ba84dd853d54902ee3401" -dependencies = [ - "ansi_term", - "chrono", - "lazy_static", - "matchers", - "regex", - "serde", - "serde_json", - "sharded-slab", - "smallvec", - "thread_local", - "tracing", - "tracing-core", - "tracing-log", - "tracing-serde", -] - -[[package]] -name = "unicode-xid" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" - -[[package]] -name = "wasi" -version = "0.9.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" - -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - -[[package]] -name = "winapi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" -dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", -] - -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" diff --git a/envoy-wasm/Cargo.toml b/envoy-wasm/Cargo.toml deleted file mode 100644 index ac95ab7..0000000 --- a/envoy-wasm/Cargo.toml +++ /dev/null @@ -1,19 +0,0 @@ -[package] -name = "envoy-wasm" -version = "0.1.0" -authors = ["mpihlak "] -edition = "2018" - -[lib] -crate-type = ["cdylib"] - -[dependencies] -log = "0.4" -bson = "1.1" -tracing = "0.1" -proxy-wasm = { git = "https://github.com/proxy-wasm/proxy-wasm-rust-sdk" } -mongo-protocol = { path="../mongo-protocol", features = ["is_sync"] } -regex = "1.8.1" - -[dev-dependencies] -tracing-subscriber = "0.2" diff --git a/envoy-wasm/README.md b/envoy-wasm/README.md deleted file mode 100644 index 0c2df06..0000000 --- a/envoy-wasm/README.md +++ /dev/null @@ -1,50 +0,0 @@ -# Mongoproxy as an Envoy WASM filter -This is a skeleton of a WASM implementation of Mongoproxy. Currently it doesn't -do anything useful other than illustrating how one would add a WASM filter to -Envoy. - -Compile with: `cargo build --target=wasm32-unknown-unknown --release` -Add WASM target if needed: `rustup target add wasm32-unknown-unknown` - -There's an example Envoy config in [testing](testing) folder. - -Useful links: -* https://github.com/proxy-wasm/spec/tree/master/abi-versions/vNEXT -* https://github.com/proxy-wasm/proxy-wasm-rust-sdk - -## Metrics -The metrics support in proxy-wasm Rust SDK is not stable as of yet, so we pull the latest -`proxy-wasm` crate directly from Github. - -Defining metrics is relatively straightforward, however with labels it gets a little bit more -trickier. Envoy expects labels to be part of the metric name and extracts the label names and -values using regex. So, to get `mongoproxy{app="foo"}` we would define metric as `mongoproxy.app.foo` and -provide a custom metrics definition to extract the `app` and `foo` as label name and value respectively. - -Alternatively we could maybe try and use the existing Envoy Mongo proxy -[metric labels](https://github.com/envoyproxy/envoy/blob/master/source/common/config/well_known_names.cc). - -## Performance of the WASM filter -So how does the WASM filter do, compared to native Envoy `tcp_proxy` or native -Mongoproxy? In short, it's not too bad. - -Ran a simple benchmark that populates a collection with 1000 documents and -queries all documents from that collection 1000 times. Obviously smaller values are better. - -``` -Direct MongoDb, no proxies : 9.16 sec -Envoy tcp_proxy : 9.75 sec -Envoy dummy WASM filter : 9.69 sec (hah, got lucky here) -Envoy WASM MongoDb decoder : 12.46 sec -Mongoproxy, v0.5.4x : 9.63 sec -``` - -Native Mongoproxy and Envoy with just `tcp_proxy` are pretty much on par. Also -the dummy WASM filter that just does `get_downstream_data()` doesn't have any -noticeable overhead. However as soon as we start actually decoding the MongoDb -protocol does the overhead become noticeable - roughly 28%. Still, in many cases -this is not a too high price to pay for metrics. - -**Note**: Mongoproxy v0.5.3 had considerably worse results initially (up to 17 -seconds and a lot of variability). This was traced back to using a too small -buffer for proxying the bytes. diff --git a/envoy-wasm/src/codec.rs b/envoy-wasm/src/codec.rs deleted file mode 100644 index 1d916ae..0000000 --- a/envoy-wasm/src/codec.rs +++ /dev/null @@ -1,305 +0,0 @@ -use std::error::Error; - -use mongo_protocol::{MsgHeader, MongoMessage}; -use tracing::{trace}; - - -#[derive(Default)] -pub struct MongoProtocolDecoder { - header: Option, - want_bytes: usize, - message_buf: Vec, - message_buf_pos: usize, -} - -impl MongoProtocolDecoder { - - pub fn new() -> MongoProtocolDecoder { - MongoProtocolDecoder { - header: None, - want_bytes: mongo_protocol::HEADER_LENGTH, - message_buf: Vec::new(), - message_buf_pos: 0, - } - } - - // Parse the buffer and return the parsed objects. - // - // decode_messages expects that it is being fed chunks of bytes from the incoming data stream. - // Returns a Vec of assembled messages, empty if there's isn't enough data. - // - // It maintains an internal buffer and does not expect to see the same bytes twice. - // - // Since MongoDb may send multiple messages in one go, we need to try and consume all the - // messages from the decoder buffer. Otherwise we might leave some unparsed messages in the - // buffer and mess up the response/request sequence. - // - // The first message we always want to see is the MongoDb message header. This header in turn - // contains the length of the message that follows. So we try to read message length worth of - // bytes and parse the message. Once the message is parsed we expect a header again and the - // process repeats. - // - // TODO: There's a lot of unnecessary copying here. We should try and get rid of this. - // For instance we could just take ownership of the Vec that get_stream_data - // gives us. - // - pub fn decode_messages(&mut self, buf: &[u8]) -> Result, Box> { - self.message_buf.extend(buf); - - let mut result = Vec::new(); - let mut work_buf = &self.message_buf[self.message_buf_pos..]; - - while self.want_bytes > 0 && work_buf.len() >= self.want_bytes { - trace!("want {} bytes, have {} in buffer.", self.want_bytes, work_buf.len()); - - // Since we entered the loop we have either a header or a message body. - // Make a note of the next packet starts and consume the bytes. - self.message_buf_pos += self.want_bytes; - - if self.header.is_none() { - let hdr = MsgHeader::from_reader(&work_buf[..self.want_bytes])?; - assert!(hdr.message_length >= mongo_protocol::HEADER_LENGTH); - trace!("obtained header: {:?}", hdr); - - self.want_bytes = hdr.message_length - mongo_protocol::HEADER_LENGTH; - self.header = Some(hdr); - } else { - let hdr = self.header.take().unwrap(); - let msg = MongoMessage::extract_message( - hdr.op_code, - &work_buf[..self.want_bytes], - false, - false, - (hdr.message_length - mongo_protocol::HEADER_LENGTH) as u64)?; - trace!("obtained message: {:?}", msg); - result.push((hdr, msg)); - - // We got the payload, time to ask for a header again - self.want_bytes = mongo_protocol::HEADER_LENGTH; - } - - // Advance the message buf to the unprocessed bytes - work_buf = &self.message_buf[self.message_buf_pos..]; - } - - if work_buf.is_empty() { - trace!("want more data"); - self.message_buf_pos = 0; - self.message_buf.clear(); - } - - Ok(result) - } -} - -#[cfg(test)] -mod tests { - - use super::*; - use bson::doc; - use mongo_protocol::{MsgOpMsg}; - use tracing_subscriber::{FmtSubscriber, EnvFilter}; - use std::sync::Once; - - static INIT: Once = Once::new(); - - fn init_tracing() { - INIT.call_once(|| { - let subscriber = FmtSubscriber::builder() - .with_max_level(tracing::Level::TRACE) - .with_env_filter(EnvFilter::from_default_env()) - .finish(); - tracing::subscriber::set_global_default(subscriber).unwrap(); - }); - } - - fn create_header(request_id: u32, response_to: u32, len: usize) -> MsgHeader { - MsgHeader { - message_length: len + mongo_protocol::HEADER_LENGTH, - request_id, - response_to, - op_code: 2013, - } - } - - fn create_message(key: &str, val: &str) -> Vec { - let msg = MsgOpMsg{ flag_bits: 0, documents: Vec::new(), section_bytes: Vec::new() }; - - let doc = doc! { - key.to_owned(): val.to_owned(), - }; - - let mut doc_buf = Vec::new(); - doc.to_writer(&mut doc_buf).unwrap(); - - let mut buf = Vec::new(); - msg.write(&mut buf, &doc_buf).unwrap(); - buf - } - - #[test] - fn test_parse_buffer_header() { - init_tracing(); - - let hdr = create_header(1234, 5678, 0); - let mut buf = [0 as u8; mongo_protocol::HEADER_LENGTH]; - hdr.write(&mut buf[..]).unwrap(); - - let mut decoder = MongoProtocolDecoder::new(); - let result = decoder.decode_messages(&buf.to_vec()).unwrap(); - - assert_eq!(result.len(), 0); - assert!(decoder.header.is_some()); - assert_eq!(decoder.want_bytes, 0); - } - - #[test] - fn test_parse_msg() { - init_tracing(); - - let msg_buf = create_message("insert", "foo"); - let hdr = create_header(1234, 5678, msg_buf.len()); - let mut buf = Vec::new(); - - hdr.write(&mut buf).unwrap(); - buf.extend(msg_buf); - - let mut decoder = MongoProtocolDecoder::new(); - let result = decoder.decode_messages(&buf).unwrap(); - assert_eq!(result.len(), 1); - - match result.iter().next().unwrap() { - (h, MongoMessage::Msg(m)) => { - assert_eq!(m.documents.len(), 1); - assert_eq!(h.request_id, 1234); - assert_eq!(h.response_to, 5678); - assert_eq!(m.documents[0].get_str("op").unwrap(), "insert"); - assert_eq!(m.documents[0].get_str("op_value").unwrap(), "foo"); - }, - other => panic!("Instead of MsgOpMsg, got this: {:?}", other), - } - - assert!(decoder.header.is_none()); - assert_eq!(decoder.want_bytes, mongo_protocol::HEADER_LENGTH); - } - - #[test] - fn test_parse_partial_msg_sequence() { - init_tracing(); - - let mut decoder = MongoProtocolDecoder::new(); - let mut buf = Vec::new(); - - let first_msg_buf = create_message("insert", "foo"); - let hdr = create_header(1234, 5678, first_msg_buf.len()); - - // Write the header of the first message and try parse. This must parse - // the header but return nothing because it doesn't have a message body yet. - hdr.write(&mut buf).unwrap(); - let result = decoder.decode_messages(&buf).unwrap(); - if result.len() == 0 { - let hdr = decoder.header.as_ref().unwrap(); - assert_eq!(hdr.request_id, 1234); - assert_eq!(hdr.response_to, 5678); - } else { - panic!("wasn't expecting to parse anything but a header: {:?}", result); - } - - // Now "write" the remainder of the first message - buf = first_msg_buf.to_vec(); - - // And construct and write the header of the second message. NB! We don't write - // the second message body just yet, because we want to verify that the decoder - // doesn't get confused. - - let second_msg_buf = create_message("delete", "bar"); - let hdr = create_header(5678, 1234, second_msg_buf.len()); - hdr.write(&mut buf).unwrap(); - - // Now the decoder must return the parsed first message. It also should have - // started to parse the bytes for the header of the second message. - match decoder.decode_messages(&buf).unwrap().iter().next().unwrap() { - (_, MongoMessage::Msg(m)) => { - assert_eq!(m.documents.len(), 1); - assert_eq!(m.documents[0].get_str("op").unwrap(), "insert"); - assert_eq!(m.documents[0].get_str("op_value").unwrap(), "foo"); - }, - other => panic!("Couldn't parse the first message, got something else: {:?}", other), - } - - // Now, the next call with empty buffer must parse the second message header - // but not return the message itself. - match decoder.decode_messages(&[]).unwrap().is_empty() { - true => { - assert!(decoder.header.is_some()); - let hdr = decoder.header.as_ref().unwrap(); - assert_eq!(hdr.request_id, 5678); - assert_eq!(hdr.response_to, 1234); - } - false => panic!("Expected nothing, got something"), - } - - // Finally write the seconds message body and expect to parse the full message. - // Also check that the header matches the second message. - buf = second_msg_buf.to_vec(); - match decoder.decode_messages(&buf).unwrap().iter().next().unwrap() { - (h, MongoMessage::Msg(m)) => { - assert_eq!(m.documents.len(), 1); - assert_eq!(h.request_id, 5678); - assert_eq!(h.response_to, 1234); - assert_eq!(m.documents[0].get_str("op").unwrap(), "delete"); - assert_eq!(m.documents[0].get_str("op_value").unwrap(), "bar"); - }, - other => panic!("Instead of MsgOpMsg, got this: {:?}", other), - } - - assert!(decoder.header.is_none()); - assert_eq!(decoder.want_bytes, mongo_protocol::HEADER_LENGTH); - } - - #[test] - fn test_parse_complete_msg_sequence() { - init_tracing(); - - let mut decoder = MongoProtocolDecoder::new(); - let mut buf = Vec::new(); - - // Write the first message - let msg_buf = create_message("insert", "foo"); - let hdr = create_header(1234, 5678, msg_buf.len()); - hdr.write(&mut buf).unwrap(); - buf.extend(&msg_buf); - - // Now write the second message - let msg_buf = create_message("delete", "bar"); - let hdr = create_header(5678, 1234, msg_buf.len()); - hdr.write(&mut buf).unwrap(); - buf.extend(&msg_buf); - - // Parse and validate the messages - let result = decoder.decode_messages(&buf).unwrap(); - assert_eq!(result.len(), 2); - - let mut it = result.iter(); - - match it.next().unwrap() { - (_, MongoMessage::Msg(m)) => { - assert_eq!(m.documents.len(), 1); - assert_eq!(m.documents[0].get_str("op").unwrap(), "insert"); - assert_eq!(m.documents[0].get_str("op_value").unwrap(), "foo"); - }, - other => panic!("Couldn't parse the first message, got something else: {:?}", other), - } - - // Now, parse and validate the second message. - match it.next().unwrap() { - (_, MongoMessage::Msg(m)) => { - assert_eq!(m.documents.len(), 1); - assert_eq!(m.documents[0].get_str("op").unwrap(), "delete"); - assert_eq!(m.documents[0].get_str("op_value").unwrap(), "bar"); - }, - other => panic!("Couldn't parse the second message, got something else: {:?}", other), - } - } - -} diff --git a/envoy-wasm/src/lib.rs b/envoy-wasm/src/lib.rs deleted file mode 100644 index 02f1cb2..0000000 --- a/envoy-wasm/src/lib.rs +++ /dev/null @@ -1,115 +0,0 @@ -mod codec; - -use log::{info, warn, error}; - -use proxy_wasm::{self, traits::{Context, StreamContext}}; -use proxy_wasm::types::{LogLevel, Action, MetricType, PeerType}; -use proxy_wasm::hostcalls::{define_metric, increment_metric}; - -use mongo_protocol::{MsgHeader, MongoMessage}; - -use codec::MongoProtocolDecoder; - -struct MongoDbFilter { - context_id: u32, - root_context_id: u32, - decoder: MongoProtocolDecoder, - filter_active: bool, - counter: u32, -} - -impl Context for MongoDbFilter {} - - -#[no_mangle] -pub fn _start() { - proxy_wasm::set_log_level(LogLevel::Trace); - proxy_wasm::set_stream_context(|context_id, root_context_id| -> Box { - info!("_start for context {}", context_id); - - // Envoy defines the following tag extraction patterns for MongoDb. We could roll our - // own, or possibly use these to bootstrap: - // - // mongo.[.]collection.[.]callsite.(.)query. - // mongo.[.]collection.(.)query. - // mongo.[.]cmd.(.) - // mongo.(.)* - - let counter = define_metric( - MetricType::Counter, - &format!("mongo.mongoproxy.total_queries"), - ).unwrap(); - - Box::new(MongoDbFilter{ - context_id, - root_context_id, - decoder: MongoProtocolDecoder::new(), - filter_active: true, - counter, - }) - }); -} - -impl MongoDbFilter { - - fn get_messages(&mut self, data: Vec) -> Vec<(MsgHeader, MongoMessage)> { - match self.decoder.decode_messages(&data) { - Ok(message_list) => { - message_list - }, - Err(e) => { - error!("Unable to decode Mongo protocol: {}\nStopping.", e); - self.filter_active = false; - vec![] - } - } - } -} - -impl StreamContext for MongoDbFilter { - - fn on_new_connection(&mut self) -> Action { - info!("ctx {}: new connection: root={}", self.context_id, self.root_context_id); - Action::Continue - } - - // When we receive something from the "client" - fn on_downstream_data(&mut self, data_size: usize, _end_of_stream: bool) -> Action { - if let Some(data) = self.get_downstream_data(0, data_size) { - for (hdr, msg) in self.get_messages(data) { - //info!("From downstream:\nhdr: {:?}\nmsg: {:?}\n", hdr, msg); - if let Err(e) = increment_metric(self.counter, 1) { - warn!("Metric inc error for {}: {:?}", self.counter, e); - } - } - } else { - info!("ctx {}: no data :(", self.context_id); - } - - Action::Continue - } - - fn on_downstream_close(&mut self, _peer_type: PeerType) { - info!("ctx {}: Downstream closed", self.context_id); - } - - // When we receive something from the "server" - fn on_upstream_data(&mut self, data_size: usize, _end_of_stream: bool) -> Action { - if let Some(data) = self.get_upstream_data(0, data_size) { - for (hdr, msg) in self.get_messages(data) { - //info!("From upstream:\nhdr: {:?}\nmsg: {:?}\n", hdr, msg); - } - } else { - info!("ctx {}: no data :(", self.context_id); - } - Action::Continue - } - - fn on_upstream_close(&mut self, _peer_type: PeerType) { - info!("ctx {}: Upstream connection closed", self.context_id); - } - - fn on_log(&mut self) { - info!("ctx {}: on_log called", self.context_id); - } -} diff --git a/envoy-wasm/testing/docker-compose.yaml b/envoy-wasm/testing/docker-compose.yaml deleted file mode 100644 index 7f5f0b6..0000000 --- a/envoy-wasm/testing/docker-compose.yaml +++ /dev/null @@ -1,36 +0,0 @@ -version: "3" -services: - envoy: - container_name: envoy - image: istio/proxyv2:1.5.0 - entrypoint: - - /usr/local/bin/envoy - - -c - - /etc/envoy.yaml - - -l - - info - - --service-cluster - - proxy - ports: - - "8001:8001" - - "27111:27111" - volumes: - - ${PWD}/envoy.yaml:/etc/envoy.yaml - - ${PWD}/../target/wasm32-unknown-unknown/release/envoy_wasm.wasm:/usr/local/lib/mongoproxy.wasm - mongoproxy: - container_name: mongoproxy - image: mongoproxy:latest - environment: - - RUST_BACKTRACE=1 - - RUST_LOG=info - command: ./mongoproxy --proxy 27112:mongodb:27017 - ports: - - "9898:9898" - - "27112:27112" - mongodb: - container_name: mongo - image: mongo - logging: - driver: none - ports: - - "27017:27017" diff --git a/envoy-wasm/testing/envoy.yaml b/envoy-wasm/testing/envoy.yaml deleted file mode 100644 index 4ec7ef1..0000000 --- a/envoy-wasm/testing/envoy.yaml +++ /dev/null @@ -1,41 +0,0 @@ -static_resources: - listeners: - - address: - socket_address: - address: 0.0.0.0 - port_value: 27111 - filter_chains: - - filters: - - name: envoy.filters.network.wasm - config: - config: - name: "mongodb_stats" - vm_config: - runtime: "envoy.wasm.runtime.v8" - code: - local: - filename: "/usr/local/lib/mongoproxy.wasm" - allow_precompiled: true - - name: envoy.tcp_proxy - config: - stat_prefix: ingress_tcp - cluster: mongo - access_log: - - name: envoy.file_access_log - config: - path: /dev/stdout - clusters: - - name: mongo - connect_timeout: 0.25s - type: strict_dns - lb_policy: round_robin - hosts: - - socket_address: - address: mongodb - port_value: 27017 -admin: - access_log_path: "/dev/null" - address: - socket_address: - address: 0.0.0.0 - port_value: 8001 diff --git a/mongo-protocol/Cargo.toml b/mongo-protocol/Cargo.toml index c32fd5e..700684b 100644 --- a/mongo-protocol/Cargo.toml +++ b/mongo-protocol/Cargo.toml @@ -6,7 +6,6 @@ edition = "2018" [features] default = [ "tokio/io-util", "tokio/macros" ] -is_sync = [ "maybe-async/is_sync", "async-bson/is_sync" ] [dependencies] tracing = "0.1" @@ -16,4 +15,5 @@ bson = "1.1" byteorder = "1.3" tokio = { version = "1.34", features = ["macros", "io-util"], optional = true } async-bson = { path = "../async-bson" } -maybe-async = { version = "0.2" } +bytes = "1.7.1" +tracing-test = "0.2.5" diff --git a/mongo-protocol/src/lib.rs b/mongo-protocol/src/lib.rs index 687a6b0..a4a90ba 100644 --- a/mongo-protocol/src/lib.rs +++ b/mongo-protocol/src/lib.rs @@ -1,22 +1,14 @@ -use std::fmt; +use std::{fmt, io::Cursor}; +use tokio::io::AsyncReadExt; use tracing::{error, warn, info, debug}; -use byteorder::{LittleEndian, WriteBytesExt}; +use byteorder::{LittleEndian, WriteBytesExt, ReadBytesExt}; use async_bson::{DocumentParser, DocumentReader, Document, read_cstring}; use lazy_static::lazy_static; -#[cfg(feature="is_sync")] -use { - std::io::{self, Cursor}, - byteorder::ReadBytesExt, -}; - -#[cfg(not(feature="is_sync"))] -use tokio::io::{self, AsyncReadExt}; - use std::io::{Result, Write, Error, ErrorKind}; #[macro_use] extern crate prometheus; -use prometheus::CounterVec; +use prometheus::{CounterVec, GaugeVec}; pub const HEADER_LENGTH: usize = 16; @@ -70,7 +62,21 @@ lazy_static! { register_counter_vec!( "mongoproxy_message_parse_error_count_total", "Message body parse errors", - &["error"]).unwrap(); + &["source", "error"]).unwrap(); + + static ref READ_BUFFER_SIZE: GaugeVec = + register_gauge_vec!( + "mongoproxy_read_buffer_size", + "Current size of the read buffer for client or server reads", + &["source"] + ).unwrap(); + + static ref READ_BUFFER_CAPACITY: GaugeVec = + register_gauge_vec!( + "mongoproxy_read_buffer_capacity", + "Current capacity of the read buffer for client or server reads", + &["source"] + ).unwrap(); } @@ -117,54 +123,131 @@ impl fmt::Display for MongoMessage { } } -impl MongoMessage { +#[derive(Debug)] +pub enum ProxyError { + IoError(std::io::Error), + InvalidHeader, + PartialData, + ParserFailed, + EOF, +} - #[maybe_async::maybe_async] - pub async fn from_reader( - mut rdr: impl DocumentReader, - log_mongo_messages: bool, - collect_tracing_data: bool, - ) -> Result<(MsgHeader, MongoMessage)> { - let hdr = MsgHeader::from_reader(&mut rdr).await?; +impl fmt::Display for ProxyError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "ParseError: {:?}", self) + } +} - if hdr.message_length < HEADER_LENGTH { - return Err(Error::new(ErrorKind::Other, "Invalid MongoDb header")); +impl std::error::Error for ProxyError {} + +pub struct MongoMessageProxy { + log_mongo_messages: bool, + collect_tracing_data: bool, + buf: bytes::BytesMut, +} + +impl MongoMessageProxy { + + pub fn new(read_buffer_size: usize, log_mongo_messages: bool, collect_tracing_data: bool) -> Self { + Self { + log_mongo_messages, + collect_tracing_data, + buf: bytes::BytesMut::with_capacity(read_buffer_size), + } + } + + // Read a MongoDb message from one stream and write it to another. Return the parsed message header + // and body. + pub async fn proxy_mongo_message( + &mut self, + read_source: &str, + reader: &mut R, + writer: &mut W, + ) -> std::result::Result<(MsgHeader, MongoMessage), ProxyError> + where + R: tokio::io::AsyncRead+tokio::io::AsyncReadExt+Unpin, + W: tokio::io::AsyncWrite+tokio::io::AsyncWriteExt+Unpin, + { + self.buf.clear(); + while self.buf.len() < HEADER_LENGTH { + let remaining_bytes = (HEADER_LENGTH - self.buf.len()) as u64; + let len = reader.take(remaining_bytes).read_buf(&mut self.buf) + .await + .map_err(ProxyError::IoError)?; + debug!("Read {len} of the required header bytes."); + + if len == 0 { + debug!("Out of data when reading header, have {len} bytes"); + return Err(ProxyError::EOF) + } } + let hdr = MsgHeader::from_reader(&self.buf[..]) + .await + .map_err(ProxyError::IoError)?; debug!("have header: {}", hdr); - // Take only as much as promised in the header. - let message_length = (hdr.message_length - HEADER_LENGTH) as u64; - let mut rdr = rdr.take(message_length); + if hdr.message_length < HEADER_LENGTH { + // TODO: Increase a counter for invalid header + warn!("Invalid message length in header: {}", hdr.message_length); + return Err(ProxyError::InvalidHeader); + } OPCODE_COUNTER.with_label_values(&[&hdr.op_code.to_string()]).inc(); - let msg = match MongoMessage::extract_message( - hdr.op_code, - &mut rdr, - log_mongo_messages, - collect_tracing_data, - message_length).await { - Ok(msg) => msg, + writer.write_all(&self.buf) + .await + .map_err(ProxyError::IoError)?; + + // Collect the message length worth of bytes. Don't take any more than + // is needed so that we can safely clear the buffer here without losing + // any unprocessed bytes. + let message_length = hdr.message_length - HEADER_LENGTH; + self.buf.clear(); + while self.buf.len() < message_length { + let remaining_bytes = (message_length - self.buf.len()) as u64; + let len = reader.take(remaining_bytes).read_buf(&mut self.buf) + .await + .map_err(ProxyError::IoError)?; + debug!("Read {len} of the required {message_length} bytes."); + + if len == 0 { + warn!("Out of data when reading message. Have {len} of {message_length}"); + return Err(ProxyError::PartialData) + } + } + + writer.write_all(&self.buf) + .await + .map_err(ProxyError::IoError)?; + + READ_BUFFER_SIZE.with_label_values(&[read_source]).set(self.buf.capacity() as f64); + READ_BUFFER_CAPACITY.with_label_values(&[read_source]).set(self.buf.capacity() as f64); + + match MongoMessage::extract_message( + hdr.op_code, + &mut &self.buf[..], + self.log_mongo_messages, + self.collect_tracing_data, + message_length as u64).await + { + Ok(msg) => { + Ok((hdr, msg)) + }, Err(e) => { error!("Failed to parse MongoDb {} message: {}", hdr.op_code, e); - MESSAGE_PARSE_ERRORS_COUNTER.with_label_values(&[&e.to_string()]).inc(); - return Err(e); + MESSAGE_PARSE_ERRORS_COUNTER.with_label_values(&[read_source, &e.to_string()]).inc(); + Err(ProxyError::ParserFailed) } - }; - - let len = io::copy(&mut rdr, &mut io::sink()).await?; - if len > 0 { - warn!("partial message, sinking {} bytes.", len); } - - Ok((hdr, msg)) } +} + +impl MongoMessage { // Extract a message or return an error // // Note that this assumes that it is safe to read everything from the reader. - #[maybe_async::maybe_async] pub async fn extract_message( op: u32, mut rdr: impl DocumentReader, @@ -220,12 +303,11 @@ impl MsgHeader { } } - #[maybe_async::maybe_async] pub async fn from_reader(mut rdr: impl DocumentReader) -> Result { - let message_length = rdr.read_u32_le().await? as usize; - let request_id = rdr.read_u32_le().await?; - let response_to = rdr.read_u32_le().await?; - let op_code = rdr.read_u32_le().await?; + let message_length = rdr.read_u32_le()? as usize; + let request_id = rdr.read_u32_le()?; + let response_to = rdr.read_u32_le()?; + let op_code = rdr.read_u32_le()?; Ok(MsgHeader{message_length, request_id, response_to, op_code}) } @@ -277,7 +359,6 @@ impl MsgOpMsg { // Construct a new OP_MSG message from a reader. This requires the length of the whole // message to be passed in to simplify handling of the optional checksum. - #[maybe_async::maybe_async] pub async fn from_reader( rdr: &mut T, log_mongo_messages: bool, @@ -285,7 +366,7 @@ impl MsgOpMsg { message_length: u64, ) -> Result { - let flag_bits = rdr.read_u32_le().await?; + let flag_bits = rdr.read_u32_le()?; debug!("flag_bits={:04x}", flag_bits); let body_length = if flag_bits & 1 == 1 { @@ -294,22 +375,15 @@ impl MsgOpMsg { message_length - 4 // Subtract just the flags }; - #[cfg(not(feature="is_sync"))] - let msg = { - let mut rdr = rdr.take(body_length); - - MsgOpMsg::read_body(&mut rdr, flag_bits, log_mongo_messages, collect_tracing_data).await? - }; - #[cfg(feature="is_sync")] let msg = { let mut buf = vec![0u8; body_length as usize]; - rdr.read_exact(&mut buf[..]).await?; + rdr.read_exact(&mut buf[..])?; MsgOpMsg::read_body(&mut &buf[..], flag_bits, log_mongo_messages, collect_tracing_data).await? }; if flag_bits & 1 == 1 { - let _checksum = rdr.read_u32_le().await?; + let _checksum = rdr.read_u32_le()?; } Ok(msg) @@ -329,7 +403,6 @@ impl MsgOpMsg { // take from the `rdr` and later try to use it. I'm sure there's an idiomatic way around // this but I just haven't found it. // - #[maybe_async::maybe_async] async fn read_body( mut rdr: impl DocumentReader, flag_bits: u32, @@ -342,9 +415,9 @@ impl MsgOpMsg { let mut rdr = &mut rdr; loop { - let kind = match rdr.read_u8().await { + let kind = match rdr.read_u8() { Ok(r) => r, - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { + Err(e) if e.kind() == tokio::io::ErrorKind::UnexpectedEof => { // This is OK if we've already read at least one doc. In fact // we're relying on reaching an EOF here to determine when to // stop parsing. @@ -370,8 +443,8 @@ impl MsgOpMsg { &mut section_bytes, ).await?; } else if kind == 1 { - let section_size = rdr.read_u32_le().await? as usize; - let seq_id = read_cstring(&mut rdr).await?; + let section_size = rdr.read_u32_le()? as usize; + let seq_id = read_cstring(&mut rdr)?; debug!("kind=1: section_size={}, seq_id={}", section_size, seq_id); // Section size includes the size of the seq_id cstring and the length bytes, but @@ -380,22 +453,9 @@ impl MsgOpMsg { let section_size = section_size - seq_id.len() - 1 - 4; // Consume all the documents in the section, but no more. - #[cfg(not(feature="is_sync"))] - { - let mut section_rdr = rdr.take(section_size as u64); - while MsgOpMsg::process_section_document( - &mut section_rdr, - log_mongo_messages, - collect_tracing_data, - &mut documents, - &mut section_bytes - ).await? {} - } - - #[cfg(feature="is_sync")] { let mut buf = vec![0u8; section_size]; - rdr.read_exact(&mut buf[..]).await?; + rdr.read_exact(&mut buf[..])?; let mut cur = Cursor::new(buf); @@ -406,8 +466,7 @@ impl MsgOpMsg { &mut documents, &mut section_bytes ).await? {} - } - } else { + } } else { warn!("unrecognized kind={}", kind); break; } @@ -416,7 +475,6 @@ impl MsgOpMsg { Ok(MsgOpMsg{flag_bits, documents, section_bytes}) } - #[maybe_async::maybe_async] async fn process_section_document( mut rdr: impl DocumentReader, log_mongo_messages: bool, @@ -427,7 +485,7 @@ impl MsgOpMsg { { let doc = MONGO_DOC_PARSER.parse_document_keep_bytes( &mut rdr, - log_mongo_messages | collect_tracing_data).await; + log_mongo_messages | collect_tracing_data); match doc { Ok(doc) => { debug!("doc: {}", doc); @@ -451,7 +509,7 @@ impl MsgOpMsg { documents.push(doc); Ok(true) }, - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => { + Err(e) if e.kind() == tokio::io::ErrorKind::UnexpectedEof => { Ok(false) }, Err(e) => { @@ -500,13 +558,12 @@ impl fmt::Display for MsgOpQuery { impl MsgOpQuery { - #[maybe_async::maybe_async] pub async fn from_reader(mut rdr: impl DocumentReader, log_mongo_messages: bool) -> Result { - let flags = rdr.read_u32_le().await?; - let full_collection_name = read_cstring(&mut rdr).await?; - let number_to_skip = rdr.read_i32_le().await?; - let number_to_return = rdr.read_i32_le().await?; - let query = MONGO_DOC_PARSER.parse_document_keep_bytes(&mut rdr, log_mongo_messages).await?; + let flags = rdr.read_u32_le()?; + let full_collection_name = read_cstring(&mut rdr)?; + let number_to_skip = rdr.read_i32_le()?; + let number_to_return = rdr.read_i32_le()?; + let query = MONGO_DOC_PARSER.parse_document_keep_bytes(&mut rdr, log_mongo_messages)?; if log_mongo_messages { if let Some(bytes) = query.get_raw_bytes() { @@ -538,12 +595,11 @@ impl fmt::Display for MsgOpGetMore { impl MsgOpGetMore { - #[maybe_async::maybe_async] pub async fn from_reader(mut rdr: impl DocumentReader) -> Result { - let _zero = rdr.read_i32_le().await?; - let full_collection_name = read_cstring(&mut rdr).await?; - let number_to_return = rdr.read_i32_le().await?; - let cursor_id = rdr.read_i64_le().await?; + let _zero = rdr.read_i32_le()?; + let full_collection_name = read_cstring(&mut rdr)?; + let number_to_return = rdr.read_i32_le()?; + let cursor_id = rdr.read_i64_le()?; Ok(MsgOpGetMore{full_collection_name, number_to_return, cursor_id}) } @@ -566,13 +622,12 @@ impl fmt::Display for MsgOpUpdate { impl MsgOpUpdate { - #[maybe_async::maybe_async] pub async fn from_reader(mut rdr: impl DocumentReader) -> Result { - let _zero = rdr.read_u32_le().await?; - let full_collection_name = read_cstring(&mut rdr).await?; - let flags = rdr.read_u32_le().await?; - let selector = MONGO_DOC_PARSER.parse_document(&mut rdr).await?; - let update = MONGO_DOC_PARSER.parse_document(&mut rdr).await?; + let _zero = rdr.read_u32_le()?; + let full_collection_name = read_cstring(&mut rdr)?; + let flags = rdr.read_u32_le()?; + let selector = MONGO_DOC_PARSER.parse_document(&mut rdr)?; + let update = MONGO_DOC_PARSER.parse_document(&mut rdr)?; Ok(MsgOpUpdate{flags, full_collection_name, selector, update}) } @@ -594,12 +649,11 @@ impl fmt::Display for MsgOpDelete { impl MsgOpDelete { - #[maybe_async::maybe_async] pub async fn from_reader(mut rdr: impl DocumentReader) -> Result { - let _zero = rdr.read_u32_le().await?; - let full_collection_name = read_cstring(&mut rdr).await?; - let flags = rdr.read_u32_le().await?; - let selector = MONGO_DOC_PARSER.parse_document(&mut rdr).await?; + let _zero = rdr.read_u32_le()?; + let full_collection_name = read_cstring(&mut rdr)?; + let flags = rdr.read_u32_le()?; + let selector = MONGO_DOC_PARSER.parse_document(&mut rdr)?; Ok(MsgOpDelete{flags, full_collection_name, selector}) } @@ -619,10 +673,9 @@ impl fmt::Display for MsgOpInsert { } impl MsgOpInsert { - #[maybe_async::maybe_async] pub async fn from_reader(mut rdr: impl DocumentReader) -> Result { - let flags = rdr.read_u32_le().await?; - let full_collection_name = read_cstring(&mut rdr).await?; + let flags = rdr.read_u32_le()?; + let full_collection_name = read_cstring(&mut rdr)?; // There's also a list of documents in the message, but we ignore it. @@ -658,15 +711,14 @@ impl ResponseDocuments for MsgOpReply { impl MsgOpReply { - #[maybe_async::maybe_async] pub async fn from_reader(mut rdr: impl DocumentReader, log_mongo_messages: bool) -> Result { - let flags = rdr.read_u32_le().await?; - let cursor_id = rdr.read_u64_le().await?; - let starting_from = rdr.read_u32_le().await?; - let number_returned = rdr.read_u32_le().await?; + let flags = rdr.read_u32_le()?; + let cursor_id = rdr.read_u64_le()?; + let starting_from = rdr.read_u32_le()?; + let number_returned = rdr.read_u32_le()?; let mut documents = Vec::new(); - while let Ok(doc) = MONGO_DOC_PARSER.parse_document_keep_bytes(&mut rdr, log_mongo_messages).await { + while let Ok(doc) = MONGO_DOC_PARSER.parse_document_keep_bytes(&mut rdr, log_mongo_messages) { documents.push(doc); } @@ -702,14 +754,13 @@ impl fmt::Display for MsgOpCompressed { impl MsgOpCompressed { - #[maybe_async::maybe_async] pub async fn from_reader(mut rdr: impl DocumentReader) -> Result { - let original_op = rdr.read_i32_le().await?; - let uncompressed_size = rdr.read_i32_le().await?; - let compressor_id = rdr.read_u8().await?; + let original_op = rdr.read_i32_le()?; + let uncompressed_size = rdr.read_i32_le()?; + let compressor_id = rdr.read_u8()?; // Ignore the actual compressed message, we are not going to use it - let _len = io::copy(&mut rdr, &mut io::sink()).await?; + let _len = std::io::copy(&mut rdr, &mut std::io::sink())?; Ok(MsgOpCompressed{original_op, uncompressed_size, compressor_id}) } @@ -746,10 +797,10 @@ pub fn debug_fmt(buf: &[u8]) -> String { mod tests { use super::*; - use byteorder::{LittleEndian, WriteBytesExt}; use bson::doc; + use tracing_test::traced_test; - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] + #[tokio::test] async fn test_parse_header() { let hdr = MsgHeader { message_length: 100, @@ -805,7 +856,7 @@ mod tests { doc.to_writer(&mut buf).unwrap(); } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] + #[tokio::test] async fn test_parse_op_msg() { let mut buf = Vec::new(); msgop_to_buf(0, &mut buf); @@ -822,7 +873,7 @@ mod tests { assert_eq!("b0", msg.documents[4].get_str("op").unwrap()); } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] + #[tokio::test] async fn test_parse_op_msg_raw() { // This is an OP_MSG that contains a "delete" operation with payloads of type 0 and 1 // (total of 2 BSON documents). @@ -854,7 +905,7 @@ mod tests { assert_eq!(0x69, msg.section_bytes[1].len()); } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] + #[tokio::test] async fn test_checksummed_message() { let buf = vec![ 0x01, 0x00, 0x00, 0x00, 0x00, 0x65, 0x00, 0x00, 0x00, 0x10, 0x6c, 0x69, 0x73, 0x74, 0x44, 0x61, @@ -878,7 +929,7 @@ mod tests { } } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] + #[tokio::test] async fn test_parse_op_query() { let mut buf = Vec::new(); buf.write_i32::(0i32).unwrap(); // flag bits @@ -897,7 +948,7 @@ mod tests { assert_eq!("ok", msg.query.get_str("comment").unwrap()); } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] + #[tokio::test] async fn test_parse_op_getmore() { let mut buf = Vec::new(); buf.write_i32::(0).unwrap(); // zero @@ -911,7 +962,7 @@ mod tests { assert_eq!(123456, msg.cursor_id); } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] + #[tokio::test] async fn test_parse_op_update() { let mut buf = Vec::new(); buf.write_i32::(0).unwrap(); // zero @@ -929,7 +980,7 @@ mod tests { assert_eq!("b", msg.update.get_str("op").unwrap()); } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] + #[tokio::test] async fn test_parse_op_delete() { let mut buf = Vec::new(); buf.write_i32::(0).unwrap(); // zero @@ -944,7 +995,7 @@ mod tests { assert_eq!("a", msg.selector.get_str("op").unwrap()); } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] + #[tokio::test] async fn test_parse_op_insert() { let mut buf = Vec::new(); buf.write_i32::(123).unwrap(); // flags @@ -955,7 +1006,7 @@ mod tests { assert_eq!("tribbles", msg.full_collection_name); } - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] + #[tokio::test] async fn test_parse_op_reply() { let mut buf = Vec::new(); buf.write_i32::(123).unwrap(); // flags @@ -977,8 +1028,9 @@ mod tests { } // Test parsing multiple back to back messages - #[maybe_async::test(feature="is_sync", async(not(feature="is_sync"), tokio::test))] - async fn test_parse_multiple_message() { + #[traced_test] + #[tokio::test] + async fn test_parse_multiple_messages() { let mut buf = Vec::new(); for i in 0..3u32 { @@ -996,12 +1048,22 @@ mod tests { buf.extend(&msg_buf); } + let buflen = buf.len(); + let input = buf.clone(); let mut cur = std::io::Cursor::new(buf); + let mut out = Vec::new(); let mut messages = Vec::new(); - while let Ok((_, msg)) = MongoMessage::from_reader(&mut cur, false, false).await { + let mut message_proxy = MongoMessageProxy::new(1024, false, true); + + while let Ok((_, msg)) = message_proxy.proxy_mongo_message("client", &mut cur, &mut out).await { messages.push(msg); } + // Expect all bytes to have been proxied + assert_eq!(buflen, out.len()); + assert_eq!(input, out); + + // Expect all messages have been parsed assert_eq!(3, messages.len()); for (i, msg) in messages.iter().enumerate() { match msg { diff --git a/proxy/Cargo.toml b/proxy/Cargo.toml index f87e36a..eee0272 100644 --- a/proxy/Cargo.toml +++ b/proxy/Cargo.toml @@ -12,7 +12,7 @@ lazy_static = '1.4' clap = '2.33' libc = '0.2' bson = '1.1' -tokio = { version = "1.34", features = ["rt", "net", "macros", "io-util", "sync" ] } +tokio = { version = "1.34", features = ["rt-multi-thread", "net", "macros", "io-util", "sync" ] } tokio-util = { version = '0.6', features = ["io"] } tokio-stream = '0.1' bytes = '1.0' diff --git a/proxy/src/main.rs b/proxy/src/main.rs index 1e728b0..cfdb466 100644 --- a/proxy/src/main.rs +++ b/proxy/src/main.rs @@ -3,15 +3,12 @@ use std::net::{SocketAddr,ToSocketAddrs}; use std::io; use std::str; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::tcp::{OwnedReadHalf, OwnedWriteHalf}; use tokio::net::{TcpListener,TcpStream}; -use tokio::net::tcp::{OwnedReadHalf,OwnedWriteHalf}; -use tokio::sync::mpsc; +use tokio::sync::mpsc::{self, Sender}; use tokio::task::JoinSet; -use tokio_stream::wrappers::ReceiverStream; -use tokio_util::io::StreamReader; -use prometheus::{Counter,CounterVec,HistogramVec,Encoder,TextEncoder}; +use prometheus::{CounterVec,HistogramVec,Encoder,TextEncoder}; use clap::{Arg, App, crate_version}; use tracing::{info, warn, error, debug, info_span, Instrument, Level}; use tracing_subscriber::{FmtSubscriber, EnvFilter}; @@ -27,11 +24,10 @@ use hyper::{ use mongoproxy::jaeger_tracing; use mongoproxy::dstaddr; use mongoproxy::appconfig::AppConfig; -use mongoproxy::tracker::MongoStatsTracker; +use mongoproxy::tracker::{MongoStatsTracker, TrackerMessage}; -use mongo_protocol::MongoMessage; +use mongo_protocol::{MongoMessage, MongoMessageProxy, MsgHeader, ProxyError}; -type BufBytes = Result; const JAEGER_ADDR: &str = "127.0.0.1:6831"; const ADMIN_PORT: &str = "9898"; @@ -77,11 +73,6 @@ lazy_static! { "mongoproxy_server_connect_time_seconds", "Time it takes to look up and connect to a server", &["server_addr"]).unwrap(); - - static ref TRACKER_CHANNEL_ERRORS_TOTAL: Counter = - register_counter!( - "mongoproxy_tracker_channel_errors_total", - "Total number of errors from sending bytes to tracker channel").unwrap(); } #[tokio::main] @@ -270,131 +261,80 @@ async fn handle_connection(server_addr: &str, client_stream: TcpStream, app: App server_stream.set_nodelay(true)?; // Start the tracker to parse and track MongoDb messages from the input stream. This works by - // having the proxy tasks send a copy of the bytes over a channel and process that channel - // as a stream of bytes, extracting MongoDb messages and tracking the metrics from there. + // having the proxy tasks send a copy of the parsed Mongo message over a channel. - let (client_tx, client_rx): (mpsc::Sender, mpsc::Receiver) = mpsc::channel(MAX_CHANNEL_EVENTS); - let (server_tx, server_rx): (mpsc::Sender, mpsc::Receiver) = mpsc::channel(MAX_CHANNEL_EVENTS); + let (client_tracker, tracker_rx): (mpsc::Sender, mpsc::Receiver) = mpsc::channel(MAX_CHANNEL_EVENTS); + let server_tracker = client_tracker.clone(); let mut task_set = JoinSet::new(); + let mut tracker = MongoStatsTracker::new( + &client_addr, + &server_addr.to_string(), + server_addr, + app, + tracker_rx, + ); + task_set.spawn(async move { - let tracker = MongoStatsTracker::new( - &client_addr, - &server_addr.to_string(), - server_addr, - app, - ); - - track_mongo_messages(client_rx, server_rx, log_mongo_messages, tracing_enabled, tracker).await?; - Ok::<(), io::Error>(()) + tracker.run_message_loop().await; + debug!("Tracker done"); }.instrument(info_span!("tracker"))); - let (mut read_client, mut write_client) = client_stream.into_split(); - let (mut read_server, mut write_server) = server_stream.into_split(); + let (read_client, write_client) = client_stream.into_split(); + let (read_server, write_server) = server_stream.into_split(); + // Read messages from client and pass to server and client tracker. + let client_proxy = MongoMessageProxy::new(READ_BUFFER_SIZE, log_mongo_messages, tracing_enabled); task_set.spawn(async move { - proxy_bytes(&mut read_client, &mut write_server, client_tx).await?; - Ok::<(), io::Error>(()) + proxy_loop(true, client_proxy, read_client, write_server, client_tracker).await; + debug!("Client proxy done"); }.instrument(info_span!("client proxy"))); + // Read messages from the server and pass to client and server tracker. + // Note that it is possible for the server to send messages on its' own so this needs to be + // a separate task. + let server_proxy = MongoMessageProxy::new(READ_BUFFER_SIZE, log_mongo_messages, tracing_enabled); task_set.spawn(async move { - proxy_bytes(&mut read_server, &mut write_client, server_tx).await?; - Ok::<(), io::Error>(()) + proxy_loop(false, server_proxy, read_server, write_client, server_tracker).await; + debug!("Server proxy done"); }.instrument(info_span!("server proxy"))); while let Some(res) = task_set.join_next().await { if let Err(e) = res { - warn!("task completed with error, closing connection: {e}"); + warn!("Task completed with error, shutting down this connection: {e}"); task_set.shutdown().await; - info!("all tasks finished."); + info!("all tasks shut down"); return Err(Box::new(e)); } } + debug!("All tasks finished"); Ok(()) } -// Move bytes between sockets, forking the byte stream into a mpsc channel -// for processing. Another channel is used to notify the other tracker of -// failures. -async fn proxy_bytes( - read_from: &mut OwnedReadHalf, - write_to: &mut OwnedWriteHalf, - tracker_channel: mpsc::Sender, -) -> Result<(), io::Error> +async fn proxy_loop( + is_client: bool, + mut proxy: MongoMessageProxy, + mut read_from: OwnedReadHalf, + mut write_to: OwnedWriteHalf, + tracker: Sender<(bool, MsgHeader, MongoMessage)>, + ) { - let mut tracker_ok = true; - + let read_source = if is_client { "client" } else { "server" }; loop { - let mut buf = bytes::BytesMut::with_capacity(READ_BUFFER_SIZE); - let len = read_from.read_buf(&mut buf).await?; - - if len > 0 { - write_to.write_all(&buf[0..len]).await?; - - if tracker_ok { - if let Err(e) = tracker_channel.try_send(Ok(buf)) { - error!("error sending to tracker, stop: {}", e); - TRACKER_CHANNEL_ERRORS_TOTAL.inc(); - tracker_ok = false; + match proxy.proxy_mongo_message(read_source, &mut read_from, &mut write_to).await { + Ok((hdr, msg)) => { + if let Err(e) = tracker.send((is_client, hdr, msg)).await { + warn!("error sending message to server tracker: {e}"); + break; } - } - } else { - // EOF on read, exit normally - return Ok(()) - } - } -} - -// Process the mpsc channel as a byte stream, parsing MongoDb messages -// and sending them off to a tracker. -// -// We assume here that client always speaks first, followed by a response from the server, then the -// client goes again and then the server, and so on. This makes it easy to reason about things, and -// responses are never processed before the request. However this is prone to break if Mongo -// changes this behavior in the protocol. -// -async fn track_mongo_messages( - client_rx: mpsc::Receiver, - server_rx: mpsc::Receiver, - log_mongo_messages: bool, - collect_tracing_data: bool, - mut tracker: MongoStatsTracker, -) -> Result<(), io::Error> -{ - let mut client_stream = StreamReader::new(ReceiverStream::new(client_rx)); - let mut server_stream = StreamReader::new(ReceiverStream::new(server_rx)); - - loop { - match MongoMessage::from_reader( - &mut client_stream, - log_mongo_messages, - collect_tracing_data) - .instrument(info_span!("client")) - .await - { - Ok((hdr, msg)) => tracker.track_client_request(&hdr, &msg), - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(()), - Err(e) => { - error!("Client stream processing error: {}", e); - return Err(e); - } - } - - match MongoMessage::from_reader( - &mut server_stream, - log_mongo_messages, - collect_tracing_data) - .instrument(info_span!("server")) - .await - { - Ok((hdr, msg)) => tracker.track_server_response(hdr, msg), - Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => return Ok(()), + }, + Err(ProxyError::EOF) => break, Err(e) => { - error!("Server stream processing failed: {}", e); - return Err(e); - } + warn!("error processing server message: {e}"); + break; + }, } } } diff --git a/proxy/src/tracker.rs b/proxy/src/tracker.rs index c6093fc..5fd18c0 100644 --- a/proxy/src/tracker.rs +++ b/proxy/src/tracker.rs @@ -1,4 +1,5 @@ use mongo_protocol::{MsgHeader,MongoMessage,MsgOpMsg,ResponseDocuments}; +use tokio::sync::mpsc::Receiver; use crate::jaeger_tracing; use crate::appconfig::AppConfig; @@ -6,7 +7,7 @@ use std::time::Instant; use std::collections::{HashMap, HashSet}; use tracing::{debug, warn, info_span}; -use prometheus::{Counter, CounterVec, HistogramVec, Gauge}; +use prometheus::{CounterVec, HistogramVec, Gauge}; use opentelemetry::trace::{Tracer, SpanKind, TraceContextExt}; use opentelemetry::trace::Span as _Span; @@ -92,10 +93,11 @@ lazy_static! { "Number of non-ok server responses", OP_LABELS).unwrap(); - static ref SERVER_RESPONSE_REQUEST_MISMATCH: Counter = - register_counter!( + static ref SERVER_RESPONSE_REQUEST_MISMATCH: CounterVec = + register_counter_vec!( "mongoproxy_server_response_request_mismatch_total", - "Number of times mongoproxy was unable to match server response to a client request" + "Number of times mongoproxy was unable to match server response to a client request", + &["response_to"] ).unwrap(); static ref CLIENT_BYTES_SENT_TOTAL: CounterVec = @@ -130,6 +132,7 @@ lazy_static! { // a risk of collision if there are multiple databases on the same server. pub type CursorTraceMapper = HashMap<(std::net::SocketAddr,i64), opentelemetry::Context>; +pub type TrackerMessage = (bool, MsgHeader, MongoMessage); // Stripped down version of the client request. We need this mostly for timing // stats and metric labels. @@ -220,20 +223,20 @@ impl ClientRequest { // is *not* actually in the full_collection_name, but needs to be obtained from the payload // query. There too are multiple options (op_value or collection) let pos = m.full_collection_name.find('.').unwrap_or(m.full_collection_name.len()); - db = m.full_collection_name[..pos].to_owned(); + m.full_collection_name[..pos].clone_into(&mut db); if let Some(val) = m.query.get_str("collection") { - coll = val.to_owned(); + val.clone_into(&mut coll); } else if let Some(val) = m.query.get_str("op_value") { - coll = val.to_owned(); + val.clone_into(&mut coll); } }, MongoMessage::GetMore(m) => { op = String::from("getMore"); if let Some(pos) = m.full_collection_name.find('.') { let (_db, _coll) = m.full_collection_name.split_at(pos); - db = _db.to_owned(); - coll = _coll[1..].to_owned(); + _db.clone_into(&mut db); + _coll[1..].clone_into(&mut coll); } }, // There is no response to OP_INSERT, DELETE, UPDATE so don't bother @@ -368,9 +371,11 @@ pub struct MongoStatsTracker { client_application: String, client_username: String, client_request_hdr: Option<(ClientRequest, MsgHeader)>, + server_request_hdr: Option, replicaset: String, server_host: String, app: AppConfig, + tracker_rx: Receiver, } impl Drop for MongoStatsTracker { @@ -384,20 +389,35 @@ impl Drop for MongoStatsTracker { } impl MongoStatsTracker{ - pub fn new(client_addr: &str, - server_addr: &str, - server_addr_sa: std::net::SocketAddr, - app: AppConfig) -> Self { + pub fn new( + client_addr: &str, + server_addr: &str, + server_addr_sa: std::net::SocketAddr, + app: AppConfig, + tracker_rx: Receiver, + ) -> Self { MongoStatsTracker { client_addr: client_addr.to_string(), server_addr: server_addr.to_string(), server_addr_sa, client_request_hdr: None, + server_request_hdr: None, client_application: String::from(""), client_username: String::from(""), replicaset: String::from(""), server_host: String::from(""), app, + tracker_rx, + } + } + + pub async fn run_message_loop(&mut self) { + while let Some((from_client, hdr, msg)) = self.tracker_rx.recv().await { + if from_client { + self.track_client_request(&hdr, &msg); + } else { + self.track_server_response(&hdr, &msg); + } } } @@ -440,7 +460,7 @@ impl MongoStatsTracker{ if op == "isMaster" || op == "ismaster" || op == "hello" { if self.client_application.is_empty() { if let Some(app_name) = doc.get_str("app_name") { - self.client_application = app_name.to_owned(); + app_name.clone_into(&mut self.client_application); APP_CONNECTION_COUNT_TOTAL .with_label_values(&[&self.client_application]) .inc(); @@ -448,7 +468,7 @@ impl MongoStatsTracker{ } if self.client_username.is_empty() { if let Some(username) = doc.get_str("username") { - self.client_username = username.to_owned(); + username.clone_into(&mut self.client_username); USER_CONNECTION_COUNT_TOTAL .with_label_values(&[&self.client_username]) .inc(); @@ -494,7 +514,7 @@ impl MongoStatsTracker{ ] } - pub fn track_server_response(&mut self, hdr: MsgHeader, msg: MongoMessage) { + pub fn track_server_response(&mut self, hdr: &MsgHeader, msg: &MongoMessage) { CLIENT_BYTES_RECV_TOTAL.with_label_values(&[&self.client_addr]).inc_by(hdr.message_length as f64); let span = info_span!("track_server_response"); @@ -507,17 +527,25 @@ impl MongoStatsTracker{ if let Some((mut client_request, req_hdr)) = self.client_request_hdr.take() { if hdr.response_to != req_hdr.request_id { - // And if this starts to happen, then we need to go back to the HashMap of requests ... warn!("Server response to {} does not match client request {}", hdr.response_to, req_hdr.request_id); - SERVER_RESPONSE_REQUEST_MISMATCH.inc(); + SERVER_RESPONSE_REQUEST_MISMATCH.with_label_values(&["client"]).inc(); } else { - self.observe_server_response_to(&hdr, &msg, &mut client_request); + self.observe_server_response_to(hdr, msg, &mut client_request); + } + } else if let Some(req_hdr) = self.server_request_hdr.take() { + // Some server responses are linked to previous server responses. For example there + // can be multiple "helloOk" responses to a single "hello" message. + // TODO: Handle these for realz. + if hdr.response_to != req_hdr.request_id { + warn!("Server response {} does not match the prior server message {}", hdr.response_to, req_hdr.request_id); + SERVER_RESPONSE_REQUEST_MISMATCH.with_label_values(&["server"]).inc(); } } else { warn!("No client request found for {:?}", hdr); - SERVER_RESPONSE_REQUEST_MISMATCH.inc(); } + + self.server_request_hdr = Some(hdr.clone()); } fn observe_server_response_to( @@ -694,10 +722,10 @@ impl MongoStatsTracker{ if let Some(op) = doc.get_str("op") { if (op == "hosts") || (op == "helloOk") || (op == "topologyVersion") { if let Some(replicaset) = doc.get_str("replicaset") { - self.replicaset = replicaset.to_owned(); + replicaset.clone_into(&mut self.replicaset); } if let Some(server_host) = doc.get_str("server_host") { - self.server_host = server_host.to_owned(); + server_host.clone_into(&mut self.server_host); } } } diff --git a/testing/tracing_client.py b/testing/tracing_client.py index f8615a3..104bcca 100644 --- a/testing/tracing_client.py +++ b/testing/tracing_client.py @@ -24,7 +24,7 @@ tracer = config.initialize_tracer() - con = pymongo.MongoClient("mongodb://localhost:27113/?appName=tracing_client") + con = pymongo.MongoClient("mongodb://localhost:27111/?appName=tracing_client") coll = con['test']['kittens'] def span_as_text(span): @@ -46,7 +46,7 @@ def span_as_text(span): for i in range(110): # Insert does not actually take a $comment so we just skip # it's not that interesting op anyway. - coll.insert({"name": "Purry", "number": i }) + coll.insert_one({"name": "Purry", "number": i }) print("Fetching exactly 101") with tracer.start_span('Fetching 101', root_span) as span: