Skip to content

Commit

Permalink
Do code refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
thamindumk committed Feb 11, 2025
1 parent 09752ec commit fcb0d20
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 66 deletions.
81 changes: 16 additions & 65 deletions src/frontend/JasmineGraphFrontEnd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -966,14 +966,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
}

// Get user response.
char exist_graph[FRONTEND_DATA_LENGTH + 1];
bzero(exist_graph, FRONTEND_DATA_LENGTH + 1);
read(connFd, exist_graph, FRONTEND_DATA_LENGTH);
string exist_g(exist_graph);
exist_g = Utils::trim_copy(exist_g);
for (char &c : exist_g) {
c = tolower(c);
}
string exist_g = Utils::getFrontendInput(connFd);
string graphId;
string partitionAlgo;
string direction;
Expand All @@ -987,14 +980,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
return;
}
// Get user response.
char exist_graph_id[FRONTEND_DATA_LENGTH + 1];
bzero(exist_graph_id, FRONTEND_DATA_LENGTH + 1);
read(connFd, exist_graph_id, FRONTEND_DATA_LENGTH);
string exist_g_i(exist_graph_id);
exist_g_i = Utils::trim_copy(exist_g_i);
for (char &c : exist_g_i) {
c = tolower(c);
}
string exist_g_i = Utils::getFrontendInput(connFd);

bool isExist = sqlite->isGraphIdExist(exist_g_i);
if (!isExist) {
Expand All @@ -1007,7 +993,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
}
return;
}
string exist_success_msg = "Set data streaming into graph ID: "+exist_g_i;
string exist_success_msg = "Set data streaming into graph ID: " + exist_g_i;
result_wr = write(connFd, exist_success_msg.c_str(), exist_success_msg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
Expand All @@ -1029,22 +1015,15 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
return;
}
graphId = to_string(nextID);
string default_id = "Do you use Default graph ID: "+ graphId +"(y/n) ? ";
string default_id = "Do you use default graph ID: "+ graphId +"(y/n) ? ";
int result_wr = write(connFd, default_id.c_str(), default_id.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
*loop_exit_p = true;
return;
}
// Get user response.
char default_graph_id[FRONTEND_DATA_LENGTH + 1];
bzero(default_graph_id, FRONTEND_DATA_LENGTH + 1);
read(connFd, default_graph_id, FRONTEND_DATA_LENGTH);
string default_g_i(default_graph_id);
default_g_i = Utils::trim_copy(default_g_i);
for (char &c : default_g_i) {
c = tolower(c);
}
string default_g_i = Utils::getFrontendInput(connFd);

if (default_g_i != "y") {
string input_graph_id = "Input your graph ID: ";
Expand All @@ -1056,18 +1035,11 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
}

// Get user response.
char graph_id[FRONTEND_DATA_LENGTH + 1];
bzero(graph_id, FRONTEND_DATA_LENGTH + 1);
read(connFd, graph_id, FRONTEND_DATA_LENGTH);
string user_graph_id(graph_id);
user_graph_id = Utils::trim_copy(user_graph_id);
for (char &c : user_graph_id) {
c = tolower(c);
}
string user_graph_id = Utils::getFrontendInput(connFd);

bool isExist = sqlite->isGraphIdExist(user_graph_id);
if (isExist) {
string Err_msg = "Error: Graph ID you entered is already exist";
string Err_msg = "Error: Graph ID you entered already exists";
result_wr = write(connFd, Err_msg.c_str(), Err_msg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
Expand All @@ -1093,7 +1065,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
graphId = user_graph_id;
}

std::string partition_selection = "Select the partition technique\n"
std::string partition_selection = "Select the partitioning technique\n"
"\toption 1: Hash partitioning\n"
"\toption 2: Fennel partitioning\n"
"\toption 3: LDG partitioning\n"
Expand All @@ -1105,17 +1077,10 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
return;
}
// Get user response.
char partition_algo[FRONTEND_DATA_LENGTH + 1];
bzero(partition_algo, FRONTEND_DATA_LENGTH + 1);
read(connFd, partition_algo, FRONTEND_DATA_LENGTH);
string partition_a(partition_algo);
partition_a = Utils::trim_copy(partition_a);
for (char &c : partition_a) {
c = tolower(c);
}
string partition_algo = Utils::getFrontendInput(connFd);

