Skip to content

Commit

Permalink
fix(publish): Only send publish msgs to each client once
Browse files Browse the repository at this point in the history
  • Loading branch information
scottwittenburg committed Aug 10, 2021
1 parent 91b7cbb commit 65ab38d
Showing 1 changed file with 29 additions and 27 deletions.
56 changes: 29 additions & 27 deletions python/src/wslink/backends/aiohttp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def create_webserver(server_config):
routes = []

for route, server_protocol in ws_routes.items():
protocol_handler = WslinkHandler(server_protocol)
protocol_handler = WslinkHandler(server_protocol, web_app)
ws_routes[route] = protocol_handler
routes.append(
aiohttp_web.get(_fix_path(route), protocol_handler.handleWsRequest)
Expand Down Expand Up @@ -153,13 +153,34 @@ def create_webserver(server_config):


class WslinkHandler(object):
def __init__(self, protocol=None):
def __init__(self, protocol=None, web_app=None):
self.serverProtocol = protocol
self.web_app = web_app
self.functionMap = {}
self.attachmentsReceived = {}
self.attachmentsRecvQueue = []
self.connections = {}

# Build the rpc method dictionary, assuming we were given a serverprotocol
if self.getServerProtocol():
protocolList = self.getServerProtocol().getLinkProtocols()
protocolList.append(self.getServerProtocol())
for protocolObject in protocolList:
protocolObject.init(
self.publish,
self.addAttachment,
lambda: schedule_coroutine(0, _stop_server, self.web_app),
)
test = lambda x: inspect.ismethod(x) or inspect.isfunction(x)
for k in inspect.getmembers(protocolObject.__class__, test):
proc = k[1]
if "_wslinkuris" in proc.__dict__:
uri_info = proc.__dict__["_wslinkuris"][0]
if "uri" in uri_info:
uri = uri_info["uri"]
self.functionMap[uri] = (protocolObject, proc)
pub.publishManager.registerProtocol(self)

def setServerProtocol(self, protocol):
self.serverProtocol = protocol

Expand All @@ -176,6 +197,8 @@ async def disconnectClients(self):
code=aiohttp.WSCloseCode.GOING_AWAY, message="Server shutdown"
)

pub.publishManager.unregisterProtocol(self)

async def handleWsRequest(self, request):
aiohttp_app = request.app

Expand All @@ -192,7 +215,7 @@ async def handleWsRequest(self, request):

await current_ws.prepare(request)

await self.onConnect(request)
await self.onConnect()

async for msg in current_ws:
await self.onMessage(msg, client_id)
Expand All @@ -209,32 +232,11 @@ async def handleWsRequest(self, request):

return current_ws

async def onConnect(self, request):
aiohttp_app = request.app

# Build the rpc method dictionary. self.serverProtocol isn't set until connected.
if not self.getServerProtocol():
return
protocolList = self.getServerProtocol().getLinkProtocols()
protocolList.append(self.getServerProtocol())
for protocolObject in protocolList:
protocolObject.init(
self.publish,
self.addAttachment,
lambda: schedule_coroutine(0, _stop_server, aiohttp_app),
)
test = lambda x: inspect.ismethod(x) or inspect.isfunction(x)
for k in inspect.getmembers(protocolObject.__class__, test):
proc = k[1]
if "_wslinkuris" in proc.__dict__:
uri_info = proc.__dict__["_wslinkuris"][0]
if "uri" in uri_info:
uri = uri_info["uri"]
self.functionMap[uri] = (protocolObject, proc)
pub.publishManager.registerProtocol(self)
async def onConnect(self):
pass

async def onClose(self):
pub.publishManager.unregisterProtocol(self)
pass

async def handleSystemMessage(self, rpcid, methodName, args, client_id):
rpcList = rpcid.split(":")
Expand Down

0 comments on commit 65ab38d

Please sign in to comment.