-
Notifications
You must be signed in to change notification settings - Fork 6
/
parser_async.rs
107 lines (90 loc) · 2.95 KB
/
parser_async.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use std::io::{Cursor, Read};
use super::*;
use crate::compression::Compressor;
use crate::error;
use crate::frame::frame_response::ResponseBody;
use crate::frame::FromCursor;
use crate::types::data_serialization_types::decode_timeuuid;
use crate::types::{from_bytes, from_u16_bytes, CStringList, UUID_LEN};
macro_rules! proceed_if_filled {
($res: expr, $expected_len: expr) => {
if $expected_len > $res? {
return Ok(None);
}
};
}
pub fn parse_frame_async<E, C>(
cursor: &mut C,
compressor: &dyn Compressor<CompressorError = E>,
) -> error::Result<Option<Frame>>
where
E: std::error::Error,
C: Read,
{
let mut version_bytes = [0; Version::BYTE_LENGTH];
let mut flag_bytes = [0; Flag::BYTE_LENGTH];
let mut opcode_bytes = [0; Opcode::BYTE_LENGTH];
let mut stream_bytes = [0; STREAM_LEN];
let mut length_bytes = [0; LENGTH_LEN];
// NOTE: order of reads matters
proceed_if_filled!(cursor.read(&mut version_bytes), Version::BYTE_LENGTH);
proceed_if_filled!(cursor.read(&mut flag_bytes), Flag::BYTE_LENGTH);
proceed_if_filled!(cursor.read(&mut stream_bytes), STREAM_LEN);
proceed_if_filled!(cursor.read(&mut opcode_bytes), Opcode::BYTE_LENGTH);
proceed_if_filled!(cursor.read(&mut length_bytes), LENGTH_LEN);
let version = Version::from(version_bytes.to_vec());
let flags = Flag::get_collection(flag_bytes[0]);
let stream = from_u16_bytes(&stream_bytes);
let opcode = Opcode::from(opcode_bytes[0]);
let length = from_bytes(&length_bytes) as usize;
let mut body_bytes = Vec::with_capacity(length);
unsafe {
body_bytes.set_len(length);
}
proceed_if_filled!(cursor.read(&mut body_bytes), length);
let full_body = if flags.iter().any(|flag| flag == &Flag::Compression) {
compressor
.decode(body_bytes)
.map_err(|err| error::Error::from(err.description()))?
} else {
body_bytes
};
// Use cursor to get tracing id, warnings and actual body
let mut body_cursor = Cursor::new(full_body.as_slice());
let tracing_id = if flags.iter().any(|flag| flag == &Flag::Tracing) {
let mut tracing_bytes = Vec::with_capacity(UUID_LEN);
unsafe {
tracing_bytes.set_len(UUID_LEN);
}
body_cursor.read_exact(&mut tracing_bytes)?;
decode_timeuuid(tracing_bytes.as_slice()).ok()
} else {
None
};
let warnings = if flags.iter().any(|flag| flag == &Flag::Warning) {
CStringList::from_cursor(&mut body_cursor)?.into_plain()
} else {
vec![]
};
let mut body = vec![];
body_cursor.read_to_end(&mut body)?;
let frame = Frame {
version: version,
flags: flags,
opcode: opcode,
stream: stream,
body: body,
tracing_id: tracing_id,
warnings: warnings,
};
Ok(Some(frame))
}
pub fn convert_frame_into_result(frame: Frame) -> error::Result<Frame> {
match frame.opcode {
Opcode::Error => frame.get_body().and_then(|err| match err {
ResponseBody::Error(err) => Err(error::Error::Server(err)),
_ => unreachable!(),
}),
_ => Ok(frame),
}
}