Skip to content
This repository has been archived by the owner on Dec 21, 2024. It is now read-only.

Commit

Permalink
chore: move task logic to cli (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
NathanFlurry committed Sep 1, 2024
1 parent 3c5acb0 commit c014583
Show file tree
Hide file tree
Showing 50 changed files with 572 additions and 497 deletions.
11 changes: 4 additions & 7 deletions rivet-cli/src/commands/backend/dev.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
use clap::Parser;
use std::process::ExitCode;
use toolchain::{
tasks::{backend_dev, RunConfig},
util::task::run_task,
};
use toolchain::tasks::backend_dev;

use crate::util::task::{run_task, TaskOutputStyle};

#[derive(Parser)]
pub struct Opts {}

impl Opts {
pub async fn execute(&self) -> ExitCode {
let run_config = RunConfig::default();

match run_task::<backend_dev::Task>(
run_config,
TaskOutputStyle::PlainNoResult,
backend_dev::Input {
port: 6420,
cwd: std::env::current_dir()
Expand Down
31 changes: 15 additions & 16 deletions rivet-cli/src/commands/config/edit.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use clap::Parser;
use std::fs;
use std::process::ExitCode;
use toolchain::{
tasks::{get_settings_paths, RunConfig},
util::task::run_task,
};
use toolchain::tasks::get_settings_paths;

use crate::util::task::{run_task, TaskOutputStyle};

#[derive(Parser)]
pub struct Opts {
Expand All @@ -20,19 +19,19 @@ enum SubCommand {

impl Opts {
pub async fn execute(&self) -> ExitCode {
let run_config = RunConfig::default();

// Get settings paths
let settings_paths =
match run_task::<get_settings_paths::Task>(run_config, get_settings_paths::Input {})
.await
{
Ok(output) => output,
Err(e) => {
eprintln!("Error getting settings paths: {}", e);
return ExitCode::FAILURE;
}
};
let settings_paths = match run_task::<get_settings_paths::Task>(
TaskOutputStyle::None,
get_settings_paths::Input {},
)
.await
{
Ok(output) => output,
Err(e) => {
eprintln!("Error getting settings paths: {}", e);
return ExitCode::FAILURE;
}
};

let path = match self.subcommand {
SubCommand::User => settings_paths.user_path,
Expand Down
18 changes: 6 additions & 12 deletions rivet-cli/src/commands/deploy.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use clap::Parser;
use std::process::ExitCode;
use toolchain::{
tasks::{deploy, get_bootstrap_data, RunConfig},
util::task::run_task,
};
use toolchain::tasks::{deploy, get_bootstrap_data};

use crate::util::task::{run_task, TaskOutputStyle};

#[derive(Parser)]
pub struct Opts {
Expand All @@ -18,10 +17,8 @@ pub struct Opts {

impl Opts {
pub async fn execute(&self) -> ExitCode {
let run_config = RunConfig::default();

let bootstrap_data = match run_task::<get_bootstrap_data::Task>(
run_config.clone(),
TaskOutputStyle::None,
get_bootstrap_data::Input {},
)
.await
Expand Down Expand Up @@ -53,7 +50,7 @@ impl Opts {
};

match run_task::<deploy::Task>(
run_config,
TaskOutputStyle::Plain,
deploy::Input {
cwd: std::env::current_dir()
.unwrap_or_default()
Expand All @@ -67,10 +64,7 @@ impl Opts {
)
.await
{
Ok(_) => {
println!("Deployment completed successfully.");
ExitCode::SUCCESS
}
Ok(_) => ExitCode::SUCCESS,
Err(e) => {
eprintln!("Error during deployment: {e}");
ExitCode::FAILURE
Expand Down
20 changes: 10 additions & 10 deletions rivet-cli/src/commands/login.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use clap::Parser;
use std::process::ExitCode;
use toolchain::{
tasks::{check_login_state, start_device_link, wait_for_login, RunConfig},
util::task::run_task,
};
use toolchain::tasks::{check_login_state, start_device_link, wait_for_login};

use crate::util::task::{run_task, TaskOutputStyle};

#[derive(Parser)]
pub struct Opts {
Expand All @@ -13,11 +12,12 @@ pub struct Opts {

impl Opts {
pub async fn execute(&self) -> ExitCode {
let run_config = RunConfig::default();

// Check if linked
match run_task::<check_login_state::Task>(run_config.clone(), check_login_state::Input {})
.await
match run_task::<check_login_state::Task>(
TaskOutputStyle::None,
check_login_state::Input {},
)
.await
{
Ok(output) => {
if output.logged_in {
Expand All @@ -33,7 +33,7 @@ impl Opts {

// Start device link
let device_link_output = match run_task::<start_device_link::Task>(
run_config.clone(),
TaskOutputStyle::None,
start_device_link::Input {
api_endpoint: self.api_endpoint.clone(),
},
Expand All @@ -50,7 +50,7 @@ impl Opts {

// Wait for finish
match run_task::<wait_for_login::Task>(
run_config.clone(),
TaskOutputStyle::None,
wait_for_login::Input {
api_endpoint: self.api_endpoint.clone(),
device_link_token: device_link_output.device_link_token,
Expand Down
11 changes: 4 additions & 7 deletions rivet-cli/src/commands/logout.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
use clap::Parser;
use std::process::ExitCode;
use toolchain::{
tasks::{unlink, RunConfig},
util::task::run_task,
};
use toolchain::tasks::unlink;

use crate::util::task::{run_task, TaskOutputStyle};

#[derive(Parser)]
pub struct Opts {}

impl Opts {
pub async fn execute(&self) -> ExitCode {
let run_config = RunConfig::default();

match run_task::<unlink::Task>(run_config.clone(), unlink::Input {}).await {
match run_task::<unlink::Task>(TaskOutputStyle::None, unlink::Input {}).await {
Ok(_) => {
eprintln!("Logged out");
ExitCode::SUCCESS
Expand Down
23 changes: 12 additions & 11 deletions rivet-cli/src/commands/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ impl SubCommand {

#[derive(Parser)]
pub struct RunOpts {
#[clap(long)]
run_config: String,
#[clap(long)]
name: String,
#[clap(long)]
Expand All @@ -27,20 +25,23 @@ pub struct RunOpts {

impl RunOpts {
pub async fn execute(&self) -> ExitCode {
match serde_json::from_str(&self.run_config) {
Ok(run_config) => {
let result =
toolchain::tasks::run_task_json(run_config, &self.name, &self.input).await;

if result.success {
let result = crate::util::task::run_task_json(
crate::util::task::TaskOutputStyle::Json,
&self.name,
&self.input,
)
.await;
match result {
Ok(res) => {
if res.success {
ExitCode::SUCCESS
} else {
ExitCode::FAILURE
}
}
Err(e) => {
eprintln!("Error parsing run_config: {}", e);
ExitCode::from(2)
Err(err) => {
eprintln!("error running task: {err:?}");
ExitCode::FAILURE
}
}
}
Expand Down
1 change: 1 addition & 0 deletions rivet-cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod commands;
pub mod util;

use clap::Parser;
use std::process::ExitCode;
Expand Down
1 change: 1 addition & 0 deletions rivet-cli/src/util/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod task;
146 changes: 146 additions & 0 deletions rivet-cli/src/util/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use global_error::*;
use std::io::Write;
use tokio::{
sync::{broadcast, mpsc},
task::block_in_place,
};
use toolchain::util::task::{self, TaskEvent};

pub async fn run_task<T>(output_style: TaskOutputStyle, input: T::Input) -> GlobalResult<T::Output>
where
T: task::Task,
{
let (run_config, handles) = task::RunConfig::build();

// Spawn aborter
tokio::spawn(abort_handler(handles.abort_tx, handles.shutdown_rx));

// Spawn event handler
let event_join_handle = tokio::spawn(event_handler(handles.event_rx, output_style));

// Run task
let result = task::run_task::<T>(run_config, input).await;

// Wait for logger to shut down
match event_join_handle.await {
Ok(_) => {}
Err(err) => eprintln!("error waiting for event handle: {err:?}"),
};

result
}

pub async fn run_task_json(
output_style: TaskOutputStyle,
name: &str,
input_json: &str,
) -> GlobalResult<task::RunTaskJsonOutput> {
let (run_config, handles) = task::RunConfig::build();

// Spawn aborter
tokio::spawn(abort_handler(handles.abort_tx, handles.shutdown_rx));

// Spawn event handler
let event_join_handle = tokio::spawn(event_handler(handles.event_rx, output_style));

// Run task
let result = toolchain::tasks::run_task_json(run_config, name, input_json).await;

// Wait for logger to shut down
match event_join_handle.await {
Ok(_) => {}
Err(err) => eprintln!("error waiting for event handle: {err:?}"),
};

Ok(result)
}

/// Handles aborting the task.
async fn abort_handler(abort_tx: mpsc::Sender<()>, mut shutdown_rx: broadcast::Receiver<()>) {
tokio::select! {
result = tokio::signal::ctrl_c() => {
match result {
Ok(_) => {}
Err(err) => {
eprintln!("error waiting for ctrl c: {err:?}");
}
}

// Abort task
let _ = abort_tx.send(()).await;
}
_ = shutdown_rx.recv() => {
// Stop waiting
}
}
}

/// Handles output from the task.
async fn event_handler(
mut event_rx: mpsc::UnboundedReceiver<TaskEvent>,
output_style: TaskOutputStyle,
) {
let mut stdout = std::io::stdout();
let mut stderr = std::io::stdout();
while let Some(event) = event_rx.recv().await {
block_in_place(|| {
print_event(&mut stdout, &mut stderr, &event, output_style);
});
}
}

/// Prints an event depending on the output style.
fn print_event(
stdout: &mut impl Write,
stderr: &mut impl Write,
event: &TaskEvent,
output_style: TaskOutputStyle,
) {
match output_style {
TaskOutputStyle::None => {}
TaskOutputStyle::Json => {
if let Err(err) = serde_json::to_writer(&mut *stdout, event) {
eprintln!("failed to serialize output: {err:?}");
}
writeln!(stdout).unwrap();
}
TaskOutputStyle::Plain => match event {
TaskEvent::Log(x) => {
if let Err(err) = writeln!(stderr, "{x}") {
eprintln!("failed to write output: {err:?}");
}
}
TaskEvent::Result { result } => {
if let Err(err) = writeln!(stdout, "{}", serde_json::to_string(&result).unwrap()) {
eprintln!("failed to serialize output: {err:?}");
}
}
},
TaskOutputStyle::PlainNoResult => match event {
TaskEvent::Log(x) => {
if let Err(err) = writeln!(stderr, "{x}") {
eprintln!("failed to write output: {err:?}");
}
}
TaskEvent::Result { .. } => {}
},
}
}

#[derive(Copy, Clone)]
pub enum TaskOutputStyle {
/// Does not output anything.
None,
/// Writes all events to stdout in JSON.
Json,
/// Writes logs to stderr and result to stdout.
Plain,
/// Writes logs to stderr but does not return result.
PlainNoResult,
}

impl Default for TaskOutputStyle {
fn default() -> Self {
Self::Plain
}
}
2 changes: 1 addition & 1 deletion rivet-toolchain-ffi/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ pub fn run_task(run_config: String, name: String, input_json: String) {
block_on(
async move { toolchain::tasks::run_task_json(run_config, &name_inner, &input_json).await },
BlockOnOpts {
multithreaded: task_config.prefer_multithreaded,
multithreaded: false,
},
);
}
Expand Down
Loading

0 comments on commit c014583

Please sign in to comment.