diff --git a/include/ymanipulator.h b/include/ymanipulator.h new file mode 100644 index 0000000..bca758a --- /dev/null +++ b/include/ymanipulator.h @@ -0,0 +1,14 @@ +#pragma once +#include + +class YTrashManipulator { +public: + virtual ~YTrashManipulator(){ + + }; + + virtual bool close() = 0; + bool collect_obsolete_chunks(); + bool delete_obsolete_chunks(); +}; + diff --git a/include/yproxy.h b/include/yproxy.h index b83faa1..f991f65 100644 --- a/include/yproxy.h +++ b/include/yproxy.h @@ -7,6 +7,7 @@ #include "ylister.h" #include "yreader.h" #include "ywriter.h" +#include "ymanipulator.h" #include #include #include @@ -143,5 +144,26 @@ class YProxyLister : public YLister { std::shared_ptr adv_; ssize_t segindx_; + int client_fd_{-1}; +}; + +struct YProxyTrashManipulator : public YTrashManipulator { +public: + explicit YProxyTrashManipulator(std::shared_ptr adv); + + virtual ~YProxyTrashManipulator(); + + bool collect_obsolete_chunks(); + bool delete_obsolete_chunks(); + + + virtual bool close(); +protected: + std::vector ConstructRequest(char reqnum); + int prepareYproxyConnection(); + +private: + std::shared_ptr adv_; + int client_fd_{-1}; }; \ No newline at end of file diff --git a/src/xvacuum.cpp b/src/xvacuum.cpp index 69a928f..c2e351e 100644 --- a/src/xvacuum.cpp +++ b/src/xvacuum.cpp @@ -9,7 +9,7 @@ #include #include #include - +#include "info_gp.h" /* * yezzey_delete_chunk_internal: * Given external chunk path, remove it from external storage @@ -113,3 +113,44 @@ int yezzey_vacuum_garbage_relation_internal_oid(Oid reloid,int segindx, bool con relation_close(rel,NoLock); return rc; } + + +int yezzey_delete_obsolete_chunks_internal(){ +try { + auto ioadv = std::make_shared("", "", + std::string(storage_class /*storage_class*/), multipart_chunksize, + DEFAULTTABLESPACE_OID, "" /* coords */, InvalidOid /* reloid */, + use_gpg_crypto, yproxy_socket); + + + auto manipulator = + std::make_shared(ioadv); + manipulator->delete_obsolete_chunks(); + + return -1; + } catch (...) { + elog(ERROR, "failed to prepare x-storage reader for chunk"); + return 0; + } + return 0; +} + +int yezzey_collect_obsolete_chunks_internal(){ +try { + auto ioadv = std::make_shared("", "", + std::string(storage_class /*storage_class*/), multipart_chunksize, + DEFAULTTABLESPACE_OID, "" /* coords */, InvalidOid /* reloid */, + use_gpg_crypto, yproxy_socket); + auto manipulator = + std::make_shared(ioadv); + manipulator->collect_obsolete_chunks(); + /* external reader destruct */ + return 0; + } catch (...) { + return -1; + } + + + return 0; +} + diff --git a/src/yproxy.cpp b/src/yproxy.cpp index 5dcc44b..d6db7c6 100644 --- a/src/yproxy.cpp +++ b/src/yproxy.cpp @@ -42,6 +42,8 @@ const char MessageTypeReadyForQuery = 45; const char MessageTypeCopyData = 46; const char MessageTypeList = 48; const char MessageTypeObjectMeta = 49; +const char MessageTypeDeleteObsolette = 56; +const char MessageTypeUpdateObsolette = 57; const char MessageTypeDelete = 47; @@ -805,3 +807,129 @@ YProxyLister::readObjectMetaBody(std::vector *body) { return res; } + +YProxyTrashManipulator::YProxyTrashManipulator(std::shared_ptr adv) + : adv_(adv) {} + + +int YProxyTrashManipulator::prepareYproxyConnection() { + // open unix data socket + + client_fd_ = socket(AF_UNIX, SOCK_STREAM, 0); + if (client_fd_ == -1) { + elog(WARNING, "failed to create unix socket, errno: %m"); + // throw here? + return -1; + } + + struct sockaddr_un addr; + /* Bind socket to socket name. */ + + memset(&addr, 0, sizeof(addr)); + + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, adv_->yproxy_socket.c_str(), + sizeof(addr.sun_path) - 1); + + auto ret = + ::connect(client_fd_, (const struct sockaddr *)&addr, sizeof(addr)); + + if (ret == -1) { + elog(WARNING, + "failed to acquire connection to unix socket on %s, errno: %m", + adv_->yproxy_socket.c_str()); + return -1; + } + return 0; +} +std::vector YProxyTrashManipulator::ConstructRequest(char reqnum){ + + auto bytes = std::vector(MSG_HEADER_SIZE + 1,0);// size (8 bytes) + reqnum (1 byte) + uint64_t len = bytes.size(); + + uint64_t cp = len; + for (ssize_t i = 7; i >= 0; --i) { + bytes[i] = cp & ((1 << 8) - 1); + cp >>= 8; + } + bytes[8] = reqnum; + return bytes; + +} +bool YProxyTrashManipulator::collect_obsolete_chunks() { + if (client_fd_ == -1) { + if (prepareYproxyConnection() == -1) { + // Throw here? + close(); + return false; + } + } + + // TODO: split to chunks + auto msg = ConstructRequest(MessageTypeUpdateObsolette); + + if (commonWriteFull(client_fd_, msg) == -1) { + close(); + return false; + } + // *amount does not need to change in case of successfull write + + msg = CommonCostructCommandCompleteRequest(); + // signal that current chunk is full + if (commonWriteFull(client_fd_, msg) == -1) { + close(); + return false; + } + // wait for responce + if (commonReadRFQResponce(client_fd_) != 0) { + close(); + return false; + } + + return true; +} + + +bool YProxyTrashManipulator::delete_obsolete_chunks(){ + if (client_fd_ == -1) { + if (prepareYproxyConnection() == -1) { + // Throw here? + close(); + return false; + } + } + + // TODO: split to chunks + auto msg = ConstructRequest(MessageTypeDeleteObsolette); + + if (commonWriteFull(client_fd_, msg) == -1) { + close(); + return false; + } + // *amount does not need to change in case of successfull write + + msg = CommonCostructCommandCompleteRequest(); + // signal that current chunk is full + if (commonWriteFull(client_fd_, msg) == -1) { + close(); + return false; + } + // wait for responce + if (commonReadRFQResponce(client_fd_) != 0) { + close(); + return false; + } + + return true; +} + + +YProxyTrashManipulator::~YProxyTrashManipulator() { close(); } + +bool YProxyTrashManipulator::close() { + if (client_fd_ != -1) { + ::close(client_fd_); + client_fd_ = -1; + } + return true; +} \ No newline at end of file