From fcb0d20d3543e48d4fbb1da33d3c82f5e92901f8 Mon Sep 17 00:00:00 2001 From: thamindumk Date: Tue, 11 Feb 2025 13:37:45 +0530 Subject: [PATCH] Do code refactoring --- src/frontend/JasmineGraphFrontEnd.cpp | 81 +++++--------------------- src/partitioner/stream/Partitioner.cpp | 2 +- src/util/Utils.cpp | 10 ++++ src/util/Utils.h | 2 + 4 files changed, 29 insertions(+), 66 deletions(-) diff --git a/src/frontend/JasmineGraphFrontEnd.cpp b/src/frontend/JasmineGraphFrontEnd.cpp index 6921d335..ea0028e7 100644 --- a/src/frontend/JasmineGraphFrontEnd.cpp +++ b/src/frontend/JasmineGraphFrontEnd.cpp @@ -966,14 +966,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } // Get user response. - char exist_graph[FRONTEND_DATA_LENGTH + 1]; - bzero(exist_graph, FRONTEND_DATA_LENGTH + 1); - read(connFd, exist_graph, FRONTEND_DATA_LENGTH); - string exist_g(exist_graph); - exist_g = Utils::trim_copy(exist_g); - for (char &c : exist_g) { - c = tolower(c); - } + string exist_g = Utils::getFrontendInput(connFd); string graphId; string partitionAlgo; string direction; @@ -987,14 +980,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } // Get user response. - char exist_graph_id[FRONTEND_DATA_LENGTH + 1]; - bzero(exist_graph_id, FRONTEND_DATA_LENGTH + 1); - read(connFd, exist_graph_id, FRONTEND_DATA_LENGTH); - string exist_g_i(exist_graph_id); - exist_g_i = Utils::trim_copy(exist_g_i); - for (char &c : exist_g_i) { - c = tolower(c); - } + string exist_g_i = Utils::getFrontendInput(connFd); bool isExist = sqlite->isGraphIdExist(exist_g_i); if (!isExist) { @@ -1007,7 +993,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } return; } - string exist_success_msg = "Set data streaming into graph ID: "+exist_g_i; + string exist_success_msg = "Set data streaming into graph ID: " + exist_g_i; result_wr = write(connFd, exist_success_msg.c_str(), exist_success_msg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1029,7 +1015,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } graphId = to_string(nextID); - string default_id = "Do you use Default graph ID: "+ graphId +"(y/n) ? "; + string default_id = "Do you use default graph ID: "+ graphId +"(y/n) ? "; int result_wr = write(connFd, default_id.c_str(), default_id.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1037,14 +1023,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } // Get user response. - char default_graph_id[FRONTEND_DATA_LENGTH + 1]; - bzero(default_graph_id, FRONTEND_DATA_LENGTH + 1); - read(connFd, default_graph_id, FRONTEND_DATA_LENGTH); - string default_g_i(default_graph_id); - default_g_i = Utils::trim_copy(default_g_i); - for (char &c : default_g_i) { - c = tolower(c); - } + string default_g_i = Utils::getFrontendInput(connFd); if (default_g_i != "y") { string input_graph_id = "Input your graph ID: "; @@ -1056,18 +1035,11 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } // Get user response. - char graph_id[FRONTEND_DATA_LENGTH + 1]; - bzero(graph_id, FRONTEND_DATA_LENGTH + 1); - read(connFd, graph_id, FRONTEND_DATA_LENGTH); - string user_graph_id(graph_id); - user_graph_id = Utils::trim_copy(user_graph_id); - for (char &c : user_graph_id) { - c = tolower(c); - } + string user_graph_id = Utils::getFrontendInput(connFd); bool isExist = sqlite->isGraphIdExist(user_graph_id); if (isExist) { - string Err_msg = "Error: Graph ID you entered is already exist"; + string Err_msg = "Error: Graph ID you entered already exists"; result_wr = write(connFd, Err_msg.c_str(), Err_msg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1093,7 +1065,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c graphId = user_graph_id; } - std::string partition_selection = "Select the partition technique\n" + std::string partition_selection = "Select the partitioning technique\n" "\toption 1: Hash partitioning\n" "\toption 2: Fennel partitioning\n" "\toption 3: LDG partitioning\n" @@ -1105,17 +1077,10 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } // Get user response. - char partition_algo[FRONTEND_DATA_LENGTH + 1]; - bzero(partition_algo, FRONTEND_DATA_LENGTH + 1); - read(connFd, partition_algo, FRONTEND_DATA_LENGTH); - string partition_a(partition_algo); - partition_a = Utils::trim_copy(partition_a); - for (char &c : partition_a) { - c = tolower(c); - } + string partition_algo = Utils::getFrontendInput(connFd); - if (partition_a == "1" || partition_a == "2" || partition_a == "3") { - string partition_success_msg = "Set partition technique: "+partition_a; + if (partition_algo == "1" || partition_algo == "2" || partition_algo == "3") { + string partition_success_msg = "Set partition technique: " + partition_algo; result_wr = write(connFd, partition_success_msg.c_str(), partition_success_msg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1128,9 +1093,9 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c *loop_exit_p = true; return; } - partitionAlgo = partition_a; + partitionAlgo = partition_algo; } else { - string Err_msg = "Error: invalid partition option: "+partition_a; + string Err_msg = "Error: invalid partition option: "+partition_algo; result_wr = write(connFd, Err_msg.c_str(), Err_msg.length()); if (result_wr < 0) { frontend_logger.error("Error writing to socket"); @@ -1148,14 +1113,7 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c return; } // Get user response. - char isDirected[FRONTEND_DATA_LENGTH + 1]; - bzero(isDirected, FRONTEND_DATA_LENGTH + 1); - read(connFd, isDirected, FRONTEND_DATA_LENGTH); - string is_directed(isDirected); - is_directed = Utils::trim_copy(is_directed); - for (char &c : is_directed) { - c = tolower(c); - } + string is_directed = Utils::getFrontendInput(connFd); if (is_directed == "y") { direction = Conts::DIRECTED; } else { @@ -1186,17 +1144,10 @@ static void add_stream_kafka_command(int connFd, std::string &kafka_server_IP, c } // Get user response. - char user_res[FRONTEND_DATA_LENGTH + 1]; - bzero(user_res, FRONTEND_DATA_LENGTH + 1); - read(connFd, user_res, FRONTEND_DATA_LENGTH); - string user_res_s(user_res); - user_res_s = Utils::trim_copy(user_res_s); - for (char &c : user_res_s) { - c = tolower(c); - } + string default_kafka = Utils::getFrontendInput(connFd); // use default kafka consumer details string group_id = "knnect"; // TODO(sakeerthan): MOVE TO CONSTANT LATER - if (user_res_s == "y") { + if (default_kafka == "y") { kafka_server_IP = Utils::getJasmineGraphProperty("org.jasminegraph.server.streaming.kafka.host"); configs = {{"metadata.broker.list", kafka_server_IP}, {"group.id", group_id}}; } else { diff --git a/src/partitioner/stream/Partitioner.cpp b/src/partitioner/stream/Partitioner.cpp index ed483e24..f996c047 100644 --- a/src/partitioner/stream/Partitioner.cpp +++ b/src/partitioner/stream/Partitioner.cpp @@ -159,7 +159,7 @@ void Partitioner::updateMetaDB() { } bool Partitioner::getIsDirected() { - std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = "+std::to_string(this->graphID); + std::string sqlStatement = "SELECT is_directed FROM graph WHERE idgraph = " + std::to_string(this->graphID); auto result = this->sqlite->runSelect(sqlStatement); if (result[0][0].second == "0") { return false; diff --git a/src/util/Utils.cpp b/src/util/Utils.cpp index cf7cbc3e..7dea17c2 100644 --- a/src/util/Utils.cpp +++ b/src/util/Utils.cpp @@ -1228,3 +1228,13 @@ void Utils::assignPartitionToWorker(int graphId, int partitionIndex, string hos delete sqlite; } + +string Utils::getFrontendInput(int connFd) { + char frontendInput[FRONTEND_DATA_LENGTH + 1]; + bzero(frontendInput, FRONTEND_DATA_LENGTH + 1); + read(connFd, frontendInput, FRONTEND_DATA_LENGTH); + std::string input(frontendInput); + input = Utils::trim_copy(input); + std::transform(input.begin(), input.end(), input.begin(), ::tolower); + return input; +} diff --git a/src/util/Utils.h b/src/util/Utils.h index ace8c3d6..2130fff9 100644 --- a/src/util/Utils.h +++ b/src/util/Utils.h @@ -27,6 +27,7 @@ limitations under the License. #include "../metadb/SQLiteDBInterface.h" #include "../performancedb/PerformanceSQLiteDBInterface.h" +#include "../frontend/JasmineGraphFrontEndProtocol.h" #include "Conts.h" using std::map; @@ -189,6 +190,7 @@ class Utils { static bool transferPartition(std::string sourceWorker, int sourceWorkerPort, std::string destinationWorker, int destinationWorkerDataPort, std::string graphID, std::string partitionID, std::string workerID, SQLiteDBInterface *sqlite); + static string getFrontendInput(int connFd); }; #endif // JASMINEGRAPH_UTILS_H