Skip to content

Commit

Permalink
adding stats functions
Browse files Browse the repository at this point in the history
  • Loading branch information
kevloui committed Aug 3, 2020
1 parent c2f0e8f commit 26c9d5e
Showing 1 changed file with 48 additions and 51 deletions.
99 changes: 48 additions & 51 deletions amadeus-core/src/par_sink/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,16 @@ use educe::Educe;
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;

use super::{
folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink
};
use super::{folder_par_sink, FolderSync, FolderSyncReducer, ParallelPipe, ParallelSink};

#[derive(new)]
#[must_use]
pub struct Mean<P> {
pipe: P,
pipe: P,
}

impl_par_dist! {
impl<P: ParallelPipe<Item, Output = f64>, Item> ParallelSink<Item> for Mean<P> {
impl<P: ParallelPipe<Item, Output = f64>, Item> ParallelSink<Item> for Mean<P> {
folder_par_sink!(
MeanFolder<StepA>,
MeanFolder<StepB>,
Expand All @@ -32,69 +30,68 @@ impl_par_dist! {
#[serde(bound = "")]

pub struct MeanFolder<Step> {
marker: PhantomData<fn() -> Step>,
marker: PhantomData<fn() -> Step>,
}

pub struct StepA;
pub struct StepB;

#[derive(Serialize, Deserialize, new)]
pub struct State {
#[new(default)]
mean: f64,
#[new(default)]
correction: f64,
#[new(default)]
count: u64,
#[new(default)]
mean: f64,
#[new(default)]
correction: f64,
#[new(default)]
count: u64,
}

impl FolderSync<f64> for MeanFolder<StepA> {
type State = State;
type Done = f64;

#[inline(always)]
fn zero(&mut self) -> Self::State {
State::new()
}

#[inline(always)]
fn push(&mut self, state: &mut Self::State, item: f64) {
state.count += 1;
let f = (item - state.mean) / (state.count as f64);
let y = f - state.correction;
let t = state.mean + y;
state.correction = (t - state.mean) - y;
state.mean = t;
}

#[inline(always)]
fn done(&mut self, state: Self::State) -> Self::Done {
state.mean
}
}
type State = State;
type Done = f64;

#[inline(always)]
fn zero(&mut self) -> Self::State {
State::new()
}

#[inline(always)]
fn push(&mut self, state: &mut Self::State, item: f64) {
state.count += 1;
let f = (item - state.mean) / (state.count as f64);
let y = f - state.correction;
let t = state.mean + y;
state.correction = (t - state.mean) - y;
state.mean = t;
}

#[inline(always)]
fn done(&mut self, state: Self::State) -> Self::Done {
state.mean
}
}

impl FolderSync<State> for MeanFolder<StepB> {
type State = State;
type Done = f64;
type State = State;
type Done = f64;

#[inline(always)]
fn zero(&mut self) -> Self::State {
State::new()
}
#[inline(always)]
fn zero(&mut self) -> Self::State {
State::new()
}

#[inline(always)]
#[inline(always)]
fn push(&mut self, state: &mut Self::State, item: State) {
state.correction = todo!();
state.mean = ((state.mean * state.count as f64) + (item.mean * item.count as f64)) / ((state.count + item.count) as f64);
state.count += item.count;
}

#[inline(always)]
fn done(&mut self, state: Self::State) -> Self::Done {
state.mean
}
state.correction = ((state.correction * state.count as f64)
+ (item.correction * item.count as f64))
/ ((state.count + item.count) as f64);
state.mean = ((state.mean * state.count as f64) + (item.mean * item.count as f64))
/ ((state.count + item.count) as f64);
state.count += item.count;
}

#[inline(always)]
fn done(&mut self, state: Self::State) -> Self::Done {
state.mean
}
}

0 comments on commit 26c9d5e

Please sign in to comment.