Skip to content

Commit

Permalink
Start documenting the project (#96)
Browse files Browse the repository at this point in the history
* Start documenting the project

* Add AsyncQueue implementation

* Documenting Async runnable

* By default value in uniq function

* Fix errors and warnings async runnable docs

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* format something

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* uniq documentation

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_queue.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

* Update src/asynk/async_runnable.rs

Co-authored-by: Ayrat Badykov <ayratin555@gmail.com>

Co-authored-by: pxp9 <pepe.marquezromero@gmail.com>
Co-authored-by: Pmarquez <48651252+pxp9@users.noreply.github.com>
  • Loading branch information
3 people authored Oct 18, 2022
1 parent cfc3c46 commit 9b92a4a
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 3 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

Background task processing library for Rust. It uses Postgres DB as a task queue.

## Features
## Key Features

Here are some of the fang's key features:

Expand Down
37 changes: 37 additions & 0 deletions src/asynk/async_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,39 +113,59 @@ impl From<cron::error::Error> for AsyncQueueError {
}
}

/// This trait defines operations for an asynchronous queue.
/// The trait can be implemented for different storage backends.
/// For now, the trait is only implemented for PostgreSQL. More backends are planned to be implemented in the future.
#[async_trait]
pub trait AsyncQueueable: Send {
/// This method should retrieve one task of the `task_type` type. If `task_type` is `None` it will try to
/// fetch a task of the type `common`. After fetching it should update the state of the task to
/// `FangTaskState::InProgress`.
///
async fn fetch_and_touch_task(
&mut self,
task_type: Option<String>,
) -> Result<Option<Task>, AsyncQueueError>;

/// Enqueue a task to the queue, The task will be executed as soon as possible by the worker of the same type
/// created by an AsyncWorkerPool.
async fn insert_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;

/// The method will remove all tasks from the queue
async fn remove_all_tasks(&mut self) -> Result<u64, AsyncQueueError>;

/// Remove all tasks that are scheduled in the future.
async fn remove_all_scheduled_tasks(&mut self) -> Result<u64, AsyncQueueError>;

/// Remove a task by its id.
async fn remove_task(&mut self, id: Uuid) -> Result<u64, AsyncQueueError>;

/// Remove a task by its metadata (struct fields values)
async fn remove_task_by_metadata(
&mut self,
task: &dyn AsyncRunnable,
) -> Result<u64, AsyncQueueError>;

/// Removes all tasks that have the specified `task_type`.
async fn remove_tasks_type(&mut self, task_type: &str) -> Result<u64, AsyncQueueError>;

/// Retrieve a task from storage by its `id`.
async fn find_task_by_id(&mut self, id: Uuid) -> Result<Task, AsyncQueueError>;

/// Update the state field of the specified task
/// See the `FangTaskState` enum for possible states.
async fn update_task_state(
&mut self,
task: Task,
state: FangTaskState,
) -> Result<Task, AsyncQueueError>;

/// Update the state of a task to `FangTaskState::Failed` and set an error_message.
async fn fail_task(&mut self, task: Task, error_message: &str)
-> Result<Task, AsyncQueueError>;

/// Schedule a task.
async fn schedule_task(&mut self, task: &dyn AsyncRunnable) -> Result<Task, AsyncQueueError>;

async fn schedule_retry(
Expand All @@ -156,6 +176,19 @@ pub trait AsyncQueueable: Send {
) -> Result<Task, AsyncQueueError>;
}

/// An async queue that can be used to enqueue tasks.
/// It uses a PostgreSQL storage. It must be connected to perform any operation.
/// To connect an `AsyncQueue` to PostgreSQL database call the `connect` method.
/// A Queue can be created with the TypedBuilder.
///
/// ```rust
/// let mut queue = AsyncQueue::builder()
/// .uri("postgres://postgres:postgres@localhost/fang")
/// .max_pool_size(max_pool_size)
/// .build();
/// ```
///
#[derive(TypedBuilder, Debug, Clone)]
pub struct AsyncQueue<Tls>
where
Expand Down Expand Up @@ -344,13 +377,16 @@ where
<Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
<<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
{
/// Check if the connection with db is established
pub fn check_if_connection(&self) -> Result<(), AsyncQueueError> {
if self.connected {
Ok(())
} else {
Err(AsyncQueueError::NotConnectedError)
}
}

/// Connect to the db if not connected
pub async fn connect(&mut self, tls: Tls) -> Result<(), AsyncQueueError> {
let manager = PostgresConnectionManager::new_from_stringlike(self.uri.clone(), tls)?;

Expand All @@ -363,6 +399,7 @@ where
self.connected = true;
Ok(())
}

async fn remove_all_tasks_query(
transaction: &mut Transaction<'_>,
) -> Result<u64, AsyncQueueError> {
Expand Down
28 changes: 28 additions & 0 deletions src/asynk/async_runnable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,55 @@ impl From<SerdeError> for FangError {
}
}

/// Implement this trait to run your custom tasks.
#[typetag::serde(tag = "type")]
#[async_trait]
pub trait AsyncRunnable: Send + Sync {
/// Execute the task. This method should define its logic
async fn run(&self, client: &mut dyn AsyncQueueable) -> Result<(), FangError>;

/// Define the type of the task.
/// The `common` task type is used by default
fn task_type(&self) -> String {
COMMON_TYPE.to_string()
}

/// If set to true, no new tasks with the same metadata will be inserted
/// By default it is set to false.
fn uniq(&self) -> bool {
false
}

/// This method defines if a task is periodic or it should be executed once in the future.
///
/// Be careful it works only with the UTC timezone.
///
///
/// Example:
///
///
/**
```rust
fn cron(&self) -> Option<Scheduled> {
let expression = "0/20 * * * Aug-Sep * 2022/1";
Some(Scheduled::CronPattern(expression.to_string()))
}
```
*/

/// In order to schedule a task once, use the `Scheduled::ScheduleOnce` enum variant.
fn cron(&self) -> Option<Scheduled> {
None
}

/// Define the maximum number of retries the task will be retried.
/// By default the number of retries is 20.
fn max_retries(&self) -> i32 {
RETRIES_NUMBER
}

/// Define the backoff mode
/// By default, it is exponential, 2^(attempt)
fn backoff(&self, attempt: u32) -> u32 {
u32::pow(2, attempt)
}
Expand Down
2 changes: 1 addition & 1 deletion src/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pub mod error;
mod error;
pub mod fang_task_state;
pub mod queue;
pub mod runnable;
Expand Down
9 changes: 9 additions & 0 deletions src/blocking/fang_task_state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
/// Possible states of the task
#[derive(diesel_derive_enum::DbEnum, Debug, Eq, PartialEq, Clone)]
#[DieselTypePath = "crate::schema::sql_types::FangTaskState"]
pub enum FangTaskState {
/// The task is ready to be executed
New,
/// The task is being executing.
///
/// The task may stay in this state forever
/// if an unexpected error happened
InProgress,
/// The task failed
Failed,
/// The task finished successfully
Finished,
/// The task is being retried. It means it failed but it's scheduled to be executed again
Retried,
}
37 changes: 36 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,80 @@

use std::time::Duration;
use thiserror::Error;
use typed_builder::TypedBuilder;

/// Represents a schedule for scheduled tasks.
///
/// It's used in the [`AsyncRunnable::cron`] and [`Runnable::cron`]
#[derive(Debug, Clone)]
pub enum Scheduled {
/// A cron pattern for a periodic task
///
/// For example, `Scheduled::CronPattern("0/20 * * * * * *")`
CronPattern(String),
/// A datetime for a scheduled task that will be executed once
///
/// For example, `Scheduled::ScheduleOnce(chrono::Utc::now() + std::time::Duration::seconds(7i64))`
ScheduleOnce(DateTime<Utc>),
}

/// List of error types that can occur while working with cron schedules.
#[derive(Debug, Error)]
pub enum CronError {
/// A problem occured during cron schedule parsing.
#[error(transparent)]
LibraryError(#[from] cron::error::Error),
/// [`Scheduled`] enum variant is not provided
#[error("You have to implement method `cron()` in your AsyncRunnable")]
TaskNotSchedulableError,
/// The next execution can not be determined using the current [`Scheduled::CronPattern`]
#[error("No timestamps match with this cron pattern")]
NoTimestampsError,
}

/// All possible options for retaining tasks in the db after their execution.
///
/// The default mode is [`RetentionMode::RemoveAll`]
#[derive(Clone, Debug)]
pub enum RetentionMode {
/// Keep all tasks
KeepAll,
/// Remove all tasks
RemoveAll,
/// Remove only successfully finished tasks
RemoveFinished,
}

impl Default for RetentionMode {
fn default() -> Self {
RetentionMode::RemoveAll
}
}

#[derive(Clone, Debug)]
/// Configuration parameters for putting workers to sleep
/// while they don't have any tasks to execute
#[derive(Clone, Debug, TypedBuilder)]
pub struct SleepParams {
/// the current sleep period
pub sleep_period: Duration,
/// the maximum period a worker is allowed to sleep.
/// After this value is reached, `sleep_period` is not increased anymore
pub max_sleep_period: Duration,
/// the initial value of the `sleep_period`
pub min_sleep_period: Duration,
/// the step that `sleep_period` is increased by on every iteration
pub sleep_step: Duration,
}

impl SleepParams {
/// Reset the `sleep_period` if `sleep_period` > `min_sleep_period`
pub fn maybe_reset_sleep_period(&mut self) {
if self.sleep_period != self.min_sleep_period {
self.sleep_period = self.min_sleep_period;
}
}

/// Increase the `sleep_period` by the `sleep_step` if the `max_sleep_period` is not reached
pub fn maybe_increase_sleep_period(&mut self) {
if self.sleep_period < self.max_sleep_period {
self.sleep_period += self.sleep_step;
Expand All @@ -63,11 +94,14 @@ impl Default for SleepParams {
}
}

/// An error that can happen during executing of tasks
#[derive(Debug)]
pub struct FangError {
/// A description of an error
pub description: String,
}

#[doc(hidden)]
#[cfg(feature = "blocking")]
extern crate diesel;

Expand All @@ -94,6 +128,7 @@ pub use chrono::Utc;

#[cfg(feature = "blocking")]
pub mod blocking;

#[cfg(feature = "blocking")]
pub use blocking::*;

Expand Down

0 comments on commit 9b92a4a

Please sign in to comment.