From 62b51bb041b151593cc634ca30114d356243ceff Mon Sep 17 00:00:00 2001 From: ymgyt Date: Tue, 9 Jul 2024 17:50:29 +0900 Subject: [PATCH] feat(term): limit the number of concurrent executions of job futures --- crates/synd_term/src/application/in_flight.rs | 13 +- crates/synd_term/src/application/mod.rs | 53 ++++---- crates/synd_term/src/command.rs | 3 + crates/synd_term/src/job.rs | 113 +++++++++++++++++- crates/synd_term/src/types/github.rs | 18 ++- crates/synd_term/src/ui/components/status.rs | 7 +- 6 files changed, 168 insertions(+), 39 deletions(-) diff --git a/crates/synd_term/src/application/in_flight.rs b/crates/synd_term/src/application/in_flight.rs index c80a940f..6a55b92e 100644 --- a/crates/synd_term/src/application/in_flight.rs +++ b/crates/synd_term/src/application/in_flight.rs @@ -7,18 +7,19 @@ use std::{ use tokio::time::{Instant, Sleep}; -use crate::types::github::NotificationId; +use crate::types::github::{IssueId, NotificationId, PullRequestId}; pub type RequestSequence = u64; #[derive(Clone, Copy, PartialEq, Eq)] -pub enum RequestId { +pub(crate) enum RequestId { DeviceFlowDeviceAuthorize, DeviceFlowPollAccessToken, FetchEntries, FetchSubscription, FetchGithubNotifications { page: u8 }, - FetchGithubSubject, + FetchGithubIssue { id: IssueId }, + FetchGithubPullRequest { id: PullRequestId }, SubscribeFeed, UnsubscribeFeed, MarkGithubNotificationAsDone { id: NotificationId }, @@ -53,7 +54,7 @@ impl InFlight { } } - pub fn recent_in_flight(&self) -> Option { + pub(crate) fn recent_in_flight(&self) -> Option { self.in_flights .iter() .max_by_key(|(_, entry)| entry.start) @@ -78,7 +79,7 @@ impl InFlight { self.throbber_step } - pub fn add(&mut self, request_id: RequestId) -> RequestSequence { + pub(crate) fn add(&mut self, request_id: RequestId) -> RequestSequence { let seq = self.next_request_sequence(); self.in_flights.insert( seq, @@ -93,7 +94,7 @@ impl InFlight { seq } - pub fn remove(&mut self, seq: RequestSequence) -> Option { + pub(crate) fn remove(&mut self, seq: RequestSequence) -> Option { let req_id = self.in_flights.remove(&seq).map(|entry| entry.request_id); if self.in_flights.is_empty() { diff --git a/crates/synd_term/src/application/mod.rs b/crates/synd_term/src/application/mod.rs index aede3237..e7cc991a 100644 --- a/crates/synd_term/src/application/mod.rs +++ b/crates/synd_term/src/application/mod.rs @@ -1,6 +1,7 @@ use std::{ collections::VecDeque, future, + num::NonZero, ops::{ControlFlow, Sub}, pin::Pin, sync::Arc, @@ -143,7 +144,8 @@ impl Application { terminal, client, github_client, - jobs: Jobs::new(), + // The secondary rate limit of the GitHub API is 100 concurrent requests, so we have set it to 90. + jobs: Jobs::new(NonZero::new(90).unwrap()), components: Components::new(&config.features), interactor: interactor.unwrap_or_else(Interactor::new), authenticator: authenticator.unwrap_or_else(Authenticator::new), @@ -255,7 +257,7 @@ impl Application { fn initial_fetch(&mut self) { tracing::info!("Initial fetch"); - self.jobs.futures.push( + self.jobs.push( future::ready(Ok(Command::FetchEntries { after: None, first: self.config.entries_per_pagination, @@ -264,7 +266,7 @@ impl Application { ); if self.config.features.enable_github_notification { if let Some(fetch) = self.components.gh_notifications.fetch_next_if_needed() { - self.jobs.futures.push(future::ready(Ok(fetch)).boxed()); + self.jobs.push(future::ready(Ok(fetch)).boxed()); } } } @@ -314,10 +316,7 @@ impl Application { Some(event) = input.next() => { self.handle_terminal_event(event) } - Some(command) = self.jobs.futures.next() => { - Some(command.unwrap()) - } - Some(command) = self.jobs.scheduled.next() => { + Some(command) = self.jobs.next() => { Some(command.unwrap()) } () = self.in_flight.throbber_timer() => { @@ -924,7 +923,7 @@ impl Application { .boxed(), }; - self.jobs.futures.push(fut); + self.jobs.push(fut); } fn prompt_feed_edition(&mut self) { @@ -951,7 +950,7 @@ impl Application { .boxed(), }; - self.jobs.futures.push(fut); + self.jobs.push(fut); } fn subscribe_feed(&mut self, input: SubscribeFeedInput) { @@ -969,7 +968,7 @@ impl Application { } } .boxed(); - self.jobs.futures.push(fut); + self.jobs.push(fut); } fn unsubscribe_feed(&mut self, url: FeedUrl) { @@ -985,7 +984,7 @@ impl Application { } } .boxed(); - self.jobs.futures.push(fut); + self.jobs.push(fut); } fn mark_gh_notification_as_done(&mut self, all: bool) { @@ -1025,7 +1024,7 @@ impl Application { } } .boxed(); - self.jobs.futures.push(fut); + self.jobs.push(fut); } } @@ -1053,7 +1052,7 @@ impl Application { } } .boxed(); - self.jobs.futures.push(fut); + self.jobs.push(fut); } } @@ -1111,7 +1110,7 @@ impl Application { } } .boxed(); - self.jobs.futures.push(fut); + self.jobs.push(fut); } #[tracing::instrument(skip(self))] @@ -1134,7 +1133,7 @@ impl Application { } } .boxed(); - self.jobs.futures.push(fut); + self.jobs.push(fut); } #[tracing::instrument(skip(self))] @@ -1162,10 +1161,10 @@ impl Application { } } .boxed(); - self.jobs.futures.push(fut); + self.jobs.push(fut); } - #[tracing::instrument(skip(self))] + #[tracing::instrument(skip_all)] fn fetch_gh_notification_details(&mut self, contexts: Vec) { let client = self .github_client @@ -1173,11 +1172,13 @@ impl Application { .expect("Github client not found, this is a BUG"); for context in contexts { - let request_seq = self.in_flight.add(RequestId::FetchGithubSubject); let client = client.clone(); let fut = match context { Either::Left(issue) => { + let request_seq = self + .in_flight + .add(RequestId::FetchGithubIssue { id: issue.id }); let notification_id = issue.notification_id; async move { match client.fetch_issue(issue).await { @@ -1197,7 +1198,11 @@ impl Application { .boxed() } Either::Right(pull_request) => { + let request_seq = self.in_flight.add(RequestId::FetchGithubPullRequest { + id: pull_request.id, + }); let notification_id = pull_request.notification_id; + async move { match client.fetch_pull_request(pull_request).await { Ok(pull_request) => Ok(Command::HandleApiResponse { @@ -1216,7 +1221,7 @@ impl Application { .boxed() } }; - self.jobs.futures.push(fut); + self.jobs.push(fut); } } } @@ -1241,7 +1246,7 @@ impl Application { } } .boxed(); - self.jobs.futures.push(fut); + self.jobs.push(fut); } fn handle_device_flow_authorization_response( @@ -1274,7 +1279,7 @@ impl Application { } .boxed(); - self.jobs.futures.push(fut); + self.jobs.push(fut); } fn complete_device_authroize_flow(&mut self, cred: Verified) { @@ -1312,7 +1317,7 @@ impl Application { } } .boxed(); - self.jobs.scheduled.push(fut); + self.jobs.push_background(fut); } } } @@ -1369,7 +1374,7 @@ impl Application { } } .boxed(); - self.jobs.futures.push(fut); + self.jobs.push(fut); } fn inform_latest_release(&self) { @@ -1423,7 +1428,7 @@ impl Application { // the assertion timing by waiting until jobs are empty. // However the future of refreshing the id token sleeps until it expires and remains in the jobs for long time // Therefore, we ignore scheduled jobs - if self.jobs.futures.is_empty() { + if self.jobs.is_empty() { break; } } diff --git a/crates/synd_term/src/command.rs b/crates/synd_term/src/command.rs index 0bc9b268..3c92832d 100644 --- a/crates/synd_term/src/command.rs +++ b/crates/synd_term/src/command.rs @@ -204,6 +204,9 @@ impl Display for Command { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Command::HandleApiResponse { response, .. } => response.fmt(f), + Command::FetchGhNotificationDetails { .. } => f + .debug_struct("FetchGhNotificationDetails") + .finish_non_exhaustive(), _ => write!(f, "{self:?}"), } } diff --git a/crates/synd_term/src/job.rs b/crates/synd_term/src/job.rs index ac978a0f..c9ddfcf5 100644 --- a/crates/synd_term/src/job.rs +++ b/crates/synd_term/src/job.rs @@ -1,19 +1,122 @@ -use futures_util::{future::BoxFuture, stream::FuturesUnordered}; +use std::{collections::VecDeque, num::NonZero}; + +use futures_util::{future::BoxFuture, stream::FuturesUnordered, StreamExt as _}; use crate::command::Command; pub(crate) type JobFuture = BoxFuture<'static, anyhow::Result>; pub(crate) struct Jobs { - pub futures: FuturesUnordered, - pub scheduled: FuturesUnordered, + futures: FuturesUnordered, + background: FuturesUnordered, + delay_queue: VecDeque, + concurrent_limit: NonZero, } impl Jobs { - pub fn new() -> Self { + pub fn new(concurrent_limit: NonZero) -> Self { Self { futures: FuturesUnordered::new(), - scheduled: FuturesUnordered::new(), + background: FuturesUnordered::new(), + delay_queue: VecDeque::new(), + concurrent_limit, + } + } + + pub(crate) fn push(&mut self, job: JobFuture) { + self.delay_queue.push_back(job); + + while self.concurrent_limit.get() > self.futures.len() { + let Some(job) = self.delay_queue.pop_front() else { + break; + }; + + self.futures.push(job); + } + + tracing::trace!( + "Job delay_queue: {} futures: {}", + self.delay_queue.len(), + self.futures.len() + ); + } + + pub(crate) fn push_background(&mut self, job: JobFuture) { + self.background.push(job); + } + + pub(crate) async fn next(&mut self) -> Option> { + debug_assert!(self.concurrent_limit.get() >= self.futures.len()); + + tokio::select! { + result = self.futures.next() => { + match result { + Some(result) => { + if let Some(job) = self.delay_queue.pop_front() { + self.futures.push(job); + } + Some(result) + } + None => None, + } + } + result = self.background.next() => result, + } + } + + #[cfg(feature = "integration")] + pub(crate) fn is_empty(&self) -> bool { + self.futures.is_empty() + } +} + +#[cfg(test)] +mod tests { + use futures_util::FutureExt as _; + + use super::*; + use std::future; + + #[tokio::test] + async fn respect_concurrent_limit() { + let mut job = Jobs::new(NonZero::new(2).unwrap()); + + for _ in 0..3 { + job.push(future::ready(Ok(Command::Nop)).boxed()); + } + + assert_eq!(job.futures.len(), 2); + assert_eq!(job.delay_queue.len(), 1); + + let mut count = 0; + loop { + if let Some(result) = job.next().await { + assert!(matches!(result, Ok(Command::Nop))); + count += 1; + } + if count == 3 { + break; + } + } + } + + #[tokio::test] + async fn background_job() { + let mut job = Jobs::new(NonZero::new(2).unwrap()); + + job.push(future::ready(Ok(Command::Nop)).boxed()); + job.push(future::ready(Ok(Command::Nop)).boxed()); + job.push_background(future::ready(Ok(Command::Nop)).boxed()); + + let mut count = 0; + loop { + if let Some(result) = job.next().await { + assert!(matches!(result, Ok(Command::Nop))); + count += 1; + } + if count == 3 { + break; + } } } } diff --git a/crates/synd_term/src/types/github.rs b/crates/synd_term/src/types/github.rs index 69cec18a..370c0f0e 100644 --- a/crates/synd_term/src/types/github.rs +++ b/crates/synd_term/src/types/github.rs @@ -1,4 +1,4 @@ -use std::{ops::Deref, str::FromStr}; +use std::{fmt::Display, ops::Deref, str::FromStr}; use either::Either; use octocrab::models::{self, activity::Subject}; @@ -21,9 +21,15 @@ pub(crate) type ThreadId = octocrab::models::ThreadId; pub(crate) type NotificationId = octocrab::models::NotificationId; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct IssueId(i64); +impl Display for IssueId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + impl IssueId { pub(crate) fn into_inner(self) -> i64 { self.0 @@ -38,9 +44,15 @@ impl Deref for IssueId { } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub(crate) struct PullRequestId(i64); +impl Display for PullRequestId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + impl PullRequestId { pub(crate) fn into_inner(self) -> i64 { self.0 diff --git a/crates/synd_term/src/ui/components/status.rs b/crates/synd_term/src/ui/components/status.rs index c4015b74..dc1d750e 100644 --- a/crates/synd_term/src/ui/components/status.rs +++ b/crates/synd_term/src/ui/components/status.rs @@ -109,7 +109,12 @@ impl StatusLine { RequestId::FetchGithubNotifications { page } => { Cow::Owned(format!("Fetch github notifications(page: {page})...")) } - RequestId::FetchGithubSubject => Cow::Borrowed("Fetch github subject..."), + RequestId::FetchGithubIssue { id } => { + Cow::Owned(format!("Fetch github issue(#{id})...")) + } + RequestId::FetchGithubPullRequest { id } => { + Cow::Owned(format!("Fetch github pull request(#{id})...")) + } RequestId::SubscribeFeed => Cow::Borrowed("Subscribe feed..."), RequestId::UnsubscribeFeed => Cow::Borrowed("Unsubscribe feed..."), RequestId::MarkGithubNotificationAsDone { id } => {