Skip to content

Commit

Permalink
Source Postgres : use more simple and comprehensive query to get sele…
Browse files Browse the repository at this point in the history
…ctable tables (#14251)

* use more simple and comprehensive query to get selectable tables

* cover case when schema is not specified

* add test to check discover with different ways of grants

* format

* incr ver

* incr ver

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
DoNotPanicUA and octavia-squidington-iii authored Jun 30, 2022
1 parent c6ff5ab commit 8b19a70
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.29
dockerImageTag: 0.4.30
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6854,7 +6854,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.29"
- dockerImage: "airbyte/source-postgres:0.4.30"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-postgres/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.29
LABEL io.airbyte.version=0.4.30
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -301,65 +301,17 @@ public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase
final CheckedFunction<Connection, PreparedStatement, SQLException> statementCreator = connection -> {
final PreparedStatement ps = connection.prepareStatement(
"""
SELECT DISTINCT table_catalog,
table_schema,
table_name,
privilege_type
FROM information_schema.table_privileges
WHERE grantee = ?
AND privilege_type = 'SELECT'
UNION ALL
SELECT r.rolname AS table_catalog,
n.nspname AS table_schema,
c.relname AS table_name,
-- the initial query is supposed to get a SELECT type. Since we use a UNION query
-- to get Views that we can read (i.e. select) - then lets fill this columns with SELECT
-- value to keep the backward-compatibility
COALESCE ('SELECT') AS privilege_type
SELECT nspname as table_schema,
relname as table_name
FROM pg_class c
JOIN pg_namespace n
ON n.oid = relnamespace
JOIN pg_roles r
ON r.oid = relowner,
Unnest(COALESCE(relacl::text[], Format('{%s=arwdDxt/%s}', rolname, rolname)::text[])) acl,
Regexp_split_to_array(acl, '=|/') s
WHERE r.rolname = ?
AND (
nspname = 'public'
OR nspname = ?)
-- 'm' means Materialized View
AND c.relkind = 'm'
AND (
-- all grants
c.relacl IS NULL
-- read grant
OR s[2] = 'r')
UNION
SELECT DISTINCT table_catalog,
table_schema,
table_name,
privilege_type
FROM information_schema.table_privileges p
JOIN information_schema.applicable_roles r ON p.grantee = r.role_name
WHERE r.grantee in
(WITH RECURSIVE membership_tree(grpid, userid) AS (
SELECT pg_roles.oid, pg_roles.oid
FROM pg_roles WHERE oid = (select oid from pg_roles where rolname=?)
UNION ALL
SELECT m_1.roleid, t_1.userid
FROM pg_auth_members m_1, membership_tree t_1
WHERE m_1.member = t_1.grpid
)
SELECT DISTINCT m.rolname AS grpname
FROM membership_tree t, pg_roles r, pg_roles m
WHERE t.grpid = m.oid AND t.userid = r.oid)
AND privilege_type = 'SELECT';
JOIN pg_namespace n on c.relnamespace = n.oid
WHERE has_table_privilege(c.oid, 'SELECT')
-- r = ordinary table, i = index, S = sequence, t = TOAST table, v = view, m = materialized view, c = composite type, f = foreign table, p = partitioned table, I = partitioned index
AND relkind in ('r', 'm', 'v', 't', 'f', 'p')
and ((? is null) OR nspname = ?)
""");
final String username = getUsername(database.getDatabaseConfig());
ps.setString(1, username);
ps.setString(2, username);
ps.setString(3, username);
ps.setString(4, username);
ps.setString(1, schema);
ps.setString(2, schema);
return ps;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,101 @@ void testDiscoverRecursiveRolePermissions() throws Exception {
}
}

@Test
void testDiscoverDifferentGrantAvailability() throws Exception {
try (final PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine")) {
db.start();
final JsonNode config = getConfig(db);
try (final DSLContext dslContext = getDslContext(config)) {
final Database database = new Database(dslContext);
database.query(ctx -> {
ctx.fetch("create table not_granted_table_name_1(column_1 integer);");
ctx.fetch("create table not_granted_table_name_2(column_1 integer);");
ctx.fetch("create table not_granted_table_name_3(column_1 integer);");
ctx.fetch("create table table_granted_by_role(column_1 integer);");
ctx.fetch("create table test_table_granted_directly(column_1 integer);");
ctx.fetch("create table table_granted_by_role_with_options(column_1 integer);");
ctx.fetch("create table test_table_granted_directly_with_options(column_1 integer);");

ctx.fetch("create materialized view not_granted_mv_name_1 as SELECT not_granted_table_name_1.column_1 FROM not_granted_table_name_1;");
ctx.fetch("create materialized view not_granted_mv_name_2 as SELECT not_granted_table_name_2.column_1 FROM not_granted_table_name_2;");
ctx.fetch("create materialized view not_granted_mv_name_3 as SELECT not_granted_table_name_3.column_1 FROM not_granted_table_name_3;");
ctx.fetch("create materialized view mv_granted_by_role as SELECT table_granted_by_role.column_1 FROM table_granted_by_role;");
ctx.fetch(
"create materialized view test_mv_granted_directly as SELECT test_table_granted_directly.column_1 FROM test_table_granted_directly;");
ctx.fetch(
"create materialized view mv_granted_by_role_with_options as SELECT table_granted_by_role_with_options.column_1 FROM table_granted_by_role_with_options;");
ctx.fetch(
"create materialized view test_mv_granted_directly_with_options as SELECT test_table_granted_directly_with_options.column_1 FROM test_table_granted_directly_with_options;");

ctx.fetch("create view not_granted_view_name_1(column_1) as SELECT not_granted_table_name_1.column_1 FROM not_granted_table_name_1;");
ctx.fetch("create view not_granted_view_name_2(column_1) as SELECT not_granted_table_name_2.column_1 FROM not_granted_table_name_2;");
ctx.fetch("create view not_granted_view_name_3(column_1) as SELECT not_granted_table_name_3.column_1 FROM not_granted_table_name_3;");
ctx.fetch("create view view_granted_by_role(column_1) as SELECT table_granted_by_role.column_1 FROM table_granted_by_role;");
ctx.fetch(
"create view test_view_granted_directly(column_1) as SELECT test_table_granted_directly.column_1 FROM test_table_granted_directly;");
ctx.fetch(
"create view view_granted_by_role_with_options(column_1) as SELECT table_granted_by_role_with_options.column_1 FROM table_granted_by_role_with_options;");
ctx.fetch(
"create view test_view_granted_directly_with_options(column_1) as SELECT test_table_granted_directly_with_options.column_1 FROM test_table_granted_directly_with_options;");

ctx.fetch("create role test_role;");

ctx.fetch("grant delete on not_granted_table_name_2 to test_role;");
ctx.fetch("grant delete on not_granted_mv_name_2 to test_role;");
ctx.fetch("grant delete on not_granted_view_name_2 to test_role;");

ctx.fetch("grant select on table_granted_by_role to test_role;");
ctx.fetch("grant select on mv_granted_by_role to test_role;");
ctx.fetch("grant select on view_granted_by_role to test_role;");

ctx.fetch("grant select on table_granted_by_role_with_options to test_role with grant option;");
ctx.fetch("grant select on mv_granted_by_role_with_options to test_role with grant option;");
ctx.fetch("grant select on view_granted_by_role_with_options to test_role with grant option;");

ctx.fetch("create user new_test_user;");
ctx.fetch("ALTER USER new_test_user WITH PASSWORD 'new_pass';");
ctx.fetch("GRANT CONNECT ON DATABASE test TO new_test_user;");

ctx.fetch("grant test_role to new_test_user;");

ctx.fetch("grant delete on not_granted_table_name_3 to new_test_user;");
ctx.fetch("grant delete on not_granted_mv_name_3 to new_test_user;");
ctx.fetch("grant delete on not_granted_view_name_3 to new_test_user;");

ctx.fetch("grant select on test_table_granted_directly to new_test_user;");
ctx.fetch("grant select on test_mv_granted_directly to new_test_user;");
ctx.fetch("grant select on test_view_granted_directly to new_test_user;");

ctx.fetch("grant select on test_table_granted_directly_with_options to test_role with grant option;");
ctx.fetch("grant select on test_mv_granted_directly_with_options to test_role with grant option;");
ctx.fetch("grant select on test_view_granted_directly_with_options to test_role with grant option;");
return null;
});
}

AirbyteCatalog actual = new PostgresSource().discover(getConfig(db, "new_test_user", "new_pass"));
Set<String> tableNames = actual.getStreams().stream().map(stream -> stream.getName()).collect(Collectors.toSet());
Set<String> expectedVisibleNames = Sets.newHashSet(
"table_granted_by_role",
"table_granted_by_role_with_options",
"test_table_granted_directly",
"test_table_granted_directly_with_options",
"mv_granted_by_role",
"mv_granted_by_role_with_options",
"test_mv_granted_directly",
"test_mv_granted_directly_with_options",
"test_view_granted_directly",
"test_view_granted_directly_with_options",
"view_granted_by_role",
"view_granted_by_role_with_options");

assertEquals(tableNames, expectedVisibleNames);

db.stop();
}
}

@Test
void testReadSuccess() throws Exception {
final ConfiguredAirbyteCatalog configuredCatalog =
Expand Down
Loading

0 comments on commit 8b19a70

Please sign in to comment.