Skip to content

Commit

Permalink
perf: GRANT all SELECT privileges in a single query
Browse files Browse the repository at this point in the history
This reduces the number of queries run when copying existing SELECT privileges,
which happens at the end of the first batch in the cases when the ingest is not
directly into the live table.

The existing test that privileges are preserved is extended to make sure the
`','.join(...` behaviour is correct
  • Loading branch information
michalc committed Mar 19, 2024
1 parent 26d5ba9 commit 8dbf1a0
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 27 deletions.
9 changes: 6 additions & 3 deletions pg_bulk_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -434,11 +434,14 @@ def escape_string(text):
''').format(schema=sql.Literal(target_table.schema), table=sql.Literal(target_table.name))
.as_string(conn.connection.driver_connection)
)).fetchall()
for grantee in grantees:
conn.execute(sa.text(sql.SQL('GRANT SELECT ON {schema_table} TO {user}')
if grantees:
conn.execute(sa.text(sql.SQL('GRANT SELECT ON {schema_table} TO {users}')
.format(
schema_table=sql.Identifier(ingest_table.schema, ingest_table.name),
user=sql.Identifier(grantee[0]),
users=sql.SQL(',').join(
sql.Identifier(grantee[0])
for grantee in grantees
),
)
.as_string(conn.connection.driver_connection))
)
Expand Down
51 changes: 27 additions & 24 deletions test_pg_bulk_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1089,28 +1089,30 @@ def batches_1(high_watermark):
with engine.connect() as conn:
ingest(conn, metadata_1, batches_1)

user_id = uuid.uuid4().hex[:16]
with engine.connect() as conn:
conn.execute(sa.text(sql.SQL('''
CREATE USER {user_id} WITH PASSWORD 'password';
''').format(user_id=sql.Identifier(user_id)).as_string(conn.connection.driver_connection)))
conn.execute(sa.text(sql.SQL('''
GRANT CONNECT ON DATABASE postgres TO {user_id};
''').format(user_id=sql.Identifier(user_id)).as_string(conn.connection.driver_connection)))
conn.execute(sa.text(sql.SQL('''
GRANT USAGE ON SCHEMA my_schema TO {user_id};
''').format(user_id=sql.Identifier(user_id)).as_string(conn.connection.driver_connection)))
conn.commit()
conn.execute(sa.text(sql.SQL('''
GRANT SELECT ON my_schema.{table} TO {user_id};
''').format(table=sql.Identifier(my_table_1.name), user_id=sql.Identifier(user_id)).as_string(conn.connection.driver_connection)))
conn.commit()

user_engine = sa.create_engine(f'{engine_type}://{user_id}:password@127.0.0.1:5432/postgres', **engine_future)
with user_engine.connect() as conn:
results = conn.execute(sa.select(my_table_1).order_by('id')).fetchall()

assert results == [(1, 'a', 'b')]
user_ids = [uuid.uuid4().hex[:16], uuid.uuid4().hex[:16]]
with engine.connect() as conn:
for user_id in user_ids:
conn.execute(sa.text(sql.SQL('''
CREATE USER {user_id} WITH PASSWORD 'password';
''').format(user_id=sql.Identifier(user_id)).as_string(conn.connection.driver_connection)))
conn.execute(sa.text(sql.SQL('''
GRANT CONNECT ON DATABASE postgres TO {user_id};
''').format(user_id=sql.Identifier(user_id)).as_string(conn.connection.driver_connection)))
conn.execute(sa.text(sql.SQL('''
GRANT USAGE ON SCHEMA my_schema TO {user_id};
''').format(user_id=sql.Identifier(user_id)).as_string(conn.connection.driver_connection)))
conn.commit()
conn.execute(sa.text(sql.SQL('''
GRANT SELECT ON my_schema.{table} TO {user_id};
''').format(table=sql.Identifier(my_table_1.name), user_id=sql.Identifier(user_id)).as_string(conn.connection.driver_connection)))
conn.commit()

for user_id in user_ids:
user_engine = sa.create_engine(f'{engine_type}://{user_id}:password@127.0.0.1:5432/postgres', **engine_future)
with user_engine.connect() as conn:
results = conn.execute(sa.select(my_table_1).order_by('id')).fetchall()

assert results == [(1, 'a', 'b')]

metadata_2 = sa.MetaData()
my_table_2 = sa.Table(
Expand All @@ -1129,10 +1131,11 @@ def batches_2(high_watermark):
with engine.connect() as conn:
ingest(conn, metadata_2, batches_2)

with user_engine.connect() as conn:
for user_id in user_ids:
user_engine = sa.create_engine(f'{engine_type}://{user_id}:password@127.0.0.1:5432/postgres', **engine_future)
results = conn.execute(sa.select(my_table_2).order_by('id')).fetchall()

assert results == [(1, 'a', None, 'b')]
assert results == [(1, 'a', None, 'b')]


def test_migrate_add_column_not_at_end_no_data():
Expand Down

0 comments on commit 8dbf1a0

Please sign in to comment.