Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PeriodicThread for sampling #113

Merged
merged 9 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ext/vernier/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@
have_func("rb_profile_thread_frames", "ruby/debug.h")

have_func("pthread_setname_np")
have_func("pthread_condattr_setclock")

create_makefile("vernier/vernier")
61 changes: 4 additions & 57 deletions ext/vernier/memory.cc
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#include <atomic>
#include <mutex>
#include <stdio.h>
#include <unistd.h>
#include <vector>

#include "vernier.hh"
#include "timestamp.hh"
#include "periodic_thread.hh"

#if defined(__APPLE__)

Expand Down Expand Up @@ -60,62 +60,6 @@ static VALUE rb_memory_rss(VALUE self) {
return ULL2NUM(memory_rss());
}

class PeriodicThread {
std::atomic<bool> running;
pthread_t pthread;
TimeStamp interval;

public:
PeriodicThread() : interval(TimeStamp::from_milliseconds(10)) {
}

void set_interval(TimeStamp timestamp) {
interval = timestamp;
}

static void *thread_entrypoint(void *arg) {
static_cast<PeriodicThread *>(arg)->run();
return NULL;
}

void run() {
TimeStamp next_sample_schedule = TimeStamp::Now();
while (running) {
TimeStamp sample_complete = TimeStamp::Now();

run_iteration();

next_sample_schedule += interval;

if (next_sample_schedule < sample_complete) {
next_sample_schedule = sample_complete + interval;
}

TimeStamp::SleepUntil(next_sample_schedule);
}
}

virtual void run_iteration() = 0;

void start() {
if (running) return;

running = true;

int ret = pthread_create(&pthread, NULL, &thread_entrypoint, this);
if (ret != 0) {
perror("pthread_create");
rb_bug("VERNIER: pthread_create failed");
}
}

void stop() {
if (!running) return;

running = false;
}
};

