Skip to content

Commit

Permalink
feat: cache HTTP GET requests
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Dec 11, 2024
1 parent 72558af commit ee20887
Show file tree
Hide file tree
Showing 3 changed files with 256 additions and 5 deletions.
223 changes: 218 additions & 5 deletions src/net/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ use http_body_util::BodyExt;
use hyper_util::rt::TokioIo;
use mime::Mime;
use serde::Serialize;
use tokio::fs;

use crate::blob::BlobObject;
use crate::context::Context;
use crate::net::proxy::ProxyConfig;
use crate::net::session::SessionStream;
use crate::net::tls::wrap_rustls;
use crate::tools::{create_id, time};

/// HTTP(S) GET response.
#[derive(Debug)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Response {
/// Response body.
pub blob: Vec<u8>,
Expand Down Expand Up @@ -90,9 +93,127 @@ where
Ok(sender)
}

/// Converts the URL to expiration timestamp.
fn http_url_cache_expires(url: &str, mimetype: Option<&str>) -> i64 {
let now = time();
if url.ends_with(".xdc") {
// WebXDCs expire in 5 weeks.
now + 3600 * 24 * 35
} else if mimetype.is_some_and(|s| s.starts_with("image/")) {
// Cache images for 1 day.
now + 3600 * 24
} else {
// Cache everything else for 1 hour.
now + 3600
}
}

/// Places the binary into HTTP cache.
async fn http_cache_put(context: &Context, url: &str, response: &Response) -> Result<()> {
let blob = BlobObject::create(
context,
&format!("http_cache_{}", create_id()),
response.blob.as_slice(),
)
.await?;

context
.sql
.insert(
"INSERT OR IGNORE INTO http_cache (url, expires, blobname, mimetype, encoding)
VALUES (?, ?, ?, ?, ?)",
(
url,
http_url_cache_expires(url, response.mimetype.as_deref()),
blob.as_name(),
response.mimetype.as_deref().unwrap_or_default(),
response.encoding.as_deref().unwrap_or_default(),
),
)
.await?;

Ok(())
}

/// Retrieves the binary from HTTP cache.
async fn http_cache_get(context: &Context, url: &str) -> Result<Option<Response>> {
let Some((blob_name, mimetype, encoding)) = context
.sql
.query_row_optional(
"SELECT blobname, mimetype, encoding
FROM http_cache WHERE url=? AND expires > ?",
(url, time()),
|row| {
let blob_name: String = row.get(0)?;
let mimetype: Option<String> = Some(row.get(1)?).filter(|s: &String| !s.is_empty());
let encoding: Option<String> = Some(row.get(2)?).filter(|s: &String| !s.is_empty());
Ok((blob_name, mimetype, encoding))
},
)
.await?
else {
return Ok(None);
};

let blob_object = BlobObject::from_name(context, blob_name)?;
let blob_abs_path = blob_object.to_abs_path();
let blob = match fs::read(blob_abs_path)
.await
.with_context(|| format!("Failed to read blob for {url:?} cache entry."))
{
Ok(blob) => blob,
Err(err) => {
// This should not happen, but user may go into the blobdir and remove files,
// antivirus may delete the file or there may be a bug in housekeeping.
warn!(context, "{err:?}.");
return Ok(None);
}
};

let expires = http_url_cache_expires(url, mimetype.as_deref());
let response = Response {
blob,
mimetype,
encoding,
};

// Update expiration timestamp.
context
.sql
.execute(
"UPDATE http_cache SET expires=? WHERE url=?",
(expires, url),
)
.await?;

Ok(Some(response))
}

/// Removes expired cache entries.
pub(crate) async fn http_cache_cleanup(context: &Context) -> Result<()> {
// Remove cache entries that are already expired
// or entries that will not expire in a year
// to make sure we don't have invalid timestamps that are way forward in the future.
context
.sql
.execute(
"DELETE FROM http_cache
WHERE ?1 > expires OR expires > ?1 + 31536000",
(time(),),
)
.await?;
Ok(())
}

/// Retrieves the binary contents of URL using HTTP GET request.
pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
let mut url = url.to_string();
pub async fn read_url_blob(context: &Context, original_url: &str) -> Result<Response> {
if let Some(response) = http_cache_get(context, original_url).await? {
info!(context, "Returning {original_url:?} from cache.");
return Ok(response);
}

info!(context, "Not found {original_url:?} in cache.");
let mut url = original_url.to_string();

// Follow up to 10 http-redirects
for _i in 0..10 {
Expand Down Expand Up @@ -139,11 +260,14 @@ pub async fn read_url_blob(context: &Context, url: &str) -> Result<Response> {
});
let body = response.collect().await?.to_bytes();
let blob: Vec<u8> = body.to_vec();
return Ok(Response {
let response = Response {
blob,
mimetype,
encoding,
});
};
info!(context, "Inserting {original_url:?} into cache.");
http_cache_put(context, &url, &response).await?;
return Ok(response);
}

