Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(python) Add autocommit flag to transaction #162

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 38 additions & 6 deletions wrappers/python/aries_askar/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@ def session(self, profile: str = None) -> "OpenSession":
"""Open a new session on the store without starting a transaction."""
return OpenSession(self._handle, profile, False)

def transaction(self, profile: str = None) -> "OpenSession":
def transaction(self, profile: str = None, *, autocommit=None) -> "OpenSession":
"""Open a new transactional session on the store."""
return OpenSession(self._handle, profile, True)
return OpenSession(self._handle, profile, True, autocommit)

async def close(self, *, remove: bool = False) -> bool:
"""Close and free the pool instance."""
Expand All @@ -431,11 +431,28 @@ def __repr__(self) -> str:
class Session:
"""An opened Session instance."""

def __init__(self, store: StoreHandle, handle: SessionHandle, is_txn: bool):
def __init__(
self,
store: StoreHandle,
handle: SessionHandle,
is_txn: bool = False,
autocommit: Optional[bool] = None,
):
"""Initialize the Session instance."""
self._store = store
self._handle = handle
self._is_txn = is_txn
self._autocommit = autocommit or False

@property
def autocommit(self) -> bool:
"""Determine if autocommit is enabled for a transaction."""
return self._autocommit

@autocommit.setter
def autocommit(self, val: bool):
"""Set the autocommit flag for a transaction."""
self._autocommit = val or False

@property
def is_transaction(self) -> bool:
Expand Down Expand Up @@ -641,21 +658,33 @@ async def rollback(self):
async def close(self):
"""Close the session without specifying the commit behaviour."""
if self._handle:
await self._handle.close(commit=False)
await self._handle.close(commit=self._autocommit)
self._handle = None

def __repr__(self) -> str:
return f"<Session(handle={self._handle}, is_transaction={self._is_txn})>"
"""Format a string representation of the session."""
return (
f"<Session(handle={self._handle}, "
f"is_transaction={self._is_txn}, "
f"autocommit={self._autocommit})>"
)


class OpenSession:
"""A pending session instance."""

def __init__(self, store: StoreHandle, profile: Optional[str], is_txn: bool):
def __init__(
self,
store: StoreHandle,
profile: Optional[str],
is_txn: bool,
autocommit: Optional[bool] = None,
):
"""Initialize the OpenSession instance."""
self._store = store
self._profile = profile
self._is_txn = is_txn
self._autocommit = autocommit
self._session: Session = None

@property
Expand All @@ -675,6 +704,7 @@ async def _open(self) -> Session:
self._store,
await bindings.session_start(self._store, self._profile, self._is_txn),
self._is_txn,
self._autocommit,
)

def __await__(self) -> Session:
Expand All @@ -690,4 +720,6 @@ async def __aexit__(self, exc_type, exc, tb):
"""Terminate the async context and close the session."""
session = self._session
self._session = None
if exc:
session.autocommit = False
await session.close()
40 changes: 36 additions & 4 deletions wrappers/python/tests/test_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ async def store() -> Store:

@mark.asyncio
async def test_insert_update(store: Store):

async with store as session:
# Insert a new entry
await session.insert(
Expand Down Expand Up @@ -84,7 +83,6 @@ async def test_insert_update(store: Store):

@mark.asyncio
async def test_remove_all(store: Store):

async with store as session:
# Insert a new entry
await session.insert(
Expand All @@ -108,7 +106,6 @@ async def test_remove_all(store: Store):

@mark.asyncio
async def test_scan(store: Store):

async with store as session:
await session.insert(
TEST_ENTRY["category"],
Expand Down Expand Up @@ -139,7 +136,6 @@ async def test_scan(store: Store):
@mark.asyncio
async def test_txn_basic(store: Store):
async with store.transaction() as txn:

# Insert a new entry
await txn.insert(
TEST_ENTRY["category"],
Expand Down Expand Up @@ -171,6 +167,42 @@ async def test_txn_basic(store: Store):
assert dict(found) == TEST_ENTRY


@mark.asyncio
async def test_txn_autocommit(store: Store):
with raises(Exception):
async with store.transaction(autocommit=True) as txn:
# Insert a new entry
await txn.insert(
TEST_ENTRY["category"],
TEST_ENTRY["name"],
TEST_ENTRY["value"],
TEST_ENTRY["tags"],
)

found = await txn.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])
assert dict(found) == TEST_ENTRY

raise Exception()

# Row should not have been inserted
async with store as session:
assert (await session.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])) is None

async with store.transaction(autocommit=True) as txn:
# Insert a new entry
await txn.insert(
TEST_ENTRY["category"],
TEST_ENTRY["name"],
TEST_ENTRY["value"],
TEST_ENTRY["tags"],
)

# Transaction should have been committed
async with store as session:
found = await session.fetch(TEST_ENTRY["category"], TEST_ENTRY["name"])
assert dict(found) == TEST_ENTRY


@mark.asyncio
async def test_txn_contention(store: Store):
async with store.transaction() as txn:
Expand Down