Skip to content

Commit

Permalink
use rayon instead of tokio for processing user commands (#231)
Browse files Browse the repository at this point in the history
since all commands are blocking the thread, it's more optimal to
spawn a thread pool with `rayon` for processing user commands
  • Loading branch information
ayrat555 authored Jul 27, 2022
1 parent 35ac909 commit 067ddd1
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 76 deletions.
145 changes: 83 additions & 62 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ rss = "2"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
sha2 = "0.10"
tokio = { version = "1", features = ["full"] }
typed-builder = "0.10"
url = "2"
rayon = "1.5"

[dev-dependencies]
mockito = "0.31"
25 changes: 15 additions & 10 deletions src/bot/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,38 @@ use diesel::r2d2;
use diesel::PgConnection;
use frankenstein::Update;
use frankenstein::UpdateContent;
use tokio::time;
use std::thread;

pub struct Handler {}

impl Handler {
pub async fn start() {
pub fn start() {
let mut api = Api::default();
let connection_pool = db::create_connection_pool(Config::commands_db_pool_number());
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(Config::commands_db_pool_number() as usize)
.build()
.unwrap();

log::info!("Starting the El Monitorro bot");

let mut interval = time::interval(std::time::Duration::from_secs(1));
let interval = std::time::Duration::from_secs(1);

loop {
while let Some(update) = api.next_update() {
tokio::spawn(Self::process_message_or_channel_post(
connection_pool.clone(),
api.clone(),
update,
));
let db_pool = connection_pool.clone();
let tg_api = api.clone();

thread_pool.spawn(move || {
Self::process_message_or_channel_post(db_pool.clone(), tg_api.clone(), update)
});
}

interval.tick().await;
thread::sleep(interval);
}
}

async fn process_message_or_channel_post(
fn process_message_or_channel_post(
db_pool: r2d2::Pool<r2d2::ConnectionManager<PgConnection>>,
api: Api,
update: Update,
Expand Down
5 changes: 2 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ use el_monitorro::bot;
use el_monitorro::config::Config;
use fang::Queue;

#[tokio::main]
async fn main() {
fn main() {
dotenv().ok();
env_logger::init();

Expand All @@ -18,5 +17,5 @@ async fn main() {

el_monitorro::start_scheduler(&queue);

bot::handler::Handler::start().await;
bot::handler::Handler::start();
}

0 comments on commit 067ddd1

Please sign in to comment.