Skip to content

Commit

Permalink
Issue/847 remove ioloop refs (#848)
Browse files Browse the repository at this point in the history
* Remove local ioloop references (#847)
  • Loading branch information
bartv authored Dec 19, 2018
1 parent 2ce6124 commit 6d567e3
Show file tree
Hide file tree
Showing 19 changed files with 124 additions and 212 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ Changes in this release:
- Use files for all logs and split out logs, stdout and stderr in autostarted agents (#824, #234)
- Introduce request_timeout option for transport settings
- Port unit tests to pytest-asyncio and fix deprecation warnings (#743)
- Remove all io_loop references and only use current ioloop (#847)

v 2018.3 (2018-12-07)
Changes in this release:
Expand Down
6 changes: 3 additions & 3 deletions src/inmanta/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,14 +637,14 @@ class Agent(AgentEndPoint):
message bus for changes.
"""

def __init__(self, io_loop, hostname=None, agent_map=None, code_loader=True, environment=None, poolsize=1,
def __init__(self, hostname=None, agent_map=None, code_loader=True, environment=None, poolsize=1,
cricital_pool_size=5):
super().__init__("agent", io_loop, timeout=cfg.server_timeout.get(), reconnect_delay=cfg.agent_reconnect_delay.get())
super().__init__("agent", timeout=cfg.server_timeout.get(), reconnect_delay=cfg.agent_reconnect_delay.get())

self.poolsize = poolsize
self.ratelimiter = locks.Semaphore(poolsize)
self.critical_ratelimiter = locks.Semaphore(cricital_pool_size)
self._sched = Scheduler(io_loop=self._io_loop)
self._sched = Scheduler()
self.thread_pool = ThreadPoolExecutor(poolsize)

if agent_map is None:
Expand Down
11 changes: 6 additions & 5 deletions src/inmanta/agent/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
from concurrent.futures import Future
from collections import defaultdict
import typing
import tornado.concurrent


from tornado import concurrent, ioloop


from inmanta.agent.io import get_io
Expand Down Expand Up @@ -331,7 +333,6 @@ def __init__(self, agent, io=None) -> None:
self._io = io

self._client = None
self._ioloop = agent.process._io_loop

def run_sync(self, func: typing.Callable) -> typing.Any:
"""
Expand All @@ -349,10 +350,10 @@ def run():
if result is not None:
from tornado.gen import convert_yielded
result = convert_yielded(result)
tornado.concurrent.chain_future(result, f)
concurrent.chain_future(result, f)
except Exception as e:
f.set_exception(e)
self._ioloop.add_callback(run)
ioloop.IOLoop.current().add_callback(run)

return f.result()

Expand All @@ -366,7 +367,7 @@ def get_client(self) -> protocol.AgentClient:
:return: A client that is associated with the session of the agent that executes this handler.
"""
if self._client is None:
self._client = protocol.AgentClient("agent", self._agent.sessionid, self._ioloop)
self._client = protocol.AgentClient("agent", self._agent.sessionid)
return self._client

def process_events(self, ctx: HandlerContext, resource: resources.Resource, events: dict) -> None:
Expand Down
6 changes: 3 additions & 3 deletions src/inmanta/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def start_agent(options):
from inmanta import agent
io_loop = IOLoop.current()

a = agent.Agent(io_loop)
a = agent.Agent()
a.start()

try:
Expand Down Expand Up @@ -297,10 +297,10 @@ def export(options):
json.dump(modelexporter.export_all(), fh)

if options.deploy:
conn = protocol.Client("compiler")
conn = protocol.SyncClient("compiler")
LOGGER.info("Triggering deploy for version %d" % version)
tid = cfg_env.get()
IOLoop.current().run_sync(lambda: conn.release_version(tid, version, True), 60)
conn.release_version(tid, version, True)


log_levels = {
Expand Down
4 changes: 2 additions & 2 deletions src/inmanta/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1622,8 +1622,8 @@ def create_indexes():
yield cls.create_indexes()


def connect(host, port, database, io_loop):
client = motor_tornado.MotorClient(host, port, io_loop=io_loop)
def connect(host, port, database):
client = motor_tornado.MotorClient(host, port)
db = client[database]

use_motor(db)
45 changes: 13 additions & 32 deletions src/inmanta/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@
from inmanta.config import Option, is_uuid_opt, is_list, is_str
from inmanta.execute.proxy import DynamicProxy, UnknownException
from inmanta.ast import RuntimeException, CompilerException, Locatable, OptionalValueException
from tornado.ioloop import IOLoop
from tornado import gen
from inmanta.execute.runtime import Instance, ResultVariable
from inmanta.util import hash_file
import itertools
from inmanta.ast.entity import Entity
from inmanta.util import groupby
from inmanta.ast.attribute import RelationAttribute
import inmanta.model as model


LOGGER = logging.getLogger(__name__)

unknown_parameters = []
Expand Down Expand Up @@ -65,25 +65,24 @@ def __str__(self, *args, **kwargs):
return "Cycle in dependencies: %s" % self.cycle


@gen.coroutine
def upload_code(conn, tid, version, resource_to_sourcemap):
allfiles = {myhash: source_code for sourcemap in resource_to_sourcemap.values()
for myhash, (file_name, module, source_code, req) in sourcemap.items()}

res = yield conn.stat_files(list(allfiles.keys()))
res = conn.stat_files(list(allfiles.keys()))
if res is None or res.code != 200:
raise Exception("Unable to upload handler plugin code to the server (msg: %s)" % res.result)

for file in res.result["files"]:
res = yield conn.upload_file(id=file, content=base64.b64encode(allfiles[file].encode()).decode("ascii"))
res = conn.upload_file(id=file, content=base64.b64encode(allfiles[file].encode()).decode("ascii"))
if res is None or res.code != 200:
raise Exception("Unable to upload handler plugin code to the server (msg: %s)" % res.result)

compactmap = {resource: {myhash: (file_name, module, req) for
myhash, (file_name, module, source_code, req)in sourcemap.items()}
for resource, sourcemap in resource_to_sourcemap.items()}

res = yield conn.upload_code_batched(tid=tid, id=version, resources=compactmap)
res = conn.upload_code_batched(tid=tid, id=version, resources=compactmap)
if res is None or res.code != 200:
raise Exception("Unable to upload handler plugin code to the server (msg: %s)" % res.result)

Expand Down Expand Up @@ -363,10 +362,7 @@ def resources_to_list(self) -> List[Dict[str, str]]:

return resources

def run_sync(self, function):
return IOLoop.current(instance=True).run_sync(function, 300)

def deploy_code(self, tid, version=None):
def deploy_code(self, conn, tid, version=None):
"""
Deploy code to the server
"""
Expand All @@ -390,13 +386,7 @@ def merge_dict(a, b):

LOGGER.info("Uploading source files")

conn = protocol.Client("compiler")

@gen.coroutine
def call():
yield upload_code(conn, tid, version, sources)

self.run_sync(call)
upload_code(conn, tid, version, sources)

def commit_resources(self, version: int, resources: List[Dict[str, str]],
metadata: Dict[str, str], model: Dict) -> None:
Expand All @@ -408,19 +398,16 @@ def commit_resources(self, version: int, resources: List[Dict[str, str]],
LOGGER.error("The environment for this model should be set!")
return

self.deploy_code(tid, version)
conn = protocol.SyncClient("compiler")
self.deploy_code(conn, tid, version)

conn = protocol.Client("compiler")
LOGGER.info("Uploading %d files" % len(self._file_store))

# collect all hashes and send them at once to the server to check
# if they are already uploaded
hashes = list(self._file_store.keys())

def call():
return conn.stat_files(files=hashes)

res = self.run_sync(call)
res = conn.stat_files(files=hashes)

if res.code != 200:
raise Exception("Unable to check status of files at server")
Expand All @@ -431,10 +418,7 @@ def call():
for hash_id in to_upload:
content = self._file_store[hash_id]

def call():
return conn.upload_file(id=hash_id, content=base64.b64encode(content).decode("ascii"))

res = self.run_sync(call)
res = conn.upload_file(id=hash_id, content=base64.b64encode(content).decode("ascii"))

if res.code != 200:
LOGGER.error("Unable to upload file with hash %s" % hash_id)
Expand All @@ -450,11 +434,8 @@ def call():
for res in resources:
LOGGER.debug(" %s", res["id"])

def put_call():
return conn.put_version(tid=tid, version=version, resources=resources, unknowns=unknown_parameters,
resource_state=self._resource_state, version_info=version_info)

res = self.run_sync(put_call)
res = conn.put_version(tid=tid, version=version, resources=resources, unknowns=unknown_parameters,
resource_state=self._resource_state, version_info=version_info)

if res.code != 200:
LOGGER.error("Failed to commit resource updates (%s)", res.result["message"])
Expand Down
49 changes: 4 additions & 45 deletions src/inmanta/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@
import datetime
import json
from collections import defaultdict
from concurrent.futures import Future


from inmanta import protocol
from inmanta.config import Config, cmdline_rest_transport
from tornado.ioloop import IOLoop
import click
import texttable
from time import sleep
Expand All @@ -36,15 +34,7 @@
class Client(object):
log = logging.getLogger(__name__)

def __init__(self, host, port, io_loop):
self._client = None
if io_loop is not None:
self._io_loop = io_loop
self._own_loop = False
else:
self._io_loop = IOLoop.current()
self._own_loop = True

def __init__(self, host, port):
if host is None:
self.host = cmdline_rest_transport.host.get()
else:
Expand All @@ -57,34 +47,7 @@ def __init__(self, host, port, io_loop):
self.port = port
Config.set("cmdline_rest_transport", "port", str(port))

self._client = protocol.Client("cmdline")

def run_sync(self, func):
if self._own_loop:
return self._io_loop.run_sync(func)

else:
f = Future()

def future_to_future(future):
exc = future.exception()
if exc is not None:
f.set_exception(exc)
else:
f.set_result(future.result())

def run():
try:
result = func()
if result is not None:
from tornado.gen import convert_yielded
result = convert_yielded(result)
result.add_done_callback(future_to_future)
except Exception as e:
f.set_exception(e)
self._io_loop.add_callback(run)

return f.result()
self._client = protocol.SyncClient("cmdline")

def do_request(self, method_name, key_name=None, arguments={}, allow_none=False):
"""
Expand All @@ -97,11 +60,7 @@ def do_request(self, method_name, key_name=None, arguments={}, allow_none=False)
raise Exception("API call %s is not available." % method_name)

method = getattr(self._client, method_name)

def call():
return method(**arguments)

result = self.run_sync(call)
result = method(**arguments)

if result is None:
raise Exception("Failed to call server.")
Expand Down Expand Up @@ -226,7 +185,7 @@ def print_table(header, rows, data_type=None):
@click.option("--port", help="The server port to connect to")
@click.pass_context
def cmd(ctx, host, port):
ctx.obj = Client(host, port, io_loop=ctx.obj)
ctx.obj = Client(host, port)


@cmd.group("project")
Expand Down
27 changes: 11 additions & 16 deletions src/inmanta/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -728,11 +728,10 @@ class Endpoint(object):
An end-point in the rpc framework
"""

def __init__(self, io_loop, name):
def __init__(self, name):
self._name = name
self._node_name = inmanta_config.nodename.get()
self._end_point_names = []
self._io_loop = io_loop

def add_future(self, future):
"""
Expand All @@ -744,7 +743,7 @@ def handle_result(f):
except Exception as e:
LOGGER.exception("An exception occurred while handling a future: %s", str(e))

self._io_loop.add_future(future, handle_result)
IOLoop.current().add_future(future, handle_result)

def get_end_point_names(self):
return self._end_point_names
Expand Down Expand Up @@ -782,11 +781,11 @@ class AgentEndPoint(Endpoint, metaclass=EndpointMeta):
An endpoint for clients that make calls to a server and that receive calls back from the server using long-poll
"""

def __init__(self, name, io_loop, timeout=120, transport=RESTTransport, reconnect_delay=5):
super().__init__(io_loop, name)
def __init__(self, name, timeout=120, transport=RESTTransport, reconnect_delay=5):
super().__init__(name)
self._transport = transport
self._client = None
self._sched = Scheduler(self._io_loop)
self._sched = Scheduler()

self._env_id = None

Expand Down Expand Up @@ -816,7 +815,7 @@ def start(self):
assert self._env_id is not None
LOGGER.log(3, "Starting agent for %s", str(self.sessionid))
self._client = AgentClient(self.name, self.sessionid, transport=self._transport, timeout=self.server_timeout)
self._io_loop.add_callback(self.perform_heartbeat)
IOLoop.current().add_callback(self.perform_heartbeat)

def stop(self):
self.running = False
Expand Down Expand Up @@ -895,7 +894,7 @@ def submit_result(future):
self._client.heartbeat_reply(self.sessionid, method_call["reply_id"],
{"result": result_body, "code": status})

self._io_loop.add_future(call_result, submit_result)
IOLoop.current().add_future(call_result, submit_result)


class ClientMeta(type):
Expand Down Expand Up @@ -931,10 +930,8 @@ class Client(Endpoint, metaclass=ClientMeta):
A client that communicates with end-point based on its configuration
"""

def __init__(self, name, ioloop=None, transport=RESTTransport):
if ioloop is None:
ioloop = IOLoop.current()
Endpoint.__init__(self, ioloop, name)
def __init__(self, name, transport=RESTTransport):
Endpoint.__init__(self, name)
self._transport = transport
self._transport_instance = None

Expand Down Expand Up @@ -982,10 +979,8 @@ class AgentClient(Endpoint, metaclass=ClientMeta):
A client that communicates with end-point based on its configuration
"""

def __init__(self, name, sid, ioloop=None, transport=RESTTransport, timeout=120):
if ioloop is None:
ioloop = IOLoop.current()
Endpoint.__init__(self, ioloop, name)
def __init__(self, name, sid, transport=RESTTransport, timeout=120):
Endpoint.__init__(self, name)
self._transport = transport
self._transport_instance = None
self._sid = sid
Expand Down
Loading

0 comments on commit 6d567e3

Please sign in to comment.