Skip to content

Commit

Permalink
Fix too-small request sizing after making a large request. (flatgeobu…
Browse files Browse the repository at this point in the history
…f#376)

* Fix too-small request sizing after making a large request.

Background:

To minimize the number of requests, nearby features are grouped into
batches which can be fetched in a single request. While processing the
first feature of the batch, we fetch all the data we need for the entire
batch.

However, because our HTTP library holds the entire request in memory, to
keep memory usage reasonable, there was logic to cap the max size of
that http request.

Bug:

If you ever hit that max http request size while fetching a feature
batch, each subsequent feature in that batch would require 2 requests
per feature — one to read the feature's length and another to read the
feature's data.

This logic would fix itself upon starting the subsequent batch,
until/unless another large request was encountered.

Fix:

We now calculate the size of the batch each request, rather than just
once per batch.

I also added a test client to test this and hopefully any other future
request related bugs in the future.

* fixup! Fix too-small request sizing after making a large request.
  • Loading branch information
michaelkirk authored Aug 9, 2024
1 parent 4b2d108 commit 9348df1
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 34 deletions.
1 change: 1 addition & 0 deletions src/rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ tokio = { version = "1.34.0", features = ["rt-multi-thread"] }
reqwest = { version = "0.11.22", default-features = true }
geo-types = "0.7.12"
yocalhost = "0.3.0"
async-trait = "0.1.81"

[[bench]]
name = "read"
Expand Down
105 changes: 105 additions & 0 deletions src/rust/src/http_reader/mock_http_range_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
use crate::{HttpFgbReader, Result};
use bytes::Bytes;
use http_range_client::AsyncHttpRangeClient;
use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::ops::Range;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};

impl HttpFgbReader<MockHttpRangeClient> {
/// NOTE: For debugging expediency, this test class often prefers panics over returning a result.
pub async fn mock_from_file(
path: &str,
) -> Result<(
HttpFgbReader<MockHttpRangeClient>,
Arc<RwLock<RequestStats>>,
)> {
trace!("starting: opening http reader, reading header");

let stats = Arc::new(RwLock::new(RequestStats::new()));
let http_client = MockHttpRangeClient::new(path, stats.clone());
let client = http_range_client::AsyncBufferedHttpRangeClient::with(http_client, path);
Ok((Self::_open(client).await?, stats))
}
}

/// NOTE: For debugging expediency, this test class often prefers panics over returning a result.
pub(crate) struct MockHttpRangeClient {
path: PathBuf,
stats: Arc<RwLock<RequestStats>>,
}

pub(crate) struct RequestStats {
pub request_count: u64,
pub bytes_requested: u64,
}

impl RequestStats {
fn new() -> Self {
Self {
request_count: 0,
bytes_requested: 0,
}
}
}

#[async_trait::async_trait]
impl AsyncHttpRangeClient for MockHttpRangeClient {
async fn get_range(&self, url: &str, range: &str) -> http_range_client::Result<Bytes> {
assert_eq!(url, self.path.to_str().unwrap());

/// This is a hack, but we need the start and length of the range
/// since all we're given is the pre-formatted range string, we
/// need to parse it into its components
///
/// For expediency, this test code panics rather than returns a result.
fn parse_range_header(range: &str) -> Range<u64> {
let bytes = range.strip_prefix("bytes=").unwrap();
let parts: Vec<&str> = bytes.split('-').collect();
assert!(parts.len() == 2);
let start = parts[0].parse().expect("should have valid start range");
let end: u64 = parts[1].parse().expect("should have valid end range");
// Range headers are *inclusive*
start..(end + 1)
}

let range = parse_range_header(range);
let request_length = range.end - range.start;

let mut stats = self
.stats
.write()
.expect("test code does not handle actual concurrency");

stats.request_count += 1;
stats.bytes_requested += request_length;

let mut file_reader = BufReader::new(File::open(&self.path).unwrap());
file_reader
.seek(SeekFrom::Start(range.start))
.expect("unable to seek test reader");
let mut output = vec![0; request_length as usize];
file_reader
.read_exact(&mut output)
.expect("failed to read from test reader");
Ok(Bytes::from(output))
}

async fn head_response_header(
&self,
_url: &str,
_header: &str,
) -> http_range_client::Result<Option<String>> {
unimplemented!()
}
}

impl MockHttpRangeClient {
fn new(path: &str, stats: Arc<RwLock<RequestStats>>) -> Self {
Self {
path: path.into(),
stats,
}
}
}
109 changes: 75 additions & 34 deletions src/rust/src/http_reader.rs → src/rust/src/http_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,12 @@ use bytes::{BufMut, Bytes, BytesMut};
use http_range_client::{
AsyncBufferedHttpRangeClient, AsyncHttpRangeClient, BufferedHttpRangeClient,
};
use std::collections::VecDeque;
use std::ops::Range;

#[cfg(test)]
mod mock_http_range_client;

