Skip to content

Commit

Permalink
feat: only GRANT when ingesting into a new table
Browse files Browse the repository at this point in the history
Copying the SELECT permissions on each batch is semantically fine, but

- Unnecessary at best
- And at worst, can cause hanging and "tuple concurrently updated" errors when
  other clients try to do GRANT SELECT on the tables in question

This change makes it so the copy of SELECT privileges only happens when we
ingest the first batch into a new table, and never otherwise.
  • Loading branch information
michalc committed Mar 19, 2024
1 parent c83ee73 commit 52ace18
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions pg_bulk_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,24 +423,25 @@ def escape_string(text):
**index.dialect_kwargs,
).create(bind=conn)

logger.info("Copying privileges for %s.%s", str(target_table.schema), str(target_table.name))
grantees = conn.execute(sa.text(sql.SQL('''
SELECT grantee
FROM information_schema.role_table_grants
WHERE table_schema = {schema} AND table_name = {table}
AND privilege_type = 'SELECT'
AND grantor != grantee
''').format(schema=sql.Literal(target_table.schema), table=sql.Literal(target_table.name))
.as_string(conn.connection.driver_connection)
)).fetchall()
for grantee in grantees:
conn.execute(sa.text(sql.SQL('GRANT SELECT ON {schema_table} TO {user}')
.format(
schema_table=sql.Identifier(ingest_table.schema, ingest_table.name),
user=sql.Identifier(grantee[0]),
if ingest_table is not target_table:
logger.info("Copying privileges for %s.%s", str(target_table.schema), str(target_table.name))
grantees = conn.execute(sa.text(sql.SQL('''
SELECT grantee
FROM information_schema.role_table_grants
WHERE table_schema = {schema} AND table_name = {table}
AND privilege_type = 'SELECT'
AND grantor != grantee
''').format(schema=sql.Literal(target_table.schema), table=sql.Literal(target_table.name))
.as_string(conn.connection.driver_connection)
)).fetchall()
for grantee in grantees:
conn.execute(sa.text(sql.SQL('GRANT SELECT ON {schema_table} TO {user}')
.format(
schema_table=sql.Identifier(ingest_table.schema, ingest_table.name),
user=sql.Identifier(grantee[0]),
)
.as_string(conn.connection.driver_connection))
)
.as_string(conn.connection.driver_connection))
)

for ingest_table in batch_ingest_tables.values():
logger.info('Calling on_before_visible callback')
Expand Down

0 comments on commit 52ace18

Please sign in to comment.