Err(anyhow!("Followed 10 redirections"))
Expand Down Expand Up @@ -241,3 +365,92 @@ pub(crate) async fn post_form<T: Serialize + ?Sized>(
let bytes = response.collect().await?.to_bytes();
Ok(bytes)
}

#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;

use crate::sql::housekeeping;
use crate::test_utils::TestContext;
use crate::tools::SystemTime;

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_http_cache() -> Result<()> {
let t = &TestContext::new().await;

assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);

let html_response = Response {
blob: b"<!DOCTYPE html> ...".to_vec(),
mimetype: Some("text/html".to_string()),
encoding: None,
};

let xdc_response = Response {
blob: b"PK...".to_vec(),
mimetype: Some("application/octet-stream".to_string()),
encoding: None,
};
let xdc_editor_url = "https://apps.testrun.org/webxdc-editor-v3.2.0.xdc";
let xdc_pixel_url = "https://apps.testrun.org/webxdc-pixel-v2.xdc";

http_cache_put(t, "https://webxdc.org/", &html_response).await?;

assert_eq!(http_cache_get(t, xdc_editor_url).await?, None);
assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);
assert_eq!(
http_cache_get(t, "https://webxdc.org/").await?,
Some(html_response.clone())
);

http_cache_put(t, xdc_editor_url, &xdc_response).await?;
assert_eq!(
http_cache_get(t, xdc_editor_url).await?,
Some(xdc_response.clone())
);

assert_eq!(
http_cache_get(t, "https://webxdc.org/").await?,
Some(html_response.clone())
);

// HTML expires after 1 hour, but .xdc does not.
SystemTime::shift(Duration::from_secs(3600 + 100));
assert_eq!(http_cache_get(t, "https://webxdc.org/").await?, None);
assert_eq!(
http_cache_get(t, xdc_editor_url).await?,
Some(xdc_response.clone())
);

// 35 days later pixel .xdc expires because we did not request it for 35 days and 1 hour.
// But editor is still there because we did not request it for just 35 days.
SystemTime::shift(Duration::from_secs(3600 * 24 * 35 - 100));

// Run housekeeping to test that it does not delete the blob too early.
housekeeping(t).await?;

assert_eq!(
http_cache_get(t, xdc_editor_url).await?,
Some(xdc_response.clone())
);
assert_eq!(http_cache_get(t, xdc_pixel_url).await?, None);

// Test that if the file is accidentally removed from the blobdir,
// there is no error when trying to load the cache entry.
for entry in std::fs::read_dir(t.get_blobdir())? {
let entry = entry.unwrap();
let path = entry.path();
std::fs::remove_file(path).expect("Failed to remove blob");
}

assert_eq!(
http_cache_get(t, xdc_editor_url)
.await
.context("Failed to get no cache response")?,
None
);

Ok(())
}
}
23 changes: 23 additions & 0 deletions src/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::location::delete_orphaned_poi_locations;
use crate::log::LogExt;
use crate::message::{Message, MsgId};
use crate::net::dns::prune_dns_cache;
use crate::net::http::http_cache_cleanup;
use crate::net::prune_connection_history;
use crate::param::{Param, Params};
use crate::peerstate::Peerstate;
Expand Down Expand Up @@ -720,6 +721,12 @@ pub async fn housekeeping(context: &Context) -> Result<()> {
warn!(context, "Can't set config: {e:#}.");
}

http_cache_cleanup(context)
.await
.context("Failed to cleanup HTTP cache")
.log_err(context)
.ok();

if let Err(err) = remove_unused_files(context).await {
warn!(
context,
Expand Down Expand Up @@ -846,6 +853,22 @@ pub async fn remove_unused_files(context: &Context) -> Result<()> {
.await
.context("housekeeping: failed to SELECT value FROM config")?;

context
.sql
.query_map(
"SELECT blobname FROM http_cache",
(),
|row| row.get::<_, String>(0),
|rows| {
for row in rows {
maybe_add_file(&mut files_in_use, &row?);
}
Ok(())
},
)
.await
.context("Failed to SELECT blobname FROM http_cache")?;

info!(context, "{} files in use.", files_in_use.len());
/* go through directories and delete unused files */
let blobdir = context.get_blobdir();
Expand Down
15 changes: 15 additions & 0 deletions src/sql/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,21 @@ CREATE INDEX msgs_status_updates_index2 ON msgs_status_updates (uid);
.await?;
}

inc_and_check(&mut migration_version, 125)?;
if dbversion < migration_version {
sql.execute_migration(
"CREATE TABLE http_cache (
url TEXT PRIMARY KEY,
expires INTEGER NOT NULL, -- When the cache entry is considered expired, timestamp in seconds.
blobname TEXT NOT NULL,
mimetype TEXT NOT NULL DEFAULT '', -- MIME type extracted from Content-Type header.
encoding TEXT NOT NULL DEFAULT '' -- Encoding from Content-Type header.
) STRICT",
migration_version,
)
.await?;
}

let new_version = sql
.get_raw_config_int(VERSION_CFG)
.await?
Expand Down

0 comments on commit ee20887

Please sign in to comment.