Skip to content

Commit

Permalink
Add test
Browse files Browse the repository at this point in the history
  • Loading branch information
fantix committed May 30, 2024
1 parent 4b7a993 commit 1893c4b
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 0 deletions.
6 changes: 6 additions & 0 deletions edgedb/_testbase.py
Original file line number Diff line number Diff line change
Expand Up @@ -372,10 +372,16 @@ def make_test_client(
database='edgedb',
user='edgedb',
password='test',
host=...,
port=...,
connection_class=...,
):
conargs = cls.get_connect_args(
cluster=cluster, database=database, user=user, password=password)
if host is not ...:
conargs['host'] = host
if port is not ...:
conargs['port'] = port
if connection_class is ...:
connection_class = (
asyncio_client.AsyncIOConnection
Expand Down
75 changes: 75 additions & 0 deletions tests/test_sync_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
#


import asyncio
import threading
import queue
import unittest.mock
from concurrent import futures

Expand Down Expand Up @@ -254,3 +256,76 @@ def test_sync_transaction_interface_errors(self):
with tx:
with tx:
pass

def test_sync_retry_parse(self):
loop = asyncio.new_event_loop()
q = queue.Queue()

async def init():
return asyncio.Event(), asyncio.Event()

reconnect, terminate = loop.run_until_complete(init())

async def proxy(r, w):
try:
while True:
buf = await r.read(65536)
if not buf:
w.close()
break
w.write(buf)
except asyncio.CancelledError:
pass

async def cb(ri, wi):
try:
args = self.get_connect_args()
ro, wo = await asyncio.open_connection(
args["host"], args["port"]
)
try:
fs = [
asyncio.create_task(proxy(ri, wo)),
asyncio.create_task(proxy(ro, wi)),
asyncio.create_task(terminate.wait()),
]
if not reconnect.is_set():
fs.append(asyncio.create_task(reconnect.wait()))
_, pending = await asyncio.wait(
fs, return_when=asyncio.FIRST_COMPLETED
)
for f in pending:
f.cancel()
finally:
wo.close()
finally:
wi.close()

async def proxy_server():
srv = await asyncio.start_server(cb, host="127.0.0.1", port=0)
try:
q.put(srv.sockets[0].getsockname()[1])
await terminate.wait()
finally:
srv.close()
await srv.wait_closed()

with futures.ThreadPoolExecutor(1) as pool:
pool.submit(loop.run_until_complete, proxy_server())
try:
client = self.make_test_client(
host="127.0.0.1",
port=q.get(),
database=self.get_database_name(),
)

# Fill the connection pool with a healthy connection
self.assertEqual(client.query_single("SELECT 42"), 42)

# Cut the connection to simulate an Internet interruption
loop.call_soon_threadsafe(reconnect.set)

# Run a new query that was never compiled, retry should work
self.assertEqual(client.query_single("SELECT 1*2+3-4"), 1)
finally:
loop.call_soon_threadsafe(terminate.set)

0 comments on commit 1893c4b

Please sign in to comment.