Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cpp backend logging and load model #1814

Closed
wants to merge 12 commits into from
2 changes: 1 addition & 1 deletion cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ set(CMAKE_CXX_STANDARD_REQUIRED True)
set(CMAKE_CXX_EXTENSIONS OFF)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -W -Wall -Wextra")

# set(CMAKE_BUILD_TYPE Debug)
find_package(Boost REQUIRED)
find_package(folly REQUIRED)
find_package(fmt REQUIRED)
find_package(gflags REQUIRED)
find_package(glog REQUIRED)
find_package(Torch REQUIRED)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${TORCH_CXX_FLAGS}")

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/backends/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ target_include_directories(model_worker_socket PRIVATE
)
target_link_libraries(model_worker_socket
PRIVATE ts_backends_core ts_backends_protocol ts_backends_torch_scripted ${FOLLY_LIBRARIES})
install(TARGETS model_worker_socket DESTINATION ${torchserve_cpp_SOURCE_DIR}/_build/bin)
install(TARGETS model_worker_socket DESTINATION ${torchserve_cpp_SOURCE_DIR}/_build/bin)
62 changes: 33 additions & 29 deletions cpp/src/backends/process/model_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,56 +15,57 @@ namespace torchserve {
if (socket_type == "unix") {
socket_family = AF_UNIX;
if (socket_name.empty()) {
LOG(FATAL) << "Wrong arguments passed. No socket name given.";
TS_LOG(FATAL, "Wrong arguments passed. No socket name given.");
}
std::filesystem::path s_name_path(socket_name);
if (std::remove(socket_name.c_str()) != 0 && std::filesystem::exists(s_name_path)) {
LOG(FATAL) << "socket already in use: " << socket_name;

std::experimental::filesystem::path s_name_path(socket_name);
if (std::remove(socket_name.c_str()) != 0 && std::experimental::filesystem::exists(s_name_path)) {
TS_LOGF(FATAL, "socket already in use: {}", socket_name);
}
socket_name_ = socket_name;
} else if (socket_type == "tcp") {
if (host_addr.empty()) {
socket_name_ = "127.0.0.1";
} else {
socket_name_ = host_addr;
if (port_num.empty())
LOG(FATAL) << "Wrong arguments passed. No socket port given.";
if (port_num.empty()) {
TS_LOG(FATAL, "Wrong arguments passed. No socket port given.");
}
port_ = htons(stoi(port_num));
}
} else {
LOG(FATAL) << "Incomplete data provided";
TS_LOG(FATAL, "Incomplete data provided");
}

LOG(INFO) << "Listening on port: " << socket_name;
TS_LOGF(INFO, "Listening on port: {}", socket_name);
server_socket_ = socket(socket_family, SOCK_STREAM, 0);
if (server_socket_ == -1) {
LOG(FATAL) << "Failed to create socket descriptor. errno: " << errno;
TS_LOGF(FATAL, "Failed to create socket descriptor. errno: {}", errno);
}

if (!CreateBackend(runtime_type, model_dir)) {
LOG(FATAL) << "Failed to create backend, model_dir: " << model_dir;
TS_LOGF(FATAL, "Failed to create backend, model_dir: {}", model_dir);
}
}

void SocketServer::Run() {
// TODO: Add sock accept timeout
int on = 1;
if (setsockopt(server_socket_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
LOG(FATAL) << "Failed to setsockopt. errno: " << errno;
TS_LOGF(FATAL, "Failed to setsockopt. errno: {}", errno);
}

sockaddr* srv_sock_address, client_sock_address{};
if (socket_type_ == "unix") {
LOG(INFO) << "Binding to unix socket";
TS_LOG(INFO, "Binding to unix socket");
sockaddr_un sock_addr{};
std::memset(&sock_addr, 0, sizeof(sock_addr));
sock_addr.sun_family = AF_UNIX;
std::strcpy(sock_addr.sun_path, socket_name_.c_str());
// TODO: Fix truncation of socket name to 14 chars when casting
srv_sock_address = reinterpret_cast<sockaddr*>(&sock_addr);
} else {
LOG(INFO) << "Binding to udp socket";
TS_LOG(INFO, "Binding to tcp socket");
sockaddr_in sock_addr{};
std::memset(&sock_addr, 0, sizeof(sock_addr));
sock_addr.sin_family = AF_INET;
Expand All @@ -74,22 +75,22 @@ namespace torchserve {
}

if (bind(server_socket_, srv_sock_address, sizeof(*srv_sock_address)) < 0) {
LOG(FATAL) << "Could not bind socket. errno: " << errno;
TS_LOGF(FATAL, "Could not bind socket. errno: {}", errno);
}
if (listen(server_socket_, 1) == -1) {
LOG(FATAL) << "Failed to listen on socket. errno: " << errno;
TS_LOGF(FATAL, "Failed to listen on socket. errno: {}", errno);
}
LOG(INFO) << "Socket bind successful";
LOG(INFO) << "[PID]" << getpid();
LOG(INFO) << "Torchserve worker started.";
TS_LOG(INFO, "Socket bind successful");
TS_LOGF(INFO, "[PID] {}", getpid());
TS_LOG(INFO, "Torchserve worker started.");

while (true) {
socklen_t len = sizeof(client_sock_address);
auto client_sock = accept(server_socket_, (sockaddr *)&client_sock_address, &len);
if (client_sock < 0) {
LOG(FATAL) << "Failed to accept client. errno: " << errno;
TS_LOGF(FATAL, "Failed to accept client. errno: {}", errno);
}
LOG(INFO) << "Connection accepted: " << socket_name_;
TS_LOGF(INFO, "Connection accepted: {}", socket_name_);
auto model_worker = std::make_unique<torchserve::SocketModelWorker>(client_sock, backend_);
model_worker->Run();
}
Expand All @@ -98,32 +99,35 @@ namespace torchserve {
bool SocketServer::CreateBackend(
const torchserve::Manifest::RuntimeType& runtime_type,
const std::string& model_dir) {
if (runtime_type == "LDP") {
if (runtime_type == "LSP") {
backend_ = std::make_shared<torchserve::torchscripted::Backend>();
return backend_->Initialize(model_dir);
}
return false;
}

[[noreturn]] void SocketModelWorker::Run() {
LOG(INFO) << "Handle connection";
TS_LOG(INFO, "Handle connection");
while (true) {
char cmd = torchserve::OTFMessage::RetrieveCmd(client_socket_);

if (cmd == 'I') {
LOG(INFO) << "INFER request received";
TS_LOG(INFO, "INFER request received");
auto model_instance = backend_->GetModelInstance();
if (!model_instance) {
LOG(ERROR) << "Model is not loaded yet, not able to process this inference request.";
TS_LOG(ERROR, "Model is not loaded yet, not able to process this inference request.");
} else {
//auto response = model_instance->Predict(torchserve::OTFMessage::RetrieveInferenceMsg(client_socket_));
}
} else if (cmd == 'L') {
LOG(INFO) << "LOAD request received";
TS_LOG(INFO, "LOAD request received");
// TODO: error handling
auto response = backend_->LoadModel(torchserve::OTFMessage::RetrieveLoadMsg(client_socket_));
auto backend_response = backend_->LoadModel(torchserve::OTFMessage::RetrieveLoadMsg(client_socket_));
if (!torchserve::OTFMessage::SendLoadModelResponse(client_socket_, std::move(backend_response))) {
TS_LOG(ERROR, "Error writing response to socket");
}
} else {
LOG(ERROR) << "Received unknown command: " << cmd;
TS_LOGF(ERROR, "Received unknown command: {}", cmd);
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/backends/process/model_worker.hh
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

#include <arpa/inet.h>
#include <cstdio>
#include <filesystem>
#include <glog/logging.h>
#include <experimental/filesystem>
#include <netinet/in.h>
#include <string>
#include <sys/socket.h>
Expand All @@ -16,6 +15,7 @@
#include "src/backends/core/backend.hh"
#include "src/backends/protocol/otf_message.hh"
#include "src/utils/config.hh"
#include "src/utils/logging.hh"
#include "src/utils/model_archive.hh"

namespace torchserve {
Expand Down
9 changes: 3 additions & 6 deletions cpp/src/backends/process/model_worker_socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,12 @@ DEFINE_string(runtime_type, "LSP", "model runtime type");
DEFINE_string(device_type, "cpu", "cpu, or gpu");
// TODO: discuss multiple backends support
DEFINE_string(model_dir, "", "model path");
// TODO: change to file based config
DEFINE_string(logger_config, "INFO:consoleLogger;consoleLogger=stream:stream=stdout,async=true", "Logging config");

int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);

// Init logging
google::InitGoogleLogging("ts_cpp_backend");
FLAGS_logtostderr = 1;
// TODO: Set logging format same as python worker
LOG(INFO) << "Initializing Libtorch backend worker...";
torchserve::Logger::InitLogger(FLAGS_logger_config);

torchserve::SocketServer server = torchserve::SocketServer::GetInstance();

Expand Down
73 changes: 51 additions & 22 deletions cpp/src/backends/protocol/otf_message.cc
Original file line number Diff line number Diff line change
@@ -1,21 +1,50 @@
#include <arpa/inet.h>
#include <sys/socket.h>
#include <glog/logging.h>

#include "src/backends/protocol/otf_message.hh"

namespace torchserve {
byte_buffer OTFMessage::CreateLoadModelResponse(StatusCode code, const std::string& message) {
LoadModelResponse response = {
code,
static_cast<int>(message.length()),
message
};
std::byte msg[sizeof(LoadModelResponse)];
std::memcpy(msg, &response, sizeof(LoadModelResponse));
byte_buffer response_byte_buffer;
std::copy(response_byte_buffer.begin(), response_byte_buffer.end(), msg);
return response_byte_buffer;
bool OTFMessage::SendAll(Socket conn, char *data, size_t length) {
char* pkt = data;
while (length > 0) {
ssize_t pkt_size = send(conn, pkt, length, 0);
if (pkt_size < 0) {
return false;
}
pkt += pkt_size;
length -= pkt_size;
}
return true;
}

void OTFMessage::CreateLoadModelResponse(std::unique_ptr<torchserve::LoadModelResponse> response, char* data) {
char* p = data;
// Serialize response code
int32_t s_code = htonl(response->code);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do you think we can move adding integer to data pointer to a util function and re-use later below?

memcpy(p, &s_code, sizeof(s_code));
p += sizeof(s_code);
// Serialize response message length
int32_t resp_length = htonl(response->length);
memcpy(p, &resp_length, sizeof(resp_length));
p += sizeof(resp_length);
// Serialize response message
strcpy(p, response->buf.c_str());
p += response->length;
// Expectation from frontend deserializer is a -1
// at the end of a LoadModelResponse
int32_t no_predict = htonl(response->predictions);
memcpy(p, &no_predict, sizeof(no_predict));
p += sizeof(no_predict);
}

bool OTFMessage::SendLoadModelResponse(Socket client_socket_, std::unique_ptr<torchserve::LoadModelResponse> response) {
char *data = new char[sizeof(LoadModelResponse)];
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the sizeof operator on LoadModelResponse object always yield the right size of the data? In the case of inference response, it did not return the expected response probably because it holds pointers to the data instead of the actual data.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix this

torchserve::OTFMessage::CreateLoadModelResponse(std::move(response), data);
if(!torchserve::OTFMessage::SendAll(client_socket_, data, sizeof(LoadModelResponse))) {
return false;
}
delete[] data;
maaquib marked this conversation as resolved.
Show resolved Hide resolved
return true;
}

char OTFMessage::RetrieveCmd(Socket conn) {
Expand Down Expand Up @@ -62,7 +91,7 @@ namespace torchserve {
RetrieveBuffer(conn, length, data);
std::string handler(data, length);
delete[] data;
LOG(INFO) << "Received handler in message, will be ignored: " << handler;
TS_LOGF(INFO, "Received handler in message, will be ignored: {}", handler);

// GPU ID
auto gpu_id = RetrieveInt(conn);
Expand All @@ -77,13 +106,13 @@ namespace torchserve {
// Limit max image pixels
auto limit_max_image_pixels = RetrieveBool(conn);

LOG(INFO) << "Model Name: " << model_name;
LOG(INFO) << "Model path: " << model_dir;
LOG(INFO) << "Batch size: " << batch_size;
LOG(INFO) << "Handler: " << handler;
LOG(INFO) << "GPU_id: " << gpu_id;
LOG(INFO) << "Envelope: " << envelope;
LOG(INFO) << "Limit max image pixels: " << limit_max_image_pixels;
TS_LOGF(DEBUG, "Model Name: {}", model_name);
TS_LOGF(DEBUG, "Model dir: {}", model_dir);
TS_LOGF(DEBUG, "Batch size: {}", batch_size);
TS_LOGF(DEBUG, "Handler: {}", handler);
TS_LOGF(DEBUG, "GPU_id: {}", gpu_id);
TS_LOGF(DEBUG, "Envelope: {}", envelope);
TS_LOGF(DEBUG, "Limit max image pixels: {}", limit_max_image_pixels);

return std::make_shared<LoadModelRequest>(
model_dir, model_name, gpu_id, handler,
Expand All @@ -95,7 +124,7 @@ namespace torchserve {
while (length > 0) {
ssize_t pkt_size = recv(conn, pkt, length, 0);
if (pkt_size == 0) {
LOG(INFO) << "Frontend disconnected.";
TS_LOG(INFO, "Frontend disconnected.");
exit(0);
}
pkt += pkt_size;
Expand All @@ -119,4 +148,4 @@ namespace torchserve {
std::memcpy(&value, data, BOOL_STD_SIZE);
return value;
}
} //namespace torchserve
} //namespace torchserve
6 changes: 4 additions & 2 deletions cpp/src/backends/protocol/otf_message.hh
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
#include <vector>

#include "src/utils/message.hh"
#include "src/utils/logging.hh"

namespace torchserve {
using Socket = int;
using StatusCode = int;
typedef std::vector<std::byte> byte_buffer;

//https://docs.python.org/3/library/struct.html#format-characters
#define BOOL_STD_SIZE 1
Expand All @@ -28,7 +28,9 @@ namespace torchserve {

class OTFMessage {
public:
static byte_buffer CreateLoadModelResponse(StatusCode code, const std::string& message);
static bool SendAll(Socket conn, char *data, size_t length);
static void CreateLoadModelResponse(std::unique_ptr<torchserve::LoadModelResponse> response, char* data);
maaquib marked this conversation as resolved.
Show resolved Hide resolved
static bool SendLoadModelResponse(Socket conn, std::unique_ptr<torchserve::LoadModelResponse> response);
static char RetrieveCmd(Socket conn);
static std::shared_ptr<LoadModelRequest> RetrieveLoadMsg(Socket conn);
// TODO: impl.
Expand Down
Loading