Skip to content

Commit

Permalink
Restore pyodide connection (#353)
Browse files Browse the repository at this point in the history
* Restore pyodide connection

* bump version
  • Loading branch information
oeway authored May 4, 2022
1 parent af739ec commit 04ba47c
Show file tree
Hide file tree
Showing 7 changed files with 46 additions and 196 deletions.
2 changes: 1 addition & 1 deletion javascript/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion javascript/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "imjoy-rpc",
"version": "0.5.9",
"version": "0.5.10",
"description": "Remote procedure calls for ImJoy.",
"module": "index.js",
"scripts": {
Expand Down
2 changes: 1 addition & 1 deletion python/imjoy_rpc/VERSION
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
"version": "0.5.9"
"version": "0.5.10"
}
74 changes: 28 additions & 46 deletions python/imjoy_rpc/connection/pyodide_connection.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import asyncio
import contextvars
import logging
import uuid
import sys
import logging
import asyncio
import traceback
import uuid

import js
import contextvars
import pyodide

from imjoy_rpc.core_connection import decode_msgpack, send_as_msgpack
from imjoy_rpc.rpc import RPC
from imjoy_rpc.utils import MessageEmitter, dotdict


import js
from js import Array, Object

logging.basicConfig(stream=sys.stdout)
logger = logging.getLogger("Pyodide Connection")

connection_id = contextvars.ContextVar("connection_id")
CHUNK_SIZE = 1024 * 512

# TODO: this is too high, we need to find a better approach
# see here: https://github.com/iodide-project/pyodide/issues/917#issuecomment-751307819
Expand Down Expand Up @@ -70,9 +70,9 @@ def register_codec(self, config):

self._codecs[config["name"]] = dotdict(config)

def start(self, plugin_id, on_ready_callback=None, on_error_callback=None):
def start(self, target="imjoy_rpc", on_ready_callback=None, on_error_callback=None):
try:
self._create_new_connection(plugin_id, on_ready_callback, on_error_callback)
self._create_new_connection(target, on_ready_callback, on_error_callback)
except Exception as ex:
if on_error_callback:
on_error_callback(ex)
Expand All @@ -85,10 +85,10 @@ def setup():

self.set_interface({"setup": setup}, config)

def _create_new_connection(self, plugin_id, on_ready_callback, on_error_callback):
def _create_new_connection(self, target, on_ready_callback, on_error_callback):
client_id = str(uuid.uuid4())
connection_id.set(client_id)
connection = PyodideConnection(self.default_config, plugin_id)
connection = PyodideConnection(self.default_config)

def initialize(data):
self.clients[client_id] = dotdict()
Expand Down Expand Up @@ -140,21 +140,18 @@ def ready(_):
)


def wrap_promise(promise):
loop = asyncio.get_event_loop()
fut = loop.create_future()
promise.then(fut.set_result).catch(fut.set_exception)
return fut


def install_requirements(requirements, resolve, reject):
import micropip

promises = []
for r in requirements:
p = micropip.install(r)
promises.append(p)
js.Promise.all(promises).then(resolve).catch(reject)
fut = micropip.install(requirements)

def done(fut):
if fut.exception():
reject(fut.exception())
else:
resolve(None)

fut.add_done_callback(done)


# This script template is a temporary workaround for the recursion error
Expand All @@ -175,28 +172,23 @@ def install_requirements(requirements, resolve, reject):


class PyodideConnection(MessageEmitter):
def __init__(self, config, plugin_id):
def __init__(self, config):
self.config = dotdict(config or {})
super().__init__(logger)
self.channel = self.config.get("channel") or "imjoy_rpc"
self._event_handlers = {}
self.peer_id = str(uuid.uuid4())
self.debug = True
self._send = js.sendMessage
self.accept_encoding = []
self.plugin_id = plugin_id
self._chunk_store = {}
self._post_message = js.sendMessage

def msg_cb(msg):
data = msg.to_py()
data = decode_msgpack(data, self._chunk_store)
if data is None:
return

