Skip to content

Commit

Permalink
path from sql interface to yproxy
Browse files Browse the repository at this point in the history
  • Loading branch information
visill committed Dec 5, 2024
1 parent b5aaca4 commit ce5234b
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 1 deletion.
14 changes: 14 additions & 0 deletions include/ymanipulator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#pragma once
#include <stdint.h>

class YTrashManipulator {
public:
virtual ~YTrashManipulator(){

};

virtual bool close() = 0;
bool collect_obsolete_chunks();
bool delete_obsolete_chunks();
};

22 changes: 22 additions & 0 deletions include/yproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "ylister.h"
#include "yreader.h"
#include "ywriter.h"
#include "ymanipulator.h"
#include <memory>
#include <string>
#include <vector>
Expand Down Expand Up @@ -143,5 +144,26 @@ class YProxyLister : public YLister {
std::shared_ptr<IOadv> adv_;
ssize_t segindx_;

int client_fd_{-1};
};

struct YProxyTrashManipulator : public YTrashManipulator {
public:
explicit YProxyTrashManipulator(std::shared_ptr<IOadv> adv);

virtual ~YProxyTrashManipulator();

bool collect_obsolete_chunks();
bool delete_obsolete_chunks();


virtual bool close();
protected:
std::vector<char> ConstructRequest(char reqnum);
int prepareYproxyConnection();

private:
std::shared_ptr<IOadv> adv_;

int client_fd_{-1};
};
43 changes: 42 additions & 1 deletion src/xvacuum.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
#include <string>
#include <url.h>
#include <util.h>

#include "info_gp.h"
/*
* yezzey_delete_chunk_internal:
* Given external chunk path, remove it from external storage
Expand Down Expand Up @@ -113,3 +113,44 @@ int yezzey_vacuum_garbage_relation_internal_oid(Oid reloid,int segindx, bool con
relation_close(rel,NoLock);
return rc;
}


int yezzey_delete_obsolete_chunks_internal(){
try {
auto ioadv = std::make_shared<IOadv>("", "",
std::string(storage_class /*storage_class*/), multipart_chunksize,
DEFAULTTABLESPACE_OID, "" /* coords */, InvalidOid /* reloid */,
use_gpg_crypto, yproxy_socket);


auto manipulator =
std::make_shared<YProxyTrashManipulator>(ioadv);
manipulator->delete_obsolete_chunks();

return -1;
} catch (...) {
elog(ERROR, "failed to prepare x-storage reader for chunk");
return 0;
}
return 0;
}

int yezzey_collect_obsolete_chunks_internal(){
try {
auto ioadv = std::make_shared<IOadv>("", "",
std::string(storage_class /*storage_class*/), multipart_chunksize,
DEFAULTTABLESPACE_OID, "" /* coords */, InvalidOid /* reloid */,
use_gpg_crypto, yproxy_socket);
auto manipulator =
std::make_shared<YProxyTrashManipulator>(ioadv);
manipulator->collect_obsolete_chunks();
/* external reader destruct */
return 0;
} catch (...) {
return -1;
}


return 0;
}

128 changes: 128 additions & 0 deletions src/yproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const char MessageTypeReadyForQuery = 45;
const char MessageTypeCopyData = 46;
const char MessageTypeList = 48;
const char MessageTypeObjectMeta = 49;
const char MessageTypeDeleteObsolette = 56;
const char MessageTypeUpdateObsolette = 57;

const char MessageTypeDelete = 47;

Expand Down Expand Up @@ -805,3 +807,129 @@ YProxyLister::readObjectMetaBody(std::vector<char> *body) {

return res;
}

YProxyTrashManipulator::YProxyTrashManipulator(std::shared_ptr<IOadv> adv)
: adv_(adv) {}


int YProxyTrashManipulator::prepareYproxyConnection() {
// open unix data socket

client_fd_ = socket(AF_UNIX, SOCK_STREAM, 0);
if (client_fd_ == -1) {
elog(WARNING, "failed to create unix socket, errno: %m");
// throw here?
return -1;
}

struct sockaddr_un addr;
/* Bind socket to socket name. */

memset(&addr, 0, sizeof(addr));

addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, adv_->yproxy_socket.c_str(),
sizeof(addr.sun_path) - 1);

auto ret =
::connect(client_fd_, (const struct sockaddr *)&addr, sizeof(addr));

if (ret == -1) {
elog(WARNING,
"failed to acquire connection to unix socket on %s, errno: %m",
adv_->yproxy_socket.c_str());
return -1;
}
return 0;
}
std::vector<char> YProxyTrashManipulator::ConstructRequest(char reqnum){

auto bytes = std::vector<char>(MSG_HEADER_SIZE + 1,0);// size (8 bytes) + reqnum (1 byte)
uint64_t len = bytes.size();

uint64_t cp = len;
for (ssize_t i = 7; i >= 0; --i) {
bytes[i] = cp & ((1 << 8) - 1);
cp >>= 8;
}
bytes[8] = reqnum;
return bytes;

}
bool YProxyTrashManipulator::collect_obsolete_chunks() {
if (client_fd_ == -1) {
if (prepareYproxyConnection() == -1) {
// Throw here?
close();
return false;
}
}

// TODO: split to chunks
auto msg = ConstructRequest(MessageTypeUpdateObsolette);

if (commonWriteFull(client_fd_, msg) == -1) {
close();
return false;
}
// *amount does not need to change in case of successfull write

msg = CommonCostructCommandCompleteRequest();
// signal that current chunk is full
if (commonWriteFull(client_fd_, msg) == -1) {
close();
return false;
}
// wait for responce
if (commonReadRFQResponce(client_fd_) != 0) {
close();
return false;
}

return true;
}


bool YProxyTrashManipulator::delete_obsolete_chunks(){
if (client_fd_ == -1) {
if (prepareYproxyConnection() == -1) {
// Throw here?
close();
return false;
}
}

// TODO: split to chunks
auto msg = ConstructRequest(MessageTypeDeleteObsolette);

if (commonWriteFull(client_fd_, msg) == -1) {
close();
return false;
}
// *amount does not need to change in case of successfull write

msg = CommonCostructCommandCompleteRequest();
// signal that current chunk is full
if (commonWriteFull(client_fd_, msg) == -1) {
close();
return false;
}
// wait for responce
if (commonReadRFQResponce(client_fd_) != 0) {
close();
return false;
}

return true;
}


YProxyTrashManipulator::~YProxyTrashManipulator() { close(); }

bool YProxyTrashManipulator::close() {
if (client_fd_ != -1) {
::close(client_fd_);
client_fd_ = -1;
}
return true;
}

0 comments on commit ce5234b

Please sign in to comment.