Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix for mirror window consumer #1264

Merged
merged 5 commits into from
Jun 6, 2019
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,12 +788,11 @@ async def _initCoreMirror(self, url):
await self.fini()
return

logger.warning(f'mirror loop connected ({url}')

# assume only the main layer for now...
layr = self.getLayer()

offs = await layr.getOffset(layr.iden)
logger.warning(f'mirror loop connected ({url} offset={offs})')

if offs == 0:
stat = await layr.stat()
Expand Down Expand Up @@ -842,7 +841,7 @@ async def consume(x):
await layr.setOffset(layr.iden, items[-1][0])

except asyncio.CancelledError: # pragma: no cover
raise
return
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is the main function for the coroutine, catching and handling CancelledError is ok here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't matter either way since schedCoro will swallow the cancellederror


except Exception:
logger.exception('error in initCoreMirror loop')
Expand Down
20 changes: 15 additions & 5 deletions synapse/lib/layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,16 +195,26 @@ async def stor(self, sops, splices=None):
await func(oper)

if splices:
await self._storFireSplices(splices)

await self._storSplices(splices)
async def _storFireSplices(self, splices):
'''
Fire events, windows, etc for splices.
'''
indx = await self._storSplices(splices)

self.spliced.set()
self.spliced.clear()
self.spliced.set()
self.spliced.clear()

# go fast and protect against edit-while-iter issues
[(await wind.puts(splices)) for wind in tuple(self.windows)]
items = [(indx + i, s) for (i, s) in enumerate(splices)]
# go fast and protect against edit-while-iter issues
[(await wind.puts(items)) for wind in tuple(self.windows)]

async def _storSplices(self, splices): # pragma: no cover
'''
Store the splices into a sequentially accessible storage structure.
Returns the indx of the first splice stored.
'''
raise NotImplementedError

async def _liftByFormRe(self, oper):
Expand Down
9 changes: 4 additions & 5 deletions synapse/lib/lmdblayer.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,7 @@ async def stor(self, sops, splices=None):
func(oper)

if splices:
self._storSplicesSync(splices)
self.spliced.set()
self.spliced.clear()
await self._storFireSplices(splices)

def _migrate_db_pre010(self, dbname, newslab):
'''
Expand Down Expand Up @@ -397,8 +395,9 @@ def _storPropDel(self, oper):
if univ:
self.layrslab.delete(penc + oldi, pvvalu, db=self.byuniv)

def _storSplicesSync(self, splices):
self.splicelog.save(splices)
async def _storSplices(self, splices):
info = self.splicelog.save(splices)
return info.get('orig')

async def _liftByIndx(self, oper):
# ('indx', (<dbname>, <prefix>, (<indxopers>...))
Expand Down
1 change: 0 additions & 1 deletion synapse/lib/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ async def _setops(self, name, valu, editatom, init=False):
raise s_exc.ReadOnlyProp(name=prop.full)

# not setting a set-once prop unless we are init...
await self.snap.warn(f'ReadOnlyProp: name={prop.full}')
return False

# check for type specific merging...
Expand Down
3 changes: 1 addition & 2 deletions synapse/lib/slabseqn.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,8 @@ def save(self, items):

origindx = self.indx
self.indx = indx
return {'indx': indx, 'size': size, 'count': len(items), 'time': tick, 'took': took}

return origindx
return {'indx': indx, 'size': size, 'count': len(items), 'time': tick, 'took': took, 'orig': origindx}

def append(self, item):

Expand Down
16 changes: 13 additions & 3 deletions synapse/tests/test_servers_cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,22 @@ async def test_server_mirror(self):
'--mirror', core00.getLocalUrl(),
path01]

# add a node for core01 to sync before window
await core00.nodes('[ inet:ipv4=5.5.5.5 ]')

async with await s_s_cortex.main(argv, outp=outp) as core01:
evnt = await core01._getWaitFor('inet:ipv4', '5.5.5.5')
await core00.nodes('[ inet:ipv4=5.5.5.5 ]')
await evnt.wait()

evnt00 = await core01._getWaitFor('inet:ipv4', '5.5.5.5')
await evnt00.wait()
self.len(1, await core01.nodes('inet:ipv4=5.5.5.5'))

# add a node for core01 to sync via window
evnt01 = await core01._getWaitFor('inet:ipv4', '6.6.6.6')
self.len(1, await core00.nodes('[ inet:ipv4=6.6.6.6 ]'))

await evnt01.wait()
self.len(1, await core01.nodes('inet:ipv4=6.6.6.6'))

async def test_server_mirror_badiden(self):

with self.getTestDir() as dirn:
Expand Down