From b76c3a3cdca9a972298c554bf505aa63e7a8a41f Mon Sep 17 00:00:00 2001 From: Ville Brofeldt <33317356+villebro@users.noreply.github.com> Date: Thu, 20 Jun 2024 16:19:41 +0300 Subject: [PATCH] fix(key-value): use flush instead of commit (#29286) --- superset/commands/dashboard/permalink/create.py | 2 ++ superset/commands/explore/permalink/create.py | 2 ++ superset/commands/key_value/create.py | 2 +- superset/commands/key_value/delete.py | 2 +- superset/commands/key_value/delete_expired.py | 2 +- superset/commands/key_value/update.py | 2 +- superset/commands/key_value/upsert.py | 2 +- superset/extensions/metastore_cache.py | 7 ++++++- superset/key_value/shared_entries.py | 2 ++ superset/utils/lock.py | 3 +++ tests/integration_tests/key_value/commands/delete_test.py | 5 +++-- tests/integration_tests/key_value/commands/fixtures.py | 2 +- tests/integration_tests/key_value/commands/get_test.py | 4 ++-- 13 files changed, 26 insertions(+), 11 deletions(-) diff --git a/superset/commands/dashboard/permalink/create.py b/superset/commands/dashboard/permalink/create.py index 3387d432d5e02..76b7b8e83453c 100644 --- a/superset/commands/dashboard/permalink/create.py +++ b/superset/commands/dashboard/permalink/create.py @@ -18,6 +18,7 @@ from sqlalchemy.exc import SQLAlchemyError +from superset import db from superset.commands.dashboard.permalink.base import BaseDashboardPermalinkCommand from superset.commands.key_value.upsert import UpsertKeyValueCommand from superset.daos.dashboard import DashboardDAO @@ -62,6 +63,7 @@ def run(self) -> str: codec=self.codec, ).run() assert key.id # for type checks + db.session.commit() return encode_permalink_key(key=key.id, salt=self.salt) except KeyValueCodecEncodeException as ex: raise DashboardPermalinkCreateFailedError(str(ex)) from ex diff --git a/superset/commands/explore/permalink/create.py b/superset/commands/explore/permalink/create.py index befb1d5a47e28..731e0b5ce8a02 100644 --- a/superset/commands/explore/permalink/create.py +++ b/superset/commands/explore/permalink/create.py @@ -19,6 +19,7 @@ from sqlalchemy.exc import SQLAlchemyError +from superset import db from superset.commands.explore.permalink.base import BaseExplorePermalinkCommand from superset.commands.key_value.create import CreateKeyValueCommand from superset.explore.permalink.exceptions import ExplorePermalinkCreateFailedError @@ -58,6 +59,7 @@ def run(self) -> str: key = command.run() if key.id is None: raise ExplorePermalinkCreateFailedError("Unexpected missing key id") + db.session.commit() return encode_permalink_key(key=key.id, salt=self.salt) except KeyValueCodecEncodeException as ex: raise ExplorePermalinkCreateFailedError(str(ex)) from ex diff --git a/superset/commands/key_value/create.py b/superset/commands/key_value/create.py index a39607b5885d0..7308321e44a73 100644 --- a/superset/commands/key_value/create.py +++ b/superset/commands/key_value/create.py @@ -99,5 +99,5 @@ def create(self) -> Key: except ValueError as ex: raise KeyValueCreateFailedError() from ex db.session.add(entry) - db.session.commit() + db.session.flush() return Key(id=entry.id, uuid=entry.uuid) diff --git a/superset/commands/key_value/delete.py b/superset/commands/key_value/delete.py index 8b9095c09c9b2..37eb7087e6a20 100644 --- a/superset/commands/key_value/delete.py +++ b/superset/commands/key_value/delete.py @@ -59,6 +59,6 @@ def delete(self) -> bool: filter_ = get_filter(self.resource, self.key) if entry := db.session.query(KeyValueEntry).filter_by(**filter_).first(): db.session.delete(entry) - db.session.commit() + db.session.flush() return True return False diff --git a/superset/commands/key_value/delete_expired.py b/superset/commands/key_value/delete_expired.py index 166a9f6f87abe..92d45683f222e 100644 --- a/superset/commands/key_value/delete_expired.py +++ b/superset/commands/key_value/delete_expired.py @@ -62,4 +62,4 @@ def delete_expired(self) -> None: ) .delete() ) - db.session.commit() + db.session.flush() diff --git a/superset/commands/key_value/update.py b/superset/commands/key_value/update.py index ca940adf60282..098c9f860d1b6 100644 --- a/superset/commands/key_value/update.py +++ b/superset/commands/key_value/update.py @@ -84,7 +84,7 @@ def update(self) -> Optional[Key]: entry.expires_on = self.expires_on entry.changed_on = datetime.now() entry.changed_by_fk = get_user_id() - db.session.commit() + db.session.flush() return Key(id=entry.id, uuid=entry.uuid) return None diff --git a/superset/commands/key_value/upsert.py b/superset/commands/key_value/upsert.py index 84f02cb9cd223..2c985530bf20a 100644 --- a/superset/commands/key_value/upsert.py +++ b/superset/commands/key_value/upsert.py @@ -88,7 +88,7 @@ def upsert(self) -> Key: entry.expires_on = self.expires_on entry.changed_on = datetime.now() entry.changed_by_fk = get_user_id() - db.session.commit() + db.session.flush() return Key(entry.id, entry.uuid) return CreateKeyValueCommand( diff --git a/superset/extensions/metastore_cache.py b/superset/extensions/metastore_cache.py index 1c89e8459774d..7b4e39677e48f 100644 --- a/superset/extensions/metastore_cache.py +++ b/superset/extensions/metastore_cache.py @@ -22,6 +22,7 @@ from flask import current_app, Flask, has_app_context from flask_caching import BaseCache +from superset import db from superset.key_value.exceptions import KeyValueCreateFailedError from superset.key_value.types import ( KeyValueCodec, @@ -94,6 +95,7 @@ def set(self, key: str, value: Any, timeout: Optional[int] = None) -> bool: codec=self.codec, expires_on=self._get_expiry(timeout), ).run() + db.session.commit() return True def add(self, key: str, value: Any, timeout: Optional[int] = None) -> bool: @@ -109,6 +111,7 @@ def add(self, key: str, value: Any, timeout: Optional[int] = None) -> bool: key=self.get_key(key), expires_on=self._get_expiry(timeout), ).run() + db.session.commit() return True except KeyValueCreateFailedError: return False @@ -133,4 +136,6 @@ def delete(self, key: str) -> Any: # pylint: disable=import-outside-toplevel from superset.commands.key_value.delete import DeleteKeyValueCommand - return DeleteKeyValueCommand(resource=RESOURCE, key=self.get_key(key)).run() + ret = DeleteKeyValueCommand(resource=RESOURCE, key=self.get_key(key)).run() + db.session.commit() + return ret diff --git a/superset/key_value/shared_entries.py b/superset/key_value/shared_entries.py index 130313157a53d..f472838d2e090 100644 --- a/superset/key_value/shared_entries.py +++ b/superset/key_value/shared_entries.py @@ -18,6 +18,7 @@ from typing import Any, Optional from uuid import uuid3 +from superset import db from superset.key_value.types import JsonKeyValueCodec, KeyValueResource, SharedKey from superset.key_value.utils import get_uuid_namespace, random_key @@ -45,6 +46,7 @@ def set_shared_value(key: SharedKey, value: Any) -> None: key=uuid_key, codec=CODEC, ).run() + db.session.commit() def get_permalink_salt(key: SharedKey) -> str: diff --git a/superset/utils/lock.py b/superset/utils/lock.py index ae982e974ab05..3cd3c8ead53ab 100644 --- a/superset/utils/lock.py +++ b/superset/utils/lock.py @@ -24,6 +24,7 @@ from datetime import datetime, timedelta from typing import Any, cast, TypeVar, Union +from superset import db from superset.exceptions import CreateKeyValueDistributedLockFailedException from superset.key_value.exceptions import KeyValueCreateFailedError from superset.key_value.types import JsonKeyValueCodec, KeyValueResource @@ -92,10 +93,12 @@ def KeyValueDistributedLock( # pylint: disable=invalid-name value=True, expires_on=datetime.now() + LOCK_EXPIRATION, ).run() + db.session.commit() yield key DeleteKeyValueCommand(resource=KeyValueResource.LOCK, key=key).run() + db.session.commit() logger.debug("Removed lock on namespace %s for key %s", namespace, key) except KeyValueCreateFailedError as ex: raise CreateKeyValueDistributedLockFailedException( diff --git a/tests/integration_tests/key_value/commands/delete_test.py b/tests/integration_tests/key_value/commands/delete_test.py index ac51fa640b0d3..b45a5d075d21a 100644 --- a/tests/integration_tests/key_value/commands/delete_test.py +++ b/tests/integration_tests/key_value/commands/delete_test.py @@ -49,7 +49,7 @@ def key_value_entry() -> KeyValueEntry: value=bytes(json.dumps(JSON_VALUE), encoding="utf-8"), ) db.session.add(entry) - db.session.commit() + db.session.flush() return entry @@ -61,6 +61,7 @@ def test_delete_id_entry( from superset.commands.key_value.delete import DeleteKeyValueCommand assert DeleteKeyValueCommand(resource=RESOURCE, key=ID_KEY).run() is True + db.session.commit() def test_delete_uuid_entry( @@ -71,12 +72,12 @@ def test_delete_uuid_entry( from superset.commands.key_value.delete import DeleteKeyValueCommand assert DeleteKeyValueCommand(resource=RESOURCE, key=UUID_KEY).run() is True + db.session.commit() def test_delete_entry_missing( app_context: AppContext, admin: User, # noqa: F811 - key_value_entry: KeyValueEntry, ) -> None: from superset.commands.key_value.delete import DeleteKeyValueCommand diff --git a/tests/integration_tests/key_value/commands/fixtures.py b/tests/integration_tests/key_value/commands/fixtures.py index c0262124b6c4f..74bf809301c1d 100644 --- a/tests/integration_tests/key_value/commands/fixtures.py +++ b/tests/integration_tests/key_value/commands/fixtures.py @@ -56,7 +56,7 @@ def key_value_entry() -> Generator[KeyValueEntry, None, None]: value=bytes(json.dumps(JSON_VALUE), encoding="utf-8"), ) db.session.add(entry) - db.session.commit() + db.session.flush() yield entry db.session.delete(entry) db.session.commit() diff --git a/tests/integration_tests/key_value/commands/get_test.py b/tests/integration_tests/key_value/commands/get_test.py index 8ea4ccd87d9d4..131b615b7c2e1 100644 --- a/tests/integration_tests/key_value/commands/get_test.py +++ b/tests/integration_tests/key_value/commands/get_test.py @@ -76,7 +76,7 @@ def test_get_expired_entry(app_context: AppContext) -> None: expires_on=datetime.now() - timedelta(days=1), ) db.session.add(entry) - db.session.commit() + db.session.flush() value = GetKeyValueCommand(resource=RESOURCE, key=ID_KEY, codec=JSON_CODEC).run() assert value is None db.session.delete(entry) @@ -96,7 +96,7 @@ def test_get_future_expiring_entry(app_context: AppContext) -> None: expires_on=datetime.now() + timedelta(days=1), ) db.session.add(entry) - db.session.commit() + db.session.flush() value = GetKeyValueCommand(resource=RESOURCE, key=id_, codec=JSON_CODEC).run() assert value == JSON_VALUE db.session.delete(entry)