diff --git a/Makefile b/Makefile index 297394b..8fe0bd4 100644 --- a/Makefile +++ b/Makefile @@ -24,18 +24,16 @@ OBJS = \ src/url.o \ src/io.o \ src/io_adv.o \ - src/worker.o \ src/offload_tablespace_map.o \ src/offload_policy.o \ src/offload.o \ src/virtual_tablespace.o \ src/partition.o \ src/xvacuum.o \ - src/yezzey_expire.o \ src/yproxy.o \ src/init.o \ src/meta.o \ - smgr.o worker.o yezzey.o + smgr.o yezzey.o EXTENSION = yezzey DATA = yezzey--1.0.sql yezzey--1.0--1.8.sql yezzey--1.8--1.8.1.sql diff --git a/include/gucs.h b/include/gucs.h index 087448d..d9a6128 100644 --- a/include/gucs.h +++ b/include/gucs.h @@ -7,11 +7,6 @@ extern int yezzey_ao_log_level; extern bool use_gpg_crypto; /* ----- STORAGE ----- */ -extern char *storage_prefix; -extern char *storage_bucket; -extern char *backup_bucket; -extern char *storage_config; -extern char *storage_host; extern char *storage_class; extern int multipart_chunksize; extern int multipart_threshold; diff --git a/include/io_adv.h b/include/io_adv.h index b600139..30e1d8c 100644 --- a/include/io_adv.h +++ b/include/io_adv.h @@ -12,12 +12,6 @@ struct IOadv { const std::string nspname; // relation name itself const std::string relname; - // s3 host - const std::string host; - // s3 bucked - const std::string bucket; - // wal-g specific prefix - const std::string external_storage_prefix; // s3 storage class const std::string storage_class; @@ -39,16 +33,14 @@ struct IOadv { const std::string yproxy_socket; IOadv(const std::string &nspname, - const std::string &relname, const std::string &host, - const std::string &bucket, const std::string &external_storage_prefix, + const std::string &relname, const std::string &storage_class, const int &multipart_chunksize, /*unparse coords*/ Oid spcNode, const std::string &fileName, const Oid reloid, bool use_gpg_crypto, const std::string &yproxy_socket); IOadv(const std::string &nspname, - const std::string &relname, const std::string &host, - const std::string &bucket, const std::string &external_storage_prefix, + const std::string &relname, const std::string &storage_class, const int &multipart_chunksize, const relnodeCoord &coords, const Oid reloid, bool use_gpg_crypto, const std::string &yproxy_socket); diff --git a/include/json.h b/include/json.h deleted file mode 100644 index d9cc440..0000000 --- a/include/json.h +++ /dev/null @@ -1,7 +0,0 @@ -#pragma once -#if YEZZEY_JSON_PARSE_BACKUPS -#include - -// for convenience -using json = nlohmann::json; -#endif diff --git a/include/offload_policy.h b/include/offload_policy.h index 5e6f5e8..78c9720 100644 --- a/include/offload_policy.h +++ b/include/offload_policy.h @@ -61,6 +61,8 @@ EXTERNC void YezzeyDefineOffloadPolicy(Oid reloid); EXTERNC void YezzeyDefineOffloadPolicyPrepare(Oid reloid); +EXTERNC void FixupOffloadMetadata(Oid reloid); + /* * YezzeyLoadRealtion: * do all offload-metadata related work for relation loading: diff --git a/include/url.h b/include/url.h index dc3bbbb..f17309f 100644 --- a/include/url.h +++ b/include/url.h @@ -31,5 +31,4 @@ std::string craftStorageUnPrefixedPath(const std::shared_ptr &adv, std::string getYezzeyRelationUrl_internal(const std::string &nspname, const std::string &relname, - const std::string &external_storage_prefix, relnodeCoord coords, int32_t segid); diff --git a/include/util.h b/include/util.h index fccffc9..52c7e39 100644 --- a/include/util.h +++ b/include/util.h @@ -38,9 +38,6 @@ int64_t yezzey_calc_virtual_relation_size(std::shared_ptr adv, std::string storage_url_add_options(const std::string &s3path, const char *config_path); -std::string getYezzeyExtrenalStorageBucket(const char *host, - const char *bucket); - std::string make_yezzey_url(const std::string &prefix, int64_t modcounts, XLogRecPtr current_recptr); @@ -49,8 +46,7 @@ std::vector parseModcounts(const std::string &prefix, #endif EXTERNC void getYezzeyExternalStoragePathByCoords( - const char *nspname, const char *relname, const char *host, - const char *bucket, const char *storage_prefix, Oid spcNode, Oid dbNode, + const char *nspname, const char *relname, Oid spcNode, Oid dbNode, Oid relNode, int32_t segblockno /* segment no*/, int32_t segid, char **dest); diff --git a/include/virtual_index.h b/include/virtual_index.h index 8bc3566..d25115f 100644 --- a/include/virtual_index.h +++ b/include/virtual_index.h @@ -37,6 +37,8 @@ typedef struct { text x_path; /* external path */ } FormData_yezzey_virtual_index; +typedef FormData_yezzey_virtual_index* Form_yezzey_virtual_index; + #define Natts_yezzey_virtual_index 10 #define Anum_yezzey_virtual_index_reloid 1 #define Anum_yezzey_virtual_index_filenode 2 @@ -57,6 +59,9 @@ EXTERNC void emptyYezzeyIndex(Oid yezzey_index_oid, Oid relfilenode); EXTERNC void emptyYezzeyIndexBlkno(Oid yezzey_index_oid, Oid relation, Oid relfilenode, int blkno); +/* fixup virtual index entry for relation's relfilenode. */ +EXTERNC void YezzeyFixupVirtualIndex(Relation rel); + #ifdef __cplusplus void YezzeyVirtualIndexInsert(Oid yandexoid /*yezzey auxiliary index oid*/, Oid reloid, Oid relfilenodeOid, int64_t blkno, diff --git a/include/yezzey_expire.h b/include/yezzey_expire.h deleted file mode 100644 index 944b659..0000000 --- a/include/yezzey_expire.h +++ /dev/null @@ -1,54 +0,0 @@ -#ifndef YEZZEY_EXPIRE -#define YEZZEY_EXPIRE - -#include "pg.h" - -#ifdef __cplusplus -#define EXTERNC extern "C" -#else -#define EXTERNC -#endif - -#define YEZZEY_EXPIRE_INDEX_RELATION 8700 -#define YEZZEY_EXPIRE_INDEX_RELATION_INDX 8701 -/* - -CREATE TABLE yezzey.yezzey_expire_index( - reloid OID, - relfileoid OID, - fqnmd5 TEXT, - last_use_lsn LSN, - expire_lsn LSN -) -DISTRIBUTED REPLICATED; -*/ - -// Relfileoid changes after alter table stmt. -// We save md5 of fully qualified name (relschema.relname) - -typedef struct { - Oid yreloid; /* Expired Yezzey relation OID */ - Oid yrelfileoid; /* Expired fileoid. */ - XLogRecPtr last_use_lsn; /* Last Alive LSN */ - XLogRecPtr expire_lsn; /* Last Alive LSN */ - text fqdnmd5; /* md5(relschema.relname) */ -} FormData_yezzey_expire_index; - -typedef FormData_yezzey_expire_index *Form_yezzey_expire_index; - -#define Natts_yezzey_expire_index 5 -#define Anum_yezzey_expire_index_reloid 1 -#define Anum_yezzey_expire_index_relfileoid 2 -#define Anum_yezzey_expire_index_last_use_lsn 3 -#define Anum_yezzey_expire_index_lsn 4 -#define Anum_yezzey_expire_index_fqnmd5 5 - -/* record that current relation relfilenode is expired */ -EXTERNC void YezzeyRecordRelationExpireLsn(Relation rel); - -EXTERNC void YezzeyCreateRelationExpireIndex(void); - -EXTERNC void YezzeyUpsertLastUseLsn(Oid reloid, Oid relfileoid, const char *md5, - XLogRecPtr lsn); - -#endif /* YEZZEY_EXPIRE */ \ No newline at end of file diff --git a/include/yezzey_meta.h b/include/yezzey_meta.h index 16c3587..90f452d 100644 --- a/include/yezzey_meta.h +++ b/include/yezzey_meta.h @@ -3,7 +3,6 @@ #include "pg.h" #include "url.h" #include "virtual_index.h" -#include "yezzey_expire.h" #ifdef __cplusplus #define EXTERNC extern "C" diff --git a/src/init.cpp b/src/init.cpp index 84be759..d92582f 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -3,10 +3,8 @@ #include "init.h" #include "offload_policy.h" -#include "yezzey_expire.h" void YezzeyInitMetadata(void) { (void)YezzeyCreateOffloadPolicyRelation(); - (void)YezzeyCreateRelationExpireIndex(); (void)YezzeyCreateVirtualIndex(); } \ No newline at end of file diff --git a/src/io_adv.cpp b/src/io_adv.cpp index 4b3129d..8c74dfa 100644 --- a/src/io_adv.cpp +++ b/src/io_adv.cpp @@ -3,16 +3,13 @@ #include "offload_tablespace_map.h" IOadv::IOadv(const std::string &nspname, - const std::string &relname, const std::string &host, - const std::string &bucket, - const std::string &external_storage_prefix, + const std::string &relname, const std::string &storage_class, const int &multipart_chunksize, const Oid spcNode, const std::string &fileName, const Oid reloid, bool use_gpg_crypto, const std::string &yproxy_socket) : - nspname(nspname), relname(relname), host(host), - bucket(bucket), external_storage_prefix(external_storage_prefix), + nspname(nspname), relname(relname), storage_class(storage_class), multipart_chunksize(multipart_chunksize), coords_(getRelnodeCoordinate(spcNode, fileName)), reloid(reloid), tableSpace(reloid ? YezzeyGetRelationOriginTablespace(nspname.c_str(), relname.c_str(), reloid) : "none"), @@ -21,14 +18,11 @@ IOadv::IOadv(const std::string &nspname, } IOadv::IOadv(const std::string &nspname, - const std::string &relname, const std::string &host, - const std::string &bucket, - const std::string &external_storage_prefix, + const std::string &relname, const std::string &storage_class, const int &multipart_chunksize, const relnodeCoord &coords, const Oid reloid,bool use_gpg_crypto, const std::string &yproxy_socket) - : nspname(nspname), relname(relname), host(host), - bucket(bucket), external_storage_prefix(external_storage_prefix), + : nspname(nspname), relname(relname), storage_class(storage_class), multipart_chunksize(multipart_chunksize), coords_(coords), reloid(reloid), tableSpace(reloid ? YezzeyGetRelationOriginTablespace(nspname.c_str(), relname.c_str(), reloid) : "none"), diff --git a/src/meta.cpp b/src/meta.cpp index 41d1de2..93d9272 100644 --- a/src/meta.cpp +++ b/src/meta.cpp @@ -8,5 +8,5 @@ void YezzeyUpdateMetadataRelations( YezzeyVirtualIndexInsert(yandexoid, reloid, relfilenodeOid, blkno, offset_start, offset_finish, encrypted, reused, modcount, lsn, x_path); - YezzeyUpsertLastUseLsn(reloid, relfilenodeOid, md5, lsn); + /* TODO: update yezzey relfilemap */ } \ No newline at end of file diff --git a/src/offload_policy.cpp b/src/offload_policy.cpp index 119f490..f4a25da 100644 --- a/src/offload_policy.cpp +++ b/src/offload_policy.cpp @@ -407,4 +407,38 @@ void YezzeyLoadRealtion(Oid i_reloid) { /* make changes visible*/ CommandCounterIncrement(); +} + +void FixupOffloadMetadata(Oid i_reloid) { + /**/ + ScanKeyData skey[1]; + + auto snap = RegisterSnapshot(GetTransactionSnapshot()); + + auto offrel = heap_open(YEZZEY_OFFLOAD_POLICY_RELATION, RowExclusiveLock); + + /* INSERT INTO yezzey.offload_metadata VALUES(v_reloid, 1, NULL, NOW()); */ + + ScanKeyInit(&skey[0], Anum_offload_metadata_reloid, BTEqualStrategyNumber, + F_OIDEQ, ObjectIdGetDatum(i_reloid)); + + auto scan = yezzey_beginscan(offrel, snap, 1, skey); + + auto oldtuple = heap_getnext(scan, ForwardScanDirection); + + if (HeapTupleIsValid(oldtuple)) { + auto meta = (Form_yezzey_offload_metadata)GETSTRUCT(oldtuple); + + Assert(meta->reloid == i_reloid); + + simple_heap_delete(offrel, &oldtuple->t_self); + } + + heap_close(offrel, RowExclusiveLock); + + yezzey_endscan(scan); + UnregisterSnapshot(snap); + + /* make changes visible */ + CommandCounterIncrement(); } \ No newline at end of file diff --git a/src/proxy.cpp b/src/proxy.cpp index 3630925..e5b9ac7 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -229,9 +229,6 @@ EXTERNC SMGRFile yezzey_AORelOpenSegFile(Oid reloid, char *nspname, if (!RecoveryInProgress()) { auto ioadv = std::make_shared( yfd.nspname, yfd.relname, - std::string(storage_host /*host*/), - std::string(storage_bucket /*bucket*/), - std::string(storage_prefix /*prefix*/), std::string(storage_class /* storage_class */), multipart_chunksize, DEFAULTTABLESPACE_OID, yfd.filepath /* coords */, reloid /* reloid */, diff --git a/src/storage.cpp b/src/storage.cpp index 74ee461..9d4c985 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -196,9 +196,7 @@ int loadSegmentFromExternalStorage(Relation rel, const std::string &nspname, /* FIXME */ auto ioadv = std::make_shared( - nspname, relname, - storage_host /* host */, storage_bucket /*bucket*/, - storage_prefix /*prefix*/, storage_class /* storage_class */, + nspname, relname, storage_class /* storage_class */, multipart_chunksize, coords /* filename */, rel->rd_id /* reloid */, use_gpg_crypto, yproxy_socket); @@ -348,9 +346,7 @@ int offloadRelationSegment(Relation aorel, int segno, int64 modcount, !external_storage_path ? "" : std::string(external_storage_path); ReleaseSysCache(tp); - auto ioadv = std::make_shared(nspname, relname, - storage_host /* host */, storage_bucket /*bucket*/, - storage_prefix /*prefix*/, storage_class /* storage_class */, + auto ioadv = std::make_shared(nspname, relname, storage_class /* storage_class */, multipart_chunksize, coords, aorel->rd_id /* reloid */, use_gpg_crypto, yproxy_socket); try { @@ -448,9 +444,6 @@ int statRelationSpaceUsage(Relation aorel, int segno, int64 modcount, auto ioadv = std::make_shared( nspname, std::string(aorel->rd_rel->relname.data), - std::string(storage_host /*host*/), - std::string(storage_bucket /*bucket*/), - std::string(storage_prefix /*prefix*/), std::string(storage_class /*storage_class*/), multipart_chunksize, coords /* coords */, aorel->rd_id /* reloid */, use_gpg_crypto, yproxy_socket); @@ -512,9 +505,6 @@ int statRelationSpaceUsagePerExternalChunk(Relation aorel, int segno, auto ioadv = std::make_shared( nspname, std::string(aorel->rd_rel->relname.data), - std::string(storage_host /*host*/), - std::string(storage_bucket /*bucket*/), - std::string(storage_prefix /*prefix*/), std::string(storage_class /*storage_class*/), multipart_chunksize, coords /* coords */, aorel->rd_id /* reloid */, use_gpg_crypto, yproxy_socket); diff --git a/src/url.cpp b/src/url.cpp index f1bfc9f..fffb855 100644 --- a/src/url.cpp +++ b/src/url.cpp @@ -77,8 +77,6 @@ std::string craftStorageUnPrefixedPath(const std::shared_ptr &adv, std::string getYezzeyRelationUrl_internal(const std::string &nspname, const std::string &relname, - const std::string &external_storage_prefix, relnodeCoord coords, int32_t segid) { - return external_storage_prefix + - yezzey_block_file_path(nspname, relname, coords, segid); + return yezzey_block_file_path(nspname, relname, coords, segid); } diff --git a/src/util.cpp b/src/util.cpp index 680c39f..50070ed 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -23,23 +23,6 @@ const char *baseYezzeyPath = "/basebackups_005/yezzey/"; -std::string getYezzeyExtrenalStorageBucket(const char *host, - const char *bucket) { - std::string url = "s3://"; - std::string toErase = "https://"; - std::string hostStr = host; - size_t pos = hostStr.find(toErase); - if (pos != std::string::npos) { - hostStr.erase(pos, toErase.length()); - } - url += hostStr; - url += "/"; - url += bucket; - url += "/"; - - return url; -} - std::string storage_url_add_options(const std::string &s3path, const char *config_path) { auto ret = s3path; @@ -87,18 +70,16 @@ relnodeCoord getRelnodeCoordinate(Oid spcNode, const std::string &fileName) { } void getYezzeyExternalStoragePathByCoords(const char *nspname, - const char *relname, const char *host, - const char *bucket, - const char *storage_prefix, + const char *relname, Oid spcNode, Oid dbNode, Oid relNode, int32_t segblockno /* segment no*/, int32_t segid, char **dest) { /* FIXME: Support for non-default table space? */ auto coords = relnodeCoord(spcNode, dbNode, relNode, segblockno); - auto prefix = getYezzeyRelationUrl_internal(nspname, relname, storage_prefix, + auto prefix = getYezzeyRelationUrl_internal(nspname, relname, coords, segid); - auto path = getYezzeyExtrenalStorageBucket(host, bucket) + prefix; + auto path = prefix; *dest = (char *)malloc(sizeof(char) * path.size()); strcpy(*dest, path.c_str()); diff --git a/src/virtual_index.cpp b/src/virtual_index.cpp index 6d43bcc..6a17f3c 100644 --- a/src/virtual_index.cpp +++ b/src/virtual_index.cpp @@ -48,7 +48,7 @@ yezzey_create_index_internal(Oid relid, const std::string &relname, relname.c_str() /* relname */, YEZZEY_AUX_NAMESPACE /* namespace */, 0 /* tablespace */, relid /* relid */, GetNewObjectId() /* reltype oid */, InvalidOid /* reloftypeid */, relowner /* owner */, - tupdesc /* rel tuple */, NIL, InvalidOid /* relam */, RELKIND_YEZZEYINDEX, + tupdesc /* rel tuple */, NIL, InvalidOid /* relam */, RELKIND_RELATION /*relkind*/, relpersistence, RELSTORAGE_HEAP, shared_relation, mapped_relation, true, 0, ONCOMMIT_NOOP, NULL /* GP Policy */, (Datum)0, false /* use_user_acl */, true, true, false /* valid_opts */, @@ -188,10 +188,6 @@ void emptyYezzeyIndexBlkno(Oid yezzey_index_oid, Oid reloid /* not used */, auto snap = RegisterSnapshot(GetTransactionSnapshot()); - // ScanKeyInit(&skey[0], Anum_yezzey_virtual_index_reloid, - // BTEqualStrategyNumber, - // F_OIDEQ, ObjectIdGetDatum(reloid)); - ScanKeyInit(&skey[0], Anum_yezzey_virtual_index_filenode, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relfilenode)); @@ -213,6 +209,61 @@ void emptyYezzeyIndexBlkno(Oid yezzey_index_oid, Oid reloid /* not used */, CommandCounterIncrement(); } /* end emptyYezzeyIndexBlkno */ + +void YezzeyFixupVirtualIndex_internal(Oid yezzey_index_oid, Relation relation) { + + HeapTuple tuple; + ScanKeyData skey[1]; + bool nulls[Natts_yezzey_virtual_index]; + Datum values[Natts_yezzey_virtual_index]; + + memset(nulls, 0, sizeof(nulls)); + memset(values, 0, sizeof(values)); + + auto rel = heap_open(yezzey_index_oid, RowExclusiveLock); + + auto snap = RegisterSnapshot(GetTransactionSnapshot()); + + ScanKeyInit(&skey[0], Anum_yezzey_virtual_index_filenode, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relation->rd_node.relNode)); + + auto desc = yezzey_beginscan(rel, snap, YezzeyVirtualIndexScanCols, skey); + + while (HeapTupleIsValid(tuple = heap_getnext(desc, ForwardScanDirection))) { + // update + auto meta = (Form_yezzey_virtual_index)GETSTRUCT(tuple); + + // Assert(meta->yrelfileoid == relfileoid); + if (meta->reloid == RelationGetRelid(relation)) { + continue; + } + + values[Anum_yezzey_virtual_index_reloid - 1] = + ObjectIdGetDatum(RelationGetRelid(relation)); + + auto yandxtuple = heap_form_tuple(RelationGetDescr(relation), values, nulls); + +#if IsGreenplum6 + simple_heap_update(relation, &tuple->t_self, yandxtuple); + CatalogUpdateIndexes(relation, yandxtuple); +#else + CatalogTupleUpdate(relation, &tuple->t_self, yandxtuple); +#endif + } + + yezzey_endscan(desc); + heap_close(rel, RowExclusiveLock); + + UnregisterSnapshot(snap); + + /* make changes visible*/ + CommandCounterIncrement(); +} + +void YezzeyFixupVirtualIndex(Relation relation) { + (void) YezzeyFixupVirtualIndex_internal(YezzeyFindAuxIndex(RelationGetRelid(relation)), relation); +} /* end YezzeyFixupVirtualIndex */ + void YezzeyVirtualIndexInsert(Oid yandexoid /*yezzey auxiliary index oid*/, Oid reloid, Oid relfilenodeOid, int64_t blkno, int64_t offset_start, int64_t offset_finish, diff --git a/src/worker.cpp b/src/worker.cpp deleted file mode 100644 index 3d23cb7..0000000 --- a/src/worker.cpp +++ /dev/null @@ -1,109 +0,0 @@ -/* - * file: src/worker.cpp - */ - -#include "worker.h" -#include "gucs.h" -#include "offload.h" -#include "offload_policy.h" - -#include "pg.h" -#include "yezzey_heap_api.h" - -void processPartitionOffload() { - SetCurrentStatementStartTimestamp(); - StartTransactionCommand(); - SPI_connect(); - PushActiveSnapshot(GetTransactionSnapshot()); - pgstat_report_activity(STATE_RUNNING, - "proccess relations to offload to yezzey"); - SetCurrentStatementStartTimestamp(); - - auto query = - "select yezzey_set_relation_expirity_seg(" - " parchildrelid," - " 1," - " yezzey_part_date_eval('select '::text || pg_get_expr(parrangeend, " - "0))::timestamp" - " ) from pg_partition_rule WHERE yezzey_check_part_exr(parrangeend);"; - - auto ret = SPI_execute(query, true, 1); - if (ret != SPI_OK_SELECT) { - elog(ERROR, "Error while processing partitions"); - } - - auto tupdesc = SPI_tuptable->tupdesc; - auto tuptable = SPI_tuptable; - auto cnt = SPI_processed; - - SPI_finish(); - PopActiveSnapshot(); - CommitTransactionCommand(); - pgstat_report_activity(STATE_IDLE, NULL); - pgstat_report_stat(true); -} - -/* - * processOffloadedRelations: - * check all expired relations and move then to - * external storage - */ -void processOffloadedRelations() { - /* - * Connect to the selected database - * - * Note: if we have selected a just-deleted database (due to using - * stale stats info), we'll fail and exit here. - */ - - /* And do an appropriate amount of work */ - StartTransactionCommand(); - (void)GetTransactionSnapshot(); - - HeapTuple tup; - - ScanKeyData skey[1]; - - auto offrel = heap_open(YEZZEY_OFFLOAD_POLICY_RELATION, AccessShareLock); - - /* - * We cannot open yezzey offloaded relations metatable - * in scheme 'yezzey'. This is possible when - * this code executes on bgworker, which - * executes on shared library loading, BEFORE - * CREATE EXTENSION command was done. - */ - if (!RelationIsValid(offrel)) { - return; - } - auto snap = RegisterSnapshot(GetTransactionSnapshot()); - - { - ScanKeyInit(&skey[0], Anum_offload_metadata_relext_time, - BTLessEqualStrategyNumber, F_TIMESTAMP_LT, - TimestampGetDatum(GetCurrentTimestamp())); - - auto desc = yezzey_beginscan(offrel, snap, 1, skey); - - while (HeapTupleIsValid(tup = heap_getnext(desc, ForwardScanDirection))) { - auto meta = (Form_yezzey_offload_metadata)GETSTRUCT(tup); - - /** - * @brief traverse pg_aoseg.pg_aoseg_ relation - * and try to lock each segment. If there is any transaction - * in progress accessing some segment, lock attemt will fail, - * because transactions holds locks on pg_aoseg.pg_aoseg_ rows - * and we are goind to lock row in X mode - */ - elog(INFO, "process expired relation (oid=%d)", meta->reloid); - YezzeyDefineOffloadPolicy(meta->reloid); - } - - yezzey_endscan(desc); - } - - UnregisterSnapshot(snap); - heap_close(offrel, AccessShareLock); - - CommitTransactionCommand(); -} \ No newline at end of file diff --git a/src/xvacuum.cpp b/src/xvacuum.cpp index 72bc1fa..15ffbb8 100644 --- a/src/xvacuum.cpp +++ b/src/xvacuum.cpp @@ -19,10 +19,7 @@ int yezzey_delete_chunk_internal(const char *external_chunk_path) { try { auto ioadv = std::make_shared( - "", "", std::string(storage_host /*host*/), - std::string(storage_bucket /*bucket*/), - std::string(storage_prefix /*prefix*/), - std::string(storage_class /*storage_class*/), multipart_chunksize, + "", "", std::string(storage_class /*storage_class*/), multipart_chunksize, DEFAULTTABLESPACE_OID, "" /* coords */, InvalidOid /* reloid */, use_gpg_crypto, yproxy_socket); @@ -52,9 +49,7 @@ int yezzey_delete_chunk_internal(const char *external_chunk_path) { int yezzey_vacuum_garbage_internal(int segindx, bool confirm, bool crazyDrop) { try { auto ioadv = std::make_shared( - "", "", std::string(storage_host /*host*/), - std::string(storage_bucket /*bucket*/), - std::string(storage_prefix /*prefix*/), + "", "", std::string(storage_class /*storage_class*/), multipart_chunksize, DEFAULTTABLESPACE_OID, "" /* coords */, InvalidOid /* reloid */, use_gpg_crypto, yproxy_socket); @@ -77,14 +72,9 @@ int yezzey_vacuum_garbage_internal(int segindx, bool confirm, bool crazyDrop) { } int yezzey_vacuum_garbage_relation_internal(Relation rel,int segindx, bool confirm,bool crazyDrop){ try { - auto ioadv = std::make_shared( - std::string(gpg_engine_path), std::string(gpg_key_id), - std::string(storage_config), "", "", std::string(storage_host /*host*/), - std::string(storage_bucket /*bucket*/), - std::string(storage_prefix /*prefix*/), + auto ioadv = std::make_shared("", "", std::string(storage_class /*storage_class*/), multipart_chunksize, DEFAULTTABLESPACE_OID, "" /* coords */, InvalidOid /* reloid */, - std::string(walg_bin_path), std::string(walg_config_path), use_gpg_crypto, yproxy_socket); auto tp = SearchSysCache1(NAMESPACEOID, diff --git a/src/yezzey_expire.cpp b/src/yezzey_expire.cpp deleted file mode 100644 index 6a3dec2..0000000 --- a/src/yezzey_expire.cpp +++ /dev/null @@ -1,346 +0,0 @@ -/* - * - * file: src/yezzey_expire.cpp - */ - -#include "yezzey_expire.h" -#include "offload_policy.h" -#include "string" -#include "url.h" -#include "util.h" -#include "yezzey_heap_api.h" - -#include "ygpver.h" -#include "gucs.h" - -void YezzeyRecordRelationExpireLsn(Relation rel) { - if (Gp_role != GP_ROLE_EXECUTE) { - return; - } - /* check that relation is yezzey-related. */ - if (rel->rd_node.spcNode != YEZZEYTABLESPACE_OID) { - /* noop */ - return; - } - - auto tp = SearchSysCache1(NAMESPACEOID, - ObjectIdGetDatum(RelationGetNamespace(rel))); - - if (!HeapTupleIsValid(tp)) { - elog(ERROR, "yezzey: failed to get namescape name of relation %d", - RelationGetNamespace(rel)); - } - - Form_pg_namespace nsptup = (Form_pg_namespace)GETSTRUCT(tp); - auto namespaceName = std::string(nsptup->nspname.data); - auto md = yezzey_fqrelname_md5(namespaceName, RelationGetRelationName(rel)); - - ReleaseSysCache(tp); - - /* - * It it important that we drop all chunks related to - * relation filenode, not oid, since owner of particular - * relifilenode may may with time thought operations - * like vacuum, expand (the main case) etc... - */ - /* clear yezzey index. */ - emptyYezzeyIndex(YezzeyFindAuxIndex(RelationGetRelid(rel)), - rel->rd_node.relNode); - -#define YezzeySetExpireIndexCols 1 - ScanKeyData skey[YezzeySetExpireIndexCols]; - - bool nulls[Natts_yezzey_expire_index]; - Datum values[Natts_yezzey_expire_index]; - - memset(nulls, 0, sizeof(nulls)); - memset(values, 0, sizeof(values)); - - /* upsert yezzey expire index */ - auto yexprel = heap_open(YEZZEY_EXPIRE_INDEX_RELATION, RowExclusiveLock); - - auto reloid = RelationGetRelid(rel); - auto relfileoid = rel->rd_node.relNode; - - auto lsn = yezzeyGetXStorageInsertLsn(); - - ScanKeyInit(&skey[0], Anum_yezzey_expire_index_reloid, BTEqualStrategyNumber, - F_OIDEQ, ObjectIdGetDatum(reloid)); - - auto snap = RegisterSnapshot(GetTransactionSnapshot()); - - auto desc = yezzey_beginscan(yexprel, snap, YezzeySetExpireIndexCols, skey); - - HeapTuple tuple; - - values[Anum_yezzey_expire_index_reloid - 1] = Int64GetDatum(reloid); - - values[Anum_yezzey_expire_index_lsn - 1] = LSNGetDatum(lsn); - - while (HeapTupleIsValid(tuple = heap_getnext(desc, ForwardScanDirection))) { - // update - auto meta = (Form_yezzey_expire_index)GETSTRUCT(tuple); - - // Assert(meta->yrelfileoid == relfileoid); - - values[Anum_yezzey_expire_index_relfileoid - 1] = - ObjectIdGetDatum(meta->yrelfileoid); - - values[Anum_yezzey_expire_index_last_use_lsn - 1] = - LSNGetDatum(meta->last_use_lsn); - - values[Anum_yezzey_expire_index_fqnmd5 - 1] = - PointerGetDatum(cstring_to_text(md.c_str())); - - auto yandxtuple = heap_form_tuple(RelationGetDescr(yexprel), values, nulls); - -#if IsGreenplum6 - simple_heap_update(yexprel, &tuple->t_self, yandxtuple); - CatalogUpdateIndexes(yexprel, yandxtuple); -#else - CatalogTupleUpdate(yexprel, &tuple->t_self, yandxtuple); -#endif - - heap_freetuple(yandxtuple); - } - - yezzey_endscan(desc); - heap_close(yexprel, RowExclusiveLock); - - UnregisterSnapshot(snap); - - /* make changes visible*/ - CommandCounterIncrement(); -} // YezzeyRecordRelationExpireLsn - -const std::string yezzey_expire_index_relname = "yezzey_expire_index"; -const std::string yezzey_expire_index_indx_relname = "yezzey_expire_index_indx"; - -void YezzeyCreateRelationExpireIndex(void) { - { /* check existed, if no, return */ - } - TupleDesc tupdesc; - - ObjectAddress baseobject; - ObjectAddress yezzey_ao_auxiliaryobject; - -#if IsGreenplum6 - tupdesc = CreateTemplateTupleDesc(Natts_yezzey_expire_index, false); -#else - tupdesc = CreateTemplateTupleDesc(Natts_yezzey_expire_index); -#endif - - TupleDescInitEntry(tupdesc, (AttrNumber)Anum_yezzey_expire_index_reloid, - "reloid", OIDOID, -1, 0); - - TupleDescInitEntry(tupdesc, (AttrNumber)Anum_yezzey_expire_index_relfileoid, - "relfileoid", OIDOID, -1, 0); - - TupleDescInitEntry(tupdesc, (AttrNumber)Anum_yezzey_expire_index_fqnmd5, - "fqnmd5", TEXTOID, -1, 0); - - TupleDescInitEntry(tupdesc, (AttrNumber)Anum_yezzey_expire_index_last_use_lsn, - "last_use_lsn", LSNOID, -1, 0); - - TupleDescInitEntry(tupdesc, (AttrNumber)Anum_yezzey_expire_index_lsn, - "expire_lsn", LSNOID, -1, 0); - -#if IsGreenplum6 - (void)heap_create_with_catalog( - yezzey_expire_index_relname.c_str() /* relname */, - YEZZEY_AUX_NAMESPACE /* namespace */, 0 /* tablespace */, - YEZZEY_EXPIRE_INDEX_RELATION /* relid */, - GetNewObjectId() /* reltype oid */, InvalidOid /* reloftypeid */, - GetUserId() /* owner */, tupdesc /* rel tuple */, NIL, - InvalidOid /* relam */, RELKIND_RELATION, RELPERSISTENCE_PERMANENT, - RELSTORAGE_HEAP, false, false, true, 0, ONCOMMIT_NOOP, - NULL /* GP Policy */, (Datum)0, false /* use_user_acl */, true, true, - false /* valid_opts */, false /* is_part_child */, - false /* is part parent */, NULL); - -#else - (void)heap_create_with_catalog( - yezzey_expire_index_relname.c_str() /* relname */, - YEZZEY_AUX_NAMESPACE /* namespace */, 0 /* tablespace */, - YEZZEY_EXPIRE_INDEX_RELATION /* relid */, - GetNewObjectId() /* reltype oid */, InvalidOid /* reloftypeid */, - GetUserId() /* owner */, HEAP_TABLE_AM_OID /* access method*/, - tupdesc /* rel tuple */, NIL, RELKIND_RELATION /*relkind*/, - RELPERSISTENCE_PERMANENT, false /*shared*/, false /*mapped*/, - ONCOMMIT_NOOP, NULL /* GP Policy */, (Datum)0, false /* use_user_acl */, - true, true, InvalidOid /*relrewrite*/, NULL, false /* valid_opts */); -#endif - - /* Make this table visible, else yezzey virtual index creation will fail */ - CommandCounterIncrement(); - - /* ShareLock is not really needed here, but take it anyway */ - auto yezzey_rel = heap_open(YEZZEY_EXPIRE_INDEX_RELATION, ShareLock); - char *col1 = "reloid"; - char *col2 = "relfileoid"; - auto indexColNames = list_make2(col1, col2); - - auto indexInfo = makeNode(IndexInfo); - - Oid collationObjectId[2]; - Oid classObjectId[2]; - int16 coloptions[2]; - - indexInfo->ii_NumIndexAttrs = 2; -#if IsGreenplum6 - indexInfo->ii_KeyAttrNumbers[0] = Anum_yezzey_expire_index_reloid; - indexInfo->ii_KeyAttrNumbers[1] = Anum_yezzey_expire_index_relfileoid; -#else - indexInfo->ii_IndexAttrNumbers[0] = Anum_yezzey_expire_index_reloid; - indexInfo->ii_IndexAttrNumbers[1] = Anum_yezzey_expire_index_relfileoid; - indexInfo->ii_NumIndexKeyAttrs = indexInfo->ii_NumIndexAttrs; -#endif - - indexInfo->ii_Expressions = NIL; - indexInfo->ii_ExpressionsState = NIL; - indexInfo->ii_Predicate = NIL; -#if IsGreenplum6 - indexInfo->ii_PredicateState = NIL; -#else - indexInfo->ii_PredicateState = NULL; -#endif - indexInfo->ii_ExclusionOps = NULL; - indexInfo->ii_ExclusionProcs = NULL; - indexInfo->ii_ExclusionStrats = NULL; - indexInfo->ii_Unique = true; - indexInfo->ii_ReadyForInserts = true; - indexInfo->ii_Concurrent = false; - indexInfo->ii_BrokenHotChain = false; - - collationObjectId[0] = InvalidOid; - classObjectId[0] = OID_BTREE_OPS_OID; - coloptions[0] = 0; - - collationObjectId[1] = InvalidOid; - classObjectId[1] = OID_BTREE_OPS_OID; - coloptions[1] = 0; - -#if IsGreenplum6 - (void)index_create(yezzey_rel, yezzey_expire_index_indx_relname.c_str(), - YEZZEY_EXPIRE_INDEX_RELATION_INDX, InvalidOid, InvalidOid, - InvalidOid, indexInfo, indexColNames, BTREE_AM_OID, - 0 /* tablespace */, collationObjectId, classObjectId, - coloptions, (Datum)0, true, false, false, false, true, - false, false, true, NULL); -#else - - bits16 flags, constr_flags; - flags = constr_flags = 0; - (void)index_create(yezzey_rel, yezzey_expire_index_indx_relname.c_str(), - YEZZEY_EXPIRE_INDEX_RELATION_INDX, InvalidOid, InvalidOid, - InvalidOid, indexInfo, indexColNames, BTREE_AM_OID, - 0 /* tablespace */, collationObjectId, classObjectId, - coloptions, (Datum)0, flags, constr_flags, true, true, - NULL); -#endif - - /* Unlock target table -- no one can see it */ - heap_close(yezzey_rel, ShareLock); - - /* Unlock the index -- no one can see it anyway */ - UnlockRelationOid(YEZZEY_EXPIRE_INDEX_RELATION_INDX, AccessExclusiveLock); - - CommandCounterIncrement(); - - /* - * Register dependency from the auxiliary table to the master, so that the - * aoseg table will be deleted if the master is. - */ - baseobject.classId = ExtensionRelationId; - baseobject.objectId = get_extension_oid("yezzey", false); - baseobject.objectSubId = 0; - yezzey_ao_auxiliaryobject.classId = RelationRelationId; - yezzey_ao_auxiliaryobject.objectId = YEZZEY_EXPIRE_INDEX_RELATION; - yezzey_ao_auxiliaryobject.objectSubId = 0; - - recordDependencyOn(&yezzey_ao_auxiliaryobject, &baseobject, - DEPENDENCY_INTERNAL); - - /* - * Make changes visible - */ - CommandCounterIncrement(); -} - -#define YezzeyExpireIndexCols 3 - -void YezzeyUpsertLastUseLsn(Oid reloid, Oid relfileoid, const char *md5, - XLogRecPtr lsn) { - ScanKeyData skey[YezzeyExpireIndexCols]; - - bool nulls[Natts_yezzey_expire_index]; - Datum values[Natts_yezzey_expire_index]; - - memset(nulls, 0, sizeof(nulls)); - memset(values, 0, sizeof(values)); - - /* upsert yezzey expire index */ - auto rel = heap_open(YEZZEY_EXPIRE_INDEX_RELATION, RowExclusiveLock); - - ScanKeyInit(&skey[0], Anum_yezzey_expire_index_reloid, BTEqualStrategyNumber, - F_OIDEQ, ObjectIdGetDatum(reloid)); - - ScanKeyInit(&skey[1], Anum_yezzey_expire_index_relfileoid, - BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relfileoid)); - - ScanKeyInit(&skey[2], Anum_yezzey_expire_index_fqnmd5, BTEqualStrategyNumber, - F_TEXTEQ, PointerGetDatum(cstring_to_text(md5))); - - auto snap = RegisterSnapshot(GetTransactionSnapshot()); - - auto desc = yezzey_beginscan(rel, snap, YezzeyExpireIndexCols, skey); - - auto tuple = heap_getnext(desc, ForwardScanDirection); - - values[Anum_yezzey_expire_index_reloid - 1] = Int64GetDatum(reloid); - values[Anum_yezzey_expire_index_relfileoid - 1] = - ObjectIdGetDatum(relfileoid); - - values[Anum_yezzey_expire_index_last_use_lsn - 1] = LSNGetDatum(lsn); - values[Anum_yezzey_expire_index_lsn - 1] = LSNGetDatum(0); - values[Anum_yezzey_expire_index_fqnmd5 - 1] = - PointerGetDatum(cstring_to_text(md5)); - - if (HeapTupleIsValid(tuple)) { - // update - auto meta = (Form_yezzey_expire_index)GETSTRUCT(tuple); - - values[Anum_yezzey_expire_index_lsn - 1] = Int32GetDatum(meta->expire_lsn); - - auto yandxtuple = heap_form_tuple(RelationGetDescr(rel), values, nulls); - -#if IsGreenplum6 - simple_heap_update(rel, &tuple->t_self, yandxtuple); - CatalogUpdateIndexes(rel, yandxtuple); -#else - CatalogTupleUpdate(rel, &tuple->t_self, yandxtuple); -#endif - - heap_freetuple(yandxtuple); - } else { - // insert - auto yandxtuple = heap_form_tuple(RelationGetDescr(rel), values, nulls); - -#if IsGreenplum6 - simple_heap_insert(rel, yandxtuple); - CatalogUpdateIndexes(rel, yandxtuple); -#else - CatalogTupleInsert(rel, yandxtuple); -#endif - - heap_freetuple(yandxtuple); - } - - yezzey_endscan(desc); - heap_close(rel, RowExclusiveLock); - - UnregisterSnapshot(snap); - - /* make changes visible*/ - CommandCounterIncrement(); -} diff --git a/worker.c b/worker.c deleted file mode 100644 index 622e13e..0000000 --- a/worker.c +++ /dev/null @@ -1,636 +0,0 @@ - - -#include "postgres.h" - -#include -#include -#include -#include -#include -#include - -#include "miscadmin.h" -#include "utils/snapmgr.h" - -#include "worker.h" - -#if PG_VERSION_NUM >= 130000 -#include "postmaster/interrupt.h" -#endif - -#include "catalog/catalog.h" -#include "common/relpath.h" -#include "executor/spi.h" -#include "storage/ipc.h" -#include "storage/lwlock.h" -#include "storage/md.h" -#include "storage/shmem.h" -#include "storage/smgr.h" -#include "utils/builtins.h" - -#include "postmaster/bgworker.h" -#include "storage/dsm.h" - -#if PG_VERSION_NUM >= 100000 -#include "common/file_perm.h" -#else -#include "access/xact.h" -#endif - -#include "lib/stringinfo.h" -#include "pgstat.h" -#include "postmaster/bgworker.h" -#include "storage/latch.h" -#include "storage/proc.h" -#include "utils/guc.h" - -#include "utils/elog.h" - -#if IsGreenplum6 || IsModernYezzey -#include "cdb/cdbvars.h" -#endif - -#include "storage.h" -#include "yezzey.h" - -#include "yezzey_expire.h" -#include "offload_tablespace_map.h" - -#include "tcop/utility.h" -#include "xvacuum.h" -#include "cdb/cdbvars.h" - -#if PG_VERSION_NUM >= 100000 -void yezzey_main(Datum main_arg); -#else -static void yezzey_main(Datum arg); -#endif - -static volatile sig_atomic_t got_sigterm = false; -static volatile sig_atomic_t got_sighup = false; - -static int yezzey_naptime = 10000; -static char *worker_name = "yezzey"; - -/* Shared state information for yezzey bgworker. */ -typedef struct YezzeySharedState { - LWLock lock; /* mutual exclusion */ - pid_t bgworker_pid; /* for main bgworker */ - pid_t pid_using_dumpfile; /* for autoprewarm or block dump */ - - /* Following items are for communication with per-database worker */ - dsm_handle block_info_handle; - Oid database; - int yezzeystart_idx; - int yezzey_stop_idx; -} YezzeySharedState; - -static bool yezzey_init_shmem(void); -void yezzey_offload_databases(void); -void yezzey_ProcessConfigFile(void); -void yezzey_process_database(Datum main_arg); - -static void yezzey_define_gucs(); - -/* Pointer to shared-memory state. */ -static YezzeySharedState *yezzey_state = NULL; - -/* - * Allocate and initialize yezzey-related shared memory, if not already - * done, and set up backend-local pointer to that state. Returns true if an - * existing shared memory segment was found. - */ -static bool yezzey_init_shmem(void) { -#if PG_VERSION_NUM < 100000 - static LWLockTranche tranche; -#endif - bool found; - - LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - yezzey_state = ShmemInitStruct("yezzey", sizeof(YezzeySharedState), &found); - if (!found) { - LWLockInitialize(&yezzey_state->lock, LWLockNewTrancheId()); - yezzey_state->bgworker_pid = InvalidPid; - } - LWLockRelease(AddinShmemInitLock); - -#if PG_VERSION_NUM >= 100000 - LWLockRegisterTranche(yezzey_state->lock.tranche, "yezzey"); -#else - tranche.name = "yezzey"; - tranche.array_base = &yezzey_state->lock; - tranche.array_stride = sizeof(LWLock); - LWLockRegisterTranche(yezzey_state->lock.tranche, &tranche); - -#endif - - return found; -} - -// options for yezzey logging -static const struct config_enum_entry loglevel_options[] = { - {"debug5", DEBUG5, false}, {"debug4", DEBUG4, false}, - {"debug3", DEBUG3, false}, {"debug2", DEBUG2, false}, - {"debug1", DEBUG1, false}, {"debug", DEBUG2, true}, - {"info", INFO, false}, {"notice", NOTICE, false}, - {"warning", WARNING, false}, {"error", ERROR, false}, - {"log", LOG, false}, {"fatal", FATAL, false}, - {"panic", PANIC, false}, {NULL, 0, false}}; - -void yezzey_prepare(void) { - SetCurrentStatementStartTimestamp(); - StartTransactionCommand(); - SPI_connect(); - PushActiveSnapshot(GetTransactionSnapshot()); - pgstat_report_activity(STATE_RUNNING, - "proccess relations to offload to yezzey"); - SetCurrentStatementStartTimestamp(); -} - -void yezzey_finish(void) { - SPI_finish(); - PopActiveSnapshot(); - CommitTransactionCommand(); - pgstat_report_activity(STATE_IDLE, NULL); - pgstat_report_stat(true); -} - -static void yezzey_sigterm(SIGNAL_ARGS) { - int save_errno = errno; - got_sigterm = true; - if (MyProc) - SetLatch(&MyProc->procLatch); - errno = save_errno; -} - -static void yezzey_sighup(SIGNAL_ARGS) { - int save_errno = errno; - got_sighup = true; - if (MyProc) - SetLatch(&MyProc->procLatch); - errno = save_errno; -} - -/* TODO: make this guc*/ -const int processTableLimit = 10; - -void yezzey_process_database(Datum main_arg) { - Oid dboid; - char dbname[NAMEDATALEN]; - - dboid = DatumGetObjectId(main_arg); - - /* Establish signal handlers; once that's done, unblock signals. */ - // pqsignal(SIGTERM, die); - BackgroundWorkerUnblockSignals(); -#if IsGreenplum6 - Gp_session_role = GP_ROLE_UTILITY; -#endif - Gp_role = GP_ROLE_UTILITY; - InitPostgres(NULL, dboid, NULL, InvalidOid, dbname, true); - SetProcessingMode(NormalProcessing); -#if IsGreenplum6 - set_ps_display(dbname, false); -#else - set_ps_display(dbname); -#endif - ereport(LOG, (errmsg("yezzey bgworker: processing database \"%s\"", dbname))); - - - if (Gp_role == GP_ROLE_DISPATCH) { - (void)processOffloadedRelations(); - (void)processPartitionOffload(); - } -} - -/* - * Start yezzey per-database worker process. - */ -static void yezzey_start_database_worker(Oid dboid) { - BackgroundWorker worker; - BackgroundWorkerHandle *handle; - elog(yezzey_log_level, "[YEZZEY_SMGR] setting up bgworker"); - - memset(&worker, 0, sizeof(worker)); - worker.bgw_flags = - BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - worker.bgw_start_time = BgWorkerStart_ConsistentState; - worker.bgw_restart_time = BGW_NEVER_RESTART; - -#if PG_VERSION_NUM < 100000 - worker.bgw_main = yezzey_process_database; - worker.bgw_main_arg = ObjectIdGetDatum(dboid); -#endif - - snprintf(worker.bgw_name, BGW_MAXLEN, "%s-oflload worker for %d", worker_name, - dboid); -#if PG_VERSION_NUM >= 110000 - snprintf(worker.bgw_type, BGW_MAXLEN, "yezzey"); -#endif - -#if PG_VERSION_NUM >= 100000 - sprintf(worker.bgw_library_name, "yezzey"); - sprintf(worker.bgw_function_name, "yezzey_process_database"); -#endif - -#if PG_VERSION_NUM >= 90400 - worker.bgw_notify_pid = 0; -#endif - - RegisterDynamicBackgroundWorker(&worker, &handle); - - elog(yezzey_log_level, "[YEZZEY_SMGR] started datbase worker"); -} - -void yezzey_offload_databases() { - SPITupleTable *tuptable; - HeapTuple tuple; - char *dbOidRaw; - Oid dbOid; - TupleDesc tupdesc; - long unsigned int cnt; - int ret; - - StringInfoData buf; - - /* start transaction */ - yezzey_prepare(); - /* - */ - initStringInfo(&buf); - appendStringInfo(&buf, "" - "SELECT oid FROM pg_database WHERE datallowconn" - " ORDER BY random() LIMIT 1;"); - - pgstat_report_activity(STATE_RUNNING, buf.data); - ret = SPI_execute(buf.data, true, 1); - if (ret != SPI_OK_SELECT) { - elog(ERROR, "Error while selecting database list"); - } - - tupdesc = SPI_tuptable->tupdesc; - tuptable = SPI_tuptable; - cnt = SPI_processed; - - /** - * @brief traverse pg_aoseg.pg_aoseg_ relation - * and try to lock each segment. If there is any transaction - * in progress accessing some segment, lock attemt will fail, - * because transactions holds locks on pg_aoseg.pg_aoseg_ rows - * and we are goind to lock row in X mode - */ - tuple = tuptable->vals[0]; - dbOidRaw = SPI_getvalue(tuple, tupdesc, 1); - /* - * Convert to desired data type - */ - dbOid = strtoll(dbOidRaw, NULL, 10); - /* comlete transaction */ - yezzey_finish(); - - yezzey_start_database_worker(dbOid); -} - -void yezzey_ProcessConfigFile() {} - -static void yezzey_detach_shmem(int code, Datum arg); -/* - * Clear our PID from autoprewarm shared state. - */ -static void yezzey_detach_shmem(int code, Datum arg) { - LWLockAcquire(&yezzey_state->lock, LW_EXCLUSIVE); - if (yezzey_state->bgworker_pid == MyProcPid) - yezzey_state->bgworker_pid = InvalidPid; - LWLockRelease(&yezzey_state->lock); -} - -// Launcher worker -#if PG_VERSION_NUM < 100000 -static void -#else -void -#endif -yezzey_main(Datum main_arg) { - bool first_time; - - pqsignal(SIGHUP, yezzey_sighup); - pqsignal(SIGTERM, yezzey_sigterm); - - BackgroundWorkerUnblockSignals(); - - /* Create (if necessary) and attach to our shared memory area. */ - if (yezzey_init_shmem()) - first_time = false; - - /* Set on-detach hook so that our PID will be cleared on exit. */ - on_shmem_exit(yezzey_detach_shmem, 0); - - /* - * Store our PID in the shared memory area --- unless there's already - * another worker running, in which case just exit. - */ - LWLockAcquire(&yezzey_state->lock, LW_EXCLUSIVE); - if (yezzey_state->bgworker_pid != InvalidPid) { - LWLockRelease(&yezzey_state->lock); - ereport(LOG, (errmsg("yezzey worker is already running under PID %lu", - (unsigned long)yezzey_state->bgworker_pid))); - return; - } - yezzey_state->bgworker_pid = MyProcPid; - LWLockRelease(&yezzey_state->lock); - -#if IsGreenplum6 || IsModernYezzey - if (IS_QUERY_DISPATCHER()) { - return; - } -#endif - /* run only on query executers */ - - /* without this, we will fail out attempts - * to modify tables on segments (sql without QD) - */ - Gp_role = GP_ROLE_UTILITY; -#if IsGreenplum6 - Gp_session_role = GP_ROLE_UTILITY; -#endif - - /* - * acquire connection to database, - * otherwise we will not be able to run queries. - * yezzey axillary database will contain tables wirth - * relnodes meta-information, which can be used for - * backgroup relation offloading to external storage. - */ -#if PG_VERSION_NUM < 110000 - BackgroundWorkerInitializeConnection("postgres", NULL); -#else - BackgroundWorkerInitializeConnection("postgres", NULL, 0); -#endif - - while (!got_sigterm) { - int rc; - rc = WaitLatch(&MyProc->procLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, -#if PG_VERSION_NUM < 100000 - yezzey_naptime -#else - yezzey_naptime, PG_WAIT_EXTENSION -#endif - ); - - ResetLatch(&MyProc->procLatch); - - if (rc & WL_POSTMASTER_DEATH) - proc_exit(1); - - if (got_sighup) { - yezzey_ProcessConfigFile(); - got_sighup = false; - ereport(LOG, (errmsg("bgworker yezzey signal: processed SIGHUP"))); - } - - if (got_sigterm) { - ereport(LOG, (errmsg("bgworker yezzey signal: processed SIGTERM"))); - proc_exit(0); - } - - elog(yezzey_log_level, - "[YEZZEY_SMGR_BG] start to process offload databases"); - yezzey_offload_databases(); - } -} - -/* - * Start yezzey primary worker process. - */ -static void yezzey_start_launcher_worker(void) { - BackgroundWorker worker; - - MemSet(&worker, 0, sizeof(BackgroundWorker)); - worker.bgw_flags = - BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - /* to start */ - worker.bgw_start_time = BgWorkerStart_ConsistentState; - worker.bgw_restart_time = 10; - -#if PG_VERSION_NUM < 100000 - worker.bgw_main = yezzey_main; -#else - sprintf(worker.bgw_library_name, "yezzey"); - sprintf(worker.bgw_function_name, "yezzey_main"); -#endif - - snprintf(worker.bgw_name, BGW_MAXLEN, "%s", worker_name); -#if PG_VERSION_NUM >= 110000 - snprintf(worker.bgw_type, BGW_MAXLEN, "yezzey"); -#endif - -#if PG_VERSION_NUM >= 90400 - worker.bgw_notify_pid = 0; -#endif - - /* must set notify PID to wait for startup */ - // worker.bgw_notify_pid = MyProcPid; - - if (Gp_role == GP_ROLE_DISPATCH) { - /* run on master */ - RegisterBackgroundWorker(&worker); - } else { - /* - do we need to run bgworker on gp execute? - if (!RegisterDynamicBackgroundWorker(&worker, &handle)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_RESOURCES), - errmsg("could not register background process"), - errhint("You may need to increase max_worker_processes."))); - - status = WaitForBackgroundWorkerStartup(handle, &pid); - if (status != BGWH_STARTED) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_RESOURCES), - errmsg("could not start background process"), - errhint("More details may be available in the server log."))); - */ - } -} - - - -/* - * Hook ProcessUtility to do external storage vacuum - */ -static void -yezzey_ProcessUtility_hook(Node *parsetree, - const char *queryString, - ProcessUtilityContext context, - ParamListInfo params, - DestReceiver *dest, - char *completionTag) { - - switch (nodeTag(parsetree)) - { - /* - * ******************** yezzey vacuum ******************** - */ - break; - case T_VacuumStmt: -#if IsGreenplum6 - { - VacuumStmt *stmt = (VacuumStmt *) parsetree; - if(!stmt->relation){ - break; - } - Relation rel = relation_openrv(stmt->relation,NoLock); - if (stmt->options & VACOPT_YEZZEY & (rel->rd_node.spcNode == YEZZEYTABLESPACE_OID)) { - if (Gp_role == GP_ROLE_EXECUTE) { - Assert(GpIdentity.segindex != -1); - yezzey_vacuum_garbage_relation_internal(rel, GpIdentity.segindex,true,false); - } - } - relation_close(rel,NoLock); - } -#endif - break; - default: - break; - } - return standard_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); -} - -static void yezzey_ExecuterEndHook(QueryDesc *queryDesc) { - (void) standard_ExecutorEnd(queryDesc); - - YezzeyTruncateOTMHint(); -} - - -static void yezzey_ExecuterStartHook(QueryDesc *queryDesc, int eflags) { - (void) standard_ExecutorStart(queryDesc, eflags); - - typedef struct - { - DestReceiver pub; /* publicly-known function pointers */ - IntoClause *into; /* target relation specification */ - /* These fields are filled by intorel_startup: */ - Relation rel; /* relation to write to */ - CommandId output_cid; /* cmin to insert in output tuples */ - int hi_options; /* heap_insert performance options */ - BulkInsertState bistate; /* bulk insert state */ - - struct AppendOnlyInsertDescData *ao_insertDesc; /* descriptor to AO tables */ - struct AOCSInsertDescData *aocs_insertDes; /* descriptor for aocs */ - } DR_intorel; - - - IntoClause *iclause; - Oid sourceOid; - - if (queryDesc->plannedstmt->intoClause != NULL) { - iclause = queryDesc->plannedstmt->intoClause; - if (strcmp(iclause->tableSpaceName, "yezzey(cloud-storage)") == 0) { - if (queryDesc->plannedstmt->relationOids->length != 1) { - elog(ERROR, "unexpected plan relation size for yezzey alter: %d", queryDesc->plannedstmt->relationOids->length); - } - sourceOid = lfirst(queryDesc->plannedstmt->relationOids->head); - /* so, target relation is yezzey. This should be expand or alter table reorg; */ - YezzeyCopyOTM(iclause->rel, sourceOid); - } - } -} - - -/* GUC variables. */ -static bool yezzey_autooffload = true; /* start yezzey worker? */ - -static void yezzey_define_gucs() { - - DefineCustomStringVariable("yezzey.storage_prefix", "segment name prefix", - NULL, &storage_prefix, "", PGC_SUSET, 0, NULL, - NULL, NULL); - - DefineCustomStringVariable("yezzey.storage_bucket", "external storage bucket", - NULL, &storage_bucket, "", PGC_SUSET, 0, NULL, - NULL, NULL); - - DefineCustomStringVariable("yezzey.backup_bucket", "external storage backup bucket", - NULL, &backup_bucket, "", PGC_SUSET, 0, NULL, - NULL, NULL); - - DefineCustomStringVariable("yezzey.storage_config", - "Storage config path for yezzey external storage.", - NULL, &storage_config, "", PGC_SUSET, 0, NULL, - NULL, NULL); - - DefineCustomStringVariable("yezzey.storage_host", "external storage host", - NULL, &storage_host, "", PGC_SUSET, 0, NULL, NULL, - NULL); - - - DefineCustomStringVariable("yezzey.storage_class", "external storage default storage class", - NULL, &storage_class, "STANDARD", PGC_SUSET, 0, NULL, NULL, - NULL); - - DefineCustomIntVariable("yezzey.multipart_chunksize", "external storage default multipart chunksize", - NULL, &multipart_chunksize, 16*1024*1024, 0, INT_MAX, PGC_SUSET, 0, NULL, NULL, NULL); - - DefineCustomIntVariable("yezzey.multipart_threshold", "external storage default multipart threshold", - NULL, &multipart_threshold, 64*1024*1024, 0, INT_MAX, PGC_SUSET, 0, NULL, NULL, NULL); - - DefineCustomBoolVariable("yezzey.use_gpg_crypto", "use gpg crypto", NULL, - &use_gpg_crypto, true, PGC_SUSET, 0, NULL, NULL, - NULL); - - DefineCustomBoolVariable( - "yezzey.autooffload", "enable auto-offloading worker", NULL, - &yezzey_autooffload, false, PGC_USERSET, 0, NULL, NULL, NULL); - - DefineCustomEnumVariable("yezzey.log_level", - "Log level for yezzey functions.", NULL, - &yezzey_log_level, DEBUG1, loglevel_options, - PGC_SUSET, 0, NULL, NULL, NULL); - - DefineCustomEnumVariable("yezzey.ao_log_level", - "Log level for yezzey functions.", NULL, - &yezzey_ao_log_level, DEBUG1, loglevel_options, - PGC_SUSET, 0, NULL, NULL, NULL); - - DefineCustomIntVariable( - "yezzey.oflload_worker_naptime", "Auto-offloading worker naptime", NULL, - &yezzey_naptime, 10000, 500, INT_MAX, PGC_SUSET, 0, NULL, NULL, NULL); - - DefineCustomStringVariable("yezzey.yproxy_socket", "wal-g config path", - NULL, &yproxy_socket, "/tmp/yproxy.sock", - PGC_SUSET, 0, NULL, NULL, NULL); -} - -void _PG_init(void) { - /* Allocate shared memory for yezzey workers */ - - /* Yezzey GUCS define */ - (void)yezzey_define_gucs(); - - RequestAddinShmemSpace(MAXALIGN(sizeof(YezzeySharedState))); - - elog(yezzey_log_level, "[YEZZEY_SMGR] setting up bgworker"); - - if (yezzey_autooffload && false /*temp disable*/) { - /* dispatch yezzey worker */ - yezzey_start_launcher_worker(); - } - - elog(yezzey_log_level, "[YEZZEY_SMGR] set hook"); - - smgr_hook = smgr_yezzey; -#if IsGreenplum6 - smgrao_hook = smgrao_yezzey; -#endif - smgr_init_hook = smgr_init_yezzey; - - /* set drop hook */ - RelationDropStorage_hook = YezzeyRecordRelationExpireLsn; - - ProcessUtility_hook = yezzey_ProcessUtility_hook; - - ExecutorStart_hook = yezzey_ExecuterStartHook; - ExecutorEnd_hook = yezzey_ExecuterEndHook; -} diff --git a/yezzey--1.8--1.8.1.sql b/yezzey--1.8--1.8.1.sql index c152260..28bc5c0 100644 --- a/yezzey--1.8--1.8.1.sql +++ b/yezzey--1.8--1.8.1.sql @@ -1,4 +1,5 @@ +-- New utilities & functions CREATE OR REPLACE FUNCTION yezzey_vacuum_garbage( confirm BOOLEAN DEFAULT FALSE, crazyDrop BOOLEAN DEFAULT FALSE @@ -56,4 +57,68 @@ BEGIN PERFORM yezzey_vacuum_garbage_relation('public', i_offload_relname, confirm, crazyDrop); END; $$ -LANGUAGE PLPGSQL; \ No newline at end of file +LANGUAGE PLPGSQL; + + +-- metadata migration + +DROP TABLE IF EXISTS yezzey.yezzey_expire_index; + +-- will preserve NULL distrib policy +CREATE TABLE yezzey.yezzey_virtual_index_stale AS select * from yezzey.yezzey_virtual_index limit 0; +CREATE TABLE yezzey.offload_metadata_stale AS select * from yezzey.offload_metadata limit 0; + +CREATE OR REPLACE FUNCTION +yezzey.fixup_stale_data() RETURNS VOID +AS +$$ + WITH stale_data AS ( + SELECT * FROM + yezzey.yezzey_virtual_index vi + WHERE NOT EXISTS (SELECT 1 FROM pg_class WHERE relfilenode = vi.filenode) + ) + INSERT INTO yezzey.yezzey_virtual_index_stale TABLE stale_data; + + DELETE FROM + yezzey.yezzey_virtual_index vi + WHERE NOT EXISTS (SELECT 1 FROM pg_class WHERE relfilenode = vi.filenode); + + WITH stale_offload_data AS ( + SELECT * FROM + yezzey.offload_metadata op + WHERE NOT EXISTS (SELECT 1 FROM pg_class WHERE oid = op.reloid) + ) + INSERT INTO yezzey.offload_metadata_stale TABLE stale_offload_data; + + DELETE FROM + yezzey.offload_metadata op + WHERE NOT EXISTS (SELECT 1 FROM pg_class WHERE oid = op.reloid); + +$$ LANGUAGE SQL +EXECUTE ON ALL SEGMENTS; + +create function yezzey.yezzey_fixup_yvi() +returns void as +$$ +update pg_class set relkind = 'r' where oid = 8500; +$$ +language sql +execute on all segments; + +CREATE TABLE yezzey.yezzey_expire_hint +( + x_path TEXT PRIMARY KEY, + lsn pg_lsn +) with (appendonly=false); + + +set allow_segment_dml to on; + +SELECT yezzey.fixup_stale_data(); +SELECT yezzey.yezzey_fixup_yvi(); + +RESET allow_segment_DML; + +CREATE INDEX yezzey_virtual_index_x_path ON yezzey.yezzey_virtual_index(x_path); + + diff --git a/yezzey.c b/yezzey.c index a5d9c87..f2bc4b9 100644 --- a/yezzey.c +++ b/yezzey.c @@ -58,22 +58,35 @@ #include "offload.h" #include "offload_policy.h" -#include "virtual_tablespace.h" #include "partition.h" -#include "xvacuum.h" -#include "yezzey_expire.h" + +#include "virtual_tablespace.h" #include "init.h" +#include "storage.h" +#include "yezzey.h" + +#include "offload_tablespace_map.h" + +#include "tcop/utility.h" +#include "xvacuum.h" +#include "cdb/cdbvars.h" + +// options for yezzey logging +static const struct config_enum_entry loglevel_options[] = { + {"debug5", DEBUG5, false}, {"debug4", DEBUG4, false}, + {"debug3", DEBUG3, false}, {"debug2", DEBUG2, false}, + {"debug1", DEBUG1, false}, {"debug", DEBUG2, true}, + {"info", INFO, false}, {"notice", NOTICE, false}, + {"warning", WARNING, false}, {"error", ERROR, false}, + {"log", LOG, false}, {"fatal", FATAL, false}, + {"panic", PANIC, false}, {NULL, 0, false}}; + #define GET_STR(textp) \ DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) /* STORAGE */ -char *storage_prefix = NULL; -char *storage_bucket = NULL; -char *backup_bucket = NULL; -char *storage_config = NULL; -char *storage_host = NULL; char *storage_class = NULL; int multipart_threshold = 0; int multipart_chunksize = 0; @@ -267,7 +280,7 @@ int yezzey_load_relation_internal(Oid reloid, const char *dest_path) { } /* record that relation on its current reloid is expired */ - YezzeyRecordRelationExpireLsn(aorel); + YezzeyFixupVirtualIndex(aorel); /* * Update relation status row in yezzet.offload_metadat @@ -481,8 +494,7 @@ Datum yezzey_show_relation_external_path(PG_FUNCTION_ARGS) { } (void)getYezzeyExternalStoragePathByCoords( - nspname, aorel->rd_rel->relname.data, storage_host /*host*/, - storage_bucket /*bucket*/, storage_prefix /*prefix*/, rnode.spcNode, rnode.dbNode, + nspname, aorel->rd_rel->relname.data, rnode.spcNode, rnode.dbNode, rnode.relNode, segno, GpIdentity.segindex, &ptr); pgptr = pstrdup(ptr); @@ -1239,3 +1251,161 @@ Datum yezzey_check_part_exr(PG_FUNCTION_ARGS) { text *expr = PG_GETARG_TEXT_P(0); PG_RETURN_TEXT_P(yezzey_get_expr_worker(expr)); } + + +/* Plugin provides a hook function matching this signature. */ +void yezzey_object_access_hook (ObjectAccessType access, + Oid classId, + Oid objectId, + int subId, + void *arg) { + Relation offRel; + if (classId != RelationRelationId) { + return; + } + + offRel = relation_open(objectId, AccessShareLock); + if (offRel->rd_node.spcNode != YEZZEYTABLESPACE_OID) { + relation_close(offRel, AccessShareLock); + return; + } + + + if (access == OAT_DROP) { + (void)emptyYezzeyIndex(YezzeyFindAuxIndex(RelationGetRelid(offRel)), offRel->rd_node.relNode); + (void)FixupOffloadMetadata(RelationGetRelid(offRel)); + } else if (access == OAT_POST_ALTER) { + /* TODO: implement properly */ + // (void)YezzeyFixupVirtualIndex(offRel); + } + + relation_close(offRel, AccessShareLock); +} + + +/* + * Hook ProcessUtility to do external storage vacuum + */ +static void +yezzey_ProcessUtility_hook(Node *parsetree, + const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, + DestReceiver *dest, + char *completionTag) { + + switch (nodeTag(parsetree)) + { + /* + * ******************** yezzey vacuum ******************** + */ + break; + case T_VacuumStmt: +#if IsGreenplum6 + { + VacuumStmt *stmt = (VacuumStmt *) parsetree; + if(!stmt->relation){ + break; + } + Relation rel = relation_openrv(stmt->relation,NoLock); + if (stmt->options & VACOPT_YEZZEY & (rel->rd_node.spcNode == YEZZEYTABLESPACE_OID)) { + if (Gp_role == GP_ROLE_EXECUTE) { + Assert(GpIdentity.segindex != -1); + yezzey_vacuum_garbage_relation_internal(rel, GpIdentity.segindex,true,false); + } + } + relation_close(rel,NoLock); + } +#endif + break; + default: + break; + } + return standard_ProcessUtility(parsetree, queryString, context, params, dest, completionTag); +} + +static void yezzey_ExecuterEndHook(QueryDesc *queryDesc) { + (void) standard_ExecutorEnd(queryDesc); + + YezzeyTruncateOTMHint(); +} + + +static void yezzey_ExecuterStartHook(QueryDesc *queryDesc, int eflags) { + (void) standard_ExecutorStart(queryDesc, eflags); + + IntoClause *iclause; + Oid sourceOid; + + if (queryDesc->plannedstmt->intoClause != NULL) { + iclause = queryDesc->plannedstmt->intoClause; + if (iclause->tableSpaceName && strcmp(iclause->tableSpaceName, "yezzey(cloud-storage)") == 0) { + if (queryDesc->plannedstmt->relationOids->length != 1) { + elog(ERROR, "unexpected plan relation size for yezzey alter: %d", queryDesc->plannedstmt->relationOids->length); + } + sourceOid = lfirst(queryDesc->plannedstmt->relationOids->head); + /* so, target relation is yezzey. This should be expand or alter table reorg; */ + YezzeyCopyOTM(iclause->rel, sourceOid); + } + } +} + + +/* GUC variables. */ +static bool yezzey_autooffload = true; /* start yezzey worker? */ + +static void yezzey_define_gucs() { + DefineCustomStringVariable("yezzey.storage_class", "external storage default storage class", + NULL, &storage_class, "STANDARD", PGC_SUSET, 0, NULL, NULL, + NULL); + + DefineCustomIntVariable("yezzey.multipart_chunksize", "external storage default multipart chunksize", + NULL, &multipart_chunksize, 16*1024*1024, 0, INT_MAX, PGC_SUSET, 0, NULL, NULL, NULL); + + DefineCustomIntVariable("yezzey.multipart_threshold", "external storage default multipart threshold", + NULL, &multipart_threshold, 64*1024*1024, 0, INT_MAX, PGC_SUSET, 0, NULL, NULL, NULL); + + DefineCustomBoolVariable("yezzey.use_gpg_crypto", "use gpg crypto", NULL, + &use_gpg_crypto, true, PGC_SUSET, 0, NULL, NULL, + NULL); + + DefineCustomBoolVariable( + "yezzey.autooffload", "enable auto-offloading worker", NULL, + &yezzey_autooffload, false, PGC_USERSET, 0, NULL, NULL, NULL); + + DefineCustomEnumVariable("yezzey.log_level", + "Log level for yezzey functions.", NULL, + &yezzey_log_level, DEBUG1, loglevel_options, + PGC_SUSET, 0, NULL, NULL, NULL); + + DefineCustomEnumVariable("yezzey.ao_log_level", + "Log level for yezzey functions.", NULL, + &yezzey_ao_log_level, DEBUG1, loglevel_options, + PGC_SUSET, 0, NULL, NULL, NULL); + + DefineCustomStringVariable("yezzey.yproxy_socket", "wal-g config path", + NULL, &yproxy_socket, "/tmp/yproxy.sock", + PGC_SUSET, 0, NULL, NULL, NULL); +} + +void _PG_init(void) { + /* Allocate shared memory for yezzey workers */ + + /* Yezzey GUCS define */ + (void)yezzey_define_gucs(); + + elog(yezzey_log_level, "[YEZZEY_SMGR] set hook"); + + smgr_hook = smgr_yezzey; +#if IsGreenplum6 + smgrao_hook = smgrao_yezzey; +#endif + smgr_init_hook = smgr_init_yezzey; + + /* set drop hook */ + ProcessUtility_hook = yezzey_ProcessUtility_hook; + object_access_hook = yezzey_object_access_hook; + + ExecutorStart_hook = yezzey_ExecuterStartHook; + ExecutorEnd_hook = yezzey_ExecuterEndHook; +} \ No newline at end of file