// The largest request we'll speculatively make.
// If a single huge feature requires, we'll necessarily exceed this limit.
const DEFAULT_HTTP_FETCH_SIZE: usize = 1_048_576; // 1MB
Expand Down Expand Up @@ -286,12 +290,7 @@ impl SelectBbox {

struct FeatureBatch {
/// The byte location of each feature within the file
feature_ranges: std::vec::IntoIter<HttpRange>,

/// When fetching new data, how many bytes should we fetch at once.
/// It was computed based on the specific feature ranges of the batch
/// to optimize number of requests vs. wasted bytes vs. resident memory
min_request_size: usize,
feature_ranges: VecDeque<HttpRange>,
}

impl FeatureBatch {
Expand All @@ -303,15 +302,19 @@ impl FeatureBatch {

for search_result_item in feature_ranges.into_iter() {
let Some(latest_batch) = batched_ranges.last_mut() else {
batched_ranges.push(vec![search_result_item.range]);
let mut new_batch = VecDeque::new();
new_batch.push_back(search_result_item.range);
batched_ranges.push(new_batch);
continue;
};

let previous_item = latest_batch.last().expect("we never push an empty batch");
let previous_item = latest_batch.back().expect("we never push an empty batch");

let HttpRange::Range(Range { end: prev_end, .. }) = previous_item else {
debug_assert!(false, "This shouldn't happen. Only the very last feature is expected to have an unknown length");
batched_ranges.push(vec![search_result_item.range]);
let mut new_batch = VecDeque::new();
new_batch.push_back(search_result_item.range);
batched_ranges.push(new_batch);
continue;
};

Expand All @@ -322,10 +325,12 @@ impl FeatureBatch {
} else {
trace!("wasting {wasted_bytes} to avoid an extra request");
}
latest_batch.push(search_result_item.range)
latest_batch.push_back(search_result_item.range)
} else {
trace!("creating a new request for batch rather than wasting {wasted_bytes} bytes");
batched_ranges.push(vec![search_result_item.range]);
let mut new_batch = VecDeque::new();
new_batch.push_back(search_result_item.range);
batched_ranges.push(new_batch);
}
}

Expand All @@ -334,13 +339,20 @@ impl FeatureBatch {
Ok(batches)
}

fn new(feature_ranges: Vec<HttpRange>) -> Self {
let first = feature_ranges
.first()
.expect("We never create empty batches");
let last = feature_ranges
.last()
.expect("We never create empty batches");
fn new(feature_ranges: VecDeque<HttpRange>) -> Self {
Self { feature_ranges }
}

/// When fetching new data, how many bytes should we fetch at once.
/// It was computed based on the specific feature ranges of the batch
/// to optimize number of requests vs. wasted bytes vs. resident memory
fn request_size(&self) -> usize {
let Some(first) = self.feature_ranges.front() else {
return 0;
};
let Some(last) = self.feature_ranges.back() else {
return 0;
};

// `last.length()` should only be None if this batch includes the final feature
// in the dataset. Since we can't infer its actual length, we'll fetch only
Expand All @@ -350,33 +362,22 @@ impl FeatureBatch {

let covering_range = first.start()..last.start() + last_feature_length;

let min_request_size = covering_range
covering_range
.len()
// Since it's all held in memory, don't fetch more than DEFAULT_HTTP_FETCH_SIZE at a time
// unless necessary.
.min(DEFAULT_HTTP_FETCH_SIZE);

Self {
feature_ranges: feature_ranges.into_iter(),
min_request_size,
}
.min(DEFAULT_HTTP_FETCH_SIZE)
}

async fn next_buffer<T: AsyncHttpRangeClient>(
&mut self,
client: &mut AsyncBufferedHttpRangeClient<T>,
) -> Result<Option<Bytes>> {
client.set_min_req_size(self.min_request_size);
let Some(feature_range) = self.feature_ranges.next() else {
let request_size = self.request_size();
client.set_min_req_size(request_size);
let Some(feature_range) = self.feature_ranges.pop_front() else {
return Ok(None);
};
// Only set min_request_size for the first request.
//
// This should only affect a batch that contains the final feature, otherwise
// we've calculated `batchSize` to get all the data we need for the batch.
// For the very final feature in a dataset, we don't know it's length, so we
// will end up executing an extra request for that batch.
self.min_request_size = 0;

let mut pos = feature_range.start();
let mut feature_buffer = BytesMut::from(client.get_range(pos, 4).await?);
Expand Down Expand Up @@ -410,3 +411,43 @@ mod geozero_api {
}
}
}

#[cfg(test)]
mod tests {
use crate::HttpFgbReader;

#[tokio::test]
async fn fgb_max_request_size() {
let (fgb, stats) = HttpFgbReader::mock_from_file("../../test/data/UScounties.fgb")
.await
.unwrap();

{
// The read guard needs to be in a scoped block, else we won't release the lock and the test will hang when
// the actual FGB client code tries to update the stats.
let stats = stats.read().unwrap();
assert_eq!(stats.request_count, 1);
// This number might change a little if the test data or logic changes, but they should be in the same ballpark.
assert_eq!(stats.bytes_requested, 12944);
}

// This bbox covers a large swathe of the dataset. The idea is that at least one request should be limited by the
// max request size `DEFAULT_HTTP_FETCH_SIZE`, but that we should still have a reasonable number of requests.
let mut iter = fgb.select_bbox(-118.0, 42.0, -100.0, 47.0).await.unwrap();

let mut feature_count = 0;
while let Some(_feature) = iter.next().await.unwrap() {
feature_count += 1;
}
assert_eq!(feature_count, 169);

{
// The read guard needs to be in a scoped block, else we won't release the lock and the test will hang when
// the actual FGB client code tries to update the stats.
let stats = stats.read().unwrap();
// These numbers might change a little if the test data or logic changes, but they should be in the same ballpark.
assert_eq!(stats.request_count, 5);
assert_eq!(stats.bytes_requested, 2131152);
}
}
}

0 comments on commit 9348df1

Please sign in to comment.