Skip to content

Commit

Permalink
Merge pull request #126 from code-mancers/implement-checks-around-mes…
Browse files Browse the repository at this point in the history
…sage-counter

Implement checks around message counter
  • Loading branch information
Hemant Kumar committed Nov 10, 2014
2 parents 6e3b1f3 + 56a77ca commit 428ab4b
Show file tree
Hide file tree
Showing 14 changed files with 38 additions and 184 deletions.
7 changes: 5 additions & 2 deletions rbkit-lib/model/objectaggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ RBKit::ObjectAggregator::ObjectAggregator()

void RBKit::ObjectAggregator::objCreated(RBKit::ObjectDetailPtr object)
{
if (idToName.end() == idToName.find(object->objectId)) {
++typeToCount[object->className];
++totalObjects;
}

idToName[object->objectId] = object->className;
++typeToCount[object->className];
++totalObjects;
}

void RBKit::ObjectAggregator::objDeleted(quint64 key)
Expand Down
29 changes: 0 additions & 29 deletions rbkit-lib/model/objectdetail.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,32 +53,3 @@ QString RBKit::ObjectDetail::getFileLine()
else
return QString("%0:%1").arg(fileName).arg(lineNumber);
}


// ============================== static helper methods ==============================

RBKit::ObjectDetailPtr RBKit::payloadToObject(const QVariantMap& map)
{
auto objectId = map["object_id"].toULongLong();
auto className = map["class_name"].toString();

RBKit::ObjectDetailPtr object(new RBKit::ObjectDetail(className, objectId));
object->fileName = map["file"].toString();
object->lineNumber = map["line"].toInt();
object->addReferences(map["references"].toList());
object->size = map["size"].toInt();

return object;
}

QList<RBKit::ObjectDetailPtr> RBKit::payloadToObjects(const QVariantList& list)
{
QList<RBKit::ObjectDetailPtr> objects;

for (auto& entry : list) {
auto object = RBKit::payloadToObject(entry.toMap());
objects.append(object);
}

return objects;
}
3 changes: 0 additions & 3 deletions rbkit-lib/model/objectdetail.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ namespace RBKit

// typedef for the pointer.
typedef QSharedPointer<ObjectDetail> ObjectDetailPtr;

QList<ObjectDetailPtr> payloadToObjects(const QVariantList& list);
ObjectDetailPtr payloadToObject(const QVariantMap& map);
}

