diff --git a/CHANGELOG.md b/CHANGELOG.md index 21034f8d48..a18a261c50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ Changes in this release: - Use files for all logs and split out logs, stdout and stderr in autostarted agents (#824, #234) - Introduce request_timeout option for transport settings - Port unit tests to pytest-asyncio and fix deprecation warnings (#743) +- Remove all io_loop references and only use current ioloop (#847) v 2018.3 (2018-12-07) Changes in this release: diff --git a/src/inmanta/agent/agent.py b/src/inmanta/agent/agent.py index 699149a3b8..fed74016cc 100644 --- a/src/inmanta/agent/agent.py +++ b/src/inmanta/agent/agent.py @@ -637,14 +637,14 @@ class Agent(AgentEndPoint): message bus for changes. """ - def __init__(self, io_loop, hostname=None, agent_map=None, code_loader=True, environment=None, poolsize=1, + def __init__(self, hostname=None, agent_map=None, code_loader=True, environment=None, poolsize=1, cricital_pool_size=5): - super().__init__("agent", io_loop, timeout=cfg.server_timeout.get(), reconnect_delay=cfg.agent_reconnect_delay.get()) + super().__init__("agent", timeout=cfg.server_timeout.get(), reconnect_delay=cfg.agent_reconnect_delay.get()) self.poolsize = poolsize self.ratelimiter = locks.Semaphore(poolsize) self.critical_ratelimiter = locks.Semaphore(cricital_pool_size) - self._sched = Scheduler(io_loop=self._io_loop) + self._sched = Scheduler() self.thread_pool = ThreadPoolExecutor(poolsize) if agent_map is None: diff --git a/src/inmanta/agent/handler.py b/src/inmanta/agent/handler.py index 6558787af6..68c3c37776 100644 --- a/src/inmanta/agent/handler.py +++ b/src/inmanta/agent/handler.py @@ -24,7 +24,9 @@ from concurrent.futures import Future from collections import defaultdict import typing -import tornado.concurrent + + +from tornado import concurrent, ioloop from inmanta.agent.io import get_io @@ -331,7 +333,6 @@ def __init__(self, agent, io=None) -> None: self._io = io self._client = None - self._ioloop = agent.process._io_loop def run_sync(self, func: typing.Callable) -> typing.Any: """ @@ -349,10 +350,10 @@ def run(): if result is not None: from tornado.gen import convert_yielded result = convert_yielded(result) - tornado.concurrent.chain_future(result, f) + concurrent.chain_future(result, f) except Exception as e: f.set_exception(e) - self._ioloop.add_callback(run) + ioloop.IOLoop.current().add_callback(run) return f.result() @@ -366,7 +367,7 @@ def get_client(self) -> protocol.AgentClient: :return: A client that is associated with the session of the agent that executes this handler. """ if self._client is None: - self._client = protocol.AgentClient("agent", self._agent.sessionid, self._ioloop) + self._client = protocol.AgentClient("agent", self._agent.sessionid) return self._client def process_events(self, ctx: HandlerContext, resource: resources.Resource, events: dict) -> None: diff --git a/src/inmanta/app.py b/src/inmanta/app.py index 3c7f90260d..d564b99fe8 100755 --- a/src/inmanta/app.py +++ b/src/inmanta/app.py @@ -72,7 +72,7 @@ def start_agent(options): from inmanta import agent io_loop = IOLoop.current() - a = agent.Agent(io_loop) + a = agent.Agent() a.start() try: @@ -297,10 +297,10 @@ def export(options): json.dump(modelexporter.export_all(), fh) if options.deploy: - conn = protocol.Client("compiler") + conn = protocol.SyncClient("compiler") LOGGER.info("Triggering deploy for version %d" % version) tid = cfg_env.get() - IOLoop.current().run_sync(lambda: conn.release_version(tid, version, True), 60) + conn.release_version(tid, version, True) log_levels = { diff --git a/src/inmanta/data.py b/src/inmanta/data.py index f434c2614c..d4eb345781 100644 --- a/src/inmanta/data.py +++ b/src/inmanta/data.py @@ -1622,8 +1622,8 @@ def create_indexes(): yield cls.create_indexes() -def connect(host, port, database, io_loop): - client = motor_tornado.MotorClient(host, port, io_loop=io_loop) +def connect(host, port, database): + client = motor_tornado.MotorClient(host, port) db = client[database] use_motor(db) diff --git a/src/inmanta/export.py b/src/inmanta/export.py index 820b8aa3b3..2119a7e4f6 100644 --- a/src/inmanta/export.py +++ b/src/inmanta/export.py @@ -29,8 +29,6 @@ from inmanta.config import Option, is_uuid_opt, is_list, is_str from inmanta.execute.proxy import DynamicProxy, UnknownException from inmanta.ast import RuntimeException, CompilerException, Locatable, OptionalValueException -from tornado.ioloop import IOLoop -from tornado import gen from inmanta.execute.runtime import Instance, ResultVariable from inmanta.util import hash_file import itertools @@ -38,6 +36,8 @@ from inmanta.util import groupby from inmanta.ast.attribute import RelationAttribute import inmanta.model as model + + LOGGER = logging.getLogger(__name__) unknown_parameters = [] @@ -65,17 +65,16 @@ def __str__(self, *args, **kwargs): return "Cycle in dependencies: %s" % self.cycle -@gen.coroutine def upload_code(conn, tid, version, resource_to_sourcemap): allfiles = {myhash: source_code for sourcemap in resource_to_sourcemap.values() for myhash, (file_name, module, source_code, req) in sourcemap.items()} - res = yield conn.stat_files(list(allfiles.keys())) + res = conn.stat_files(list(allfiles.keys())) if res is None or res.code != 200: raise Exception("Unable to upload handler plugin code to the server (msg: %s)" % res.result) for file in res.result["files"]: - res = yield conn.upload_file(id=file, content=base64.b64encode(allfiles[file].encode()).decode("ascii")) + res = conn.upload_file(id=file, content=base64.b64encode(allfiles[file].encode()).decode("ascii")) if res is None or res.code != 200: raise Exception("Unable to upload handler plugin code to the server (msg: %s)" % res.result) @@ -83,7 +82,7 @@ def upload_code(conn, tid, version, resource_to_sourcemap): myhash, (file_name, module, source_code, req)in sourcemap.items()} for resource, sourcemap in resource_to_sourcemap.items()} - res = yield conn.upload_code_batched(tid=tid, id=version, resources=compactmap) + res = conn.upload_code_batched(tid=tid, id=version, resources=compactmap) if res is None or res.code != 200: raise Exception("Unable to upload handler plugin code to the server (msg: %s)" % res.result) @@ -363,10 +362,7 @@ def resources_to_list(self) -> List[Dict[str, str]]: return resources - def run_sync(self, function): - return IOLoop.current(instance=True).run_sync(function, 300) - - def deploy_code(self, tid, version=None): + def deploy_code(self, conn, tid, version=None): """ Deploy code to the server """ @@ -390,13 +386,7 @@ def merge_dict(a, b): LOGGER.info("Uploading source files") - conn = protocol.Client("compiler") - - @gen.coroutine - def call(): - yield upload_code(conn, tid, version, sources) - - self.run_sync(call) + upload_code(conn, tid, version, sources) def commit_resources(self, version: int, resources: List[Dict[str, str]], metadata: Dict[str, str], model: Dict) -> None: @@ -408,19 +398,16 @@ def commit_resources(self, version: int, resources: List[Dict[str, str]], LOGGER.error("The environment for this model should be set!") return - self.deploy_code(tid, version) + conn = protocol.SyncClient("compiler") + self.deploy_code(conn, tid, version) - conn = protocol.Client("compiler") LOGGER.info("Uploading %d files" % len(self._file_store)) # collect all hashes and send them at once to the server to check # if they are already uploaded hashes = list(self._file_store.keys()) - def call(): - return conn.stat_files(files=hashes) - - res = self.run_sync(call) + res = conn.stat_files(files=hashes) if res.code != 200: raise Exception("Unable to check status of files at server") @@ -431,10 +418,7 @@ def call(): for hash_id in to_upload: content = self._file_store[hash_id] - def call(): - return conn.upload_file(id=hash_id, content=base64.b64encode(content).decode("ascii")) - - res = self.run_sync(call) + res = conn.upload_file(id=hash_id, content=base64.b64encode(content).decode("ascii")) if res.code != 200: LOGGER.error("Unable to upload file with hash %s" % hash_id) @@ -450,11 +434,8 @@ def call(): for res in resources: LOGGER.debug(" %s", res["id"]) - def put_call(): - return conn.put_version(tid=tid, version=version, resources=resources, unknowns=unknown_parameters, - resource_state=self._resource_state, version_info=version_info) - - res = self.run_sync(put_call) + res = conn.put_version(tid=tid, version=version, resources=resources, unknowns=unknown_parameters, + resource_state=self._resource_state, version_info=version_info) if res.code != 200: LOGGER.error("Failed to commit resource updates (%s)", res.result["message"]) diff --git a/src/inmanta/main.py b/src/inmanta/main.py index 904e1ab50a..a381202567 100644 --- a/src/inmanta/main.py +++ b/src/inmanta/main.py @@ -22,12 +22,10 @@ import datetime import json from collections import defaultdict -from concurrent.futures import Future from inmanta import protocol from inmanta.config import Config, cmdline_rest_transport -from tornado.ioloop import IOLoop import click import texttable from time import sleep @@ -36,15 +34,7 @@ class Client(object): log = logging.getLogger(__name__) - def __init__(self, host, port, io_loop): - self._client = None - if io_loop is not None: - self._io_loop = io_loop - self._own_loop = False - else: - self._io_loop = IOLoop.current() - self._own_loop = True - + def __init__(self, host, port): if host is None: self.host = cmdline_rest_transport.host.get() else: @@ -57,34 +47,7 @@ def __init__(self, host, port, io_loop): self.port = port Config.set("cmdline_rest_transport", "port", str(port)) - self._client = protocol.Client("cmdline") - - def run_sync(self, func): - if self._own_loop: - return self._io_loop.run_sync(func) - - else: - f = Future() - - def future_to_future(future): - exc = future.exception() - if exc is not None: - f.set_exception(exc) - else: - f.set_result(future.result()) - - def run(): - try: - result = func() - if result is not None: - from tornado.gen import convert_yielded - result = convert_yielded(result) - result.add_done_callback(future_to_future) - except Exception as e: - f.set_exception(e) - self._io_loop.add_callback(run) - - return f.result() + self._client = protocol.SyncClient("cmdline") def do_request(self, method_name, key_name=None, arguments={}, allow_none=False): """ @@ -97,11 +60,7 @@ def do_request(self, method_name, key_name=None, arguments={}, allow_none=False) raise Exception("API call %s is not available." % method_name) method = getattr(self._client, method_name) - - def call(): - return method(**arguments) - - result = self.run_sync(call) + result = method(**arguments) if result is None: raise Exception("Failed to call server.") @@ -226,7 +185,7 @@ def print_table(header, rows, data_type=None): @click.option("--port", help="The server port to connect to") @click.pass_context def cmd(ctx, host, port): - ctx.obj = Client(host, port, io_loop=ctx.obj) + ctx.obj = Client(host, port) @cmd.group("project") diff --git a/src/inmanta/protocol.py b/src/inmanta/protocol.py index 19e233b729..df46bb2254 100644 --- a/src/inmanta/protocol.py +++ b/src/inmanta/protocol.py @@ -728,11 +728,10 @@ class Endpoint(object): An end-point in the rpc framework """ - def __init__(self, io_loop, name): + def __init__(self, name): self._name = name self._node_name = inmanta_config.nodename.get() self._end_point_names = [] - self._io_loop = io_loop def add_future(self, future): """ @@ -744,7 +743,7 @@ def handle_result(f): except Exception as e: LOGGER.exception("An exception occurred while handling a future: %s", str(e)) - self._io_loop.add_future(future, handle_result) + IOLoop.current().add_future(future, handle_result) def get_end_point_names(self): return self._end_point_names @@ -782,11 +781,11 @@ class AgentEndPoint(Endpoint, metaclass=EndpointMeta): An endpoint for clients that make calls to a server and that receive calls back from the server using long-poll """ - def __init__(self, name, io_loop, timeout=120, transport=RESTTransport, reconnect_delay=5): - super().__init__(io_loop, name) + def __init__(self, name, timeout=120, transport=RESTTransport, reconnect_delay=5): + super().__init__(name) self._transport = transport self._client = None - self._sched = Scheduler(self._io_loop) + self._sched = Scheduler() self._env_id = None @@ -816,7 +815,7 @@ def start(self): assert self._env_id is not None LOGGER.log(3, "Starting agent for %s", str(self.sessionid)) self._client = AgentClient(self.name, self.sessionid, transport=self._transport, timeout=self.server_timeout) - self._io_loop.add_callback(self.perform_heartbeat) + IOLoop.current().add_callback(self.perform_heartbeat) def stop(self): self.running = False @@ -895,7 +894,7 @@ def submit_result(future): self._client.heartbeat_reply(self.sessionid, method_call["reply_id"], {"result": result_body, "code": status}) - self._io_loop.add_future(call_result, submit_result) + IOLoop.current().add_future(call_result, submit_result) class ClientMeta(type): @@ -931,10 +930,8 @@ class Client(Endpoint, metaclass=ClientMeta): A client that communicates with end-point based on its configuration """ - def __init__(self, name, ioloop=None, transport=RESTTransport): - if ioloop is None: - ioloop = IOLoop.current() - Endpoint.__init__(self, ioloop, name) + def __init__(self, name, transport=RESTTransport): + Endpoint.__init__(self, name) self._transport = transport self._transport_instance = None @@ -982,10 +979,8 @@ class AgentClient(Endpoint, metaclass=ClientMeta): A client that communicates with end-point based on its configuration """ - def __init__(self, name, sid, ioloop=None, transport=RESTTransport, timeout=120): - if ioloop is None: - ioloop = IOLoop.current() - Endpoint.__init__(self, ioloop, name) + def __init__(self, name, sid, transport=RESTTransport, timeout=120): + Endpoint.__init__(self, name) self._transport = transport self._transport_instance = None self._sid = sid diff --git a/src/inmanta/server/agentmanager.py b/src/inmanta/server/agentmanager.py index 8b00c022ac..6bcf56401b 100644 --- a/src/inmanta/server/agentmanager.py +++ b/src/inmanta/server/agentmanager.py @@ -35,7 +35,6 @@ import uuid from inmanta.server.protocol import ServerSlice from inmanta.server import config as opt -from tornado.ioloop import IOLoop from inmanta.protocol import encode_token from inmanta.resources import Id @@ -86,7 +85,7 @@ class AgentManager(ServerSlice): ''' def __init__(self, restserver, closesessionsonstart=True, fact_back_off=None): - super(AgentManager, self).__init__(IOLoop.current(), SLICE_AGENT_MANAGER) + super(AgentManager, self).__init__(SLICE_AGENT_MANAGER) self.restserver = restserver if fact_back_off is None: diff --git a/src/inmanta/server/bootloader.py b/src/inmanta/server/bootloader.py index 4b8bf28525..7896fc651f 100644 --- a/src/inmanta/server/bootloader.py +++ b/src/inmanta/server/bootloader.py @@ -15,7 +15,6 @@ Contact: code@inmanta.com """ -from tornado.ioloop import IOLoop from inmanta.server import server from inmanta.server.protocol import RESTServer from inmanta.server.agentmanager import AgentManager @@ -28,8 +27,7 @@ def __init__(self, agent_no_log=False): self.agent_no_log = agent_no_log def get_server_slice(self): - io_loop = IOLoop.current() - return server.Server(io_loop, agent_no_log=self.agent_no_log) + return server.Server(agent_no_log=self.agent_no_log) def get_agent_manager_slice(self): return AgentManager(self.restserver) diff --git a/src/inmanta/server/protocol.py b/src/inmanta/server/protocol.py index e7b3bf27f9..983e90fb53 100644 --- a/src/inmanta/server/protocol.py +++ b/src/inmanta/server/protocol.py @@ -46,7 +46,7 @@ def __init__(self, connection_timout=120): self.token = inmanta_config.Config.get(self.id, "token", None) self.connection_timout = connection_timout self.headers = set() - self.sessions_handler = SessionManager(IOLoop.current()) + self.sessions_handler = SessionManager() self.add_endpoint(self.sessions_handler) def add_endpoint(self, endpoint: "ServerSlice"): @@ -157,14 +157,13 @@ class ServerSlice(object): An API serving part of the server. """ - def __init__(self, io_loop, name): + def __init__(self, name): self._name = name - self._io_loop = io_loop self.create_endpoint_metadata() self._end_point_names = [] self._handlers = [] - self._sched = Scheduler(self._io_loop) + self._sched = Scheduler() def prestart(self, server: RESTServer): """Called by the RestServer host prior to start, can be used to collect references to other server slices""" @@ -202,7 +201,7 @@ def handle_result(f): except Exception as e: LOGGER.exception("An exception occurred while handling a future: %s", str(e)) - self._io_loop.add_future(future, handle_result) + IOLoop.current().add_future(future, handle_result) def schedule(self, call, interval=60): self._sched.add_action(call, interval) @@ -251,7 +250,7 @@ class Session(object): An environment that segments agents connected to the server """ - def __init__(self, sessionstore, io_loop, sid, hang_interval, timout, tid, endpoint_names, nodename): + def __init__(self, sessionstore, sid, hang_interval, timout, tid, endpoint_names, nodename): self._sid = sid self._interval = hang_interval self._timeout = timout @@ -264,8 +263,6 @@ def __init__(self, sessionstore, io_loop, sid, hang_interval, timout, tid, endpo self.endpoint_names = endpoint_names self.nodename = nodename - self._io_loop = io_loop - self._replies = {} self.check_expire() self._queue = queues.Queue() @@ -279,7 +276,7 @@ def check_expire(self): if ttw < 0: self.expire(self._seen - time.time()) else: - self._callhandle = self._io_loop.call_later(ttw, self.check_expire) + self._callhandle = IOLoop.current().call_later(ttw, self.check_expire) def get_id(self): return self._sid @@ -289,7 +286,7 @@ def get_id(self): def expire(self, timeout): self.expired = True if self._callhandle is not None: - self._io_loop.remove_timeout(self._callhandle) + IOLoop.current().remove_timeout(self._callhandle) self._sessionstore.expire(self, timeout) def seen(self): @@ -301,8 +298,8 @@ def on_timeout(): LOGGER.warning(log_message) future.set_exception(gen.TimeoutError()) - timeout_handle = self._io_loop.add_timeout(self._io_loop.time() + timeout, on_timeout) - future.add_done_callback(lambda _: self._io_loop.remove_timeout(timeout_handle)) + timeout_handle = IOLoop.current().add_timeout(IOLoop.current().time() + timeout, on_timeout) + future.add_done_callback(lambda _: IOLoop.current().remove_timeout(timeout_handle)) def put_call(self, call_spec, timeout=10): future = tornado.concurrent.Future() @@ -329,7 +326,7 @@ def get_calls(self): try: q = self._queue call_list = [] - call = yield q.get(timeout=self._io_loop.time() + self._interval) + call = yield q.get(timeout=IOLoop.current().time() + self._interval) call_list.append(call) while q.qsize() > 0: call = yield q.get() @@ -373,8 +370,8 @@ class SessionManager(ServerSlice): """ __methods__ = {} - def __init__(self, io_loop): - super().__init__(io_loop, SLICE_SESSION_MANAGER) + def __init__(self): + super().__init__(SLICE_SESSION_MANAGER) # Config interval = opt.agent_timeout.get() @@ -429,7 +426,7 @@ def get_or_create_session(self, sid, tid, endpoint_names, nodename): def new_session(self, sid, tid, endpoint_names, nodename): LOGGER.debug("New session with id %s on node %s for env %s with endpoints %s" % (sid, nodename, tid, endpoint_names)) - return Session(self, self._io_loop, sid, self.hangtime, self.interval, tid, endpoint_names, nodename) + return Session(self, sid, self.hangtime, self.interval, tid, endpoint_names, nodename) def expire(self, session: Session, timeout): LOGGER.debug("Expired session with id %s, last seen %d seconds ago" % (session.get_id(), timeout)) diff --git a/src/inmanta/server/server.py b/src/inmanta/server/server.py index 852cd0fc41..a1df5ca5fc 100644 --- a/src/inmanta/server/server.py +++ b/src/inmanta/server/server.py @@ -31,9 +31,7 @@ import dateutil import pymongo -from tornado import gen -from tornado import locks -from tornado import process +from tornado import gen, locks, process, ioloop from inmanta import const from inmanta import data, config @@ -59,8 +57,8 @@ class Server(protocol.ServerSlice): information """ - def __init__(self, io_loop, database_host=None, database_port=None, agent_no_log=False): - super().__init__(io_loop=io_loop, name=SLICE_SERVER) + def __init__(self, database_host=None, database_port=None, agent_no_log=False): + super().__init__(name=SLICE_SERVER) LOGGER.info("Starting server endpoint") self._server_storage = self.check_storage() @@ -73,10 +71,10 @@ def __init__(self, io_loop, database_host=None, database_port=None, agent_no_log if database_port is None: database_port = opt.db_port.get() - data.connect(database_host, database_port, opt.db_name.get(), self._io_loop) + data.connect(database_host, database_port, opt.db_name.get()) LOGGER.info("Connected to mongodb database %s on %s:%d", opt.db_name.get(), database_host, database_port) - self._io_loop.add_callback(data.create_indexes) + ioloop.IOLoop.current().add_callback(data.create_indexes) self._fact_expire = opt.server_fact_expire.get() self._fact_renew = opt.server_fact_renew.get() @@ -84,7 +82,7 @@ def __init__(self, io_loop, database_host=None, database_port=None, agent_no_log self.schedule(self.renew_expired_facts, self._fact_renew) self.schedule(self._purge_versions, opt.server_purge_version_interval.get()) - self._io_loop.add_callback(self._purge_versions) + ioloop.IOLoop.current().add_callback(self._purge_versions) self._recompiles = defaultdict(lambda: None) @@ -1459,7 +1457,7 @@ def _async_recompile(self, env, update_repo, wait=0, metadata={}): LOGGER.info("Last recompile longer than %s ago (last was at %s)", wait_time, last_recompile) self._recompiles[env.id] = self - self._io_loop.add_callback(self._recompile_environment, env.id, update_repo, wait, metadata) + ioloop.IOLoop.current().add_callback(self._recompile_environment, env.id, update_repo, wait, metadata) else: LOGGER.info("Not recompiling, last recompile less than %s ago (last was at %s)", wait_time, last_recompile) diff --git a/src/inmanta/util.py b/src/inmanta/util.py index a8d025e9b3..d517a7f68b 100644 --- a/src/inmanta/util.py +++ b/src/inmanta/util.py @@ -17,12 +17,13 @@ """ import functools +import hashlib +import itertools import logging -from pkg_resources import DistributionNotFound import pkg_resources -import itertools -import hashlib +from pkg_resources import DistributionNotFound +from tornado.ioloop import IOLoop LOGGER = logging.getLogger(__name__) @@ -84,9 +85,8 @@ class Scheduler(object): An event scheduler class """ - def __init__(self, io_loop): + def __init__(self): self._scheduled = set() - self._io_loop = io_loop def add_action(self, action, interval, initial_delay=None): """ @@ -111,9 +111,9 @@ def action_function(): LOGGER.exception("Uncaught exception while executing scheduled action") finally: - self._io_loop.call_later(interval, action_function) + IOLoop.current().call_later(interval, action_function) - self._io_loop.call_later(initial_delay, action_function) + IOLoop.current().call_later(initial_delay, action_function) self._scheduled.add(action) def remove(self, action): diff --git a/tests/conftest.py b/tests/conftest.py index 96e4711ccf..2f607517b5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -283,6 +283,15 @@ def client_multi(server_multi): yield client +@pytest.fixture(scope="function") +def sync_client_multi(server_multi): + from inmanta import protocol + + client = protocol.SyncClient("client") + + yield client + + @pytest.fixture(scope="function") async def environment(client, server): """ diff --git a/tests/test_2way_protocol.py b/tests/test_2way_protocol.py index e3cc55061d..f1b29c0736 100644 --- a/tests/test_2way_protocol.py +++ b/tests/test_2way_protocol.py @@ -53,7 +53,7 @@ def get_agent_status_x(self, id): class SessionSpy(SessionListener, ServerSlice): def __init__(self): - ServerSlice.__init__(self, IOLoop.current(), "sessionspy") + ServerSlice.__init__(self, "sessionspy") self.expires = 0 self.__sessions = [] @@ -142,7 +142,7 @@ async def test_2way_protocol(unused_tcp_port, logs=False): rs.add_endpoint(server) rs.start() - agent = Agent("agent", io_loop) + agent = Agent("agent") agent.add_end_point_name("agent") agent.set_environment(uuid.uuid4()) agent.start() @@ -179,7 +179,7 @@ async def test_timeout(unused_tcp_port): import inmanta.agent.config # nopep8 import inmanta.server.config # nopep8 - io_loop = IOLoop.current() + free_port = str(unused_tcp_port) free_port = str(unused_tcp_port) @@ -207,7 +207,7 @@ async def test_timeout(unused_tcp_port): env = uuid.uuid4() # agent 1 - agent = Agent("agent", io_loop) + agent = Agent("agent") agent.add_end_point_name("agent") agent.set_environment(env) agent.start() @@ -217,7 +217,7 @@ async def test_timeout(unused_tcp_port): assert len(server.get_sessions()) == 1 # agent 2 - agent2 = Agent("agent", io_loop) + agent2 = Agent("agent") agent2.add_end_point_name("agent") agent2.set_environment(env) agent2.start() diff --git a/tests/test_agent.py b/tests/test_agent.py index 16b7976e92..72c4ca6dc7 100644 --- a/tests/test_agent.py +++ b/tests/test_agent.py @@ -20,14 +20,12 @@ from utils import retry_limited from inmanta.agent import reporting from inmanta.server import SLICE_SESSION_MANAGER -from tornado.ioloop import IOLoop @pytest.mark.slowtest @pytest.mark.asyncio async def test_agent_get_status(server, environment): - myagent = agent.Agent(IOLoop.current(), hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, - code_loader=False) + myagent = agent.Agent(hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, code_loader=False) myagent.add_end_point_name("agent1") myagent.start() diff --git a/tests/test_deploy.py b/tests/test_deploy.py index 137302496e..7a43a4f76d 100644 --- a/tests/test_deploy.py +++ b/tests/test_deploy.py @@ -19,7 +19,7 @@ import collections from inmanta import deploy -from tornado import process, ioloop +from tornado import process import pytest @@ -39,7 +39,7 @@ async def test_deploy(snippetcompiler, tmpdir, mongo_db, motor): Options = collections.namedtuple("Options", ["no_agent_log", "dryrun", "map", "agent"]) options = Options(no_agent_log=False, dryrun=False, map="", agent="") - run = deploy.Deploy(ioloop.IOLoop.current(), mongoport=mongo_db.port) + run = deploy.Deploy(mongoport=mongo_db.port) try: run.run(options, only_setup=True) await run.do_deploy(False, "") diff --git a/tests/test_server.py b/tests/test_server.py index 03951ce7a7..ccbf55c149 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -30,7 +30,6 @@ from inmanta.export import upload_code from inmanta.util import hash_file from inmanta.export import unknown_parameters -from tornado.ioloop import IOLoop import asyncio LOGGER = logging.getLogger(__name__) @@ -187,7 +186,7 @@ async def test_get_resource_for_agent(motor, server_multi, client_multi, environ """ Test the server to manage the updates on a model during agent deploy """ - agent = Agent(IOLoop.current(), "localhost", {"nvblah": "localhost"}, environment=environment, code_loader=False) + agent = Agent("localhost", {"nvblah": "localhost"}, environment=environment, code_loader=False) agent.start() aclient = agent._client @@ -310,7 +309,7 @@ async def test_resource_update(client, server, environment): """ Test updating resources and logging """ - agent = Agent(IOLoop.current(), "localhost", {"blah": "localhost"}, environment=environment, code_loader=False) + agent = Agent("localhost", {"blah": "localhost"}, environment=environment, code_loader=False) agent.start() aclient = agent._client @@ -481,7 +480,7 @@ async def test_purge_on_delete_requires(client, server, environment): """ Test purge on delete of resources and inversion of requires """ - agent = Agent(IOLoop.current(), "localhost", {"blah": "localhost"}, environment=environment, code_loader=False) + agent = Agent("localhost", {"blah": "localhost"}, environment=environment, code_loader=False) agent.start() aclient = agent._client @@ -601,7 +600,7 @@ async def test_purge_on_delete_compile_failed(client, server, environment): """ Test purge on delete of resources """ - agent = Agent(IOLoop.current(), "localhost", {"blah": "localhost"}, environment=environment, code_loader=False) + agent = Agent("localhost", {"blah": "localhost"}, environment=environment, code_loader=False) agent.start() aclient = agent._client @@ -689,7 +688,7 @@ async def test_purge_on_delete(client, server, environment): """ Test purge on delete of resources """ - agent = Agent(IOLoop.current(), "localhost", {"blah": "localhost"}, environment=environment, code_loader=False) + agent = Agent("localhost", {"blah": "localhost"}, environment=environment, code_loader=False) agent.start() aclient = agent._client @@ -795,7 +794,7 @@ async def test_purge_on_delete_ignore(client, server, environment): """ Test purge on delete behavior for resources that have not longer purged_on_delete set """ - agent = Agent(IOLoop.current(), "localhost", {"blah": "localhost"}, environment=environment, code_loader=False) + agent = Agent("localhost", {"blah": "localhost"}, environment=environment, code_loader=False) agent.start() aclient = agent._client @@ -950,7 +949,7 @@ async def test_code_upload(motor, server_multi, client_multi, environment): @pytest.mark.asyncio(timeout=30) -async def test_batched_code_upload(motor, server_multi, client_multi, environment): +async def test_batched_code_upload(motor, server_multi, client_multi, sync_client_multi, environment): """ Test the server to manage the updates on a model during agent deploy """ @@ -983,7 +982,7 @@ async def test_batched_code_upload(motor, server_multi, client_multi, environmen "std:xxx": csources } - await upload_code(client_multi, environment, version, sources) + await asyncio.get_event_loop().run_in_executor(None, lambda: upload_code(sync_client_multi, environment, version, sources)) agent = protocol.Client("agent") diff --git a/tests/test_server_agent.py b/tests/test_server_agent.py index a991a2cd7f..097a070b17 100644 --- a/tests/test_server_agent.py +++ b/tests/test_server_agent.py @@ -42,7 +42,6 @@ from inmanta.server import SLICE_AGENT_MANAGER from typing import List, Tuple, Optional, Dict from inmanta.const import ResourceState -from tornado.ioloop import IOLoop logger = logging.getLogger("inmanta.test.server_agent") @@ -385,8 +384,7 @@ async def test_dryrun_and_deploy(server_multi, client_multi, resource_container) result = await client_multi.create_environment(project_id=project_id, name="dev") env_id = result.result["environment"]["id"] - agent = Agent(IOLoop.current(), hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() @@ -537,7 +535,6 @@ async def test_deploy_with_undefined(server_multi, client_multi, resource_contai resource_container.Provider.set_skip("agent2", "key1", 1) agent = Agent( - IOLoop.current(), hostname="node1", environment=env_id, agent_map={"agent1": "localhost", "agent2": "localhost"}, @@ -655,7 +652,7 @@ async def test_server_restart(resource_container, server, mongo_db, client): result = await client.create_environment(project_id=project_id, name="dev") env_id = result.result["environment"]["id"] - agent = Agent(IOLoop.current(), hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, + agent = Agent(hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() @@ -775,7 +772,7 @@ async def test_spontaneous_deploy(resource_container, server, client): Config.set("config", "agent-interval", "2") Config.set("config", "agent-splay", "2") - agent = Agent(IOLoop.current(), hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, + agent = Agent(hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() @@ -850,8 +847,7 @@ async def test_failing_deploy_no_handler(resource_container, server, client): result = await client.create_environment(project_id=project_id, name="dev") env_id = result.result["environment"]["id"] - agent = Agent(IOLoop.current(), hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(agentmanager.sessions) == 1, 10) @@ -898,8 +894,7 @@ async def test_dual_agent(resource_container, server, client, environment): dryrun and deploy a configuration model """ resource_container.Provider.reset() - myagent = agent.Agent(IOLoop.current(), hostname="node1", environment=environment, - agent_map={"agent1": "localhost", "agent2": "localhost"}, + myagent = agent.Agent(hostname="node1", environment=environment, agent_map={"agent1": "localhost", "agent2": "localhost"}, code_loader=False) myagent.add_end_point_name("agent1") myagent.add_end_point_name("agent2") @@ -984,12 +979,10 @@ async def test_server_agent_api(resource_container, client, server): result = await client.create_environment(project_id=project_id, name="dev") env_id = result.result["environment"]["id"] - agent = Agent(IOLoop.current(), environment=env_id, hostname="agent1", agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(environment=env_id, hostname="agent1", agent_map={"agent1": "localhost"}, code_loader=False) agent.start() - agent = Agent(IOLoop.current(), environment=env_id, hostname="agent2", agent_map={"agent2": "localhost"}, - code_loader=False) + agent = Agent(environment=env_id, hostname="agent2", agent_map={"agent2": "localhost"}, code_loader=False) agent.start() await retry_limited(lambda: len(agentmanager.sessions) == 2, 10) @@ -1074,8 +1067,7 @@ async def test_get_facts(resource_container, client, server): result = await client.create_environment(project_id=project_id, name="dev") env_id = result.result["environment"]["id"] - agent = Agent(IOLoop.current(), hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(server.get_endpoint("session")._sessions) == 1, 10) @@ -1119,8 +1111,7 @@ async def test_purged_facts(resource_container, client, server, environment): Test if facts are purged when the resource is purged. """ resource_container.Provider.reset() - agent = Agent(IOLoop.current(), hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(server.get_endpoint("session")._sessions) == 1, 10) @@ -1188,8 +1179,7 @@ async def test_get_facts_extended(server, client, resource_container, environmen agentmanager._fact_resource_block = 0.1 resource_container.Provider.reset() - agent = Agent(IOLoop.current(), hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(agentmanager.sessions) == 1, 10) @@ -1363,8 +1353,7 @@ async def test_unkown_parameters(resource_container, client, server): result = await client.create_environment(project_id=project_id, name="dev") env_id = result.result["environment"]["id"] - agent = Agent(IOLoop.current(), hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(server.get_endpoint("session")._sessions) == 1, 10) @@ -1416,8 +1405,7 @@ async def test_fail(resource_container, client, server): result = await client.create_environment(project_id=project_id, name="dev") env_id = result.result["environment"]["id"] - agent = Agent(IOLoop.current(), hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, - code_loader=False, poolsize=10) + agent = Agent(hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, code_loader=False, poolsize=10) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(server.get_endpoint("session")._sessions) == 1, 10) @@ -1509,8 +1497,7 @@ async def test_wait(resource_container, client, server): env_id = result.result["environment"]["id"] # setup agent - agent = Agent(IOLoop.current(), hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, - code_loader=False, poolsize=10) + agent = Agent(hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, code_loader=False, poolsize=10) agent.add_end_point_name("agent1") agent.start() @@ -1640,7 +1627,7 @@ async def test_multi_instance(resource_container, client, server): env_id = result.result["environment"]["id"] # setup agent - agent = Agent(IOLoop.current(), hostname="node1", environment=env_id, + agent = Agent(hostname="node1", environment=env_id, agent_map={"agent1": "localhost", "agent2": "localhost", "agent3": "localhost"}, code_loader=False, poolsize=1) agent.add_end_point_name("agent1") @@ -1756,14 +1743,12 @@ async def test_cross_agent_deps(resource_container, server, client): result = await client.create_environment(project_id=project_id, name="dev") env_id = result.result["environment"]["id"] - agent = Agent(IOLoop.current(), hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(agentmanager.sessions) == 1, 10) - agent2 = Agent(IOLoop.current(), hostname="node2", environment=env_id, agent_map={"agent2": "localhost"}, - code_loader=False) + agent2 = Agent(hostname="node2", environment=env_id, agent_map={"agent2": "localhost"}, code_loader=False) agent2.add_end_point_name("agent2") agent2.start() await retry_limited(lambda: len(agentmanager.sessions) == 2, 10) @@ -1849,8 +1834,7 @@ async def test_dryrun_scale(resource_container, server, client): result = await client.create_environment(project_id=project_id, name="dev") env_id = result.result["environment"]["id"] - agent = Agent(IOLoop.current(), hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(agentmanager.sessions) == 1, 10) @@ -1906,8 +1890,7 @@ async def test_dryrun_failures(resource_container, server, client): result = await client.create_environment(project_id=project_id, name="dev") env_id = result.result["environment"]["id"] - agent = Agent(IOLoop.current(), hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=env_id, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(agentmanager.sessions) == 1, 10) @@ -1986,8 +1969,7 @@ async def test_send_events(resource_container, environment, server, client): agentmanager = server.get_endpoint(SLICE_AGENT_MANAGER) resource_container.Provider.reset() - agent = Agent(IOLoop.current(), hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(agentmanager.sessions) == 1, 10) @@ -2044,14 +2026,12 @@ async def test_send_events_cross_agent(resource_container, environment, server, agentmanager = server.get_endpoint(SLICE_AGENT_MANAGER) resource_container.Provider.reset() - agent = Agent(IOLoop.current(), hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(agentmanager.sessions) == 1, 10) - agent2 = Agent(IOLoop.current(), hostname="node2", environment=environment, agent_map={"agent2": "localhost"}, - code_loader=False) + agent2 = Agent(hostname="node2", environment=environment, agent_map={"agent2": "localhost"}, code_loader=False) agent2.add_end_point_name("agent2") agent2.start() await retry_limited(lambda: len(agentmanager.sessions) == 2, 10) @@ -2112,8 +2092,7 @@ async def test_send_events_cross_agent_restart(resource_container, environment, agentmanager = server.get_endpoint(SLICE_AGENT_MANAGER) resource_container.Provider.reset() - agent2 = Agent(IOLoop.current(), hostname="node2", environment=environment, agent_map={"agent2": "localhost"}, - code_loader=False) + agent2 = Agent(hostname="node2", environment=environment, agent_map={"agent2": "localhost"}, code_loader=False) agent2.add_end_point_name("agent2") agent2.start() await retry_limited(lambda: len(agentmanager.sessions) == 1, 10) @@ -2156,8 +2135,7 @@ async def test_send_events_cross_agent_restart(resource_container, environment, # start agent 1 and wait for it to finish Config.set("config", "agent-splay", "0") - agent = Agent(IOLoop.current(), hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(agentmanager.sessions) == 2, 10) @@ -2188,8 +2166,7 @@ async def test_auto_deploy(server, client, resource_container, environment): agentmanager = server.get_endpoint(SLICE_AGENT_MANAGER) resource_container.Provider.reset() - agent = Agent(IOLoop.current(), hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, - code_loader=False) + agent = Agent(hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() await retry_limited(lambda: len(agentmanager.sessions) == 1, 10) @@ -2662,7 +2639,7 @@ async def test_deploy_and_events(client, server, environment, resource_container agentmanager = server.get_endpoint(SLICE_AGENT_MANAGER) resource_container.Provider.reset() - agent = Agent(IOLoop.current(), hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, + agent = Agent(hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() @@ -2725,7 +2702,7 @@ async def test_deploy_and_events_failed(client, server, environment, resource_co agentmanager = server.get_endpoint(SLICE_AGENT_MANAGER) resource_container.Provider.reset() - agent = Agent(IOLoop.current(), hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, + agent = Agent(hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start() @@ -2786,7 +2763,7 @@ async def test_reload(client, server, environment, resource_container, dep_state agentmanager = server.get_endpoint(SLICE_AGENT_MANAGER) resource_container.Provider.reset() - agent = Agent(IOLoop.current(), hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, + agent = Agent(hostname="node1", environment=environment, agent_map={"agent1": "localhost"}, code_loader=False) agent.add_end_point_name("agent1") agent.start()