diff --git a/tests/integration_tests/access_tests.py b/tests/integration_tests/access_tests.py index d888dbf53c19f..66067ae2d359a 100644 --- a/tests/integration_tests/access_tests.py +++ b/tests/integration_tests/access_tests.py @@ -19,15 +19,16 @@ import json import unittest from unittest import mock + +import pytest +from sqlalchemy import inspect + from tests.integration_tests.fixtures.birth_names_dashboard import ( load_birth_names_dashboard_with_slices, ) - -import pytest from tests.integration_tests.fixtures.world_bank_dashboard import ( load_world_bank_dashboard_with_slices, ) - from tests.integration_tests.fixtures.energy_dashboard import ( load_energy_table_with_slice, ) @@ -38,6 +39,7 @@ from superset.connectors.sqla.models import SqlaTable from superset.models import core as models from superset.models.datasource_access_request import DatasourceAccessRequest +from superset.utils.core import get_example_database from .base_tests import SupersetTestCase @@ -152,16 +154,23 @@ def test_override_role_permissions_is_admin_only(self): @pytest.mark.usefixtures("load_birth_names_dashboard_with_slices") def test_override_role_permissions_1_table(self): + database = get_example_database() + engine = database.get_sqla_engine() + schema = inspect(engine).default_schema_name + + perm_data = ROLE_TABLES_PERM_DATA.copy() + perm_data["database"][0]["schema"][0]["name"] = schema + response = self.client.post( "/superset/override_role_permissions/", - data=json.dumps(ROLE_TABLES_PERM_DATA), + data=json.dumps(perm_data), content_type="application/json", ) self.assertEqual(201, response.status_code) updated_override_me = security_manager.find_role("override_me") self.assertEqual(1, len(updated_override_me.permissions)) - birth_names = self.get_table(name="birth_names") + birth_names = self.get_table(name="birth_names", schema=schema) self.assertEqual( birth_names.perm, updated_override_me.permissions[0].view_menu.name ) @@ -171,6 +180,12 @@ def test_override_role_permissions_1_table(self): @pytest.mark.usefixtures("load_birth_names_dashboard_with_slices") def test_override_role_permissions_druid_and_table(self): + database = get_example_database() + engine = database.get_sqla_engine() + schema = inspect(engine).default_schema_name + + perm_data = ROLE_ALL_PERM_DATA.copy() + perm_data["database"][0]["schema"][0]["name"] = schema response = self.client.post( "/superset/override_role_permissions/", data=json.dumps(ROLE_ALL_PERM_DATA), @@ -190,7 +205,7 @@ def test_override_role_permissions_druid_and_table(self): "datasource_access", updated_role.permissions[1].permission.name ) - birth_names = self.get_table(name="birth_names") + birth_names = self.get_table(name="birth_names", schema=schema) self.assertEqual(birth_names.perm, perms[2].view_menu.name) self.assertEqual( "datasource_access", updated_role.permissions[2].permission.name @@ -201,24 +216,31 @@ def test_override_role_permissions_druid_and_table(self): "load_energy_table_with_slice", "load_birth_names_dashboard_with_slices" ) def test_override_role_permissions_drops_absent_perms(self): + database = get_example_database() + engine = database.get_sqla_engine() + schema = inspect(engine).default_schema_name + override_me = security_manager.find_role("override_me") override_me.permissions.append( security_manager.find_permission_view_menu( - view_menu_name=self.get_table(name="energy_usage").perm, + view_menu_name=self.get_table(name="energy_usage", schema=schema).perm, permission_name="datasource_access", ) ) db.session.flush() + perm_data = ROLE_TABLES_PERM_DATA.copy() + perm_data["database"][0]["schema"][0]["name"] = schema + response = self.client.post( "/superset/override_role_permissions/", - data=json.dumps(ROLE_TABLES_PERM_DATA), + data=json.dumps(perm_data), content_type="application/json", ) self.assertEqual(201, response.status_code) updated_override_me = security_manager.find_role("override_me") self.assertEqual(1, len(updated_override_me.permissions)) - birth_names = self.get_table(name="birth_names") + birth_names = self.get_table(name="birth_names", schema=schema) self.assertEqual( birth_names.perm, updated_override_me.permissions[0].view_menu.name )