Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure Nexus events written to the log are always applied (SYN-6807) #3518

Merged
merged 6 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions synapse/lib/nexus.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ async def __anit__(self, cell):
self.celliden = self.cell.iden
self.readonly = False
self.readonlyreason = None

self.applytask = None
self.applylock = asyncio.Lock()

self.ready = asyncio.Event()
Expand Down Expand Up @@ -312,7 +314,10 @@ async def eat(self, nexsiden, event, args, kwargs, meta):
if meta is None:
meta = {}

return await self._eat((nexsiden, event, args, kwargs, meta))
async with self.applylock:
# Keep a reference to the shielded task to ensure it isn't GC'd
self.applytask = asyncio.create_task(self._eat((nexsiden, event, args, kwargs, meta)))
invisig0th marked this conversation as resolved.
Show resolved Hide resolved
return await asyncio.shield(self.applytask)

async def index(self):
if self.donexslog:
Expand Down Expand Up @@ -365,11 +370,10 @@ async def _apply(self, indx, mesg):
nexus = self._nexskids[nexsiden]
func, passitem = nexus._nexshands[event]

async with self.applylock:
if passitem:
return await func(nexus, *args, nexsitem=(indx, mesg), **kwargs)
if passitem:
return await func(nexus, *args, nexsitem=(indx, mesg), **kwargs)

return await func(nexus, *args, **kwargs)
return await func(nexus, *args, **kwargs)

async def iter(self, offs: int, tellready=False) -> AsyncIterator[Any]:
'''
Expand Down
6 changes: 6 additions & 0 deletions synapse/tests/test_lib_agenda.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,9 +346,11 @@ def looptime():
adef = await agenda.add(cdef)
guid = adef.get('iden')

strt = await core.nexsroot.index()
# bypass the API because it would actually syntax check
unixtime += 60
self.eq((11, 'boom'), await asyncio.wait_for(core.callStorm('return($lib.queue.gen(visi).pop(wait=$lib.true))'), timeout=5))
await core.nexsroot.waitOffs(strt + 5)

appt = await agenda.get(guid)
self.eq(appt.isrunning, False)
Expand Down Expand Up @@ -399,6 +401,8 @@ def looptime():
await agenda.add(cdef)

# Lock user and advance time
strt = await core.nexsroot.index()

await visi.setLocked(True)

with self.getLoggerStream('synapse.lib.agenda', 'locked') as stream:
Expand All @@ -408,6 +412,8 @@ def looptime():
while not stream.wait(0.1):
await asyncio.sleep(0)

await core.nexsroot.waitOffs(strt + 4)

self.eq(2, appt.startcount)

async def test_agenda_persistence(self):
Expand Down
63 changes: 63 additions & 0 deletions synapse/tests/test_lib_nexus.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import asyncio
from unittest import mock

import synapse.exc as s_exc
import synapse.common as s_common
import synapse.cortex as s_cortex

import synapse.lib.cell as s_cell
import synapse.lib.nexus as s_nexus
import synapse.lib.hiveauth as s_hiveauth

import synapse.tests.utils as s_t_utils

Expand Down Expand Up @@ -201,3 +205,62 @@ async def test_nexus_setindex(self):
nexsindx = await core.getNexsIndx()
layrindx = max([await layr.getEditIndx() for layr in core.layers.values()])
self.ge(nexsindx, layrindx)

async def test_nexus_safety(self):

orig = s_hiveauth.Auth.reqUser
async def slowReq(self, iden):
await asyncio.sleep(0.2)
return await orig(self, iden)

with self.getTestDir() as dirn:
async with self.getTestCore(dirn=dirn) as core:

with mock.patch('synapse.lib.hiveauth.Auth.reqUser', slowReq):

vcnt = len(core.views)
deflayr = (await core.getLayerDef()).get('iden')

strt = await core.nexsroot.index()

vdef = {'layers': (deflayr,), 'name': 'nextview'}
core.schedCoro(core.addView(vdef))

for x in range(10):
vdef = {'layers': (deflayr,), 'name': f'someview{x}'}
core.schedCoro(core.addView(vdef))

await asyncio.sleep(0.1)

async with self.getTestCore(dirn=dirn) as core:

viewadds = 0
async for item in core.nexsroot.nexslog.iter(strt):
if item[1][1] == 'view:add':
viewadds += 1

self.eq(1, viewadds)
self.len(vcnt + viewadds, core.views)
self.len(1, [v for v in core.views.values() if (await v.pack())['name'] == 'nextview'])

vcnt = len(core.views)
strt = await core.nexsroot.index()

with mock.patch('synapse.lib.hiveauth.Auth.reqUser', slowReq):
for x in range(3):
vdef = {'layers': (deflayr,), 'name': f'someview{x}'}
with self.raises(TimeoutError):
await s_common.wait_for(core.addView(vdef), 0.1)

await core.nexsroot.waitOffs(strt + 3, timeout=2)

viewadds = 0
async for item in core.nexsroot.nexslog.iter(strt):
if item[1][1] == 'view:add':
viewadds += 1

self.eq(3, viewadds)
self.len(vcnt + viewadds, core.views)

async with self.getTestCore(dirn=dirn) as core:
self.len(vcnt + viewadds, core.views)
Loading