# TODO: remove the exception for "initialize"
if data.get("peer_id") == self.peer_id or data.get("type") == "initialize":
if data.get("type") == "initialize":
self.accept_encoding = data.get("accept_encoding", [])
if "type" in data:
if data["type"] == "execute":
self.execute(data)
return
self._fire(data["type"], data)
else:
logger.warn(
Expand Down Expand Up @@ -237,14 +229,4 @@ def disconnect(self):
pass

def emit(self, msg):
msg["plugin_id"] = self.plugin_id
if (
msg.get("type") in ["initialized", "imjoyRPCReady"]
or "msgpack" not in self.accept_encoding
):
# Notify the server that the plugin supports msgpack decoding
if msg.get("type") == "initialized":
msg["accept_encoding"] = ["msgpack", "gzip"]
asyncio.ensure_future(self._send(msg))
else:
send_as_msgpack(msg, self._send, self.accept_encoding)
self._post_message(msg)
42 changes: 12 additions & 30 deletions python/imjoy_rpc/connection/socketio_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@

import socketio

from imjoy_rpc.core_connection import decode_msgpack, send_as_msgpack
from imjoy_rpc.rpc import RPC
from imjoy_rpc.utils import MessageEmitter, dotdict

logging.basicConfig(stream=sys.stdout)
logger = logging.getLogger("SocketIOConnection")

connection_id = contextvars.ContextVar("connection_id")
CHUNK_SIZE = 1024 * 512


class SocketIOManager:
Expand Down Expand Up @@ -82,7 +80,7 @@ def start(
):
"""Start."""
sio = socketio.AsyncClient()
self.connected = False
self.is_reconnect = False
self.url = url
socketio_path = urlparse(url).path.rstrip("/") + "/socket.io"
self.client_params = {
Expand All @@ -93,7 +91,6 @@ def start(
def registered(config):
"""Handle registration."""
if config.get("success"):
self.connected = True
client_id = str(uuid.uuid4())
self._create_new_connection(
sio,
Expand All @@ -111,11 +108,14 @@ def registered(config):
@sio.event
async def connect():
"""Handle connected."""
if self.connected:
logger.warn("Skipping reconnect to the server")
return
logger.info("connected to the server")
await sio.emit("register_plugin", self.default_config, callback=registered)
if not self.is_reconnect:
logger.info("connected to the server")
await sio.emit(
"register_plugin", self.default_config, callback=registered
)
self.is_reconnect = True
else:
logger.info("Skipping reconnect to the server")

self.sio = sio
fut = asyncio.ensure_future(self.sio.connect(self.url, **self.client_params))
Expand Down Expand Up @@ -203,20 +203,12 @@ def __init__(self, config, sio, plugin_id, client_channel):
self.peer_id = client_channel
self.client_channel = client_channel
self.plugin_id = plugin_id
self._chunk_store = {}
self.accept_encoding = []

self.sio = sio

@sio.event
def plugin_message(data):
data = decode_msgpack(data, self._chunk_store)
if data is None:
return

if data.get("peer_id") == self.peer_id or data.get("type") == "initialize":
if data.get("type") == "initialize":
self.accept_encoding = data.get("accept_encoding", [])
if "type" in data:
self._fire(data["type"], data)
else:
Expand Down Expand Up @@ -249,19 +241,9 @@ def _msg_callback(self, data):
if not data.get("success"):
self._fire("error", data.get("detail"))

async def _send(self, data):
await self.sio.emit("plugin_message", data, callback=self._msg_callback)

def emit(self, msg):
"""Emit a message."""
msg["plugin_id"] = self.plugin_id
if (
msg.get("type") in ["initialized", "imjoyRPCReady"]
or "msgpack" not in self.accept_encoding
):
# Notify the server that the plugin supports msgpack decoding
if msg.get("type") == "initialized":
msg["accept_encoding"] = ["msgpack", "gzip"]
asyncio.ensure_future(self._send(msg))
else:
send_as_msgpack(msg, self._send, self.accept_encoding)
asyncio.ensure_future(
self.sio.emit("plugin_message", msg, callback=self._msg_callback)
)
Loading

0 comments on commit 04ba47c

Please sign in to comment.