Skip to content

Commit

Permalink
fix grpc write problem in big data traffic situation.
Browse files Browse the repository at this point in the history
poll_capacity can return value smaller than the real data,
 resulting a partial write.
  • Loading branch information
e1732a364fed committed Jan 1, 2099
1 parent 169be65 commit e2e8300
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 15 deletions.
61 changes: 47 additions & 14 deletions rucimp/src/map/h2/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,17 @@ pub fn read_uvarint(r: &mut BytesMut) -> (u64, Option<UVariantErr>) {
(x, Some(UVariantErr::OverFlow))
}

pub fn get_real_len(lp: usize) -> usize {
let mut protobuf_header = [0u8; MAX_VARINT_LEN64 + 1];
protobuf_header[0] = 0x0a;

let varuint_size = put_uvarint(&mut protobuf_header[1..], lp);

let ph_len = varuint_size + 1;

5 + ph_len + lp
}

pub fn encode_head(lp: usize, only_head_cap: bool) -> BytesMut {
let mut protobuf_header = [0u8; MAX_VARINT_LEN64 + 1];
protobuf_header[0] = 0x0a;
Expand Down Expand Up @@ -147,7 +158,7 @@ const READ_HEAD_LEN: usize = 6;
pub struct Stream {
recv: RecvStream,
send: SendStream<Bytes>,
buffer: BytesMut,
r_buffer: BytesMut,

next_data_len: usize,

Expand All @@ -170,16 +181,16 @@ impl Stream {
Stream {
recv,
send,
buffer: BytesMut::with_capacity(super::BUFFER_CAP),
r_buffer: BytesMut::with_capacity(super::BUFFER_CAP),
next_data_len: 0,
shutdown_tx,
}
}

fn try_read_next_len(&mut self) -> Result<(), UVariantErr> {
if self.next_data_len == 0 {
self.buffer.advance(READ_HEAD_LEN);
let (len, oe) = read_uvarint(&mut self.buffer);
self.r_buffer.advance(READ_HEAD_LEN);
let (len, oe) = read_uvarint(&mut self.r_buffer);
if let Some(e) = oe {
return Err(e);
}
Expand All @@ -199,13 +210,13 @@ impl AsyncRead for Stream {
loop {
//debug!("loop {} {}", self.next_data_len, self.buffer.len());

if !self.buffer.is_empty() && self.buffer.len() >= self.next_data_len {
if !self.r_buffer.is_empty() && self.r_buffer.len() >= self.next_data_len {
let r = self.try_read_next_len();
if let Err(e) = r {
return Poll::Ready(Err(io_error(e)));
}

let v = vec![buf.remaining(), self.buffer.len(), self.next_data_len];
let v = vec![buf.remaining(), self.r_buffer.len(), self.next_data_len];

let r_len = v.into_iter().min().expect("ok");

Expand All @@ -217,7 +228,7 @@ impl AsyncRead for Stream {
// r_len
// );

let read_data = self.buffer.split_to(r_len);
let read_data = self.r_buffer.split_to(r_len);
buf.put_slice(&read_data);

self.next_data_len -= r_len;
Expand All @@ -226,7 +237,7 @@ impl AsyncRead for Stream {
};
match ready!(self.recv.poll_data(cx)) {
Some(Ok(data)) => {
self.buffer.extend_from_slice(&data);
self.r_buffer.extend_from_slice(&data);

let r = self
.recv
Expand Down Expand Up @@ -257,14 +268,36 @@ impl AsyncWrite for Stream {
buf: &[u8],
) -> Poll<io::Result<usize>> {
let old_len = buf.len();
let real_buf = encode(buf, false);
let realbuf_len = get_real_len(old_len);

self.send.reserve_capacity(real_buf.len());
self.send.reserve_capacity(realbuf_len);
Poll::Ready(match ready!(self.send.poll_capacity(cx)) {
Some(Ok(write_len)) => self.send.send_data(real_buf.into(), false).map_or_else(
|e| Err(Error::new(ErrorKind::BrokenPipe, e)),
|_| Ok(min(old_len, write_len)),
),
Some(Ok(write_len)) => {
if write_len >= realbuf_len {
//实测不能连续使用两次 send_data, 会卡住, 故只能先encode再一次性发送

let b = encode(buf, false);

self.send.send_data(b.into(), false).map_or_else(
|e| Err(Error::new(ErrorKind::BrokenPipe, e)),
|_| Ok(min(old_len, write_len)),
)
} else {
//debug!("write len short {write_len} {realbuf_len}");
//这种情况很常见, 尤其在看4k视频等大流量情况下,在 e的测试中, realbuf_len 常为 8200,
// 而 实得的 write_len 会以各种数值 小于它

let diff = realbuf_len - write_len;
let old_buf_written = old_len - diff;

let to_write_buf = encode(&buf[..old_buf_written], false);

self.send.send_data(to_write_buf.into(), false).map_or_else(
|e| Err(Error::new(ErrorKind::BrokenPipe, e)),
|_| Ok(old_buf_written),
)
}
}
// is_send_streaming returns false
// which indicates the state is
// neither open nor half_close_remote
Expand Down
2 changes: 1 addition & 1 deletion rucimp/src/map/h2/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tracing::debug;
// (MIT)
// https://github.com/zephyrchien/midori/blob/master/src/transport/h2/stream.rs

const BUFFER_CAP: usize = 0x1000; //todo: change this
const BUFFER_CAP: usize = 0x4000; //todo: adjust this

pub struct H2Stream {
recv: RecvStream,
Expand Down

0 comments on commit e2e8300

Please sign in to comment.