Q_DECLARE_METATYPE(RBKit::ObjectDetail)
Expand Down
6 changes: 4 additions & 2 deletions rbkit-lib/rbeventparser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ RBKit::EventParser::parseEvents(const msgpack::object& objarray) const
RBKit::EventParser::EventParser(const QByteArray& message)
: rawMessage(message)
{
if (1*1024*1024 < message.size()) {
if (5*1024*1024 < message.size()) {
qDebug() << "probably got objdump" << QTime::currentTime();
}

Expand All @@ -94,8 +94,10 @@ RBKit::EventDataBase* RBKit::EventParser::parseEvent() const
auto timestamp = map[RBKit::EfTimestamp].as<double>();
auto ts = QDateTime::fromMSecsSinceEpoch(timestamp);

auto counter = map[RBKit::EfMessageCounter].as<unsigned long long>();

auto events = parseEvents(map[RBKit::EfPayload]);
return new RBKit::EvtCollection(ts, eventType, events);
return new RBKit::EvtCollection(ts, eventType, events, counter);
}


Expand Down
146 changes: 3 additions & 143 deletions rbkit-lib/rbevents.cpp
Original file line number Diff line number Diff line change
@@ -1,147 +1,5 @@
#include <msgpack.hpp>
#include "subscriber.h"
#include "rbevents.h"
#include "mpparser.h"


static QVariantList parseMsgpackObjectArray(const msgpack::object_array&);
static QVariantMap parseMsgpackObjectMap(const msgpack::object_map&);
static QList<RBKit::EventPtr> parseEventCollection(const QVariantList&);


static QVariant parseMsgpackObject(const msgpack::object& obj)
{
switch (obj.type) {
case msgpack::type::ARRAY :
return QVariant(parseMsgpackObjectArray(obj.via.array));
case msgpack::type::MAP :
return QVariant(parseMsgpackObjectMap(obj.via.map));

case msgpack::type::RAW :
return QVariant(RBKit::StringUtil::rawToQString(obj));
case msgpack::type::DOUBLE :
return QVariant(obj.via.dec);
case msgpack::type::POSITIVE_INTEGER :
return QVariant((unsigned long long int)(obj.via.u64));
case msgpack::type::NIL :
return QVariant("");

default:
qDebug() << "throwing error while parsing event" << obj.type;
throw "unknown object type";
}
}

// NOTE: This can be improved with the version that hemant is writing for GCStats.
static QVariantMap parseMsgpackObjectMap(const msgpack::object_map& obj)
{
QVariantMap map;

msgpack::object_kv* list = obj.ptr;
for (uint32_t iter = 0; iter != obj.size; ++iter) {
msgpack::object key = list->key;
msgpack::object val = list->val;

// qDebug() << key.type << val.type;

QString keyStr = RBKit::StringUtil::rawToQString(key);
map[keyStr] = parseMsgpackObject(val);

++list;
}

return map;
}

RBKit::EventDataBase* RBKit::makeEventFromQVariantMap(const QVariantMap &map) {
QDateTime timestamp = QDateTime::fromMSecsSinceEpoch(map["timestamp"].toULongLong());
auto eventType = static_cast<RBKit::EventType>( map["event_type"].toInt() );

RBKit::EventDataBase* event(nullptr);
switch (eventType) {
case RBKit::EtObjCreated:
{
auto object = RBKit::payloadToObject(map["payload"].toMap());
event = new RBKit::EvtNewObject(timestamp, eventType, object);
}
break;

case RBKit::EtObjDestroyed:
// event = new RBKit::EvtDelObject(timestamp, eventType, map["payload"].toMap());
break;

case RBKit::EtGcStats:
event = new RBKit::EvtGcStats(timestamp, eventType, map["payload"].toMap());
break;

case RBKit::EtGcStart:
event = new RBKit::EvtGcStart(timestamp, eventType);
break;

case RBKit::EtGcStartM:
event = new RBKit::EvtGcStartM(timestamp, eventType);
break;

case RBKit::EtGcEndS:
event = new RBKit::EvtGcStop(timestamp, eventType);
break;

case RBKit::EtObjectSpaceDump:
{
auto objects = RBKit::payloadToObjects(map["payload"].toList());
event = new RBKit::EvtObjectDump(timestamp, eventType, objects);
}
break;

case RBKit::EtEventCollection:
{
auto events = parseEventCollection(map["payload"].toList());
event = new RBKit::EvtCollection(timestamp, eventType, events);
}
break;

default:
qDebug() << "Unable to parse event of type: " << eventType;
}

return event;
}


static QVariantList parseMsgpackObjectArray(const msgpack::object_array& array)
{
QVariantList objList;

for (uint32_t iter = 0; iter != array.size; ++iter) {
objList.append(parseMsgpackObject(array.ptr[iter]));
}

return objList;
}


RBKit::EventDataBase* RBKit::parseEvent(const QByteArray& message)
{
msgpack::unpacked unpackedMessage;
msgpack::unpack(&unpackedMessage, message.data(), message.size());

msgpack::object_map obj = unpackedMessage.get().via.map;

QVariantMap map = parseMsgpackObjectMap(obj);
return makeEventFromQVariantMap(map);
}

static QList<RBKit::EventPtr> parseEventCollection(const QVariantList& list)
{
QList<RBKit::EventPtr> events;

for (auto& eventMap : list) {
auto event = RBKit::makeEventFromQVariantMap(eventMap.toMap());
events.append(RBKit::EventPtr(event));
}

return events;
}


// ============================== different events ==============================
Expand Down Expand Up @@ -231,9 +89,11 @@ void RBKit::EvtObjectDump::process(Subscriber& processor) const
}

