Skip to content

Commit

Permalink
refactor(time): add get_time_range
Browse files Browse the repository at this point in the history
  • Loading branch information
omarabid committed Feb 7, 2022
1 parent 3fd4e79 commit a6d4dbc
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 49 deletions.
8 changes: 4 additions & 4 deletions src/pyroscope.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,20 +302,20 @@ impl PyroscopeAgent {
self.handle = Some(std::thread::spawn(move || {
log::trace!("PyroscopeAgent - Main Thread started");

while let Ok(time) = rx.recv() {
log::trace!("PyroscopeAgent - Sending session {}", time);
while let Ok(until) = rx.recv() {
log::trace!("PyroscopeAgent - Sending session {}", until);

// Generate report from backend
let report = backend.lock()?.report()?;

// Send new Session to SessionManager
stx.send(SessionSignal::Session(Session::new(
time,
until,
config.clone(),
report,
)?))?;

if time == 0 {
if until == 0 {
log::trace!("PyroscopeAgent - Session Killed");

let (lock, cvar) = &*pair;
Expand Down
23 changes: 8 additions & 15 deletions src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
};

use crate::pyroscope::PyroscopeConfig;
use crate::utils::get_time_range;
use crate::utils::merge_tags_with_app_name;
use crate::Result;

Expand Down Expand Up @@ -101,26 +102,18 @@ impl Session {
/// let until = 154065120;
/// let session = Session::new(until, config, report).unwrap();
/// ```
pub fn new(mut until: u64, config: PyroscopeConfig, report: Vec<u8>) -> Result<Self> {
pub fn new(until: u64, config: PyroscopeConfig, report: Vec<u8>) -> Result<Self> {
log::info!("Session - Creating Session");
// Session interrupted (0 signal), determine the time
if until == 0 {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
until = now
.checked_add(10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap())
.unwrap();
}

// Start of the session
let from = until.checked_sub(10u64).unwrap();
// get_time_range should be used with "from". We balance this by reducing
// 10s from the returned range.
let time_range = get_time_range(until)?;

Ok(Self {
config,
report,
from,
until,
from: time_range.from - 10,
until: time_range.until - 10,
})
}

Expand All @@ -134,7 +127,7 @@ impl Session {
/// session.send().unwrap();
/// ```
pub fn send(self) -> Result<()> {
log::info!("Session - Sending Session");
log::info!("Session - Sending Session {} - {}", self.from, self.until);

// Check if the report is empty
if self.report.is_empty() {
Expand Down
15 changes: 5 additions & 10 deletions src/timer/epoll.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// except according to those terms.

use crate::utils::check_err;
use crate::utils::get_time_range;
use crate::Result;

use std::sync::{
Expand Down Expand Up @@ -56,15 +57,13 @@ impl Timer {
// Fire @ 10th sec
Timer::epoll_wait(timer_fd, epoll_fd)?;

// Get current time
let current = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
// Get the current time range
let from = get_time_range(0)?.from;

// Iterate through Senders
txs.lock()?.iter().for_each(|tx| {
// Send event to attached Sender
if tx.send(current).is_ok() {}
if tx.send(from).is_ok() {}
});
}
}));
Expand All @@ -83,11 +82,7 @@ impl Timer {
let tfd = timerfd_create(clockid, clock_flags)?;

// Get the next event time
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap();
let first_fire = now + rem;
let first_fire = get_time_range(0)?.until;

// new_value sets the Timer
let mut new_value = libc::itimerspec {
Expand Down
16 changes: 6 additions & 10 deletions src/timer/kqueue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// except according to those terms.

use crate::utils::check_err;
use crate::utils::get_time_range;
use crate::Result;

use std::sync::{
Expand Down Expand Up @@ -57,15 +58,14 @@ impl Timer {
return Ok(());
}


// Get current time
let current = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let from = get_time_range(0)?.from;

// Iterate through Senders
txs.lock()?.iter().for_each(|tx| {
// Send event to attached Sender
match tx.send(current) {
match tx.send(from) {
Ok(_) => {}
Err(_) => {}
}
Expand Down Expand Up @@ -107,12 +107,8 @@ impl Timer {

/// Register an initial expiration event
fn register_initial_expiration(kqueue: i32) -> Result<libc::kevent> {
// Get the next event time
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap();
let first_fire = now + rem;
// Get first event time
let first_time = get_time_range(0)?.until;

let initial_event = libc::kevent {
ident: 1,
Expand Down
14 changes: 4 additions & 10 deletions src/timer/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
// https://www.apache.org/licenses/LICENSE-2.0>. This file may not be copied, modified, or distributed
// except according to those terms.

use crate::utils::get_time_range;
use crate::Result;

use std::sync::{
Expand Down Expand Up @@ -41,13 +42,8 @@ impl Timer {

// Spawn a Thread
let handle = Some(thread::spawn(move || {
// Get the current time
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();

// Calculate number of seconds until 10th second
let rem = 10u64.checked_sub(now.checked_rem(10).unwrap()).unwrap();
// Get remaining time for 10th second fire event
let rem = get_time_range(0)?.rem;

// Sleep for rem seconds
thread::sleep(Duration::from_secs(rem));
Expand All @@ -59,9 +55,7 @@ impl Timer {
}

// Get current time
let current = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs();
let current = get_time_range(0)?.from;

// Iterate through Senders
txs.lock()?.iter().for_each(|tx| {
Expand Down
69 changes: 69 additions & 0 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,72 @@ mod check_err_tests {
assert!(check_err(-1).is_err())
}
}

/// Return the current time in seconds.
pub fn get_current_time_secs() -> Result<u64> {
Ok(std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_secs())
}

#[cfg(test)]
mod get_current_time_secs_tests {
use crate::utils::get_current_time_secs;

#[test]
fn get_current_time_secs_success() {
assert!(get_current_time_secs().is_ok())
}
}

#[derive(Debug, PartialEq)]
pub struct TimeRange {
pub from: u64,
pub until: u64,
pub current: u64,
pub rem: u64,
}

/// Return a range of timestamps in the form [start, end).
/// The range is inclusive of start and exclusive of end.
pub fn get_time_range(timestamp: u64) -> Result<TimeRange> {
// if timestamp is 0, then get the current time
if timestamp == 0 {
return get_time_range(get_current_time_secs()?);
}

// Determine the start and end of the range
Ok(TimeRange {
from: timestamp / 10 * 10,
until: timestamp / 10 * 10 + 10,
current: timestamp,
rem: 10 - (timestamp % 10),
})
}

#[cfg(test)]
mod get_time_range_tests {
use crate::utils::{get_time_range, TimeRange};

#[test]
fn get_time_range_verify() {
assert_eq!(get_time_range(1644194479).unwrap(), TimeRange {
from: 1644194470,
until: 1644194480,
current: 1644194479,
rem: 1,
});
assert_eq!(get_time_range(1644194470).unwrap(), TimeRange {
from: 1644194470,
until: 1644194480,
current: 1644194470,
rem: 10,
});
assert_eq!(get_time_range(1644194476).unwrap(), TimeRange {
from: 1644194470,
until: 1644194480,
current: 1644194476,
rem: 4,
});
}
}

0 comments on commit a6d4dbc

Please sign in to comment.