Skip to content

Commit

Permalink
Add pg_replication_slots table
Browse files Browse the repository at this point in the history
  • Loading branch information
arjunlol committed Dec 16, 2024
1 parent 14ffb05 commit 25a540a
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 1 deletion.
4 changes: 4 additions & 0 deletions src/query_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ func TestHandleQuery(t *testing.T) {
"description": {"oid"},
"values": {"13823"},
},
"SELECT slot_name FROM pg_replication_slots": {
"description": {"slot_name"},
"values": {},
},
// pg_namespace
"SELECT DISTINCT(nspname) FROM pg_catalog.pg_namespace WHERE nspname != 'information_schema' AND nspname != 'pg_catalog'": {
"description": {"nspname"},
Expand Down
1 change: 1 addition & 0 deletions src/query_parser_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ var PG_SYSTEM_TABLES = NewSet([]string{
"pg_publication_rel",
"pg_range",
"pg_replication_origin",
"pg_replication_slots"
"pg_rewrite",
"pg_roles",
"pg_seclabel",
Expand Down
25 changes: 24 additions & 1 deletion src/select_remapper_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const (
PG_TABLE_PG_ROLES = "pg_roles"
PG_TABLE_PG_CLASS = "pg_class"
PG_TABLE_PG_EXTENSION = "pg_extension"
PG_TABLE_PG_REPLICATION_SLOTS = "pg_replication_slots"

PG_TABLE_TABLES = "tables"
)
Expand Down Expand Up @@ -44,7 +45,6 @@ func NewSelectRemapperTable(config *Config, icebergReader *IcebergReader, duckdb
func (remapper *SelectRemapperTable) RemapTable(node *pgQuery.Node) *pgQuery.Node {
parser := remapper.parserTable
qSchemaTable := parser.NodeToQuerySchemaTable(node)

// pg_catalog.pg_* system tables
if parser.IsTableFromPgCatalog(qSchemaTable) {
switch qSchemaTable.Table {
Expand Down Expand Up @@ -76,6 +76,10 @@ func (remapper *SelectRemapperTable) RemapTable(node *pgQuery.Node) *pgQuery.Nod
// pg_catalog.pg_extension -> return hard-coded extension info
tableNode := parser.MakePgExtensionNode(qSchemaTable.Alias)
return remapper.overrideTable(node, tableNode)
case PG_TABLE_PG_REPLICATION_SLOTS:
// pg_replication_slots -> return nothing
tableNode := parser.MakeEmptyTableNode(PG_REPLICATION_SLOTS_COLUMNS, qSchemaTable.Alias)
return remapper.overrideTable(node, tableNode)
default:
// pg_catalog.pg_* other system tables -> return as is
return node
Expand Down Expand Up @@ -188,3 +192,22 @@ var PG_STATIO_USER_TABLES_COLUMNS = []string{
"tidx_blks_read",
"tidx_blks_hit",
}

var PG_REPLICATION_SLOTS_COLUMNS = []string{
"slot_name",
"plugin",
"slot_type",
"datoid",
"database",
"temporary",
"active",
"active_pid",
"xmin",
"catalog_xmin",
"restart_lsn",
"confirmed_flush_lsn",
"wal_status",
"safe_wal_size",
"two_phase",
"conflicting",
}

0 comments on commit 25a540a

Please sign in to comment.