Skip to content

Commit

Permalink
Merge pull request #21 from drahnr/bernhard-acc-cycle
Browse files Browse the repository at this point in the history
allow configurable accumulation_cycle
  • Loading branch information
omarabid authored Mar 9, 2022
2 parents 37fa587 + 44fc28d commit 9127303
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 117 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ libc = "^0.2.66"
[dev-dependencies]
tokio = { version = "1.13", features = ["full"] }
pretty_env_logger = "0.4.0"
assert_matches = "1"

[profile.dev]
opt-level=0
Expand Down
9 changes: 4 additions & 5 deletions examples/internals-timer.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
extern crate pyroscope;

use std::sync::mpsc::channel;
use std::sync::mpsc::{Receiver, Sender};
use std::sync::mpsc;

use pyroscope::timer::Timer;

fn main() {
// Initialize the Timer
let mut timer = Timer::default().initialize().unwrap();
let mut timer = Timer::initialize(std::time::Duration::from_secs(10)).unwrap();

// Create a streaming channel
let (tx, rx): (Sender<u64>, Receiver<u64>) = channel();
let (tx, rx) = mpsc::channel();

let (tx2, rx2): (Sender<u64>, Receiver<u64>) = channel();
let (tx2, rx2) = mpsc::channel();

// Attach tx to Timer
timer.attach_listener(tx).unwrap();
Expand Down
11 changes: 9 additions & 2 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ pub enum PyroscopeError {
AdHoc(String),

#[error("{msg}: {source:?}")]
Compat{ msg: String, #[source] source: Box<dyn std::error::Error + Send + Sync + 'static> },
Compat {
msg: String,
#[source]
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},

#[error(transparent)]
Reqwest(#[from] reqwest::Error),
Expand All @@ -37,7 +41,10 @@ impl PyroscopeError {
}

/// Create a new instance of PyroscopeError with source
pub fn new_with_source<E>(msg: &str, source: E) -> Self where E: std::error::Error + Send + Sync + 'static {
pub fn new_with_source<E>(msg: &str, source: E) -> Self
where
E: std::error::Error + Send + Sync + 'static,
{
PyroscopeError::Compat {
msg: msg.to_string(),
source: Box::new(source),
Expand Down
111 changes: 82 additions & 29 deletions src/pyroscope.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::{
collections::HashMap,
sync::{
mpsc::{channel, Receiver, Sender},
mpsc::{self, Sender},
Arc, Condvar, Mutex,
},
thread::JoinHandle,
time::Duration,
};

use crate::{
backends::{pprof::Pprof, Backend},
error::Result,
session::{Session, SessionManager, SessionSignal},
timer::Timer,
timer::{Timer, TimerSignal},
};

const LOG_TAG: &str = "Pyroscope::Agent";
Expand All @@ -32,6 +33,8 @@ pub struct PyroscopeConfig {
pub tags: HashMap<String, String>,
/// Sample rate used in Hz
pub sample_rate: i32,
/// How long to accumulate data for before sending it upstream
pub accumulation_cycle: Duration,
// TODO
// log_level
// auth_token
Expand All @@ -52,6 +55,7 @@ impl PyroscopeConfig {
application_name: application_name.as_ref().to_owned(),
tags: HashMap::new(),
sample_rate: 100i32,
accumulation_cycle: Duration::from_secs(10),
}
}

Expand All @@ -69,6 +73,27 @@ impl PyroscopeConfig {
}
}

/// Override the accumulation cycle.
///
/// # Example
///
/// ```
/// use std::time::Duration;
/// use pyroscope::pyroscope::PyroscopeConfig;
/// # use pyroscope::PyroscopeError;
/// # fn main() -> Result<(), PyroscopeError> {
/// let config = PyroscopeConfig::new("http://localhost:8080", "my-app")
/// .accumulation_cycle(Duration::from_millis(4587));
/// # Ok(())
/// # }
/// ```
pub fn accumulation_cycle(self, accumulation_cycle: Duration) -> Self {
Self {
accumulation_cycle,
..self
}
}

/// Set the tags
/// # Example
/// ```ignore
Expand Down Expand Up @@ -147,7 +172,7 @@ impl PyroscopeAgentBuilder {
/// # Example
/// ```ignore
/// let builder = PyroscopeAgentBuilder::new("http://localhost:8080", "my-app")
/// .sample_rate(99)
/// .sample_rate(113)
/// .build()
/// ?;
/// ```
Expand All @@ -158,6 +183,29 @@ impl PyroscopeAgentBuilder {
}
}

/// Set the accumulation cycle. Default value is 10 seconds.
///
/// # Example
///
/// ```
/// use pyroscope::pyroscope::PyroscopeAgentBuilder;
/// use std::time::Duration;
/// # use pyroscope::PyroscopeError;
/// # fn main() -> Result<(), PyroscopeError> {
///
/// let builder = PyroscopeAgentBuilder::new("http://localhost:8080", "my-app")
/// .accumulation_cycle(Duration::from_secs(3))
/// .build()?;
/// # Ok(())
/// # }
/// ```
pub fn accumulation_cycle(self, acc_cycle: impl Into<Duration>) -> Self {
Self {
config: self.config.accumulation_cycle(acc_cycle.into()),
..self
}
}

/// Set tags. Default is empty.
/// # Example
/// ```ignore
Expand All @@ -181,7 +229,7 @@ impl PyroscopeAgentBuilder {
log::trace!(target: LOG_TAG, "Backend initialized");

// Start Timer
let timer = Timer::default().initialize()?;
let timer = Timer::initialize(self.config.accumulation_cycle.clone())?;
log::trace!(target: LOG_TAG, "Timer initialized");

// Start the SessionManager
Expand All @@ -206,7 +254,7 @@ impl PyroscopeAgentBuilder {
pub struct PyroscopeAgent {
timer: Timer,
session_manager: SessionManager,
tx: Option<Sender<u64>>,
tx: Option<Sender<TimerSignal>>,
handle: Option<JoinHandle<Result<()>>>,
running: Arc<(Mutex<bool>, Condvar)>,

Expand Down Expand Up @@ -298,7 +346,7 @@ impl PyroscopeAgent {
*running = true;
drop(running);

let (tx, rx): (Sender<u64>, Receiver<u64>) = channel();
let (tx, rx) = mpsc::channel();
self.timer.attach_listener(tx.clone())?;
self.tx = Some(tx);

Expand All @@ -309,28 +357,31 @@ impl PyroscopeAgent {
self.handle = Some(std::thread::spawn(move || {
log::trace!(target: LOG_TAG, "Main Thread started");

while let Ok(until) = rx.recv() {
log::trace!(target: LOG_TAG, "Sending session {}", until);

// Generate report from backend
let report = backend.lock()?.report()?;

// Send new Session to SessionManager
stx.send(SessionSignal::Session(Session::new(
until,
config.clone(),
report,
)?))?;

if until == 0 {
log::trace!(target: LOG_TAG, "Session Killed");

let (lock, cvar) = &*pair;
let mut running = lock.lock()?;
*running = false;
cvar.notify_one();

return Ok(());
while let Ok(signal) = rx.recv() {
match signal {
TimerSignal::NextSnapshot(until) => {
log::trace!(target: LOG_TAG, "Sending session {}", until);

// Generate report from backend
let report = backend.lock()?.report()?;

// Send new Session to SessionManager
stx.send(SessionSignal::Session(Session::new(
until,
config.clone(),
report,
)?))?
}
TimerSignal::Terminate => {
log::trace!(target: LOG_TAG, "Session Killed");

let (lock, cvar) = &*pair;
let mut running = lock.lock()?;
*running = false;
cvar.notify_one();

return Ok(());
}
}
}
Ok(())
Expand All @@ -351,7 +402,9 @@ impl PyroscopeAgent {
log::debug!(target: LOG_TAG, "Stopping");
// get tx and send termination signal
if let Some(sender) = self.tx.take() {
sender.send(0)?;
// best effort
let _ = sender.send(TimerSignal::NextSnapshot(0));
sender.send(TimerSignal::Terminate)?;
} else {
log::error!("PyroscopeAgent - Missing sender")
}
Expand Down
15 changes: 11 additions & 4 deletions src/session.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{
sync::mpsc::{sync_channel, Receiver, SyncSender},
thread,
thread::JoinHandle,
thread::{self, JoinHandle},
time::Duration,
};

use crate::pyroscope::PyroscopeConfig;
Expand Down Expand Up @@ -84,7 +84,9 @@ impl SessionManager {
pub struct Session {
pub config: PyroscopeConfig,
pub report: Vec<u8>,
// unix time
pub from: u64,
// unix time
pub until: u64,
}

Expand Down Expand Up @@ -122,7 +124,12 @@ impl Session {
/// session.send()?;
/// ```
pub fn send(self) -> Result<()> {
log::info!(target: LOG_TAG, "Sending Session: {} - {}", self.from, self.until);
log::info!(
target: LOG_TAG,
"Sending Session: {} - {}",
self.from,
self.until
);

// Check if the report is empty
if self.report.is_empty() {
Expand Down Expand Up @@ -154,7 +161,7 @@ impl Session {
("spyName", "pyroscope-rs"),
])
.body(self.report)
.timeout(std::time::Duration::from_secs(10))
.timeout(Duration::from_secs(10))
.send()?;

Ok(())
Expand Down
Loading

0 comments on commit 9127303

Please sign in to comment.