Skip to content

Commit

Permalink
Merge pull request #375 from JeffersonLab/nbrei_arrow_cleanup
Browse files Browse the repository at this point in the history
Arrow cleanup
  • Loading branch information
nathanwbrei authored Oct 18, 2024
2 parents 073d8a4 + dc13597 commit 7b51136
Show file tree
Hide file tree
Showing 30 changed files with 346 additions and 567 deletions.
19 changes: 10 additions & 9 deletions src/examples/SubeventCUDAExample/SubeventCUDAExample.cu
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

#include <JANA/JApplication.h>
#include <JANA/JObject.h>
#include <JANA/Engine/JSubeventArrow.h>
#include <JANA/JEventSource.h>
#include <JANA/JEventProcessor.h>
#include "JANA/Engine/JTopologyBuilder.h"
#include <JANA/Topology/JSubeventArrow.h>


struct MyInput : public JObject {
Expand Down Expand Up @@ -128,8 +128,7 @@ int main() {
JMailbox <SubeventWrapper<MyOutput>> subevents_out;

auto split_arrow = new JSplitArrow<MyInput, MyOutput>("split", &processor, &events_in, &subevents_in);
auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in,
&subevents_out);
auto subprocess_arrow = new JSubeventArrow<MyInput, MyOutput>("subprocess", &processor, &subevents_in, &subevents_out);
auto merge_arrow = new JMergeArrow<MyInput, MyOutput>("merge", &processor, &subevents_out, &events_out);

JApplication app;
Expand All @@ -140,12 +139,14 @@ int main() {
source->SetNEvents(10); // limit ourselves to 10 events. Note that the 'jana:nevents' param won't work
// here because we aren't using JComponentManager to manage the EventSource

auto topology = app.GetService<JTopologyBuilder>()->create_empty();
auto source_arrow = new JEventSourceArrow("simpleSource",
{source},
&events_in,
topology->event_pool);
auto proc_arrow = new JEventProcessorArrow("simpleProcessor", &events_out, nullptr, topology->event_pool);
auto topology = app.GetService<JTopologyBuilder>();
auto source_arrow = new JEventSourceArrow("simpleSource", {source});
source_arrow->set_input(topology->event_pool);
source_arrow->set_output(&events_in);

auto proc_arrow = new JEventMapArrow("simpleProcessor");
proc_arrow->set_input(&events_out);
proc_arrow->set_output(topology->event_pool);
proc_arrow->add_processor(new SimpleProcessor);

topology->arrows.push_back(source_arrow);
Expand Down
6 changes: 3 additions & 3 deletions src/examples/SubeventExample/SubeventExample.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
#include <JANA/JEventProcessor.h>

#include <JANA/Topology/JEventSourceArrow.h>
#include <JANA/Topology/JEventProcessorArrow.h>
#include <JANA/Topology/JEventMapArrow.h>
#include <JANA/Topology/JSubeventArrow.h>
#include "JANA/Topology/JTopologyBuilder.h"
#include <JANA/Topology/JTopologyBuilder.h>


struct MyInput : public JObject {
Expand Down Expand Up @@ -111,7 +111,7 @@ int main() {
source_arrow->set_input(topology->event_pool);
source_arrow->set_output(&events_in);

auto proc_arrow = new JEventProcessorArrow("simpleProcessor");
auto proc_arrow = new JEventMapArrow("simpleProcessor");
proc_arrow->set_input(&events_out);
proc_arrow->set_output(topology->event_pool);
proc_arrow->add_processor(new SimpleProcessor);
Expand Down
1 change: 0 additions & 1 deletion src/libraries/JANA/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ set(JANA2_SOURCES
Engine/JPerfMetrics.cc
Engine/JPerfSummary.cc

Topology/JEventProcessorArrow.cc
Topology/JEventSourceArrow.cc
Topology/JEventMapArrow.cc
Topology/JEventTapArrow.cc
Expand Down
84 changes: 37 additions & 47 deletions src/libraries/JANA/Topology/JArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,24 @@
#include <JANA/JLogger.h>
#include <JANA/JException.h>
#include <JANA/Topology/JMailbox.h>
#include <JANA/Topology/JPool.h>
#include <JANA/Topology/JEventPool.h>


#ifndef JANA2_ARROWDATA_MAX_SIZE
#define JANA2_ARROWDATA_MAX_SIZE 10
#endif

struct PlaceRefBase;
struct Place;

using EventT = std::shared_ptr<JEvent>;

class JArrow {
private:
const std::string m_name; // Used for human understanding
const bool m_is_parallel; // Whether or not it is safe to parallelize
const bool m_is_source; // Whether or not this arrow should activate/drain the topology
bool m_is_sink; // Whether or not tnis arrow contributes to the final event count
JArrowMetrics m_metrics; // Performance information accumulated over all workers
std::string m_name; // Used for human understanding
bool m_is_parallel; // Whether or not it is safe to parallelize
bool m_is_source; // Whether or not this arrow should activate/drain the topology
bool m_is_sink; // Whether or not tnis arrow contributes to the final event count
JArrowMetrics m_metrics; // Performance information accumulated over all workers

friend class JScheduler;
std::vector<JArrow *> m_listeners; // Downstream Arrows
Expand All @@ -34,27 +36,22 @@ class JArrow {
// This is usable by subclasses.
JLogger m_logger;
friend class JTopologyBuilder;
std::vector<PlaceRefBase*> m_places; // Will eventually supplant m_listeners, m_chunksize
std::vector<Place*> m_places; // Will eventually supplant m_listeners

public:
std::string get_name() { return m_name; }
JLogger& get_logger() { return m_logger; }
bool is_parallel() { return m_is_parallel; }
bool is_source() { return m_is_source; }
bool is_sink() { return m_is_sink; }
JArrowMetrics& get_metrics() { return m_metrics; }

std::string get_name() { return m_name; }

void set_logger(JLogger logger) {
m_logger = logger;
}
void set_name(std::string name) { m_name = name; }
void set_logger(JLogger logger) { m_logger = logger; }
void set_is_parallel(bool is_parallel) { m_is_parallel = is_parallel; }
void set_is_source(bool is_source) { m_is_source = is_source; }
void set_is_sink(bool is_sink) { m_is_sink = is_sink; }

void set_is_sink(bool is_sink) {
m_is_sink = is_sink;
}

// TODO: Metrics should be encapsulated so that only actions are to update, clear, or summarize
JArrowMetrics& get_metrics() {
return m_metrics;
}

JArrow(std::string name, bool is_parallel, bool is_source, bool is_sink) :
m_name(std::move(name)), m_is_parallel(is_parallel), m_is_source(is_source), m_is_sink(is_sink) {
Expand All @@ -77,16 +74,15 @@ class JArrow {
m_listeners.push_back(downstream);
};

void attach(PlaceRefBase* place) {
void attach(Place* place) {
if (std::find(m_places.begin(), m_places.end(), place) == m_places.end()) {
m_places.push_back(place);
}
};
};

template <typename T>
struct Data {
std::array<T*, JANA2_ARROWDATA_MAX_SIZE> items;
std::array<EventT*, JANA2_ARROWDATA_MAX_SIZE> items;
size_t item_count = 0;
size_t reserve_count = 0;
size_t location_id;
Expand All @@ -96,59 +92,53 @@ struct Data {
}
};

struct PlaceRefBase {
struct Place {
void* place_ref = nullptr;
bool is_queue = true;
bool is_input = false;
size_t min_item_count = 1;
size_t max_item_count = 1;

virtual size_t get_pending() { return 0; }
};

template <typename T>
struct PlaceRef : public PlaceRefBase {

PlaceRef(JArrow* parent, bool is_input, size_t min_item_count, size_t max_item_count) {
Place(JArrow* parent, bool is_input, size_t min_item_count, size_t max_item_count) {
assert(parent != nullptr);
parent->attach(this);
this->is_input = is_input;
this->min_item_count = min_item_count;
this->max_item_count = max_item_count;
}

void set_queue(JMailbox<T*>* queue) {
void set_queue(JMailbox<EventT*>* queue) {
assert(queue != nullptr);
this->place_ref = queue;
this->is_queue = true;
}

void set_pool(JPool<T>* pool) {
void set_pool(JEventPool* pool) {
assert(pool != nullptr);
this->place_ref = pool;
this->is_queue = false;
}

size_t get_pending() override {
size_t get_pending() {
assert(place_ref != nullptr);
if (is_input && is_queue) {
auto queue = static_cast<JMailbox<T*>*>(place_ref);
auto queue = static_cast<JMailbox<EventT*>*>(place_ref);
return queue->size();
}
return 0;
}

bool pull(Data<T>& data) {
bool pull(Data& data) {
assert(place_ref != nullptr);
if (is_input) { // Actually pull the data
if (is_queue) {
auto queue = static_cast<JMailbox<T*>*>(place_ref);
auto queue = static_cast<JMailbox<EventT*>*>(place_ref);
data.item_count = queue->pop_and_reserve(data.items.data(), min_item_count, max_item_count, data.location_id);
data.reserve_count = data.item_count;
return (data.item_count >= min_item_count);
}
else {
auto pool = static_cast<JPool<T>*>(place_ref);
auto pool = static_cast<JEventPool*>(place_ref);
data.item_count = pool->pop(data.items.data(), min_item_count, max_item_count, data.location_id);
data.reserve_count = 0;
return (data.item_count >= min_item_count);
Expand All @@ -158,7 +148,7 @@ struct PlaceRef : public PlaceRefBase {
if (is_queue) {
// Reserve a space on the output queue
data.item_count = 0;
auto queue = static_cast<JMailbox<T*>*>(place_ref);
auto queue = static_cast<JMailbox<EventT*>*>(place_ref);
data.reserve_count = queue->reserve(min_item_count, max_item_count, data.location_id);
return (data.reserve_count >= min_item_count);
}
Expand All @@ -171,31 +161,31 @@ struct PlaceRef : public PlaceRefBase {
}
}

void revert(Data<T>& data) {
void revert(Data& data) {
assert(place_ref != nullptr);
if (is_queue) {
auto queue = static_cast<JMailbox<T*>*>(place_ref);
auto queue = static_cast<JMailbox<EventT*>*>(place_ref);
queue->push_and_unreserve(data.items.data(), data.item_count, data.reserve_count, data.location_id);
}
else {
if (is_input) {
auto pool = static_cast<JPool<T>*>(place_ref);
auto pool = static_cast<JEventPool*>(place_ref);
pool->push(data.items.data(), data.item_count, false, data.location_id);
}
}
}

size_t push(Data<T>& data) {
size_t push(Data& data) {
assert(place_ref != nullptr);
if (is_queue) {
auto queue = static_cast<JMailbox<T*>*>(place_ref);
auto queue = static_cast<JMailbox<EventT*>*>(place_ref);
queue->push_and_unreserve(data.items.data(), data.item_count, data.reserve_count, data.location_id);
data.item_count = 0;
data.reserve_count = 0;
return is_input ? 0 : data.item_count;
}
else {
auto pool = static_cast<JPool<T>*>(place_ref);
auto pool = static_cast<JEventPool*>(place_ref);
pool->push(data.items.data(), data.item_count, !is_input, data.location_id);
data.item_count = 0;
data.reserve_count = 0;
Expand All @@ -206,7 +196,7 @@ struct PlaceRef : public PlaceRefBase {

inline size_t JArrow::get_pending() {
size_t sum = 0;
for (PlaceRefBase* place : m_places) {
for (Place* place : m_places) {
sum += place->get_pending();
}
return sum;
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/JANA/Topology/JEventMapArrow.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ void JEventMapArrow::add_processor(JEventProcessor* processor) {
m_procs.push_back(processor);
}

void JEventMapArrow::process(Event* event, bool& success, JArrowMetrics::Status& status) {
void JEventMapArrow::process(std::shared_ptr<JEvent>* event, bool& success, JArrowMetrics::Status& status) {

LOG_DEBUG(m_logger) << "Executing arrow " << get_name() << " for event# " << (*event)->GetEventNumber() << LOG_END;
for (JEventSource* source : m_sources) {
Expand Down
6 changes: 2 additions & 4 deletions src/libraries/JANA/Topology/JEventMapArrow.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,8 @@ class JEventUnfolder;
class JEventProcessor;
class JEvent;

using Event = std::shared_ptr<JEvent>;
using EventQueue = JMailbox<Event*>;

class JEventMapArrow : public JPipelineArrow<JEventMapArrow, Event> {
class JEventMapArrow : public JPipelineArrow<JEventMapArrow> {

private:
std::vector<JEventSource*> m_sources;
Expand All @@ -28,7 +26,7 @@ class JEventMapArrow : public JPipelineArrow<JEventMapArrow, Event> {
void add_unfolder(JEventUnfolder* unfolder);
void add_processor(JEventProcessor* proc);

void process(Event* event, bool& success, JArrowMetrics::Status& status);
void process(std::shared_ptr<JEvent>* event, bool& success, JArrowMetrics::Status& status);

void initialize() final;
void finalize() final;
Expand Down
Loading

0 comments on commit 7b51136

Please sign in to comment.