Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move BufWriter construction into get_output_file #19

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 7 additions & 12 deletions neqo-bin/src/bin/client/http09.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ use neqo_transport::{
use url::Url;

use super::{get_output_file, Args, KeyUpdateState, Res};
use crate::{qlog_new, BUFWRITER_BUFFER_SIZE};
use crate::qlog_new;

pub struct Handler<'a> {
streams: HashMap<StreamId, Option<File>>,
streams: HashMap<StreamId, Option<BufWriter<File>>>,
url_queue: VecDeque<Url>,
all_paths: Vec<PathBuf>,
args: &'a Args,
Expand Down Expand Up @@ -228,7 +228,7 @@ impl<'b> Handler<'b> {
return Ok(fin);
}

if let Some(ref mut out_file) = maybe_out_file {
if let Some(out_file) = maybe_out_file {
out_file.write_all(&data[..sz])?;
} else if !output_read_data {
println!("READ[{stream_id}]: {sz} bytes");
Expand All @@ -246,27 +246,22 @@ impl<'b> Handler<'b> {
}

fn read(&mut self, client: &mut Connection, stream_id: StreamId) -> Res<()> {
let mut maybe_maybe_out_file = self.streams.get_mut(&stream_id);
match &mut maybe_maybe_out_file {
match self.streams.get_mut(&stream_id) {
None => {
println!("Data on unexpected stream: {stream_id}");
return Ok(());
}
Some(maybe_out_file) => {
let mut buf_writer = maybe_out_file
.take()
.map(|file| BufWriter::with_capacity(BUFWRITER_BUFFER_SIZE, file));

let fin_recvd = Self::read_from_stream(
client,
stream_id,
self.args.output_read_data,
&mut buf_writer,
maybe_out_file,
)?;

if fin_recvd {
if buf_writer.is_some() {
buf_writer.take().unwrap().flush()?;
if let Some(mut out_file) = maybe_out_file.take() {
out_file.flush()?;
} else {
println!("<FIN[{stream_id}]>");
}
Expand Down
18 changes: 8 additions & 10 deletions neqo-bin/src/bin/client/http3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use neqo_transport::{
};
use url::Url;

use crate::{get_output_file, qlog_new, Args, KeyUpdateState, Res, BUFWRITER_BUFFER_SIZE};
use crate::{get_output_file, qlog_new, Args, KeyUpdateState, Res};

pub(crate) struct Handler<'a> {
#[allow(
Expand Down Expand Up @@ -255,10 +255,8 @@ impl StreamHandlerType {
match handler_type {
Self::Download => {
let out_file = get_output_file(url, &args.output_dir, all_paths);
let buf_writer =
out_file.map(|file| BufWriter::with_capacity(BUFWRITER_BUFFER_SIZE, file));
client.stream_close_send(client_stream_id).unwrap();
Box::new(DownloadStreamHandler { buf_writer })
Box::new(DownloadStreamHandler { out_file })
}
Self::Upload => Box::new(UploadStreamHandler {
data: vec![42; args.upload_size],
Expand All @@ -271,12 +269,12 @@ impl StreamHandlerType {
}

struct DownloadStreamHandler {
buf_writer: Option<BufWriter<File>>,
out_file: Option<BufWriter<File>>,
}

impl StreamHandler for DownloadStreamHandler {
fn process_header_ready(&mut self, stream_id: StreamId, fin: bool, headers: Vec<Header>) {
if self.buf_writer.is_none() {
if self.out_file.is_none() {
println!("READ HEADERS[{stream_id}]: fin={fin} {headers:?}");
}
}
Expand All @@ -289,9 +287,9 @@ impl StreamHandler for DownloadStreamHandler {
sz: usize,
output_read_data: bool,
) -> Res<bool> {
if let Some(buf_writer) = &mut self.buf_writer {
if let Some(out_file) = &mut self.out_file {
if sz > 0 {
buf_writer.write_all(&data[..sz])?;
out_file.write_all(&data[..sz])?;
}
return Ok(true);
} else if !output_read_data {
Expand All @@ -303,8 +301,8 @@ impl StreamHandler for DownloadStreamHandler {
}

if fin {
if let Some(mut buf_writer) = self.buf_writer.take() {
buf_writer.flush()?;
if let Some(mut out_file) = self.out_file.take() {
out_file.flush()?;
} else {
println!("<FIN[{stream_id}]>");
}
Expand Down
6 changes: 3 additions & 3 deletions neqo-bin/src/bin/client/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
collections::{HashMap, VecDeque},
fmt::{self, Display},
fs::{create_dir_all, File, OpenOptions},
io,
io::{self, BufWriter},
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, ToSocketAddrs},
path::PathBuf,
pin::Pin,
Expand Down Expand Up @@ -254,7 +254,7 @@ fn get_output_file(
url: &Url,
output_dir: &Option<PathBuf>,
all_paths: &mut Vec<PathBuf>,
) -> Option<File> {
) -> Option<BufWriter<File>> {
if let Some(ref dir) = output_dir {
let mut out_path = dir.clone();

Expand Down Expand Up @@ -286,7 +286,7 @@ fn get_output_file(
.ok()?;

all_paths.push(out_path);
Some(f)
Some(BufWriter::with_capacity(BUFWRITER_BUFFER_SIZE, f))
} else {
None
}
Expand Down