Skip to content
This repository has been archived by the owner on Jan 14, 2022. It is now read-only.

Commit

Permalink
Stream events via a watch channel (#128)
Browse files Browse the repository at this point in the history
This squashed commit makes a fairly significant structural change to significantly reduce Flodgatt's CPU usage.

Flodgatt connects to Redis in a single (green) thread, and then creates a new thread to handle each WebSocket/SSE connection. Previously, each thread was responsible for polling the Redis thread to determine whether it had a message relevant to the connected client. I initially selected this structure both because it was simple and because it minimized memory overhead – no messages are sent to a particular thread unless they are relevant to the client connected to the thread. However, I recently ran some load tests that show this approach to have unacceptable CPU costs when 300+ clients are simultaneously connected.

Accordingly, Flodgatt now uses a different structure: the main Redis thread now announces each incoming message via a watch channel connected to every client thread, and each client thread filters out irrelevant messages. In theory, this could lead to slightly higher memory use, but tests I have run so far have not found a measurable increase. On the other hand, Flodgatt's CPU use is now an order of magnitude lower in tests I've run.

This approach does run a (very slight) risk of dropping messages under extremely heavy load: because a watch channel only stores the most recent message transmitted, if Flodgatt adds a second message before the thread can read the first message, the first message will be overwritten and never transmitted. This seems unlikely to happen in practice, and we can avoid the issue entirely by changing to a broadcast channel when we upgrade to the most recent Tokio version (see #75).
  • Loading branch information
codesections authored Apr 9, 2020
1 parent fa8b695 commit 1657113
Show file tree
Hide file tree
Showing 14 changed files with 261 additions and 375 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "flodgatt"
description = "A blazingly fast drop-in replacement for the Mastodon streaming api server"
version = "0.7.1"
version = "0.8.0"
authors = ["Daniel Long Sockwell <daniel@codesections.com", "Julian Laubstein <contact@julianlaubstein.de>"]
edition = "2018"

Expand Down Expand Up @@ -43,8 +43,9 @@ stub_status = []
production = []

[profile.release]
lto = "fat"
panic = "abort"
codegen-units = 1
#lto = "fat"
#panic = "abort"
#codegen-units = 1



3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
//! most important settings for performance control the frequency with which the `ClientAgent`
//! polls the `Receiver` and the frequency with which the `Receiver` polls Redis.
//!
#![allow(clippy::try_err, clippy::match_bool)]

pub mod config;
pub mod err;
pub mod messages;
Expand Down
86 changes: 63 additions & 23 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use flodgatt::{
config::{DeploymentConfig, EnvVar, PostgresConfig, RedisConfig},
parse_client_request::{PgPool, Subscription},
redis_to_client_stream::{ClientAgent, EventStream, Receiver},
messages::Event,
parse_client_request::{PgPool, Subscription, Timeline},
redis_to_client_stream::{Receiver, SseStream, WsStream},
};
use std::{env, fs, net::SocketAddr, os::unix::fs::PermissionsExt};
use tokio::net::UnixListener;
use tokio::{
net::UnixListener,
sync::{mpsc, watch},
};
use warp::{http::StatusCode, path, ws::Ws2, Filter, Rejection};

fn main() {
Expand All @@ -23,8 +27,10 @@ fn main() {
let cfg = DeploymentConfig::from_env(env_vars);

let pg_pool = PgPool::new(postgres_cfg);

let receiver = Receiver::try_from(redis_cfg)
let (event_tx, event_rx) = watch::channel((Timeline::empty(), Event::Ping));
let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
let poll_freq = *redis_cfg.polling_interval;
let receiver = Receiver::try_from(redis_cfg, event_tx, cmd_rx)
.unwrap_or_else(|e| {
log::error!("{}\nFlodgatt shutting down...", e);
std::process::exit(1);
Expand All @@ -34,38 +40,57 @@ fn main() {

// Server Sent Events
let sse_receiver = receiver.clone();
let (sse_interval, whitelist_mode) = (*cfg.sse_interval, *cfg.whitelist_mode);
let (sse_rx, sse_cmd_tx) = (event_rx.clone(), cmd_tx.clone());
let whitelist_mode = *cfg.whitelist_mode;
let sse_routes = Subscription::from_sse_query(pg_pool.clone(), whitelist_mode)
.and(warp::sse())
.map(
move |subscription: Subscription, sse_connection_to_client: warp::sse::Sse| {
log::info!("Incoming SSE request for {:?}", subscription.timeline);
let mut client_agent = ClientAgent::new(sse_receiver.clone(), &subscription);
client_agent.subscribe();

{
let mut receiver = sse_receiver.lock().expect("TODO");
receiver.subscribe(&subscription).unwrap_or_else(|e| {
log::error!("Could not subscribe to the Redis channel: {}", e)
});
}
let cmd_tx = sse_cmd_tx.clone();
let sse_rx = sse_rx.clone();
// self.sse.reply(
// warp::sse::keep_alive()
// .interval(Duration::from_secs(30))
// .text("thump".to_string())
// .stream(event_stream),
// )
// send the updates through the SSE connection
EventStream::send_to_sse(client_agent, sse_connection_to_client, sse_interval)
SseStream::send_events(sse_connection_to_client, cmd_tx, subscription, sse_rx)
},
)
.with(warp::reply::with::header("Connection", "keep-alive"));

// WebSocket
let ws_receiver = receiver.clone();
let (ws_update_interval, whitelist_mode) = (*cfg.ws_interval, *cfg.whitelist_mode);
let whitelist_mode = *cfg.whitelist_mode;
let ws_routes = Subscription::from_ws_request(pg_pool, whitelist_mode)
.and(warp::ws::ws2())
.map(move |subscription: Subscription, ws: Ws2| {
log::info!("Incoming websocket request for {:?}", subscription.timeline);
let mut client_agent = ClientAgent::new(ws_receiver.clone(), &subscription);
client_agent.subscribe();
{
let mut receiver = ws_receiver.lock().expect("TODO");
receiver.subscribe(&subscription).unwrap_or_else(|e| {
log::error!("Could not subscribe to the Redis channel: {}", e)
});
}
let cmd_tx = cmd_tx.clone();
let ws_rx = event_rx.clone();
let token = subscription
.clone()
.access_token
.unwrap_or_else(String::new);

// send the updates through the WS connection
// (along with the User's access_token which is sent for security)
// send the updates through the WS connection (along with the access_token, for security)
(
ws.on_upgrade(move |s| {
EventStream::send_to_ws(s, client_agent, ws_update_interval)
}),
subscription.access_token.unwrap_or_else(String::new),
ws.on_upgrade(move |ws| WsStream::new(ws, cmd_tx, subscription).send_events(ws_rx)),
token,
)
})
.map(|(reply, token)| warp::reply::with_header(reply, "sec-websocket-protocol", token));
Expand All @@ -77,14 +102,12 @@ fn main() {

#[cfg(feature = "stub_status")]
let status_endpoints = {
let (r1, r2, r3) = (receiver.clone(), receiver.clone(), receiver.clone());
let (r1, r3) = (receiver.clone(), receiver.clone());
warp::path!("api" / "v1" / "streaming" / "health")
.map(|| "OK")
.or(warp::path!("api" / "v1" / "streaming" / "status")
.and(warp::path::end())
.map(move || r1.lock().expect("TODO").count_connections()))
.or(warp::path!("api" / "v1" / "streaming" / "status" / "queue")
.map(move || r2.lock().expect("TODO").queue_length()))
.or(
warp::path!("api" / "v1" / "streaming" / "status" / "per_timeline")
.map(move || r3.lock().expect("TODO").list_connections()),
Expand Down Expand Up @@ -119,7 +142,24 @@ fn main() {
)
.run_incoming(incoming);
} else {
use futures::{future::lazy, stream::Stream as _Stream};
use std::time::Instant;

let server_addr = SocketAddr::new(*cfg.address, *cfg.port);
warp::serve(ws_routes.or(sse_routes).with(cors).or(status_endpoints)).run(server_addr);

tokio::run(lazy(move || {
let receiver = receiver.clone();
warp::spawn(lazy(move || {
tokio::timer::Interval::new(Instant::now(), poll_freq)
.map_err(|e| log::error!("{}", e))
.for_each(move |_| {
let receiver = receiver.clone();
receiver.lock().expect("TODO").poll_broadcast();
Ok(())
})
}));

warp::serve(ws_routes.or(sse_routes).with(cors).or(status_endpoints)).bind(server_addr)
}));
};
}
11 changes: 5 additions & 6 deletions src/messages/event/checked_event/status/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ pub struct Status {
poll: Option<Poll>,
card: Option<Card>,
language: Option<String>,

text: Option<String>,
// ↓↓↓ Only for authorized users
favourited: Option<bool>,
Expand Down Expand Up @@ -85,16 +86,14 @@ impl Status {
pub fn involves_any(&self, blocks: &Blocks) -> bool {
const ALLOW: bool = false;
const REJECT: bool = true;

let Blocks {
blocked_users,
blocking_users,
blocked_domains,
} = blocks;
let user_id = &self.account.id.parse().expect("TODO");

if !self.calculate_involved_users().is_disjoint(blocked_users) {
REJECT
} else if blocking_users.contains(&self.account.id.parse().expect("TODO")) {
if blocking_users.contains(user_id) || self.involves(blocked_users) {
REJECT
} else {
let full_username = &self.account.acct;
Expand All @@ -105,7 +104,7 @@ impl Status {
}
}

fn calculate_involved_users(&self) -> HashSet<i64> {
fn involves(&self, blocked_users: &HashSet<i64>) -> bool {
// TODO replace vvvv with error handling
let err = |_| log_fatal!("Could not process an `id` field in {:?}", &self);

Expand All @@ -126,6 +125,6 @@ impl Status {
if let Some(boosted_status) = self.reblog.clone() {
involved_users.insert(boosted_status.account.id.parse().unwrap_or_else(err));
}
involved_users
!involved_users.is_disjoint(blocked_users)
}
}
19 changes: 9 additions & 10 deletions src/messages/event/dynamic_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl DynamicEvent {
match self.payload["language"].as_str() {
Some(toot_language) if allowed_langs.contains(toot_language) => ALLOW,
None => ALLOW, // If toot language is unknown, toot is always allowed
Some(empty) if empty == &String::new() => ALLOW,
Some(empty) if empty == String::new() => ALLOW,
Some(_toot_language) => REJECT,
}
}
Expand All @@ -45,12 +45,10 @@ impl DynamicEvent {
blocked_domains,
} = blocks;

let user_id = self.payload["account"]["id"].as_str().expect("TODO");
let id = self.payload["account"]["id"].as_str().expect("TODO");
let username = self.payload["account"]["acct"].as_str().expect("TODO");

if !self.calculate_involved_users().is_disjoint(blocked_users) {
REJECT
} else if blocking_users.contains(&user_id.parse().expect("TODO")) {
if self.involves(blocked_users) || blocking_users.contains(&id.parse().expect("TODO")) {
REJECT
} else {
let full_username = &username;
Expand All @@ -60,9 +58,11 @@ impl DynamicEvent {
}
}
}
fn calculate_involved_users(&self) -> HashSet<i64> {

// involved_users = mentioned_users + author + replied-to user + boosted user
fn involves(&self, blocked_users: &HashSet<i64>) -> bool {
// mentions
let mentions = self.payload["mentions"].as_array().expect("TODO");
// involved_users = mentioned_users + author + replied-to user + boosted user
let mut involved_users: HashSet<i64> = mentions
.iter()
.map(|mention| mention["id"].as_str().expect("TODO").parse().expect("TODO"))
Expand All @@ -73,16 +73,15 @@ impl DynamicEvent {
involved_users.insert(author_id.parse::<i64>().expect("TODO"));
// replied-to user
let replied_to_user = self.payload["in_reply_to_account_id"].as_str();
if let Some(user_id) = replied_to_user.clone() {
if let Some(user_id) = replied_to_user {
involved_users.insert(user_id.parse().expect("TODO"));
}
// boosted user

let id_of_boosted_user = self.payload["reblog"]["account"]["id"]
.as_str()
.expect("TODO");
involved_users.insert(id_of_boosted_user.parse().expect("TODO"));

involved_users
!involved_users.is_disjoint(blocked_users)
}
}
3 changes: 3 additions & 0 deletions src/messages/event/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::string::String;
pub enum Event {
TypeSafe(CheckedEvent),
Dynamic(DynamicEvent),
Ping,
}

impl Event {
Expand All @@ -37,6 +38,7 @@ impl Event {
CheckedEvent::FiltersChanged => "filters_changed",
},
Self::Dynamic(dyn_event) => &dyn_event.event,
Self::Ping => panic!("event_name() called on EventNotReady"),
})
}

Expand All @@ -54,6 +56,7 @@ impl Event {
FiltersChanged => None,
},
Self::Dynamic(dyn_event) => Some(dyn_event.payload.to_string()),
Self::Ping => panic!("payload() called on EventNotReady"),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/parse_client_request/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ impl Timeline {
};

use {Content::*, Reach::*, Stream::*};
Ok(match &timeline.split(":").collect::<Vec<&str>>()[..] {
Ok(match &timeline.split(':').collect::<Vec<&str>>()[..] {
["public"] => Timeline(Public, Federated, All),
["public", "local"] => Timeline(Public, Local, All),
["public", "media"] => Timeline(Public, Federated, Media),
Expand Down
Loading

0 comments on commit 1657113

Please sign in to comment.