if (partition_a == "1" || partition_a == "2" || partition_a == "3") {
string partition_success_msg = "Set partition technique: "+partition_a;
if (partition_algo == "1" || partition_algo == "2" || partition_algo == "3") {
string partition_success_msg = "Set partition technique: " + partition_algo;
result_wr = write(connFd, partition_success_msg.c_str(), partition_success_msg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
Expand All @@ -1128,9 +1093,9 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
*loop_exit_p = true;
return;
}
partitionAlgo = partition_a;
partitionAlgo = partition_algo;
} else {
string Err_msg = "Error: invalid partition option: "+partition_a;
string Err_msg = "Error: invalid partition option: "+partition_algo;
result_wr = write(connFd, Err_msg.c_str(), Err_msg.length());
if (result_wr < 0) {
frontend_logger.error("Error writing to socket");
Expand All @@ -1148,14 +1113,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
return;
}
// Get user response.
char isDirected[FRONTEND_DATA_LENGTH + 1];
bzero(isDirected, FRONTEND_DATA_LENGTH + 1);
read(connFd, isDirected, FRONTEND_DATA_LENGTH);
string is_directed(isDirected);
is_directed = Utils::trim_copy(is_directed);
for (char &c : is_directed) {
c = tolower(c);
}
string is_directed = Utils::getFrontendInput(connFd);
if (is_directed == "y") {
direction = Conts::DIRECTED;
} else {
Expand Down Expand Up @@ -1186,17 +1144,10 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c
}

// Get user response.
char user_res[FRONTEND_DATA_LENGTH + 1];
bzero(user_res, FRONTEND_DATA_LENGTH + 1);
read(connFd, user_res, FRONTEND_DATA_LENGTH);
string user_res_s(user_res);
user_res_s = Utils::trim_copy(user_res_s);
for (char &c : user_res_s) {
c = tolower(c);
}
string default_kafka = Utils::getFrontendInput(connFd);
// use default kafka consumer details
string group_id = "knnect"; // TODO(sakeerthan): MOVE TO CONSTANT LATER
if (user_res_s == "y") {
if (default_kafka == "y") {
kafka_server_IP = Utils::getJasmineGraphProperty("org.jasminegraph.server.streaming.kafka.host");
configs = {{"metadata.broker.list", kafka_server_IP}, {"group.id", group_id}};
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/partitioner/stream/Partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ void Partitioner::updateMetaDB() {
}

bool Partitioner::getIsDirected() {
std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = "+std::to_string(this->graphID);
std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = " + std::to_string(this->graphID);
auto result = this->sqlite->runSelect(sqlStatement);
if (result[0][0].second == "0") {
return false;
Expand Down
10 changes: 10 additions & 0 deletions src/util/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1228,3 +1228,13 @@ void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hos

delete sqlite;
}

string Utils::getFrontendInput(int connFd) {
char frontendInput[FRONTEND_DATA_LENGTH + 1];
bzero(frontendInput, FRONTEND_DATA_LENGTH + 1);
read(connFd, frontendInput, FRONTEND_DATA_LENGTH);
std::string input(frontendInput);
input = Utils::trim_copy(input);
std::transform(input.begin(), input.end(), input.begin(), ::tolower);
return input;
}
2 changes: 2 additions & 0 deletions src/util/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ limitations under the License.

#include "../metadb/SQLiteDBInterface.h"
#include "../performancedb/PerformanceSQLiteDBInterface.h"
#include "../frontend/JasmineGraphFrontEndProtocol.h"
#include "Conts.h"

using std::map;
Expand Down Expand Up @@ -189,6 +190,7 @@ class Utils {
static bool transferPartition(std::string sourceWorker, int sourceWorkerPort, std::string destinationWorker,
int destinationWorkerDataPort, std::string graphID, std::string partitionID,
std::string workerID, SQLiteDBInterface *sqlite);
static string getFrontendInput(int connFd);
};

#endif // JASMINEGRAPH_UTILS_H

0 comments on commit fcb0d20

Please sign in to comment.