Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Bugs in Nativestore Implementation. #212

Merged
merged 20 commits into from
Dec 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 19 additions & 13 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ set(HEADERS src/backend/JasmineGraphBackend.h
src/backend/JasmineGraphBackendProtocol.h
src/centralstore/JasmineGraphHashMapCentralStore.h
src/centralstore/JasmineGraphHashMapDuplicateCentralStore.h
src/centralstore/incremental/DataPublisher.h
src/centralstore/incremental/NodeBlock.h
src/centralstore/incremental/NodeManager.h
src/centralstore/incremental/PropertyLink.h
src/centralstore/incremental/RelationBlock.h
src/exception/JasmineGraphException.h
src/frontend/JasmineGraphFrontEnd.h
src/frontend/JasmineGraphFrontEndProtocol.h
Expand Down Expand Up @@ -54,23 +49,26 @@ set(HEADERS src/backend/JasmineGraphBackend.h
src/util/dbutil/edgestore_generated.h
src/util/dbutil/partedgemapstore_generated.h
src/util/kafka/KafkaCC.h
src/util/kafka/StreamHandler.h
src/util/logger/Logger.h
src/util/scheduler/Cron.h
src/util/scheduler/InterruptableSleep.h
src/util/scheduler/Scheduler.h
src/util/scheduler/SchedulerService.h
src/util/scheduler/ctpl_stl.h
)
src/nativestore/NodeManager.h
src/nativestore/NodeBlock.h
src/nativestore/PropertyLink.h
src/nativestore/PropertyEdgeLink.h
src/nativestore/RelationBlock.h
src/nativestore/DataPublisher.h
src/partitioner/stream/Partition.h
)

set(SOURCES src/backend/JasmineGraphBackend.cpp
src/backend/JasmineGraphBackendProtocol.cpp
src/centralstore/JasmineGraphHashMapCentralStore.cpp
src/centralstore/JasmineGraphHashMapDuplicateCentralStore.cpp
src/centralstore/incremental/DataPublisher.cpp
src/centralstore/incremental/NodeBlock.cpp
src/centralstore/incremental/NodeManager.cpp
src/centralstore/incremental/PropertyLink.cpp
src/centralstore/incremental/RelationBlock.cpp
src/exception/JasmineGraphException.cpp
src/frontend/JasmineGraphFrontEnd.cpp
src/frontend/JasmineGraphFrontEndProtocol.cpp
Expand Down Expand Up @@ -108,9 +106,17 @@ set(SOURCES src/backend/JasmineGraphBackend.cpp
src/util/PlacesToNodeMapper.cpp
src/util/Utils.cpp
src/util/kafka/KafkaCC.cpp
src/util/kafka/StreamHandler.cpp
src/util/logger/Logger.cpp
src/util/scheduler/SchedulerService.cpp
)
src/nativestore/NodeManager.cpp
src/nativestore/NodeBlock.cpp
src/nativestore/PropertyLink.cpp
src/nativestore/PropertyEdgeLink.cpp
src/nativestore/RelationBlock.cpp
src/nativestore/DataPublisher.cpp
src/partitioner/stream/Partition.cpp
)

add_library(JasmineGraphLib ${HEADERS} ${SOURCES})
add_executable(JasmineGraph main.h main.cpp)
Expand Down Expand Up @@ -170,4 +176,4 @@ if (CMAKE_BUILD_TYPE STREQUAL "DEBUG")
EXECUTABLE JasmineGraphUnitTest
BASE_DIRECTORY "${PROJECT_SOURCE_DIR}/src"
)
endif ()
endif ()
360 changes: 0 additions & 360 deletions src/centralstore/incremental/RelationBlock.cpp

This file was deleted.

68 changes: 17 additions & 51 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ limitations under the License.
#include <set>
#include <thread>

