From 25a540aab4c8236d41dff7277666d79f17e5609f Mon Sep 17 00:00:00 2001 From: Arjun Lall Date: Sun, 15 Dec 2024 22:26:06 -0500 Subject: [PATCH] Add pg_replication_slots table --- src/query_handler_test.go | 4 ++++ src/query_parser_table.go | 1 + src/select_remapper_table.go | 25 ++++++++++++++++++++++++- 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/src/query_handler_test.go b/src/query_handler_test.go index c1818a1..784f499 100644 --- a/src/query_handler_test.go +++ b/src/query_handler_test.go @@ -56,6 +56,10 @@ func TestHandleQuery(t *testing.T) { "description": {"oid"}, "values": {"13823"}, }, + "SELECT slot_name FROM pg_replication_slots": { + "description": {"slot_name"}, + "values": {}, + }, // pg_namespace "SELECT DISTINCT(nspname) FROM pg_catalog.pg_namespace WHERE nspname != 'information_schema' AND nspname != 'pg_catalog'": { "description": {"nspname"}, diff --git a/src/query_parser_table.go b/src/query_parser_table.go index 5edbf6b..45e9157 100644 --- a/src/query_parser_table.go +++ b/src/query_parser_table.go @@ -413,6 +413,7 @@ var PG_SYSTEM_TABLES = NewSet([]string{ "pg_publication_rel", "pg_range", "pg_replication_origin", + "pg_replication_slots" "pg_rewrite", "pg_roles", "pg_seclabel", diff --git a/src/select_remapper_table.go b/src/select_remapper_table.go index 9403c41..a0e98c9 100644 --- a/src/select_remapper_table.go +++ b/src/select_remapper_table.go @@ -17,6 +17,7 @@ const ( PG_TABLE_PG_ROLES = "pg_roles" PG_TABLE_PG_CLASS = "pg_class" PG_TABLE_PG_EXTENSION = "pg_extension" + PG_TABLE_PG_REPLICATION_SLOTS = "pg_replication_slots" PG_TABLE_TABLES = "tables" ) @@ -44,7 +45,6 @@ func NewSelectRemapperTable(config *Config, icebergReader *IcebergReader, duckdb func (remapper *SelectRemapperTable) RemapTable(node *pgQuery.Node) *pgQuery.Node { parser := remapper.parserTable qSchemaTable := parser.NodeToQuerySchemaTable(node) - // pg_catalog.pg_* system tables if parser.IsTableFromPgCatalog(qSchemaTable) { switch qSchemaTable.Table { @@ -76,6 +76,10 @@ func (remapper *SelectRemapperTable) RemapTable(node *pgQuery.Node) *pgQuery.Nod // pg_catalog.pg_extension -> return hard-coded extension info tableNode := parser.MakePgExtensionNode(qSchemaTable.Alias) return remapper.overrideTable(node, tableNode) + case PG_TABLE_PG_REPLICATION_SLOTS: + // pg_replication_slots -> return nothing + tableNode := parser.MakeEmptyTableNode(PG_REPLICATION_SLOTS_COLUMNS, qSchemaTable.Alias) + return remapper.overrideTable(node, tableNode) default: // pg_catalog.pg_* other system tables -> return as is return node @@ -188,3 +192,22 @@ var PG_STATIO_USER_TABLES_COLUMNS = []string{ "tidx_blks_read", "tidx_blks_hit", } + +var PG_REPLICATION_SLOTS_COLUMNS = []string{ + "slot_name", + "plugin", + "slot_type", + "datoid", + "database", + "temporary", + "active", + "active_pid", + "xmin", + "catalog_xmin", + "restart_lsn", + "confirmed_flush_lsn", + "wal_status", + "safe_wal_size", + "two_phase", + "conflicting", +}