From 53053375524cbafa702a8b10a38b96837219c96e Mon Sep 17 00:00:00 2001 From: reshke Date: Mon, 28 Oct 2024 20:07:32 +0000 Subject: [PATCH] Major refactoring --- Makefile | 4 +- include/gucs.h | 5 - include/io_adv.h | 12 +- include/url.h | 1 - include/util.h | 6 +- include/virtual_index.h | 5 + include/yezzey_meta.h | 1 - src/init.cpp | 2 - src/io_adv.cpp | 14 +- src/meta.cpp | 2 +- src/proxy.cpp | 3 - src/storage.cpp | 14 +- src/url.cpp | 4 +- src/util.cpp | 25 +-- src/virtual_index.cpp | 52 +++++ src/worker.cpp | 109 ---------- src/xvacuum.cpp | 16 +- worker.c | 455 ---------------------------------------- yezzey--1.8--1.8.1.sql | 5 +- yezzey.c | 92 ++++---- 20 files changed, 124 insertions(+), 703 deletions(-) delete mode 100644 src/worker.cpp delete mode 100644 worker.c diff --git a/Makefile b/Makefile index 297394b..8fe0bd4 100644 --- a/Makefile +++ b/Makefile @@ -24,18 +24,16 @@ OBJS = \ src/url.o \ src/io.o \ src/io_adv.o \ - src/worker.o \ src/offload_tablespace_map.o \ src/offload_policy.o \ src/offload.o \ src/virtual_tablespace.o \ src/partition.o \ src/xvacuum.o \ - src/yezzey_expire.o \ src/yproxy.o \ src/init.o \ src/meta.o \ - smgr.o worker.o yezzey.o + smgr.o yezzey.o EXTENSION = yezzey DATA = yezzey--1.0.sql yezzey--1.0--1.8.sql yezzey--1.8--1.8.1.sql diff --git a/include/gucs.h b/include/gucs.h index 087448d..d9a6128 100644 --- a/include/gucs.h +++ b/include/gucs.h @@ -7,11 +7,6 @@ extern int yezzey_ao_log_level; extern bool use_gpg_crypto; /* ----- STORAGE ----- */ -extern char *storage_prefix; -extern char *storage_bucket; -extern char *backup_bucket; -extern char *storage_config; -extern char *storage_host; extern char *storage_class; extern int multipart_chunksize; extern int multipart_threshold; diff --git a/include/io_adv.h b/include/io_adv.h index b600139..30e1d8c 100644 --- a/include/io_adv.h +++ b/include/io_adv.h @@ -12,12 +12,6 @@ struct IOadv { const std::string nspname; // relation name itself const std::string relname; - // s3 host - const std::string host; - // s3 bucked - const std::string bucket; - // wal-g specific prefix - const std::string external_storage_prefix; // s3 storage class const std::string storage_class; @@ -39,16 +33,14 @@ struct IOadv { const std::string yproxy_socket; IOadv(const std::string &nspname, - const std::string &relname, const std::string &host, - const std::string &bucket, const std::string &external_storage_prefix, + const std::string &relname, const std::string &storage_class, const int &multipart_chunksize, /*unparse coords*/ Oid spcNode, const std::string &fileName, const Oid reloid, bool use_gpg_crypto, const std::string &yproxy_socket); IOadv(const std::string &nspname, - const std::string &relname, const std::string &host, - const std::string &bucket, const std::string &external_storage_prefix, + const std::string &relname, const std::string &storage_class, const int &multipart_chunksize, const relnodeCoord &coords, const Oid reloid, bool use_gpg_crypto, const std::string &yproxy_socket); diff --git a/include/url.h b/include/url.h index dc3bbbb..f17309f 100644 --- a/include/url.h +++ b/include/url.h @@ -31,5 +31,4 @@ std::string craftStorageUnPrefixedPath(const std::shared_ptr &adv, std::string getYezzeyRelationUrl_internal(const std::string &nspname, const std::string &relname, - const std::string &external_storage_prefix, relnodeCoord coords, int32_t segid); diff --git a/include/util.h b/include/util.h index fccffc9..52c7e39 100644 --- a/include/util.h +++ b/include/util.h @@ -38,9 +38,6 @@ int64_t yezzey_calc_virtual_relation_size(std::shared_ptr adv, std::string storage_url_add_options(const std::string &s3path, const char *config_path); -std::string getYezzeyExtrenalStorageBucket(const char *host, - const char *bucket); - std::string make_yezzey_url(const std::string &prefix, int64_t modcounts, XLogRecPtr current_recptr); @@ -49,8 +46,7 @@ std::vector parseModcounts(const std::string &prefix, #endif EXTERNC void getYezzeyExternalStoragePathByCoords( - const char *nspname, const char *relname, const char *host, - const char *bucket, const char *storage_prefix, Oid spcNode, Oid dbNode, + const char *nspname, const char *relname, Oid spcNode, Oid dbNode, Oid relNode, int32_t segblockno /* segment no*/, int32_t segid, char **dest); diff --git a/include/virtual_index.h b/include/virtual_index.h index 8bc3566..d25115f 100644 --- a/include/virtual_index.h +++ b/include/virtual_index.h @@ -37,6 +37,8 @@ typedef struct { text x_path; /* external path */ } FormData_yezzey_virtual_index; +typedef FormData_yezzey_virtual_index* Form_yezzey_virtual_index; + #define Natts_yezzey_virtual_index 10 #define Anum_yezzey_virtual_index_reloid 1 #define Anum_yezzey_virtual_index_filenode 2 @@ -57,6 +59,9 @@ EXTERNC void emptyYezzeyIndex(Oid yezzey_index_oid, Oid relfilenode); EXTERNC void emptyYezzeyIndexBlkno(Oid yezzey_index_oid, Oid relation, Oid relfilenode, int blkno); +/* fixup virtual index entry for relation's relfilenode. */ +EXTERNC void YezzeyFixupVirtualIndex(Relation rel); + #ifdef __cplusplus void YezzeyVirtualIndexInsert(Oid yandexoid /*yezzey auxiliary index oid*/, Oid reloid, Oid relfilenodeOid, int64_t blkno, diff --git a/include/yezzey_meta.h b/include/yezzey_meta.h index 16c3587..90f452d 100644 --- a/include/yezzey_meta.h +++ b/include/yezzey_meta.h @@ -3,7 +3,6 @@ #include "pg.h" #include "url.h" #include "virtual_index.h" -#include "yezzey_expire.h" #ifdef __cplusplus #define EXTERNC extern "C" diff --git a/src/init.cpp b/src/init.cpp index 84be759..d92582f 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -3,10 +3,8 @@ #include "init.h" #include "offload_policy.h" -#include "yezzey_expire.h" void YezzeyInitMetadata(void) { (void)YezzeyCreateOffloadPolicyRelation(); - (void)YezzeyCreateRelationExpireIndex(); (void)YezzeyCreateVirtualIndex(); } \ No newline at end of file diff --git a/src/io_adv.cpp b/src/io_adv.cpp index 4b3129d..8c74dfa 100644 --- a/src/io_adv.cpp +++ b/src/io_adv.cpp @@ -3,16 +3,13 @@ #include "offload_tablespace_map.h" IOadv::IOadv(const std::string &nspname, - const std::string &relname, const std::string &host, - const std::string &bucket, - const std::string &external_storage_prefix, + const std::string &relname, const std::string &storage_class, const int &multipart_chunksize, const Oid spcNode, const std::string &fileName, const Oid reloid, bool use_gpg_crypto, const std::string &yproxy_socket) : - nspname(nspname), relname(relname), host(host), - bucket(bucket), external_storage_prefix(external_storage_prefix), + nspname(nspname), relname(relname), storage_class(storage_class), multipart_chunksize(multipart_chunksize), coords_(getRelnodeCoordinate(spcNode, fileName)), reloid(reloid), tableSpace(reloid ? YezzeyGetRelationOriginTablespace(nspname.c_str(), relname.c_str(), reloid) : "none"), @@ -21,14 +18,11 @@ IOadv::IOadv(const std::string &nspname, } IOadv::IOadv(const std::string &nspname, - const std::string &relname, const std::string &host, - const std::string &bucket, - const std::string &external_storage_prefix, + const std::string &relname, const std::string &storage_class, const int &multipart_chunksize, const relnodeCoord &coords, const Oid reloid,bool use_gpg_crypto, const std::string &yproxy_socket) - : nspname(nspname), relname(relname), host(host), - bucket(bucket), external_storage_prefix(external_storage_prefix), + : nspname(nspname), relname(relname), storage_class(storage_class), multipart_chunksize(multipart_chunksize), coords_(coords), reloid(reloid), tableSpace(reloid ? YezzeyGetRelationOriginTablespace(nspname.c_str(), relname.c_str(), reloid) : "none"), diff --git a/src/meta.cpp b/src/meta.cpp index 41d1de2..93d9272 100644 --- a/src/meta.cpp +++ b/src/meta.cpp @@ -8,5 +8,5 @@ void YezzeyUpdateMetadataRelations( YezzeyVirtualIndexInsert(yandexoid, reloid, relfilenodeOid, blkno, offset_start, offset_finish, encrypted, reused, modcount, lsn, x_path); - YezzeyUpsertLastUseLsn(reloid, relfilenodeOid, md5, lsn); + /* TODO: update yezzey relfilemap */ } \ No newline at end of file diff --git a/src/proxy.cpp b/src/proxy.cpp index 3630925..e5b9ac7 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -229,9 +229,6 @@ EXTERNC SMGRFile yezzey_AORelOpenSegFile(Oid reloid, char *nspname, if (!RecoveryInProgress()) { auto ioadv = std::make_shared( yfd.nspname, yfd.relname, - std::string(storage_host /*host*/), - std::string(storage_bucket /*bucket*/), - std::string(storage_prefix /*prefix*/), std::string(storage_class /* storage_class */), multipart_chunksize, DEFAULTTABLESPACE_OID, yfd.filepath /* coords */, reloid /* reloid */, diff --git a/src/storage.cpp b/src/storage.cpp index 74ee461..9d4c985 100644 --- a/src/storage.cpp +++ b/src/storage.cpp @@ -196,9 +196,7 @@ int loadSegmentFromExternalStorage(Relation rel, const std::string &nspname, /* FIXME */ auto ioadv = std::make_shared( - nspname, relname, - storage_host /* host */, storage_bucket /*bucket*/, - storage_prefix /*prefix*/, storage_class /* storage_class */, + nspname, relname, storage_class /* storage_class */, multipart_chunksize, coords /* filename */, rel->rd_id /* reloid */, use_gpg_crypto, yproxy_socket); @@ -348,9 +346,7 @@ int offloadRelationSegment(Relation aorel, int segno, int64 modcount, !external_storage_path ? "" : std::string(external_storage_path); ReleaseSysCache(tp); - auto ioadv = std::make_shared(nspname, relname, - storage_host /* host */, storage_bucket /*bucket*/, - storage_prefix /*prefix*/, storage_class /* storage_class */, + auto ioadv = std::make_shared(nspname, relname, storage_class /* storage_class */, multipart_chunksize, coords, aorel->rd_id /* reloid */, use_gpg_crypto, yproxy_socket); try { @@ -448,9 +444,6 @@ int statRelationSpaceUsage(Relation aorel, int segno, int64 modcount, auto ioadv = std::make_shared( nspname, std::string(aorel->rd_rel->relname.data), - std::string(storage_host /*host*/), - std::string(storage_bucket /*bucket*/), - std::string(storage_prefix /*prefix*/), std::string(storage_class /*storage_class*/), multipart_chunksize, coords /* coords */, aorel->rd_id /* reloid */, use_gpg_crypto, yproxy_socket); @@ -512,9 +505,6 @@ int statRelationSpaceUsagePerExternalChunk(Relation aorel, int segno, auto ioadv = std::make_shared( nspname, std::string(aorel->rd_rel->relname.data), - std::string(storage_host /*host*/), - std::string(storage_bucket /*bucket*/), - std::string(storage_prefix /*prefix*/), std::string(storage_class /*storage_class*/), multipart_chunksize, coords /* coords */, aorel->rd_id /* reloid */, use_gpg_crypto, yproxy_socket); diff --git a/src/url.cpp b/src/url.cpp index f1bfc9f..fffb855 100644 --- a/src/url.cpp +++ b/src/url.cpp @@ -77,8 +77,6 @@ std::string craftStorageUnPrefixedPath(const std::shared_ptr &adv, std::string getYezzeyRelationUrl_internal(const std::string &nspname, const std::string &relname, - const std::string &external_storage_prefix, relnodeCoord coords, int32_t segid) { - return external_storage_prefix + - yezzey_block_file_path(nspname, relname, coords, segid); + return yezzey_block_file_path(nspname, relname, coords, segid); } diff --git a/src/util.cpp b/src/util.cpp index 680c39f..50070ed 100644 --- a/src/util.cpp +++ b/src/util.cpp @@ -23,23 +23,6 @@ const char *baseYezzeyPath = "/basebackups_005/yezzey/"; -std::string getYezzeyExtrenalStorageBucket(const char *host, - const char *bucket) { - std::string url = "s3://"; - std::string toErase = "https://"; - std::string hostStr = host; - size_t pos = hostStr.find(toErase); - if (pos != std::string::npos) { - hostStr.erase(pos, toErase.length()); - } - url += hostStr; - url += "/"; - url += bucket; - url += "/"; - - return url; -} - std::string storage_url_add_options(const std::string &s3path, const char *config_path) { auto ret = s3path; @@ -87,18 +70,16 @@ relnodeCoord getRelnodeCoordinate(Oid spcNode, const std::string &fileName) { } void getYezzeyExternalStoragePathByCoords(const char *nspname, - const char *relname, const char *host, - const char *bucket, - const char *storage_prefix, + const char *relname, Oid spcNode, Oid dbNode, Oid relNode, int32_t segblockno /* segment no*/, int32_t segid, char **dest) { /* FIXME: Support for non-default table space? */ auto coords = relnodeCoord(spcNode, dbNode, relNode, segblockno); - auto prefix = getYezzeyRelationUrl_internal(nspname, relname, storage_prefix, + auto prefix = getYezzeyRelationUrl_internal(nspname, relname, coords, segid); - auto path = getYezzeyExtrenalStorageBucket(host, bucket) + prefix; + auto path = prefix; *dest = (char *)malloc(sizeof(char) * path.size()); strcpy(*dest, path.c_str()); diff --git a/src/virtual_index.cpp b/src/virtual_index.cpp index 6d43bcc..1a2c40c 100644 --- a/src/virtual_index.cpp +++ b/src/virtual_index.cpp @@ -213,6 +213,58 @@ void emptyYezzeyIndexBlkno(Oid yezzey_index_oid, Oid reloid /* not used */, CommandCounterIncrement(); } /* end emptyYezzeyIndexBlkno */ + +void YezzeyFixupVirtualIndex_internal(Oid yezzey_index_oid, Relation relation) { + + HeapTuple tuple; + ScanKeyData skey[1]; + bool nulls[Natts_yezzey_virtual_index]; + Datum values[Natts_yezzey_virtual_index]; + + memset(nulls, 0, sizeof(nulls)); + memset(values, 0, sizeof(values)); + + auto rel = heap_open(yezzey_index_oid, RowExclusiveLock); + + auto snap = RegisterSnapshot(GetTransactionSnapshot()); + + ScanKeyInit(&skey[0], Anum_yezzey_virtual_index_filenode, + BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relation->rd_node.relNode)); + + auto desc = yezzey_beginscan(rel, snap, YezzeyVirtualIndexScanCols, skey); + + while (HeapTupleIsValid(tuple = heap_getnext(desc, ForwardScanDirection))) { + // update + auto meta = (Form_yezzey_virtual_index)GETSTRUCT(tuple); + + // Assert(meta->yrelfileoid == relfileoid); + + values[Anum_yezzey_virtual_index_filenode - 1] = + ObjectIdGetDatum(relation->rd_node.relNode); + + auto yandxtuple = heap_form_tuple(RelationGetDescr(relation), values, nulls); + +#if IsGreenplum6 + simple_heap_update(relation, &tuple->t_self, yandxtuple); + CatalogUpdateIndexes(relation, yandxtuple); +#else + CatalogTupleUpdate(relation, &tuple->t_self, yandxtuple); +#endif + } + + yezzey_endscan(desc); + heap_close(rel, RowExclusiveLock); + + UnregisterSnapshot(snap); + + /* make changes visible*/ + CommandCounterIncrement(); +} + +void YezzeyFixupVirtualIndex(Relation relation) { + (void) YezzeyFixupVirtualIndex_internal(YezzeyFindAuxIndex(RelationGetRelid(relation)), relation); +} /* end YezzeyFixupVirtualIndex */ + void YezzeyVirtualIndexInsert(Oid yandexoid /*yezzey auxiliary index oid*/, Oid reloid, Oid relfilenodeOid, int64_t blkno, int64_t offset_start, int64_t offset_finish, diff --git a/src/worker.cpp b/src/worker.cpp deleted file mode 100644 index 3d23cb7..0000000 --- a/src/worker.cpp +++ /dev/null @@ -1,109 +0,0 @@ -/* - * file: src/worker.cpp - */ - -#include "worker.h" -#include "gucs.h" -#include "offload.h" -#include "offload_policy.h" - -#include "pg.h" -#include "yezzey_heap_api.h" - -void processPartitionOffload() { - SetCurrentStatementStartTimestamp(); - StartTransactionCommand(); - SPI_connect(); - PushActiveSnapshot(GetTransactionSnapshot()); - pgstat_report_activity(STATE_RUNNING, - "proccess relations to offload to yezzey"); - SetCurrentStatementStartTimestamp(); - - auto query = - "select yezzey_set_relation_expirity_seg(" - " parchildrelid," - " 1," - " yezzey_part_date_eval('select '::text || pg_get_expr(parrangeend, " - "0))::timestamp" - " ) from pg_partition_rule WHERE yezzey_check_part_exr(parrangeend);"; - - auto ret = SPI_execute(query, true, 1); - if (ret != SPI_OK_SELECT) { - elog(ERROR, "Error while processing partitions"); - } - - auto tupdesc = SPI_tuptable->tupdesc; - auto tuptable = SPI_tuptable; - auto cnt = SPI_processed; - - SPI_finish(); - PopActiveSnapshot(); - CommitTransactionCommand(); - pgstat_report_activity(STATE_IDLE, NULL); - pgstat_report_stat(true); -} - -/* - * processOffloadedRelations: - * check all expired relations and move then to - * external storage - */ -void processOffloadedRelations() { - /* - * Connect to the selected database - * - * Note: if we have selected a just-deleted database (due to using - * stale stats info), we'll fail and exit here. - */ - - /* And do an appropriate amount of work */ - StartTransactionCommand(); - (void)GetTransactionSnapshot(); - - HeapTuple tup; - - ScanKeyData skey[1]; - - auto offrel = heap_open(YEZZEY_OFFLOAD_POLICY_RELATION, AccessShareLock); - - /* - * We cannot open yezzey offloaded relations metatable - * in scheme 'yezzey'. This is possible when - * this code executes on bgworker, which - * executes on shared library loading, BEFORE - * CREATE EXTENSION command was done. - */ - if (!RelationIsValid(offrel)) { - return; - } - auto snap = RegisterSnapshot(GetTransactionSnapshot()); - - { - ScanKeyInit(&skey[0], Anum_offload_metadata_relext_time, - BTLessEqualStrategyNumber, F_TIMESTAMP_LT, - TimestampGetDatum(GetCurrentTimestamp())); - - auto desc = yezzey_beginscan(offrel, snap, 1, skey); - - while (HeapTupleIsValid(tup = heap_getnext(desc, ForwardScanDirection))) { - auto meta = (Form_yezzey_offload_metadata)GETSTRUCT(tup); - - /** - * @brief traverse pg_aoseg.pg_aoseg_ relation - * and try to lock each segment. If there is any transaction - * in progress accessing some segment, lock attemt will fail, - * because transactions holds locks on pg_aoseg.pg_aoseg_ rows - * and we are goind to lock row in X mode - */ - elog(INFO, "process expired relation (oid=%d)", meta->reloid); - YezzeyDefineOffloadPolicy(meta->reloid); - } - - yezzey_endscan(desc); - } - - UnregisterSnapshot(snap); - heap_close(offrel, AccessShareLock); - - CommitTransactionCommand(); -} \ No newline at end of file diff --git a/src/xvacuum.cpp b/src/xvacuum.cpp index 72bc1fa..15ffbb8 100644 --- a/src/xvacuum.cpp +++ b/src/xvacuum.cpp @@ -19,10 +19,7 @@ int yezzey_delete_chunk_internal(const char *external_chunk_path) { try { auto ioadv = std::make_shared( - "", "", std::string(storage_host /*host*/), - std::string(storage_bucket /*bucket*/), - std::string(storage_prefix /*prefix*/), - std::string(storage_class /*storage_class*/), multipart_chunksize, + "", "", std::string(storage_class /*storage_class*/), multipart_chunksize, DEFAULTTABLESPACE_OID, "" /* coords */, InvalidOid /* reloid */, use_gpg_crypto, yproxy_socket); @@ -52,9 +49,7 @@ int yezzey_delete_chunk_internal(const char *external_chunk_path) { int yezzey_vacuum_garbage_internal(int segindx, bool confirm, bool crazyDrop) { try { auto ioadv = std::make_shared( - "", "", std::string(storage_host /*host*/), - std::string(storage_bucket /*bucket*/), - std::string(storage_prefix /*prefix*/), + "", "", std::string(storage_class /*storage_class*/), multipart_chunksize, DEFAULTTABLESPACE_OID, "" /* coords */, InvalidOid /* reloid */, use_gpg_crypto, yproxy_socket); @@ -77,14 +72,9 @@ int yezzey_vacuum_garbage_internal(int segindx, bool confirm, bool crazyDrop) { } int yezzey_vacuum_garbage_relation_internal(Relation rel,int segindx, bool confirm,bool crazyDrop){ try { - auto ioadv = std::make_shared( - std::string(gpg_engine_path), std::string(gpg_key_id), - std::string(storage_config), "", "", std::string(storage_host /*host*/), - std::string(storage_bucket /*bucket*/), - std::string(storage_prefix /*prefix*/), + auto ioadv = std::make_shared("", "", std::string(storage_class /*storage_class*/), multipart_chunksize, DEFAULTTABLESPACE_OID, "" /* coords */, InvalidOid /* reloid */, - std::string(walg_bin_path), std::string(walg_config_path), use_gpg_crypto, yproxy_socket); auto tp = SearchSysCache1(NAMESPACEOID, diff --git a/worker.c b/worker.c deleted file mode 100644 index 977b807..0000000 --- a/worker.c +++ /dev/null @@ -1,455 +0,0 @@ - - -#include "postgres.h" - -#include -#include -#include -#include -#include -#include - -#include "miscadmin.h" -#include "utils/snapmgr.h" - -#include "worker.h" - -#if PG_VERSION_NUM >= 130000 -#include "postmaster/interrupt.h" -#endif - -#include "catalog/catalog.h" -#include "common/relpath.h" -#include "executor/spi.h" -#include "storage/ipc.h" -#include "storage/lwlock.h" -#include "storage/md.h" -#include "storage/shmem.h" -#include "storage/smgr.h" -#include "utils/builtins.h" - -#include "postmaster/bgworker.h" -#include "storage/dsm.h" - -#if PG_VERSION_NUM >= 100000 -#include "common/file_perm.h" -#else -#include "access/xact.h" -#endif - -#include "lib/stringinfo.h" -#include "pgstat.h" -#include "postmaster/bgworker.h" -#include "storage/latch.h" -#include "storage/proc.h" -#include "utils/guc.h" - -#include "utils/elog.h" - -#if IsGreenplum6 || IsModernYezzey -#include "cdb/cdbvars.h" -#endif - -#include "storage.h" -#include "yezzey.h" - -#include "yezzey_expire.h" -#include "offload_tablespace_map.h" - -#include "tcop/utility.h" -#include "xvacuum.h" -#include "cdb/cdbvars.h" - -#if PG_VERSION_NUM >= 100000 -void yezzey_main(Datum main_arg); -#else -static void yezzey_main(Datum arg); -#endif - -static volatile sig_atomic_t got_sigterm = false; -static volatile sig_atomic_t got_sighup = false; - -static int yezzey_naptime = 10000; -static char *worker_name = "yezzey"; - -/* Shared state information for yezzey bgworker. */ -typedef struct YezzeySharedState { - LWLock lock; /* mutual exclusion */ - pid_t bgworker_pid; /* for main bgworker */ - pid_t pid_using_dumpfile; /* for autoprewarm or block dump */ - - /* Following items are for communication with per-database worker */ - dsm_handle block_info_handle; - Oid database; - int yezzeystart_idx; - int yezzey_stop_idx; -} YezzeySharedState; - -static bool yezzey_init_shmem(void); -void yezzey_offload_databases(void); -void yezzey_ProcessConfigFile(void); -void yezzey_process_database(Datum main_arg); - -static void yezzey_define_gucs(); - -/* Pointer to shared-memory state. */ -static YezzeySharedState *yezzey_state = NULL; - -/* - * Allocate and initialize yezzey-related shared memory, if not already - * done, and set up backend-local pointer to that state. Returns true if an - * existing shared memory segment was found. - */ -static bool yezzey_init_shmem(void) { -#if PG_VERSION_NUM < 100000 - static LWLockTranche tranche; -#endif - bool found; - - LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE); - yezzey_state = ShmemInitStruct("yezzey", sizeof(YezzeySharedState), &found); - if (!found) { - LWLockInitialize(&yezzey_state->lock, LWLockNewTrancheId()); - yezzey_state->bgworker_pid = InvalidPid; - } - LWLockRelease(AddinShmemInitLock); - -#if PG_VERSION_NUM >= 100000 - LWLockRegisterTranche(yezzey_state->lock.tranche, "yezzey"); -#else - tranche.name = "yezzey"; - tranche.array_base = &yezzey_state->lock; - tranche.array_stride = sizeof(LWLock); - LWLockRegisterTranche(yezzey_state->lock.tranche, &tranche); - -#endif - - return found; -} - -// options for yezzey logging -static const struct config_enum_entry loglevel_options[] = { - {"debug5", DEBUG5, false}, {"debug4", DEBUG4, false}, - {"debug3", DEBUG3, false}, {"debug2", DEBUG2, false}, - {"debug1", DEBUG1, false}, {"debug", DEBUG2, true}, - {"info", INFO, false}, {"notice", NOTICE, false}, - {"warning", WARNING, false}, {"error", ERROR, false}, - {"log", LOG, false}, {"fatal", FATAL, false}, - {"panic", PANIC, false}, {NULL, 0, false}}; - -void yezzey_prepare(void) { - SetCurrentStatementStartTimestamp(); - StartTransactionCommand(); - SPI_connect(); - PushActiveSnapshot(GetTransactionSnapshot()); - pgstat_report_activity(STATE_RUNNING, - "proccess relations to offload to yezzey"); - SetCurrentStatementStartTimestamp(); -} - -void yezzey_finish(void) { - SPI_finish(); - PopActiveSnapshot(); - CommitTransactionCommand(); - pgstat_report_activity(STATE_IDLE, NULL); - pgstat_report_stat(true); -} - -static void yezzey_sigterm(SIGNAL_ARGS) { - int save_errno = errno; - got_sigterm = true; - if (MyProc) - SetLatch(&MyProc->procLatch); - errno = save_errno; -} - -static void yezzey_sighup(SIGNAL_ARGS) { - int save_errno = errno; - got_sighup = true; - if (MyProc) - SetLatch(&MyProc->procLatch); - errno = save_errno; -} - -/* TODO: make this guc*/ -const int processTableLimit = 10; - -void yezzey_process_database(Datum main_arg) { - Oid dboid; - char dbname[NAMEDATALEN]; - - dboid = DatumGetObjectId(main_arg); - - /* Establish signal handlers; once that's done, unblock signals. */ - // pqsignal(SIGTERM, die); - BackgroundWorkerUnblockSignals(); -#if IsGreenplum6 - Gp_session_role = GP_ROLE_UTILITY; -#endif - Gp_role = GP_ROLE_UTILITY; - InitPostgres(NULL, dboid, NULL, InvalidOid, dbname, true); - SetProcessingMode(NormalProcessing); -#if IsGreenplum6 - set_ps_display(dbname, false); -#else - set_ps_display(dbname); -#endif - ereport(LOG, (errmsg("yezzey bgworker: processing database \"%s\"", dbname))); - - - if (Gp_role == GP_ROLE_DISPATCH) { - (void)processOffloadedRelations(); - (void)processPartitionOffload(); - } -} - -/* - * Start yezzey per-database worker process. - */ -static void yezzey_start_database_worker(Oid dboid) { - BackgroundWorker worker; - BackgroundWorkerHandle *handle; - elog(yezzey_log_level, "[YEZZEY_SMGR] setting up bgworker"); - - memset(&worker, 0, sizeof(worker)); - worker.bgw_flags = - BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - worker.bgw_start_time = BgWorkerStart_ConsistentState; - worker.bgw_restart_time = BGW_NEVER_RESTART; - -#if PG_VERSION_NUM < 100000 - worker.bgw_main = yezzey_process_database; - worker.bgw_main_arg = ObjectIdGetDatum(dboid); -#endif - - snprintf(worker.bgw_name, BGW_MAXLEN, "%s-oflload worker for %d", worker_name, - dboid); -#if PG_VERSION_NUM >= 110000 - snprintf(worker.bgw_type, BGW_MAXLEN, "yezzey"); -#endif - -#if PG_VERSION_NUM >= 100000 - sprintf(worker.bgw_library_name, "yezzey"); - sprintf(worker.bgw_function_name, "yezzey_process_database"); -#endif - -#if PG_VERSION_NUM >= 90400 - worker.bgw_notify_pid = 0; -#endif - - RegisterDynamicBackgroundWorker(&worker, &handle); - - elog(yezzey_log_level, "[YEZZEY_SMGR] started datbase worker"); -} - -void yezzey_offload_databases() { - SPITupleTable *tuptable; - HeapTuple tuple; - char *dbOidRaw; - Oid dbOid; - TupleDesc tupdesc; - long unsigned int cnt; - int ret; - - StringInfoData buf; - - /* start transaction */ - yezzey_prepare(); - /* - */ - initStringInfo(&buf); - appendStringInfo(&buf, "" - "SELECT oid FROM pg_database WHERE datallowconn" - " ORDER BY random() LIMIT 1;"); - - pgstat_report_activity(STATE_RUNNING, buf.data); - ret = SPI_execute(buf.data, true, 1); - if (ret != SPI_OK_SELECT) { - elog(ERROR, "Error while selecting database list"); - } - - tupdesc = SPI_tuptable->tupdesc; - tuptable = SPI_tuptable; - cnt = SPI_processed; - - /** - * @brief traverse pg_aoseg.pg_aoseg_ relation - * and try to lock each segment. If there is any transaction - * in progress accessing some segment, lock attemt will fail, - * because transactions holds locks on pg_aoseg.pg_aoseg_ rows - * and we are goind to lock row in X mode - */ - tuple = tuptable->vals[0]; - dbOidRaw = SPI_getvalue(tuple, tupdesc, 1); - /* - * Convert to desired data type - */ - dbOid = strtoll(dbOidRaw, NULL, 10); - /* comlete transaction */ - yezzey_finish(); - - yezzey_start_database_worker(dbOid); -} - -void yezzey_ProcessConfigFile() {} - -static void yezzey_detach_shmem(int code, Datum arg); -/* - * Clear our PID from autoprewarm shared state. - */ -static void yezzey_detach_shmem(int code, Datum arg) { - LWLockAcquire(&yezzey_state->lock, LW_EXCLUSIVE); - if (yezzey_state->bgworker_pid == MyProcPid) - yezzey_state->bgworker_pid = InvalidPid; - LWLockRelease(&yezzey_state->lock); -} - -// Launcher worker -#if PG_VERSION_NUM < 100000 -static void -#else -void -#endif -yezzey_main(Datum main_arg) { - bool first_time; - - pqsignal(SIGHUP, yezzey_sighup); - pqsignal(SIGTERM, yezzey_sigterm); - - BackgroundWorkerUnblockSignals(); - - /* Create (if necessary) and attach to our shared memory area. */ - if (yezzey_init_shmem()) - first_time = false; - - /* Set on-detach hook so that our PID will be cleared on exit. */ - on_shmem_exit(yezzey_detach_shmem, 0); - - /* - * Store our PID in the shared memory area --- unless there's already - * another worker running, in which case just exit. - */ - LWLockAcquire(&yezzey_state->lock, LW_EXCLUSIVE); - if (yezzey_state->bgworker_pid != InvalidPid) { - LWLockRelease(&yezzey_state->lock); - ereport(LOG, (errmsg("yezzey worker is already running under PID %lu", - (unsigned long)yezzey_state->bgworker_pid))); - return; - } - yezzey_state->bgworker_pid = MyProcPid; - LWLockRelease(&yezzey_state->lock); - -#if IsGreenplum6 || IsModernYezzey - if (IS_QUERY_DISPATCHER()) { - return; - } -#endif - /* run only on query executers */ - - /* without this, we will fail out attempts - * to modify tables on segments (sql without QD) - */ - Gp_role = GP_ROLE_UTILITY; -#if IsGreenplum6 - Gp_session_role = GP_ROLE_UTILITY; -#endif - - /* - * acquire connection to database, - * otherwise we will not be able to run queries. - * yezzey axillary database will contain tables wirth - * relnodes meta-information, which can be used for - * backgroup relation offloading to external storage. - */ -#if PG_VERSION_NUM < 110000 - BackgroundWorkerInitializeConnection("postgres", NULL); -#else - BackgroundWorkerInitializeConnection("postgres", NULL, 0); -#endif - - while (!got_sigterm) { - int rc; - rc = WaitLatch(&MyProc->procLatch, - WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, -#if PG_VERSION_NUM < 100000 - yezzey_naptime -#else - yezzey_naptime, PG_WAIT_EXTENSION -#endif - ); - - ResetLatch(&MyProc->procLatch); - - if (rc & WL_POSTMASTER_DEATH) - proc_exit(1); - - if (got_sighup) { - yezzey_ProcessConfigFile(); - got_sighup = false; - ereport(LOG, (errmsg("bgworker yezzey signal: processed SIGHUP"))); - } - - if (got_sigterm) { - ereport(LOG, (errmsg("bgworker yezzey signal: processed SIGTERM"))); - proc_exit(0); - } - - elog(yezzey_log_level, - "[YEZZEY_SMGR_BG] start to process offload databases"); - yezzey_offload_databases(); - } -} - -/* - * Start yezzey primary worker process. - */ -static void yezzey_start_launcher_worker(void) { - BackgroundWorker worker; - - MemSet(&worker, 0, sizeof(BackgroundWorker)); - worker.bgw_flags = - BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; - /* to start */ - worker.bgw_start_time = BgWorkerStart_ConsistentState; - worker.bgw_restart_time = 10; - -#if PG_VERSION_NUM < 100000 - worker.bgw_main = yezzey_main; -#else - sprintf(worker.bgw_library_name, "yezzey"); - sprintf(worker.bgw_function_name, "yezzey_main"); -#endif - - snprintf(worker.bgw_name, BGW_MAXLEN, "%s", worker_name); -#if PG_VERSION_NUM >= 110000 - snprintf(worker.bgw_type, BGW_MAXLEN, "yezzey"); -#endif - -#if PG_VERSION_NUM >= 90400 - worker.bgw_notify_pid = 0; -#endif - - /* must set notify PID to wait for startup */ - // worker.bgw_notify_pid = MyProcPid; - - if (Gp_role == GP_ROLE_DISPATCH) { - /* run on master */ - RegisterBackgroundWorker(&worker); - } else { - /* - do we need to run bgworker on gp execute? - if (!RegisterDynamicBackgroundWorker(&worker, &handle)) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_RESOURCES), - errmsg("could not register background process"), - errhint("You may need to increase max_worker_processes."))); - - status = WaitForBackgroundWorkerStartup(handle, &pid); - if (status != BGWH_STARTED) - ereport(ERROR, - (errcode(ERRCODE_INSUFFICIENT_RESOURCES), - errmsg("could not start background process"), - errhint("More details may be available in the server log."))); - */ - } -} diff --git a/yezzey--1.8--1.8.1.sql b/yezzey--1.8--1.8.1.sql index c152260..86cf7e1 100644 --- a/yezzey--1.8--1.8.1.sql +++ b/yezzey--1.8--1.8.1.sql @@ -56,4 +56,7 @@ BEGIN PERFORM yezzey_vacuum_garbage_relation('public', i_offload_relname, confirm, crazyDrop); END; $$ -LANGUAGE PLPGSQL; \ No newline at end of file +LANGUAGE PLPGSQL; + +DROP IF EXISTS yezzey.yezzey_expire_index; + diff --git a/yezzey.c b/yezzey.c index 0fece65..b057147 100644 --- a/yezzey.c +++ b/yezzey.c @@ -58,22 +58,35 @@ #include "offload.h" #include "offload_policy.h" -#include "virtual_tablespace.h" #include "partition.h" -#include "xvacuum.h" -#include "yezzey_expire.h" + +#include "virtual_tablespace.h" #include "init.h" +#include "storage.h" +#include "yezzey.h" + +#include "offload_tablespace_map.h" + +#include "tcop/utility.h" +#include "xvacuum.h" +#include "cdb/cdbvars.h" + +// options for yezzey logging +static const struct config_enum_entry loglevel_options[] = { + {"debug5", DEBUG5, false}, {"debug4", DEBUG4, false}, + {"debug3", DEBUG3, false}, {"debug2", DEBUG2, false}, + {"debug1", DEBUG1, false}, {"debug", DEBUG2, true}, + {"info", INFO, false}, {"notice", NOTICE, false}, + {"warning", WARNING, false}, {"error", ERROR, false}, + {"log", LOG, false}, {"fatal", FATAL, false}, + {"panic", PANIC, false}, {NULL, 0, false}}; + #define GET_STR(textp) \ DatumGetCString(DirectFunctionCall1(textout, PointerGetDatum(textp))) /* STORAGE */ -char *storage_prefix = NULL; -char *storage_bucket = NULL; -char *backup_bucket = NULL; -char *storage_config = NULL; -char *storage_host = NULL; char *storage_class = NULL; int multipart_threshold = 0; int multipart_chunksize = 0; @@ -267,7 +280,7 @@ int yezzey_load_relation_internal(Oid reloid, const char *dest_path) { } /* record that relation on its current reloid is expired */ - YezzeyRecordRelationExpireLsn(aorel); + YezzeyFixupVirtualIndex(aorel); /* * Update relation status row in yezzet.offload_metadat @@ -481,8 +494,7 @@ Datum yezzey_show_relation_external_path(PG_FUNCTION_ARGS) { } (void)getYezzeyExternalStoragePathByCoords( - nspname, aorel->rd_rel->relname.data, storage_host /*host*/, - storage_bucket /*bucket*/, storage_prefix /*prefix*/, rnode.spcNode, rnode.dbNode, + nspname, aorel->rd_rel->relname.data, rnode.spcNode, rnode.dbNode, rnode.relNode, segno, GpIdentity.segindex, &ptr); pgptr = pstrdup(ptr); @@ -1241,6 +1253,27 @@ Datum yezzey_check_part_exr(PG_FUNCTION_ARGS) { } +/* Plugin provides a hook function matching this signature. */ +void yezzey_object_access_hook (ObjectAccessType access, + Oid classId, + Oid objectId, + int subId, + void *arg) { + Relation offRel; + if (classId != RelationRelationId) { + return; + } + + offRel = heap_open(objectId, AccessShareLock); + if (offRel->rd_node.spcNode != YEZZEYTABLESPACE_OID) { + return; + } + + (void)YezzeyFixupVirtualIndex(offRel); + + heap_close(offRel, AccessShareLock); +} + /* * Hook ProcessUtility to do external storage vacuum @@ -1329,29 +1362,6 @@ static void yezzey_ExecuterStartHook(QueryDesc *queryDesc, int eflags) { static bool yezzey_autooffload = true; /* start yezzey worker? */ static void yezzey_define_gucs() { - - DefineCustomStringVariable("yezzey.storage_prefix", "segment name prefix", - NULL, &storage_prefix, "", PGC_SUSET, 0, NULL, - NULL, NULL); - - DefineCustomStringVariable("yezzey.storage_bucket", "external storage bucket", - NULL, &storage_bucket, "", PGC_SUSET, 0, NULL, - NULL, NULL); - - DefineCustomStringVariable("yezzey.backup_bucket", "external storage backup bucket", - NULL, &backup_bucket, "", PGC_SUSET, 0, NULL, - NULL, NULL); - - DefineCustomStringVariable("yezzey.storage_config", - "Storage config path for yezzey external storage.", - NULL, &storage_config, "", PGC_SUSET, 0, NULL, - NULL, NULL); - - DefineCustomStringVariable("yezzey.storage_host", "external storage host", - NULL, &storage_host, "", PGC_SUSET, 0, NULL, NULL, - NULL); - - DefineCustomStringVariable("yezzey.storage_class", "external storage default storage class", NULL, &storage_class, "STANDARD", PGC_SUSET, 0, NULL, NULL, NULL); @@ -1380,10 +1390,6 @@ static void yezzey_define_gucs() { &yezzey_ao_log_level, DEBUG1, loglevel_options, PGC_SUSET, 0, NULL, NULL, NULL); - DefineCustomIntVariable( - "yezzey.oflload_worker_naptime", "Auto-offloading worker naptime", NULL, - &yezzey_naptime, 10000, 500, INT_MAX, PGC_SUSET, 0, NULL, NULL, NULL); - DefineCustomStringVariable("yezzey.yproxy_socket", "wal-g config path", NULL, &yproxy_socket, "/tmp/yproxy.sock", PGC_SUSET, 0, NULL, NULL, NULL); @@ -1395,15 +1401,6 @@ void _PG_init(void) { /* Yezzey GUCS define */ (void)yezzey_define_gucs(); - RequestAddinShmemSpace(MAXALIGN(sizeof(YezzeySharedState))); - - elog(yezzey_log_level, "[YEZZEY_SMGR] setting up bgworker"); - - if (yezzey_autooffload && false /*temp disable*/) { - /* dispatch yezzey worker */ - yezzey_start_launcher_worker(); - } - elog(yezzey_log_level, "[YEZZEY_SMGR] set hook"); smgr_hook = smgr_yezzey; @@ -1414,6 +1411,7 @@ void _PG_init(void) { /* set drop hook */ ProcessUtility_hook = yezzey_ProcessUtility_hook; + object_access_hook = yezzey_object_access_hook; ExecutorStart_hook = yezzey_ExecuterStartHook; ExecutorEnd_hook = yezzey_ExecuterEndHook;