diff --git a/README.md b/README.md index 244ac00..e79ec7b 100644 --- a/README.md +++ b/README.md @@ -2,6 +2,8 @@ [![Build](https://github.com/primitivefinance/aika/actions/workflows/build.yaml/badge.svg)](https://github.com/primitivefinance/aika/actions/workflows/build.yaml) [![Rust](https://github.com/primitivefinance/aika/actions/workflows/rust.yaml/badge.svg)](https://github.com/primitivefinance/aika/actions/workflows/rust.yaml) -Discrete event simulator built in Rust 🦀 . Designed to have a similar syntax to SimPy with a particular focus on configurability for complex simulation environments. +Discrete event simulation manager built in Rust 🦀 . Aika is designed to have a similar base syntax to [SimPy](https://gitlab.com/team-simpy/simpy), however, has the addition of a `Manager` to improve focus on large, repetitive, data intensive simulations. This simulator utilizes `generators`, currently an expiremental feature of Rust nightly version 1.71.0. + +![](/assets/aika-clock.png) \ No newline at end of file diff --git a/aika/src/distribution.rs b/aika/src/distribution.rs index 5ba8217..416d32f 100644 --- a/aika/src/distribution.rs +++ b/aika/src/distribution.rs @@ -2,7 +2,7 @@ //! Distributions must enforce a sampling of only positive real numbers, as this describes a time delta moving forward. use rand::Rng; -use rand_distr::{Gamma as GammaDistribution, Poisson as PoissonDistribution}; +use rand_distr::{Gamma as GammaDistribution, Poisson as PoissonDistribution, LogNormal as LogNormalDistribution}; /// The `Distribution` trait allows for the creation of custom distributions to be used in the `ProcessExecution::Stochastic` variant. pub trait Distribution { @@ -47,3 +47,22 @@ impl Distribution for Gamma { rng.sample(self.distribution) } } + +/// The `LogNormal` struct implements the `Distribution` trait for the LogNormal distribution. +pub struct LogNormal { + pub distribution: LogNormalDistribution, +} + +impl LogNormal { + pub fn new(mean: f64, std_dev: f64) -> LogNormal { + Self { + distribution: LogNormalDistribution::new(mean, std_dev).unwrap(), + } + } +} + +impl Distribution for LogNormal { + fn sample(&self, rng: &mut rand::rngs::StdRng) -> f64 { + rng.sample(self.distribution) + } +} \ No newline at end of file diff --git a/aika/src/environment.rs b/aika/src/environment.rs index 8ab9323..7e30560 100644 --- a/aika/src/environment.rs +++ b/aika/src/environment.rs @@ -6,28 +6,30 @@ use rand::SeedableRng; use crate::distribution::Distribution; +use crate::resources::*; + use std::cmp::Reverse; -use std::collections::{BTreeMap, BinaryHeap, HashMap}; +use std::collections::{BinaryHeap, HashMap}; use std::ops::{Generator, GeneratorState}; use std::pin::Pin; /// The type of process accepted by aika. Processes are generators that yields a value of type `T` and returns `()`. -pub type Process = Box + Unpin>; +pub type Process = Box, Yield = T, Return = ()> + Unpin>; /// The type of function describing the event time delta for a given process. It can be constant, deterministic, or stochastic. pub enum ProcessExecution { + /// Process will only execute once + Standard, /// Constant process execution. The process will execute at a constant time delta. - Constant(u64), + Constant(u64, ProcessDuration), /// Deterministic process execution. The process will execute at a time delta given by the function. - Deterministic(fn(u64) -> u64), + Deterministic(fn(u64) -> u64, ProcessDuration), /// Stochastic process execution. The process will execute at a time delta given by the distribution. - Stochastic(Box), + Stochastic(Box, ProcessDuration), } /// The type of process duration. It can be standard, infinite, or finite. pub enum ProcessDuration { - /// Standard process duration. The process will run from the first event time until the simulation is complete. - Standard, /// Infinite process duration. The process will run from the given event time until the simulation is complete. Infinite(u64), /// Finite process duration. The process will run from the given start event time until the given end event time. @@ -38,142 +40,260 @@ pub enum ProcessDuration { pub struct SimProcess { process: Process, time_delta: ProcessExecution, - process_duration: ProcessDuration, } impl SimProcess { fn new( process: Process, time_delta: ProcessExecution, - process_duration: ProcessDuration, ) -> Self { SimProcess { process: process, time_delta: time_delta, - process_duration: process_duration, } } } +pub trait EventYield { + fn output(&self) -> Yield; + fn set(&mut self, output: Yield); +} + +#[derive(Clone)] /// Event struct. Contains information on which process to execute and when. -pub struct Event { +pub struct Event { /// The time at which the event occurs in the chain. pub time: u64, /// The id of the process to execute. pub process_id: usize, + /// Simulation state + pub state: T, } -impl Ord for Event { +impl Ord for Event { fn cmp(&self, other: &Self) -> std::cmp::Ordering { self.time.cmp(&other.time) } } -impl PartialOrd for Event { +impl PartialOrd for Event { fn partial_cmp(&self, other: &Self) -> Option { Some(self.time.cmp(&other.time)) } } -impl PartialEq for Event { +impl PartialEq for Event { fn eq(&self, other: &Self) -> bool { self.time == other.time } } -impl Eq for Event {} +impl Eq for Event {} + +#[derive(Clone)] +pub struct State { + pub state: T, + pub time: u64, +} + +#[derive(Copy, Clone)] +pub enum Yield { + Timeout(u64), + Pause, + AddEvent { + time_delta: u64, + process_id: usize, + }, + RequestResource(usize), + ReleaseResource(usize), + GetContainer(usize), + PutContainer(usize), + GetStore(usize), + PutStore(usize), +} + +impl EventYield for Yield { + fn output(&self) -> Yield { + *self + } + fn set(&mut self, output: Yield) { + *self = output; + } +} /// The main struct of the library. It contains the `processes`, `events`, and `stores` of the /// simulation and keeps track of the current event time. -pub struct Environment { +pub struct Environment { /// The events to be executed. - pub events: BinaryHeap>, + pub events: BinaryHeap>>, + /// Past events. + pub past_events: Vec>, /// The processes and their id. pub processes: HashMap>, + /// The stores of the simulation yield. + pub stores: Vec>, + /// The containers of the simulation yield. + pub containers: Vec>, + /// The resources of the simulation yield. + pub resources: Vec>, /// The current event time. - pub curr_event: u64, + pub time: u64, /// The maximum event time. - pub max_event: u64, - /// The stores of the simulation yield. - pub stores: BTreeMap, + pub stop: u64, /// Seeded random number generator for optional randomness. pub rng: rand::rngs::StdRng, + /// Logging boolean + pub logs: bool, } /// Implementation of the Environment struct. Contains public methods `new`, `add_process`, `run`. -impl Environment { - pub fn new(max_event: u64, seed: u64) -> Self { +impl> Environment { + pub fn new(stop: u64, seed: u64) -> Self { Environment { events: BinaryHeap::new(), + past_events: Vec::new(), processes: HashMap::new(), - curr_event: 0, - max_event: max_event, - stores: BTreeMap::new(), + stores: Vec::new(), + containers: Vec::new(), + resources: Vec::new(), + time: 0, + stop: stop, rng: rand::rngs::StdRng::seed_from_u64(seed), + logs: false, } } /// Add a new process to the simulation environment. pub fn add_process( &mut self, - process: Box + Unpin>, + process: Box, Yield = T, Return = ()> + Unpin>, time_delta: ProcessExecution, - process_duration: ProcessDuration, ) { let id = self.processes.len(); - let process = SimProcess::new(process, time_delta, process_duration); + let process = SimProcess::new(process, time_delta); self.processes.insert(id, process); - self.init_process(id); } - /// Initialize a process by adding its first event to the event queue. Private function to be called in [`run`]. - fn init_process(&mut self, id: usize) { - let process = self.processes.get(&id).unwrap(); - match process.process_duration { - ProcessDuration::Standard => { - self.add_events(id, 0); - } - ProcessDuration::Infinite(start) => { - self.add_events(id, start); - } - ProcessDuration::Finite(start, _end) => { - self.add_events(id, start); - } - } + pub fn create_stores(&mut self, capacity: usize) { + self.stores.push(Stores::new(capacity)); + } + + pub fn create_containers(&mut self, capacity: T, init: T) { + self.containers.push(Containers::new(capacity, init)); + } + + pub fn create_resources(&mut self, capacity: usize) { + self.resources.push(Resources::new(capacity)); } /// Execute the next event in the event queue the store the yield in stores. fn step(&mut self) { let event = self.events.pop().unwrap().0; let process_id = event.process_id; - self.curr_event = event.time; + self.time = event.time; let sim_process = self.processes.get_mut(&process_id).unwrap(); - match sim_process.process_duration { - ProcessDuration::Finite(_start, end) => { - if self.curr_event >= end { - return; - } - } - _ => {} - } let process = Pin::new(&mut sim_process.process); let time_delta: u64; match &sim_process.time_delta { - ProcessExecution::Constant(delta) => { - time_delta = *delta; + ProcessExecution::Standard => { + time_delta = 0; } - ProcessExecution::Deterministic(events_path) => { - time_delta = events_path(self.curr_event); + ProcessExecution::Constant(delta, duration) => { + match duration { + ProcessDuration::Infinite(_) => { + time_delta = *delta; + } + ProcessDuration::Finite(_start, end) => { + if end < &self.time { + return; + } + time_delta = *delta; + } + } } - ProcessExecution::Stochastic(distribution_sample) => { - time_delta = distribution_sample.sample(&mut self.rng).round() as u64; + ProcessExecution::Deterministic(events_path, duration) => { + match duration { + ProcessDuration::Infinite(_) => { + time_delta = events_path(self.time); + } + ProcessDuration::Finite(_start, end) => { + if end < &self.time { + return; + } + time_delta = events_path(self.time); + } + } + } + ProcessExecution::Stochastic(distribution_sample, duration) => { + match duration { + ProcessDuration::Infinite(_) => { + time_delta = distribution_sample.sample(&mut self.rng).round() as u64; + } + ProcessDuration::Finite(_start, end) => { + if end < &self.time { + return; + } + time_delta = distribution_sample.sample(&mut self.rng).round() as u64; + } + } } } - match process.resume(()) { + match process.resume(State { + state: event.state, + time: self.time, + }) { GeneratorState::Yielded(val) => { - self.add_events(process_id, time_delta); - self.stores.insert(self.curr_event, val); - self.curr_event += 1; + match val.output() { + Yield::Timeout(delta) => { + self.add_events(process_id, delta as u64, val.clone()); + }, + Yield::Pause => {}, + Yield::AddEvent { time_delta, process_id } => { + self.add_events(process_id, time_delta, val.clone()); + }, + Yield::RequestResource(r) => { + let resource = self.resources.get_mut(r).unwrap(); + resource.request(Event { + time: self.time, + process_id: process_id, + state: val.clone(), + }).unwrap(); + }, + Yield::ReleaseResource(r) => { + let resource = self.resources.get_mut(r).unwrap(); + resource.release(Event { + time: self.time, + process_id: process_id, + state: val.clone(), + }).unwrap(); + }, + Yield::GetContainer(c) => { + let container = self.containers.get_mut(c).unwrap(); + container.get(val.clone()).unwrap(); + }, + Yield::PutContainer(c) => { + let container = self.containers.get_mut(c).unwrap(); + container.put(val.clone()); + }, + Yield::GetStore(s) => { + let store = self.stores.get_mut(s).unwrap(); + store.get(Event { + time: self.time, + process_id: process_id, + state: val.clone(), + }).unwrap(); + }, + Yield::PutStore(s) => { + let store = self.stores.get_mut(s).unwrap(); + store.put(Event { + time: self.time, + process_id: process_id, + state: val.clone(), + }); + }, + + } + self.add_events(process_id, time_delta, val) } GeneratorState::Complete(_output) => {} } @@ -181,7 +301,7 @@ impl Environment { /// Run the simulation until the maximum event time is reached. pub fn run(&mut self) { - if self.curr_event < self.max_event { + if self.time < self.stop { while !self.events.is_empty() { self.step(); } @@ -191,13 +311,20 @@ impl Environment { } /// Add an event to the event queue. - fn add_events(&mut self, id: usize, time_delta: u64) { - if self.curr_event + time_delta > self.max_event { + pub fn add_events(&mut self, id: usize, time_delta: u64, state: T) { + if self.time + time_delta > self.stop { + return; + } else if time_delta == 0 { return; } self.events.push(Reverse(Event { - time: self.curr_event + time_delta, + time: self.time + time_delta, process_id: id, + state: state, })); } + + pub fn set_logs(&mut self, logs: bool) { + self.logs = logs; + } } diff --git a/aika/src/lib.rs b/aika/src/lib.rs index 3d4cf59..d5bc3a1 100644 --- a/aika/src/lib.rs +++ b/aika/src/lib.rs @@ -3,6 +3,7 @@ pub mod distribution; pub mod environment; pub mod manager; +pub mod resources; #[cfg(test)] mod test { @@ -13,23 +14,30 @@ mod test { fn setup_simple_des() { let seed = 223u64; + impl EventYield for f64 { + fn output(&self) -> Yield { + Yield::Pause + } + fn set(&mut self, output: Yield) { + } + } + let mut env = Environment::new(100, seed); - let process_random = Box::new(move || { - let mut i = 0; + + env.create_containers(1000f64, 300f64); + let process_random = Box::new(move |_| { loop { + let i = env.containers[0].get(1.0).unwrap(); yield i; - i -= 1; + } + }); + let process = Box::new(move |_| { + loop { + env.containers[0].put(1.0); + yield 1.0; } }); // Execution Distribution let gamma = Gamma::new(7.0, 1.0); - - env.add_process( - process_random, - ProcessExecution::Stochastic(Box::new(gamma)), - ProcessDuration::Finite(30, 60), - ); - env.run(); - println!("{:?}", env.stores); } } diff --git a/aika/src/manager.rs b/aika/src/manager.rs index 6efa7c1..27b969c 100644 --- a/aika/src/manager.rs +++ b/aika/src/manager.rs @@ -1,16 +1,14 @@ -use std::collections::BTreeMap; - -use crate::environment::Environment; +use crate::{environment::{Environment, EventYield}, resources::{Stores, Arithmetic}}; /// The `Manager` struct is responsible for running a series of simulations and storing the results. -pub struct Manager { +pub struct Manager { /// The number of simulations to run. pub simulations: Vec>, /// The storage of simulation data - pub stores: Vec>, + pub stores: Vec>>, } -impl Manager { +impl> Manager { /// Create a new `Manager` struct. pub fn new() -> Self { Manager { diff --git a/aika/src/resources.rs b/aika/src/resources.rs new file mode 100644 index 0000000..64419e4 --- /dev/null +++ b/aika/src/resources.rs @@ -0,0 +1,271 @@ +use std::collections::VecDeque; + +use crate::environment::Event; + +#[derive(Clone)] +pub struct Stores { + capacity: usize, + gets: VecDeque>, + puts: VecDeque>, + state: VecDeque>, +} + +impl Stores { + pub fn new(capacity: usize) -> Self { + Stores { + capacity: capacity, + gets: VecDeque::new(), + puts: VecDeque::new(), + state: VecDeque::new(), + } + } + + pub fn get(&mut self, event: Event) -> Result, &'static str> { + let process_id = event.process_id; + let time = event.time; + if let Some(mut event) = self.state.pop_front() { + if let Some(put) = self.puts.pop_front() { + self.state.push_back(put); + }; + event = Event { + time: time, + process_id: process_id, + state: event.state, + }; + Ok(event) + } else { + self.gets.push_back(event); + Err("Cannot get from empty store") + } + } + + pub fn put(&mut self, event: Event) { + if self.state.len() < self.capacity { + self.state.push_back(event); + } else { + self.puts.push_back(event); + } + + } +} + +#[derive(Clone)] +pub struct Resources { + capacity: usize, + left: usize, + queue: VecDeque>, +} + +impl Resources { + pub fn new(capacity: usize) -> Self { + Resources { + capacity: capacity, + left: capacity, + queue: VecDeque::new(), + } + } + + pub fn request(&mut self, event: Event) -> Result, &'static str> { + let process_id = event.process_id; + let time = event.time; + if self.left > 0 { + self.left -= 1; + let event = Event { + time: time, + process_id: process_id, + state: event.state, + }; + Ok(event) + } else { + self.queue.push_back(event); + Err("Cannot request from empty resource") + } + } + + pub fn release(&mut self, event: Event) -> Option> { + let time = event.time; + if let Some(event) = self.queue.pop_front() { + Some(Event { + time: time, + process_id: event.process_id, + state: event.state, + }) + } else { + assert!(self.left < self.capacity); + self.left += 1; + None + } + } +} + +#[derive(Clone)] +pub struct Containers { + container_capacity: T, + init: T, + chain: Vec, +} + +impl> Containers { + pub fn new(capacity: T, init: T) -> Self { + let mut vec = Vec::new(); + vec.push(init); + Containers { + container_capacity: capacity, + init: T::default(), + chain: vec, + } + } + + pub fn get(&mut self, amount: T) -> Result { + let mut total = T::default(); + if amount >= T::default() { + if let Some(value) = self.chain.pop() { + if value >= amount { + total = amount.clone(); + let new_value = value.sub(amount); + self.chain.push(new_value); + } else { + return Err("Cannot have negative reserves"); + } + } + } else { + return Err("Cannot have negative get amount"); + } + Ok(total.clone()) + } + + pub fn put(&mut self, amount:T) { + if let Some(value) = self.chain.pop() { + if value.add(amount.clone()) > self.container_capacity { + self.chain.push(self.container_capacity.clone()); + } else { + self.chain.push(value.add(amount.clone())); + } + } + } +} + +pub trait Arithmetic { + fn add(&self, other: T) -> T; + fn sub(&self, other: T) -> T; +} + +impl Arithmetic for i32 { + fn add(&self, other: i32) -> i32 { + self + other + } + + fn sub(&self, other: i32) -> i32 { + self - other + } +} + +impl Arithmetic for f64 { + fn add(&self, other: f64) -> f64 { + self + other + } + + fn sub(&self, other: f64) -> f64 { + self - other + } +} + +impl Arithmetic for f32 { + fn add(&self, other: f32) -> f32 { + self + other + } + + fn sub(&self, other: f32) -> f32 { + self - other + } +} + +impl Arithmetic for u64 { + fn add(&self, other: u64) -> u64 { + self + other + } + + fn sub(&self, other: u64) -> u64 { + self - other + } +} + +impl Arithmetic for u32 { + fn add(&self, other: u32) -> u32 { + self + other + } + + fn sub(&self, other: u32) -> u32 { + self - other + } +} + +impl Arithmetic for u16 { + fn add(&self, other: u16) -> u16 { + self + other + } + + fn sub(&self, other: u16) -> u16 { + self - other + } +} + +impl Arithmetic for u8 { + fn add(&self, other: u8) -> u8 { + self + other + } + + fn sub(&self, other: u8) -> u8 { + self - other + } +} + +impl Arithmetic for i64 { + fn add(&self, other: i64) -> i64 { + self + other + } + + fn sub(&self, other: i64) -> i64 { + self - other + } +} + +impl Arithmetic for i16 { + fn add(&self, other: i16) -> i16 { + self + other + } + + fn sub(&self, other: i16) -> i16 { + self - other + } +} + +impl Arithmetic for i8 { + fn add(&self, other: i8) -> i8 { + self + other + } + + fn sub(&self, other: i8) -> i8 { + self - other + } +} + +impl Arithmetic for usize { + fn add(&self, other: usize) -> usize { + self + other + } + + fn sub(&self, other: usize) -> usize { + self - other + } +} + +impl Arithmetic for isize { + fn add(&self, other: isize) -> isize { + self + other + } + + fn sub(&self, other: isize) -> isize { + self - other + } +} \ No newline at end of file diff --git a/assets/aika-clock.png b/assets/aika-clock.png new file mode 100644 index 0000000..4f8c37d Binary files /dev/null and b/assets/aika-clock.png differ