Skip to content

Commit

Permalink
Per review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
renukamanavalan committed Jun 2, 2022
1 parent 3328970 commit 66632f6
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 31 deletions.
28 changes: 14 additions & 14 deletions common/events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,11 @@ EventSubscriber::~EventSubscriber()
/* Shadow the cache init request, as it is async */
m_event_service.send_recv(EVENT_ECHO);

/* read for 2 seconds in non-block mode, to drain any local cache */
/*
* read for a second in non-block mode, to drain any local cache.
* Break if no event locally available or 1 second passed, whichever
* comes earlier.
*/
chrono::steady_clock::time_point start = chrono::steady_clock::now();
while(true) {
string source, evt_str;
Expand All @@ -214,19 +218,15 @@ EventSubscriber::~EventSubscriber()
rc = zmq_message_read(m_socket, ZMQ_DONTWAIT, source, evt_data);
if (rc == -1) {
if (zerrno == EAGAIN) {
/* Try again after a small pause */
this_thread::sleep_for(chrono::milliseconds(10));
}
else {
break;
rc = 0;
}
break;
}
else {
serialize(evt_data, evt_str);
events.push_back(evt_str);
}

serialize(evt_data, evt_str);
events.push_back(evt_str);
chrono::steady_clock::time_point now = chrono::steady_clock::now();
if (chrono::duration_cast<std::chrono::milliseconds>(now - start).count() >
if (chrono::duration_cast<chrono::milliseconds>(now - start).count() >
CACHE_DRAIN_IN_MILLISECS)
break;
}
Expand Down Expand Up @@ -399,12 +399,12 @@ events_init_subscriber(bool use_cache, int recv_timeout,
const event_subscribe_sources_t *sources)
{
if (s_subscriber == NULL) {
EventSubscriber *p = new EventSubscriber();
EventSubscriber *sub = new EventSubscriber();

RET_ON_ERR(p->init(use_cache, recv_timeout, sources) == 0,
RET_ON_ERR(sub->init(use_cache, recv_timeout, sources) == 0,
"Failed to init subscriber");

s_subscriber = p;
s_subscriber = sub;
}
out:
return s_subscriber;
Expand Down
6 changes: 3 additions & 3 deletions common/events_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,11 @@ get_config(const string key)
const string
get_timestamp()
{
std::stringstream ss, sfrac;
stringstream ss, sfrac;

auto timepoint = system_clock::now();
std::time_t tt = system_clock::to_time_t (timepoint);
struct std::tm * ptm = std::localtime(&tt);
time_t tt = system_clock::to_time_t (timepoint);
struct tm * ptm = localtime(&tt);

uint64_t ms = duration_cast<microseconds>(timepoint.time_since_epoch()).count();
uint64_t sec = duration_cast<seconds>(timepoint.time_since_epoch()).count();
Expand Down
26 changes: 13 additions & 13 deletions common/events_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,34 +63,34 @@ extern int running_ut;
* std::cout << type_name<decltype(t)>() << '\n';
* std::cout << type_name<decltype(tt_t)>() << '\n';
*/
template <typename T> std::string type_name();
template <typename T> string type_name();

template <class T>
std::string
string
type_name()
{
typedef typename std::remove_reference<T>::type TR;
std::unique_ptr<char, void(*)(void*)> own
typedef typename remove_reference<T>::type TR;
unique_ptr<char, void(*)(void*)> own
(
abi::__cxa_demangle(typeid(TR).name(), nullptr,
nullptr, nullptr),
std::free
free
);
std::string r = own != nullptr ? own.get() : typeid(TR).name();
if (std::is_const<TR>::value)
string r = own != nullptr ? own.get() : typeid(TR).name();
if (is_const<TR>::value)
r += " const";
if (std::is_volatile<TR>::value)
if (is_volatile<TR>::value)
r += " volatile";
if (std::is_lvalue_reference<T>::value)
if (is_lvalue_reference<T>::value)
r += "&";
else if (std::is_rvalue_reference<T>::value)
else if (is_rvalue_reference<T>::value)
r += "&&";
return r;
}


template <class T>
std::string
string
get_typename(T &val)
{
return type_name<decltype(val)>();
Expand Down Expand Up @@ -174,7 +174,7 @@ int
serialize(const Map& data, string &s)
{
s.clear();
std::stringstream _ser_ss;
stringstream _ser_ss;
boost::archive::text_oarchive oarch(_ser_ss);

try {
Expand All @@ -195,7 +195,7 @@ template <typename Map>
int
deserialize(const string& s, Map& data)
{
std::stringstream ss(s);
stringstream ss(s);
boost::archive::text_iarchive iarch(ss);

try {
Expand Down
2 changes: 1 addition & 1 deletion common/events_pi.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class EventPublisher : public events_base

int init(const string event_source);

int publish(const std::string event_tag,
int publish(const string event_tag,
const event_params_t *params);
private:

Expand Down

0 comments on commit 66632f6

Please sign in to comment.