#include "../centralstore/incremental/DataPublisher.h"
#include "../centralstore/incremental/RelationBlock.h"
#include "../nativestore/DataPublisher.h"
#include "../nativestore/RelationBlock.h"
#include "../metadb/SQLiteDBInterface.h"
#include "../ml/trainer/JasmineGraphTrainingSchedular.h"
#include "../partitioner/local/JSONParser.h"
Expand All @@ -41,6 +41,7 @@ limitations under the License.
#include "../server/JasmineGraphServer.h"
#include "../util/Conts.h"
#include "../util/kafka/KafkaCC.h"
#include "../util/kafka/StreamHandler.h"
#include "../util/logger/Logger.h"
#include "JasmineGraphFrontEndProtocol.h"
#include "core/CoreConstants.h"
Expand Down Expand Up @@ -69,7 +70,7 @@ static void add_graph_cust_command(std::string masterIP, int connFd, SQLiteDBInt
static void remove_graph_command(std::string masterIP, int connFd, SQLiteDBInterface sqlite, bool *loop_exit_p);
static void add_model_command(int connFd, SQLiteDBInterface sqlite, bool *loop_exit_p);
static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, cppkafka::Configuration &configs,
KafkaConnector *&kstream, thread &input_stream_handler,
KafkaConnector *&kstream, thread &input_stream_handler_thread,
vector<DataPublisher *> &workerClients, int numberOfPartitions, bool *loop_exit_p);
static void stop_stream_kafka_command(int connFd, KafkaConnector *kstream, bool *loop_exit_p);
static void process_dataset_command(int connFd, bool *loop_exit_p);
Expand All @@ -88,44 +89,6 @@ static void predict_command(std::string masterIP, int connFd, SQLiteDBInterface
static void start_remote_worker_command(int connFd, bool *loop_exit_p);
static void sla_command(int connFd, SQLiteDBInterface sqlite, PerformanceSQLiteDBInterface perfSqlite,
bool *loop_exit_p);
// Thread function
void listen_to_kafka_topic(KafkaConnector *kstream, Partitioner &graphPartitioner,
vector<DataPublisher *> &workerClients) {
while (true) {
cppkafka::Message msg = kstream->consumer.poll();
if (!msg || msg.get_error()) {
continue;
}
string data(msg.get_payload());
if (data == "-1") { // Marks the end of stream
frontend_logger.info("Received the end of `" + stream_topic_name + "` input kafka stream");
break;
}
auto edgeJson = json::parse(data);
auto sourceJson = edgeJson["source"];
auto destinationJson = edgeJson["destination"];
std::string sId = std::string(sourceJson["id"]);
std::string dId = std::string(destinationJson["id"]);
partitionedEdge partitionedEdge = graphPartitioner.addEdge({sId, dId});
sourceJson["pid"] = partitionedEdge[0].second;
destinationJson["pid"] = partitionedEdge[1].second;
string source = sourceJson.dump();
string destination = destinationJson.dump();
json obj;
obj["source"] = sourceJson;
obj["destination"] = destinationJson;
long temp_s = partitionedEdge[0].second;
long temp_d = partitionedEdge[1].second;
workerClients.at((int)partitionedEdge[0].second)->publish(sourceJson.dump());
workerClients.at((int)partitionedEdge[1].second)->publish(destinationJson.dump());
// storing Node block
if (temp_s == temp_d) {
// +miyurud: Temorarily commeting the following line to make the code build
// workerClients.at((int) partitionedEdge[0].second)->publish_relation(obj.dump());
}
}
graphPartitioner.printStats();
}

void *frontendservicesesion(std::string masterIP, int connFd, SQLiteDBInterface sqlite,
PerformanceSQLiteDBInterface perfSqlite, JobScheduler jobScheduler) {
Expand Down Expand Up @@ -1151,10 +1114,10 @@ static void add_model_command(int connFd, SQLiteDBInterface sqlite, bool *loop_e
}

static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, cppkafka::Configuration &configs,
KafkaConnector *&kstream, thread &input_stream_handler,
KafkaConnector *&kstream, thread &input_stream_handler_thread,
vector<DataPublisher *> &workerClients, int numberOfPartitions,
bool *loop_exit_p) {
string msg_1 = "DO you want to use default KAFKA consumer(y/n) ? ";
string msg_1 = "Do you want to use default KAFKA consumer(y/n) ?";
int result_wr = write(connFd, msg_1.c_str(), msg_1.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
Expand Down Expand Up @@ -1244,25 +1207,28 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
char topic_name[FRONTEND_DATA_LENGTH + 1];
bzero(topic_name, FRONTEND_DATA_LENGTH + 1);
read(connFd, topic_name, FRONTEND_DATA_LENGTH);

string topic_name_s(topic_name);
topic_name_s = Utils::trim_copy(topic_name_s, " \f\n\r\t\v");
string con_message = "Received the kafka topic";
int con_result_wr = write(connFd, con_message.c_str(), con_message.length());
if (con_result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}

// create kafka consumer and graph partitioner
// create kafka consumer and graph partitioner
kstream = new KafkaConnector(configs);
// Create the Partitioner object.
Partitioner graphPartitioner(numberOfPartitions, 1, spt::Algorithms::HASH);

string topic_name_s(topic_name);
topic_name_s = Utils::trim_copy(topic_name_s, " \f\n\r\t\v");
stream_topic_name = topic_name_s;
// Create the KafkaConnector object.
kstream = new KafkaConnector(configs);
// Subscribe to the Kafka topic.
kstream->Subscribe(topic_name_s);
// Create the StreamHandler object.
StreamHandler* stream_handler = new StreamHandler(kstream, graphPartitioner, workerClients);

frontend_logger.info("Start listening to " + topic_name_s);
input_stream_handler = thread(listen_to_kafka_topic, kstream, std::ref(graphPartitioner), std::ref(workerClients));
input_stream_handler_thread = thread(&StreamHandler::listen_to_kafka_topic, stream_handler);
}

static void stop_stream_kafka_command(int connFd, KafkaConnector *kstream, bool *loop_exit_p) {
Expand Down
1 change: 1 addition & 0 deletions src/frontend/JasmineGraphFrontEndProtocol.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const string SHTDN = "shdn";
const string SEND = "send";
const string ERROR = "error";
const string ADD_STREAM_KAFKA = "adstrmk";
const string ADD_STREAM_KAFKA_CSV = "adstrmkcsv";
const string STOP_STREAM_KAFKA = "stopstrm";
const string STREAM_TOPIC_NAME = "topicnm";
const string PROCESS_DATASET = "process_dataset";
Expand Down
15 changes: 10 additions & 5 deletions src/localstore/incremental/JasmineGraphIncrementalLocalStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ limitations under the License.
#include <memory>
#include <stdexcept>

#include "../../centralstore/incremental/RelationBlock.h"
#include "../../nativestore/RelationBlock.h"
#include "../../util/logger/Logger.h"

Logger incremental_localstore_logger;
Expand All @@ -34,11 +34,11 @@ std::pair<std::string, unsigned int> JasmineGraphIncrementalLocalStore::getIDs(s
auto edgeJson = json::parse(edgeString);
if (edgeJson.contains("properties")) {
auto edgeProperties = edgeJson["properties"];
return {edgeProperties["graphId"], 0};
return {edgeProperties["graphId"], edgeJson["PID"]};
}
} catch (const std::exception&) { // TODO tmkasun: Handle multiple types of exceptions
} catch (const std::exception& e) { // TODO tmkasun: Handle multiple types of exceptions
incremental_localstore_logger.log(
"Error while processing edge data = " + edgeString +
"Error while processing edge data = " + std::string(e.what()) +
"Could be due to JSON parsing error or error while persisting the data to disk",
"error");
}
Expand All @@ -54,7 +54,12 @@ void JasmineGraphIncrementalLocalStore::addEdgeFromString(std::string edgeString
std::string sId = std::string(sourceJson["id"]);
std::string dId = std::string(destinationJson["id"]);

RelationBlock* newRelation = this->nm->addEdge({sId, dId});
RelationBlock* newRelation;
if (edgeJson["EdgeType"] == "Central") {
newRelation = this->nm->addCentralEdge({sId, dId});
} else {
newRelation = this->nm->addEdge({sId, dId});
}
if (!newRelation) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ limitations under the License.
#include <string>
using json = nlohmann::json;

#include "../../centralstore/incremental/NodeManager.h"
#include "../../nativestore/NodeManager.h"
#ifndef Incremental_LocalStore
#define Incremental_LocalStore

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@

#include "./DataPublisher.h"

#include "../../server/JasmineGraphInstanceProtocol.h"
#include "../../util/Utils.h"
#include "../../util/logger/Logger.h"
#include "../server/JasmineGraphInstanceProtocol.h"
#include "../util/Utils.h"
#include "../util/logger/Logger.h"

Logger data_publisher_logger;

Expand All @@ -26,7 +26,7 @@ DataPublisher::DataPublisher(int worker_port, std::string worker_address) {

server = gethostbyname(worker_address.c_str());
if (server == NULL) {
data_publisher_logger.error("ERROR, no host named " + worker_address);
data_publisher_logger.log("ERROR, no host named " + worker_address, "error");
exit(0);
}

Expand All @@ -45,60 +45,47 @@ DataPublisher::DataPublisher(int worker_port, std::string worker_address) {
DataPublisher::~DataPublisher() { close(sock); }

void DataPublisher::publish(std::string message) {
char recever_buffer[MAX_STREAMING_DATA_LENGTH] = {0};
char receiver_buffer[MAX_STREAMING_DATA_LENGTH] = {0};

// Send initial start sending edge command
send(this->sock, JasmineGraphInstanceProtocol::GRAPH_STREAM_START.c_str(),
JasmineGraphInstanceProtocol::GRAPH_STREAM_START.length(), 0);

char start_ack[1024] = {0};
// Wait to receve an ACK for initial start sending edge command
char start_ack[ACK_MESSAGE_SIZE] = {0};
auto ack_return_status = recv(this->sock, &start_ack, sizeof(start_ack), 0);
std::string ack(start_ack);
std::cout << ack << std::endl;
std::cout << JasmineGraphInstanceProtocol::GRAPH_STREAM_START_ACK << std::endl;
if (JasmineGraphInstanceProtocol::GRAPH_STREAM_START_ACK != ack) {
data_publisher_logger.error("Error while receiving start command ack\n");
}

int message_length = message.length();
int converted_number = htonl(message_length);
data_publisher_logger.info("Sending content length\n");
// Sending edge data content length
send(this->sock, &converted_number, sizeof(converted_number), 0);

int received_int = 0;
data_publisher_logger.info("Waiting for content length ack\n");
auto return_status = recv(this->sock, &received_int, sizeof(received_int), 0);
// Receve ack for edge data content length

if (return_status > 0) {
data_publisher_logger.info("Received int =" + std::to_string(ntohl(received_int)));
} else {
data_publisher_logger.error("Error while receiving content length ack\n");
}
// Sending edge data
send(this->sock, message.c_str(), message.length(), 0);
data_publisher_logger.info("Edge data sent\n");
char CRLF;
do {
// read a single byte
auto return_status = recv(this->sock, &CRLF, sizeof(CRLF), 0);
if (return_status < 1) {
// error or disconnect
return;
}

// has end of line been reached?
if (CRLF == '\r') {
// read a single byte
auto return_status = recv(this->sock, &CRLF, sizeof(CRLF), 0);
if (return_status < 1) {
// error or disconnect
return;
}
if (CRLF == '\n') {
break; // yes
break;
}
}
} while (true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#ifndef WORKER_DATA_PUBLISHER
#define WORKER_DATA_PUBLISHER

static const int ACK_MESSAGE_SIZE = 1024;

class DataPublisher {
private:
int sock = 0, valread, worker_port;
Expand All @@ -33,7 +35,12 @@ class DataPublisher {
public:
DataPublisher(int, std::string);
void publish(std::string);
void publish_relation(std::string);
void publish_edge(std::string);

~DataPublisher();

void publish_central_relation(std::string message);
};

#endif // !Worker_data_publisher
Loading