Skip to content

Commit

Permalink
Add async events (#626)
Browse files Browse the repository at this point in the history
* Implement asynchronous timed events

* Add irecv gap time stats
  • Loading branch information
daboehme authored Nov 25, 2024
1 parent b8e4b5e commit 9895b2f
Show file tree
Hide file tree
Showing 13 changed files with 391 additions and 17 deletions.
3 changes: 3 additions & 0 deletions examples/apps/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ project(caliper-examples)
include_directories("../../src/interface/c_fortran")

set(CALIPER_CXX_EXAMPLE_APPS
async_event
cxx-example
cali-memtracking
cali-memtracking-macros
Expand Down Expand Up @@ -53,3 +54,5 @@ target_link_libraries(cali-memtracking
caliper-tools-util)
target_link_libraries(cali-memtracking-macros
caliper-tools-util)
target_link_libraries(async_event
Threads::Threads)
94 changes: 94 additions & 0 deletions examples/apps/async_event.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (c) 2024, Lawrence Livermore National Security, LLC.
// See top-level LICENSE file for details.

// Example for using asynchronous timed events across threads

#include <caliper/cali.h>
#include <caliper/cali-manager.h>

#include <caliper/AsyncEvent.h>

#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>

std::queue<cali::TimedAsyncEvent> q;
std::mutex q_mtx;
std::condition_variable cv;
bool done = false;

void consumer_thread_fn()
{
CALI_CXX_MARK_FUNCTION;

while (true) {
cali::TimedAsyncEvent evt;

CALI_MARK_BEGIN("waiting");
{
std::unique_lock<std::mutex> g(q_mtx);
cv.wait(g, [](){ return !q.empty() || done; });
if (!q.empty()) {
evt = q.front();
q.pop();
} else
return;
}
CALI_MARK_END("waiting");

evt.end();

CALI_CXX_MARK_SCOPE("processing");
std::this_thread::sleep_for(std::chrono::microseconds(200));
}
}

int main(int argc, char* argv[])
{
cali::ConfigManager mgr;
mgr.set_default_parameter("aggregate_across_ranks", "false");

if (argc > 1) {
mgr.add(argv[1]);
if (mgr.error()) {
std::cerr << "ConfigManager: " << mgr.error_msg() << std::endl;
return -1;
}
}

mgr.start();
cali_init(); // initialize Caliper before creating sub-thread

std::thread consumer(consumer_thread_fn);

CALI_MARK_BEGIN("main_thread");

int N = 200;

CALI_MARK_BEGIN("producing");

for (int i = 0; i < N; ++i) {
q_mtx.lock();
q.push(cali::TimedAsyncEvent::begin("queue_wait"));
q_mtx.unlock();
cv.notify_one();
std::this_thread::sleep_for(std::chrono::microseconds(100));
}

CALI_MARK_END("producing");

{
std::lock_guard<std::mutex> g(q_mtx);
done = true;
}

cv.notify_all();

CALI_MARK_BEGIN("waiting");
consumer.join();
CALI_MARK_END("waiting");
CALI_MARK_END("main_thread");

mgr.flush();
}
40 changes: 40 additions & 0 deletions include/caliper/AsyncEvent.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright (c) 2015-2024, Lawrence Livermore National Security, LLC.
// See top-level LICENSE file for details.

/// \file AsyncEvent.h
/// %Caliper C++ interface for asynchronous events

#pragma once
#ifndef CALI_ASYNC_EVENT_H
#define CALI_ASYNC_EVENT_H

#include <chrono>
#include <memory>

namespace cali
{

class Node;

class TimedAsyncEvent
{
using clock = std::chrono::steady_clock;

Node* end_tree_node_;
std::chrono::time_point<clock> start_time_;

explicit TimedAsyncEvent(Node* node) : end_tree_node_ { node }, start_time_ { clock::now() } { }

public:

constexpr TimedAsyncEvent() : end_tree_node_ { nullptr } { }

void end();

static TimedAsyncEvent begin(const char* message);
};

} // namespace cali


#endif
17 changes: 12 additions & 5 deletions include/caliper/Caliper.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,12 @@ class Channel
typedef util::callback<void(Caliper*, Channel*, const Attribute&, const Variant&)> update_cbvec;
typedef util::callback<void(Caliper*, Channel*)> caliper_cbvec;

typedef util::callback<void(Caliper*, Channel*, SnapshotView)> event_cbvec;
typedef util::callback<void(Caliper*, Channel*, SnapshotView, SnapshotBuilder&)> snapshot_cbvec;
typedef util::callback<void(Caliper*, Channel*, SnapshotView, SnapshotView)> process_snapshot_cbvec;
typedef util::callback<void(Caliper*, Channel*, std::vector<Entry>&)> edit_snapshot_cbvec;

typedef util::callback<void(Caliper*, Channel*, SnapshotView, SnapshotFlushFn)> flush_cbvec;
typedef util::callback<void(Caliper*, Channel*, SnapshotView)> write_cbvec;

typedef util::callback<
void(Caliper*, Channel*, const void*, const char*, size_t, size_t, const size_t*, size_t, const Attribute*, const Variant*)>
Expand All @@ -84,6 +84,8 @@ class Channel
update_cbvec pre_set_evt;
/// \brief Invoked on region end, \e before it has been removed from the blackboard.
update_cbvec pre_end_evt;
/// \brief Invoked for asynchronous events
event_cbvec async_event;

/// \brief Invoked when a new thread context is being created.
caliper_cbvec create_thread_evt;
Expand Down Expand Up @@ -115,11 +117,11 @@ class Channel
process_snapshot_cbvec process_snapshot;

/// \brief Invoked before flush.
write_cbvec pre_flush_evt;
event_cbvec pre_flush_evt;
/// \brief Flush all snapshot records.
flush_cbvec flush_evt;
/// \brief Invoked after flush.
write_cbvec post_flush_evt;
event_cbvec post_flush_evt;

/// \brief Modify snapshot records during flush.
edit_snapshot_cbvec postprocess_snapshot;
Expand All @@ -129,7 +131,7 @@ class Channel
/// This is invoked by the Caliper::flush_and_write() API call, and
/// causes output services (e.g., report or recorder) to trigger a
/// flush.
write_cbvec write_output_evt;
event_cbvec write_output_evt;

/// \brief Invoked at a memory region begin.
track_mem_cbvec track_mem_evt;
Expand Down Expand Up @@ -264,7 +266,7 @@ class Caliper : public CaliperMetadataAccessInterface
/// \param attr Attribute key.
void end_with_value_check(const Attribute& attr, const Variant& data);

/// \brief Set attribute:value pair on the process or thread blackboard.
/// \brief Set attribute:value pair on the process or thread blackboard
///
/// Set the given attribute/value pair on the blackboard. Overwrites
/// the previous values of the same attribute.
Expand All @@ -278,6 +280,11 @@ class Caliper : public CaliperMetadataAccessInterface
/// \param data Value to set
void set(const Attribute& attr, const Variant& data);

/// \brief Mark an asynchronous event with the given info.
///
/// This function invokes the async_event callback on all active channels.
void async_event(SnapshotView info);

/// \}
/// \name Memory region tracking (across channels)
/// \{
Expand Down
86 changes: 86 additions & 0 deletions src/caliper/AsyncEvent.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) 2015-2024, Lawrence Livermore National Security, LLC.
// See top-level LICENSE file for details.

#include "caliper/AsyncEvent.h"

#include "caliper/Caliper.h"
#include "caliper/SnapshotRecord.h"

using namespace cali;

namespace
{

Attribute get_async_event_begin_attr(Caliper& c)
{
static std::atomic<Node*> s_attr_node { nullptr };

auto node = s_attr_node.load();
if (node)
return Attribute::make_attribute(node);

Attribute attr = c.create_attribute("async.begin", CALI_TYPE_STRING, CALI_ATTR_SCOPE_THREAD | CALI_ATTR_SKIP_EVENTS);
s_attr_node.store(attr.node());

return attr;
}

Attribute get_async_event_end_attr(Caliper& c)
{
static std::atomic<Node*> s_attr_node { nullptr };

auto node = s_attr_node.load();
if (node)
return Attribute::make_attribute(node);

Attribute attr = c.create_attribute("async.end", CALI_TYPE_STRING, CALI_SCOPE_THREAD | CALI_ATTR_SKIP_EVENTS);
s_attr_node.store(attr.node());

return attr;
}

Attribute get_event_duration_attr(Caliper& c)
{
static std::atomic<Node*> s_attr_node { nullptr };

auto node = s_attr_node.load();
if (node)
return Attribute::make_attribute(node);

Attribute attr = c.create_attribute("event.duration.ns", CALI_TYPE_UINT, CALI_ATTR_ASVALUE | CALI_ATTR_AGGREGATABLE | CALI_ATTR_SKIP_EVENTS);
s_attr_node.store(attr.node());

return attr;
}

Node async_event_root_node { CALI_INV_ID, CALI_INV_ID, Variant() };

}

void TimedAsyncEvent::end()
{
uint64_t nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(clock::now() - start_time_).count();

Caliper c;
Attribute duration_attr = ::get_event_duration_attr(c);
const Entry data[2] = {
{ end_tree_node_ }, { duration_attr, cali_make_variant_from_uint(nsec) }
};

c.async_event(SnapshotView(2, data));
}

TimedAsyncEvent TimedAsyncEvent::begin(const char* message)
{
Caliper c;
Attribute begin_attr = ::get_async_event_begin_attr(c);
Attribute end_attr = ::get_async_event_end_attr(c);

Variant v_msg(message);
Node* begin_node = c.make_tree_entry(begin_attr, v_msg, &::async_event_root_node);
Node* end_node = c.make_tree_entry(end_attr, v_msg, &::async_event_root_node);

c.async_event(SnapshotView(Entry(begin_node)));

return TimedAsyncEvent(end_node);
}
1 change: 1 addition & 0 deletions src/caliper/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
set(CALIPER_RUNTIME_SOURCES
Annotation.cpp
AnnotationBinding.cpp
AsyncEvent.cpp
Blackboard.cpp
Caliper.cpp
ChannelController.cpp
Expand Down
8 changes: 8 additions & 0 deletions src/caliper/Caliper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1065,6 +1065,14 @@ void Caliper::set(const Attribute& attr, const Variant& data)
handle_set(attr, data, prop, sG->process_blackboard, sT->tree);
}

void Caliper::async_event(SnapshotView info)
{
std::lock_guard<::siglock> g(sT->lock);

for (auto& channel : sG->active_channels)
channel.mP->events.async_event(this, &channel, info);
}

void Caliper::begin(Channel* channel, const Attribute& attr, const Variant& data)
{
int prop = attr.properties();
Expand Down
Loading

0 comments on commit 9895b2f

Please sign in to comment.