Skip to content

Commit

Permalink
Make the capacity, max payload size and eviction interval configurable
Browse files Browse the repository at this point in the history
This does not add the config bits, but does add the plumbing to set it
from the Python size
  • Loading branch information
sandhose committed Apr 15, 2024
1 parent d4dec65 commit 2db9345
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 33 deletions.
76 changes: 44 additions & 32 deletions rust/src/rendezvous/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ use crate::{

mod session;

const MAX_CONTENT_LENGTH: u64 = 1024 * 100;
const CAPACITY: usize = 100;

// n.b. Because OPTIONS requests are handled by the Python code, we don't need to set Access-Control-Allow-Headers.
fn prepare_headers(headers: &mut HeaderMap, session: &Session) {
headers.typed_insert(AccessControlAllowOrigin::ANY);
Expand All @@ -53,38 +50,42 @@ fn prepare_headers(headers: &mut HeaderMap, session: &Session) {
headers.typed_insert(session.last_modified());
}

fn check_input_headers(headers: &HeaderMap) -> PyResult<Mime> {
let ContentLength(content_length) = headers.typed_get_required()?;

if content_length > MAX_CONTENT_LENGTH {
return Err(SynapseError::new(
StatusCode::PAYLOAD_TOO_LARGE,
"Payload too large".to_owned(),
"M_TOO_LARGE",
None,
None,
));
}

let content_type: ContentType = headers.typed_get_required()?;

Ok(content_type.into())
}

#[pyclass]
struct RendezvousHandler {
base: Uri,
clock: PyObject,
sessions: BTreeMap<Ulid, Session>,
capacity: usize,
max_content_length: u64,
}

impl RendezvousHandler {
fn evict(&mut self, now: SystemTime, max_entries: usize) {
/// Check the input headers of a request which sets data for a session, and return the content type.
fn check_input_headers(&self, headers: &HeaderMap) -> PyResult<Mime> {
let ContentLength(content_length) = headers.typed_get_required()?;

if content_length > self.max_content_length {
return Err(SynapseError::new(
StatusCode::PAYLOAD_TOO_LARGE,
"Payload too large".to_owned(),
"M_TOO_LARGE",
None,
None,
));
}

let content_type: ContentType = headers.typed_get_required()?;

Ok(content_type.into())
}

/// Evict expired sessions and remove the oldest sessions until we're under the capacity.
fn evict(&mut self, now: SystemTime) {
// First remove all the entries which expired
self.sessions.retain(|_, session| !session.expired(now));

// Then we remove the oldest entires until we're under the limit
while self.sessions.len() > max_entries {
while self.sessions.len() > self.capacity {
self.sessions.pop_first();
}
}
Expand All @@ -93,7 +94,14 @@ impl RendezvousHandler {
#[pymethods]
impl RendezvousHandler {
#[new]
fn new(py: Python<'_>, homeserver: &PyAny) -> PyResult<Py<Self>> {
#[pyo3(signature = (homeserver, /, capacity=100, max_content_length=1024*1024, eviction_interval=60*1000))]
fn new(
py: Python<'_>,
homeserver: &PyAny,
capacity: usize,
max_content_length: u64,
eviction_interval: u64,
) -> PyResult<Py<Self>> {
let base: String = homeserver
.getattr("config")?
.getattr("server")?
Expand All @@ -112,13 +120,17 @@ impl RendezvousHandler {
base,
clock,
sessions: BTreeMap::new(),
capacity,
max_content_length,
},
)?;

let evict = self_.getattr(py, "_evict")?;
homeserver
.call_method0("get_clock")?
.call_method("looping_call", (evict, 500), None)?;
homeserver.call_method0("get_clock")?.call_method(
"looping_call",
(evict, eviction_interval),
None,
)?;

Ok(self_)
}
Expand All @@ -127,23 +139,23 @@ impl RendezvousHandler {
let clock = self.clock.as_ref(py);
let now: u64 = clock.call_method0("time_msec")?.extract()?;
let now = SystemTime::UNIX_EPOCH + Duration::from_millis(now);
self.evict(now, CAPACITY);
self.evict(now);

Ok(())
}

fn handle_post(&mut self, py: Python<'_>, twisted_request: &PyAny) -> PyResult<()> {
let request = http_request_from_twisted(twisted_request)?;

let content_type = check_input_headers(request.headers())?;
let content_type = self.check_input_headers(request.headers())?;

let clock = self.clock.as_ref(py);
let now: u64 = clock.call_method0("time_msec")?.extract()?;
let now = SystemTime::UNIX_EPOCH + Duration::from_millis(now);

// We trigger an immediate eviction if we're at 2x the capacity
if self.sessions.len() >= CAPACITY * 2 {
self.evict(now, CAPACITY);
if self.sessions.len() >= self.capacity * 2 {
self.evict(now);
}

// Generate a new ULID for the session from the current time.
Expand Down Expand Up @@ -210,7 +222,7 @@ impl RendezvousHandler {
fn handle_put(&mut self, py: Python<'_>, twisted_request: &PyAny, id: &str) -> PyResult<()> {
let request = http_request_from_twisted(twisted_request)?;

let content_type = check_input_headers(request.headers())?;
let content_type = self.check_input_headers(request.headers())?;

let if_match: IfMatch = request.headers().typed_get_required()?;

Expand Down
9 changes: 8 additions & 1 deletion synapse/synapse_rust/rendezvous.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,14 @@ from twisted.web.iweb import IRequest
from synapse.server import HomeServer

class RendezvousHandler:
def __init__(self, homeserver: HomeServer) -> None: ...
def __init__(
self,
homeserver: HomeServer,
/,
capacity: int = 100,
max_content_length: int = 1024 * 1024,
eviction_interval: int = 60 * 1000,
) -> None: ...
def handle_post(self, request: IRequest) -> None: ...
def handle_get(self, request: IRequest, session_id: str) -> None: ...
def handle_put(self, request: IRequest, session_id: str) -> None: ...
Expand Down

0 comments on commit 2db9345

Please sign in to comment.