class MemoryTracker : public PeriodicThread {
public:
struct Record {
Expand All @@ -125,6 +69,9 @@ class MemoryTracker : public PeriodicThread {
std::vector<Record> results;
std::mutex mutex;

MemoryTracker() : PeriodicThread(TimeStamp::from_milliseconds(10)) {
}

void run_iteration() {
record();
}
Expand Down
141 changes: 141 additions & 0 deletions ext/vernier/periodic_thread.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
#include "ruby.h"

#include <atomic>
#include "timestamp.hh"

#ifdef __APPLE__

#include <mach/mach.h>
#include <mach/mach_time.h>
#include <pthread.h>

// https://developer.apple.com/library/archive/technotes/tn2169/_index.html
inline void upgrade_thread_priority(pthread_t pthread) {
mach_timebase_info_data_t timebase_info;
mach_timebase_info(&timebase_info);

const uint64_t NANOS_PER_MSEC = 1000000ULL;
double clock2abs = ((double)timebase_info.denom / (double)timebase_info.numer) * NANOS_PER_MSEC;

thread_time_constraint_policy_data_t policy;
policy.period = 0;

// FIXME: I really don't know what these value should be
policy.computation = (uint32_t)(5 * clock2abs); // 5 ms of work
policy.constraint = (uint32_t)(10 * clock2abs);
policy.preemptible = FALSE;

int kr = thread_policy_set(pthread_mach_thread_np(pthread_self()),
THREAD_TIME_CONSTRAINT_POLICY,
(thread_policy_t)&policy,
THREAD_TIME_CONSTRAINT_POLICY_COUNT);

if (kr != KERN_SUCCESS) {
mach_error("thread_policy_set:", kr);
exit(1);
}
}
#else
inline void upgrade_thread_priority(pthread_t pthread) {
}
#endif

class PeriodicThread {
pthread_t pthread;
TimeStamp interval;

pthread_mutex_t running_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t running_cv;
std::atomic_bool running;

public:
PeriodicThread(TimeStamp interval) : interval(interval), running(false) {
pthread_condattr_t attr;
pthread_condattr_init(&attr);
#if HAVE_PTHREAD_CONDATTR_SETCLOCK
pthread_condattr_setclock(&attr, CLOCK_MONOTONIC);
#endif
pthread_cond_init(&running_cv, &attr);
}

void set_interval(TimeStamp timestamp) {
interval = timestamp;
}

static void *thread_entrypoint(void *arg) {
upgrade_thread_priority(pthread_self());

static_cast<PeriodicThread *>(arg)->run();
return NULL;
}

void run() {
#if HAVE_PTHREAD_SETNAME_NP
#ifdef __APPLE__
pthread_setname_np("Vernier profiler");
#else
pthread_setname_np(pthread_self(), "Vernier profiler");
#endif
#endif

TimeStamp next_sample_schedule = TimeStamp::Now();
bool done = false;
while (!done) {
TimeStamp sample_complete = TimeStamp::Now();

run_iteration();

next_sample_schedule += interval;

if (next_sample_schedule < sample_complete) {
next_sample_schedule = sample_complete + interval;
}

pthread_mutex_lock(&running_mutex);
if (running) {
#if HAVE_PTHREAD_CONDATTR_SETCLOCK
struct timespec next_sample_ts = next_sample_schedule.timespec();
#else
auto offset = TimeStamp::NowRealtime() - TimeStamp::Now();
struct timespec next_sample_ts = (next_sample_schedule + offset).timespec();
#endif
int ret;
do {
ret = pthread_cond_timedwait(&running_cv, &running_mutex, &next_sample_ts);
} while(running && ret == EINTR);
}
done = !running;
pthread_mutex_unlock(&running_mutex);
}
}

virtual void run_iteration() = 0;

void start() {
pthread_mutex_lock(&running_mutex);
if (!running) {
running = true;

int ret = pthread_create(&pthread, NULL, &thread_entrypoint, this);
if (ret != 0) {
perror("pthread_create");
rb_bug("VERNIER: pthread_create failed");
}
}
pthread_mutex_unlock(&running_mutex);
}

void stop() {
pthread_mutex_lock(&running_mutex);
bool was_running = running;
if (running) {
running = false;
pthread_cond_broadcast(&running_cv);
}
pthread_mutex_unlock(&running_mutex);
if (was_running)
pthread_join(pthread, NULL);
pthread = 0;
}
};

72 changes: 72 additions & 0 deletions ext/vernier/signal_safe_semaphore.hh
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#ifndef SIGNAL_SAFE_SEMAPHORE_HH
#define SIGNAL_SAFE_SEMAPHORE_HH

#if defined(__APPLE__)
/* macOS */
#include <dispatch/dispatch.h>
#elif defined(__FreeBSD__)
/* FreeBSD */
#include <pthread_np.h>
#include <semaphore.h>
#else
/* Linux */
#include <semaphore.h>
#include <sys/syscall.h> /* for SYS_gettid */
#endif

// A basic semaphore built on sem_wait/sem_post
// post() is guaranteed to be async-signal-safe
class SignalSafeSemaphore {
#ifdef __APPLE__
dispatch_semaphore_t sem;
#else
sem_t sem;
#endif

public:

SignalSafeSemaphore(unsigned int value = 0) {
#ifdef __APPLE__
sem = dispatch_semaphore_create(value);
#else
sem_init(&sem, 0, value);
#endif
};

~SignalSafeSemaphore() {
#ifdef __APPLE__
dispatch_release(sem);
#else
sem_destroy(&sem);
#endif
};

void wait() {
#ifdef __APPLE__
dispatch_semaphore_wait(sem, DISPATCH_TIME_FOREVER);
#else
// Use sem_timedwait so that we get a crash instead of a deadlock for
// easier debugging
struct timespec ts = (TimeStamp::NowRealtime() + TimeStamp::from_seconds(5)).timespec();

int ret;
do {
ret = sem_timedwait(&sem, &ts);
} while (ret && errno == EINTR);
if (ret != 0) {
rb_bug("VERNIER: sem_timedwait waited over 5 seconds");
}
assert(ret == 0);
#endif
}

void post() {
#ifdef __APPLE__
dispatch_semaphore_signal(sem);
#else
sem_post(&sem);
#endif
}
};

#endif
6 changes: 6 additions & 0 deletions ext/vernier/timestamp.hh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ class TimeStamp {
return TimeStamp(ts.tv_sec * nanoseconds_per_second + ts.tv_nsec);
}

static TimeStamp NowRealtime() {
struct timespec ts;
clock_gettime(CLOCK_REALTIME, &ts);
return TimeStamp(ts.tv_sec * nanoseconds_per_second + ts.tv_nsec);
}

static TimeStamp Zero() {
return TimeStamp(0);
}
Expand Down
Loading
Loading