RBKit::EvtCollection::EvtCollection(QDateTime ts, RBKit::EventType eventType,
QList<RBKit::EventPtr> _events)
QList<RBKit::EventPtr> _events,
quint64 _counter)
: EventDataBase(ts, eventType)
, events(_events)
, messageCounter(_counter)
{}

void RBKit::EvtCollection::process(Subscriber& processor) const
Expand Down
7 changes: 3 additions & 4 deletions rbkit-lib/rbevents.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,13 @@ namespace RBKit
class EvtCollection : public EventDataBase
{
public:
EvtCollection(QDateTime ts, EventType eventType, QList<RBKit::EventPtr>);
EvtCollection(QDateTime ts, EventType eventType, QList<RBKit::EventPtr>,
quint64 counter);
void process(Subscriber& process) const;

QList<RBKit::EventPtr> events;
quint64 messageCounter;
};

EventDataBase* parseEvent(const QByteArray& rawMessage);
EventDataBase* makeEventFromQVariantMap(const QVariantMap& map);
}


Expand Down
19 changes: 18 additions & 1 deletion rbkit-lib/subscriber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@ void Subscriber::setContext(nzmqt::ZMQContext *value)
{
context = value;
}


Subscriber::Subscriber(RBKit::JsBridge* bridge)
:jsBridge(bridge), connectionEstablished(false)
: jsBridge(bridge)
, connectionEstablished(false)
, messageCounter(0)
{
qDebug() << "** Thread is is : " << QThread::currentThreadId();
}


void Subscriber::triggerGc() {
RBKit::CmdTriggerGC triggerGC_Command;
qDebug() << "Triggering GC";
Expand Down Expand Up @@ -209,6 +214,8 @@ void Subscriber::processEvent(const RBKit::EvtObjectDump& dump)

void Subscriber::processEvent(const RBKit::EvtCollection& evtCollection)
{
checkForMissingMessages(evtCollection.messageCounter);

for (auto& event : evtCollection.events) {
event->process(*this);
}
Expand Down Expand Up @@ -241,3 +248,13 @@ void Subscriber::onTimerExpiry()
QVariantMap map = hashToQVarMap(objectStore->liveStats());
jsBridge->sendMapToJs(eventName, QDateTime(), map);
}


void Subscriber::checkForMissingMessages(const quint64 counter)
{
if (counter != ++messageCounter) {
qDebug() << "missed message pack event messages from"
<< messageCounter << "to" << counter - 1;
messageCounter = counter;
}
}
5 changes: 5 additions & 0 deletions rbkit-lib/subscriber.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ class Subscriber : public QObject
nzmqt::ZMQContext *context;
bool connectionEstablished;

quint64 messageCounter;

public:
explicit Subscriber(RBKit::JsBridge* jsBridge);
~Subscriber();
Expand Down Expand Up @@ -76,6 +78,9 @@ public slots:
void triggerGc();
void takeSnapshot();
void startSubscriber();

private:
void checkForMissingMessages(const quint64 counter);
};

#endif // SUBSCRIBER_H
Binary file modified tests/msgpack/gc_start
Binary file not shown.
Binary file modified tests/msgpack/gcstats
Binary file not shown.
Binary file modified tests/msgpack/hugedump
Binary file not shown.
Binary file modified tests/msgpack/objcreated
Binary file not shown.
Binary file modified tests/msgpack/objdestroyed
Binary file not shown.
Binary file modified tests/msgpack/objectdump
Binary file not shown.

0 comments on commit 428ab4b

Please sign in to comment.