Skip to content

Commit

Permalink
feat(term): limit the number of concurrent executions of job futures
Browse files Browse the repository at this point in the history
  • Loading branch information
ymgyt committed Jul 9, 2024
1 parent f1739fe commit 62b51bb
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 39 deletions.
13 changes: 7 additions & 6 deletions crates/synd_term/src/application/in_flight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ use std::{

use tokio::time::{Instant, Sleep};

use crate::types::github::NotificationId;
use crate::types::github::{IssueId, NotificationId, PullRequestId};

pub type RequestSequence = u64;

#[derive(Clone, Copy, PartialEq, Eq)]
pub enum RequestId {
pub(crate) enum RequestId {
DeviceFlowDeviceAuthorize,
DeviceFlowPollAccessToken,
FetchEntries,
FetchSubscription,
FetchGithubNotifications { page: u8 },
FetchGithubSubject,
FetchGithubIssue { id: IssueId },
FetchGithubPullRequest { id: PullRequestId },
SubscribeFeed,
UnsubscribeFeed,
MarkGithubNotificationAsDone { id: NotificationId },
Expand Down Expand Up @@ -53,7 +54,7 @@ impl InFlight {
}
}

pub fn recent_in_flight(&self) -> Option<RequestId> {
pub(crate) fn recent_in_flight(&self) -> Option<RequestId> {
self.in_flights
.iter()
.max_by_key(|(_, entry)| entry.start)
Expand All @@ -78,7 +79,7 @@ impl InFlight {
self.throbber_step
}

pub fn add(&mut self, request_id: RequestId) -> RequestSequence {
pub(crate) fn add(&mut self, request_id: RequestId) -> RequestSequence {
let seq = self.next_request_sequence();
self.in_flights.insert(
seq,
Expand All @@ -93,7 +94,7 @@ impl InFlight {
seq
}

pub fn remove(&mut self, seq: RequestSequence) -> Option<RequestId> {
pub(crate) fn remove(&mut self, seq: RequestSequence) -> Option<RequestId> {
let req_id = self.in_flights.remove(&seq).map(|entry| entry.request_id);

if self.in_flights.is_empty() {
Expand Down
53 changes: 29 additions & 24 deletions crates/synd_term/src/application/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{
collections::VecDeque,
future,
num::NonZero,
ops::{ControlFlow, Sub},
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -143,7 +144,8 @@ impl Application {
terminal,
client,
github_client,
jobs: Jobs::new(),
// The secondary rate limit of the GitHub API is 100 concurrent requests, so we have set it to 90.
jobs: Jobs::new(NonZero::new(90).unwrap()),
components: Components::new(&config.features),
interactor: interactor.unwrap_or_else(Interactor::new),
authenticator: authenticator.unwrap_or_else(Authenticator::new),
Expand Down Expand Up @@ -255,7 +257,7 @@ impl Application {

fn initial_fetch(&mut self) {
tracing::info!("Initial fetch");
self.jobs.futures.push(
self.jobs.push(
future::ready(Ok(Command::FetchEntries {
after: None,
first: self.config.entries_per_pagination,
Expand All @@ -264,7 +266,7 @@ impl Application {
);
if self.config.features.enable_github_notification {
if let Some(fetch) = self.components.gh_notifications.fetch_next_if_needed() {
self.jobs.futures.push(future::ready(Ok(fetch)).boxed());
self.jobs.push(future::ready(Ok(fetch)).boxed());
}
}
}
Expand Down Expand Up @@ -314,10 +316,7 @@ impl Application {
Some(event) = input.next() => {
self.handle_terminal_event(event)
}
Some(command) = self.jobs.futures.next() => {
Some(command.unwrap())
}
Some(command) = self.jobs.scheduled.next() => {
Some(command) = self.jobs.next() => {
Some(command.unwrap())
}
() = self.in_flight.throbber_timer() => {
Expand Down Expand Up @@ -924,7 +923,7 @@ impl Application {
.boxed(),
};

self.jobs.futures.push(fut);
self.jobs.push(fut);
}

fn prompt_feed_edition(&mut self) {
Expand All @@ -951,7 +950,7 @@ impl Application {
.boxed(),
};

self.jobs.futures.push(fut);
self.jobs.push(fut);
}

fn subscribe_feed(&mut self, input: SubscribeFeedInput) {
Expand All @@ -969,7 +968,7 @@ impl Application {
}
}
.boxed();
self.jobs.futures.push(fut);
self.jobs.push(fut);
}

fn unsubscribe_feed(&mut self, url: FeedUrl) {
Expand All @@ -985,7 +984,7 @@ impl Application {
}
}
.boxed();
self.jobs.futures.push(fut);
self.jobs.push(fut);
}

fn mark_gh_notification_as_done(&mut self, all: bool) {
Expand Down Expand Up @@ -1025,7 +1024,7 @@ impl Application {
}
}
.boxed();
self.jobs.futures.push(fut);
self.jobs.push(fut);
}
}

Expand Down Expand Up @@ -1053,7 +1052,7 @@ impl Application {
}
}
.boxed();
self.jobs.futures.push(fut);
self.jobs.push(fut);
}
}

Expand Down Expand Up @@ -1111,7 +1110,7 @@ impl Application {
}
}
.boxed();
self.jobs.futures.push(fut);
self.jobs.push(fut);
}

#[tracing::instrument(skip(self))]
Expand All @@ -1134,7 +1133,7 @@ impl Application {
}
}
.boxed();
self.jobs.futures.push(fut);
self.jobs.push(fut);
}

#[tracing::instrument(skip(self))]
Expand Down Expand Up @@ -1162,22 +1161,24 @@ impl Application {
}
}
.boxed();
self.jobs.futures.push(fut);
self.jobs.push(fut);
}

#[tracing::instrument(skip(self))]
#[tracing::instrument(skip_all)]
fn fetch_gh_notification_details(&mut self, contexts: Vec<IssueOrPullRequest>) {
let client = self
.github_client
.clone()
.expect("Github client not found, this is a BUG");

for context in contexts {
let request_seq = self.in_flight.add(RequestId::FetchGithubSubject);
let client = client.clone();

let fut = match context {
Either::Left(issue) => {
let request_seq = self
.in_flight
.add(RequestId::FetchGithubIssue { id: issue.id });
let notification_id = issue.notification_id;
async move {
match client.fetch_issue(issue).await {
Expand All @@ -1197,7 +1198,11 @@ impl Application {
.boxed()
}
Either::Right(pull_request) => {
let request_seq = self.in_flight.add(RequestId::FetchGithubPullRequest {
id: pull_request.id,
});
let notification_id = pull_request.notification_id;

async move {
match client.fetch_pull_request(pull_request).await {
Ok(pull_request) => Ok(Command::HandleApiResponse {
Expand All @@ -1216,7 +1221,7 @@ impl Application {
.boxed()
}
};
self.jobs.futures.push(fut);
self.jobs.push(fut);
}
}
}
Expand All @@ -1241,7 +1246,7 @@ impl Application {
}
}
.boxed();
self.jobs.futures.push(fut);
self.jobs.push(fut);
}

fn handle_device_flow_authorization_response(
Expand Down Expand Up @@ -1274,7 +1279,7 @@ impl Application {
}
.boxed();

self.jobs.futures.push(fut);
self.jobs.push(fut);
}

fn complete_device_authroize_flow(&mut self, cred: Verified<Credential>) {
Expand Down Expand Up @@ -1312,7 +1317,7 @@ impl Application {
}
}
.boxed();
self.jobs.scheduled.push(fut);
self.jobs.push_background(fut);
}
}
}
Expand Down Expand Up @@ -1369,7 +1374,7 @@ impl Application {
}
}
.boxed();
self.jobs.futures.push(fut);
self.jobs.push(fut);
}

fn inform_latest_release(&self) {
Expand Down Expand Up @@ -1423,7 +1428,7 @@ impl Application {
// the assertion timing by waiting until jobs are empty.
// However the future of refreshing the id token sleeps until it expires and remains in the jobs for long time
// Therefore, we ignore scheduled jobs
if self.jobs.futures.is_empty() {
if self.jobs.is_empty() {
break;
}
}
Expand Down
3 changes: 3 additions & 0 deletions crates/synd_term/src/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,9 @@ impl Display for Command {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Command::HandleApiResponse { response, .. } => response.fmt(f),
Command::FetchGhNotificationDetails { .. } => f
.debug_struct("FetchGhNotificationDetails")
.finish_non_exhaustive(),
_ => write!(f, "{self:?}"),
}
}
Expand Down
Loading

0 comments on commit 62b51bb

Please sign in to comment.