Skip to content

Commit

Permalink
FEAT-modin-project#1831: Move TracingWrappingConnection to separate p…
Browse files Browse the repository at this point in the history
…ackage

Signed-off-by: Vasilij Litvinov <vasilij.n.litvinov@intel.com>
  • Loading branch information
vnlitvinov committed Jul 28, 2020
1 parent 525666f commit 06c112d
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 88 deletions.
97 changes: 9 additions & 88 deletions modin/experimental/cloud/rpyc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from rpyc.lib.compat import pickle
from rpyc.lib import get_methods

from rpyc.core import brine, netref, AsyncResult, consts
from rpyc.core import netref, AsyncResult, consts

from . import get_connection
from .meta_magic import _LOCAL_ATTRS, RemoteMeta, _KNOWN_DUALS
Expand All @@ -35,14 +35,11 @@ def _tuplize(arg):
"""turns any sequence or iterator into a flat tuple"""
return tuple(arg)

_TRACE_RPYC = os.environ.get('MODIN_TRACE_RPYC', '').title() == 'True'

_msg_to_name = collections.defaultdict(list)
for name in dir(consts):
if name.upper() == name:
category, _ = name.split("_", 1)
_msg_to_name[category][getattr(consts, name)] = name
_msg_to_name = dict(_msg_to_name)
_TRACE_RPYC = os.environ.get("MODIN_TRACE_RPYC", "").title() == "True"





class WrappingConnection(rpyc.Connection):
Expand Down Expand Up @@ -241,88 +238,12 @@ def _init_deliver(self):
"modin.experimental.cloud.rpyc_proxy"
]._tuplize

class TracingWrappingConnection(WrappingConnection):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self.logLock = threading.RLock()
self.timings = {}
with open("rpyc-trace.log", "a") as out:
out.write(f"------------[new trace at {time.asctime()}]----------\n")
self.logfiles = set(["rpyc-trace.log"])

@classmethod
def __stringify(cls, args):
if isinstance(args, (tuple, list)):
return tuple(cls.__stringify(i) for i in args)
if isinstance(args, netref.BaseNetref):
return str(args.____id_pack__)
return args

@classmethod
def __to_text(cls, args):
return str(cls.__stringify(args))

def _send(self, msg, seq, args):
str_args = self.__to_text(args).replace("\r", "").replace("\n", "\tNEWLINE\t")
if msg == consts.MSG_REQUEST:
handler, _ = args
str_handler = f":req={_msg_to_name['HANDLE'][handler]}"
else:
str_handler = ""
with self.logLock:
for logfile in self.logfiles:
with open(logfile, "a") as out:
out.write(
f"send:msg={_msg_to_name['MSG'][msg]}:seq={seq}{str_handler}:args={str_args}\n"
)
self.timings[seq] = time.time()
return super()._send(msg, seq, args)

def _dispatch(self, data):
"""tracing only"""
got1 = time.time()
try:
return super()._dispatch(data)
finally:
got2 = time.time()
msg, seq, args = brine.load(data)
sent = self.timings.pop(seq, got1)
if msg == consts.MSG_REQUEST:
handler, args = args
str_handler = f":req={_msg_to_name['HANDLE'][handler]}"
else:
str_handler = ""
str_args = self.__to_text(args).replace("\r", "").replace("\n", "\tNEWLINE\t")
with self.logLock:
for logfile in self.logfiles:
with open(logfile, "a") as out:
out.write(
f"recv:timing={got1 - sent}+{got2 - got1}:msg={_msg_to_name['MSG'][msg]}:seq={seq}{str_handler}:args={str_args}\n"
)
class _Logger:
def __init__(self, conn, logname):
self.conn = conn
self.logname = logname

def __enter__(self):
with self.conn.logLock:
self.conn.logfiles.add(self.logname)
with open(self.logname, "a") as out:
out.write(
f"------------[new trace at {time.asctime()}]----------\n"
)
return self

def __exit__(self, *a, **kw):
with self.conn.logLock:
self.conn.logfiles.remove(self.logname)

def _logmore(self, logname):
return self._Logger(self, logname)


class WrappingService(rpyc.ClassicService):
_protocol = TracingWrappingConnection if _TRACE_RPYC else WrappingConnection
if _TRACE_RPYC:
from .tracing.tracing_connection import TracingWrappingConnection as _protocol
else:
_protocol = WrappingConnection

def on_connect(self, conn):
super().on_connect(conn)
Expand Down
12 changes: 12 additions & 0 deletions modin/experimental/cloud/tracing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.
109 changes: 109 additions & 0 deletions modin/experimental/cloud/tracing/tracing_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import threading
import time
import collections

from rpyc.core import brine, consts, netref

from ..rpyc_proxy import WrappingConnection

_msg_to_name = collections.defaultdict(dict)
for name in dir(consts):
if name.upper() == name:
category, _ = name.split("_", 1)
_msg_to_name[category][getattr(consts, name)] = name
_msg_to_name = dict(_msg_to_name)


class _Logger:
def __init__(self, conn, logname):
self.conn = conn
self.logname = logname

def __enter__(self):
with self.conn.logLock:
self.conn.logfiles.add(self.logname)
with open(self.logname, "a") as out:
out.write(f"------------[new trace at {time.asctime()}]----------\n")
return self

def __exit__(self, *a, **kw):
with self.conn.logLock:
self.conn.logfiles.remove(self.logname)


class TracingWrappingConnection(WrappingConnection):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self.logLock = threading.RLock()
self.timings = {}
with open("rpyc-trace.log", "a") as out:
out.write(f"------------[new trace at {time.asctime()}]----------\n")
self.logfiles = set(["rpyc-trace.log"])

@classmethod
def __stringify(cls, args):
if isinstance(args, (tuple, list)):
return tuple(cls.__stringify(i) for i in args)
if isinstance(args, netref.BaseNetref):
return str(args.____id_pack__)
return args

@classmethod
def __to_text(cls, args):
return str(cls.__stringify(args))

def _send(self, msg, seq, args):
str_args = self.__to_text(args).replace("\r", "").replace("\n", "\tNEWLINE\t")
if msg == consts.MSG_REQUEST:
handler, _ = args
str_handler = f":req={_msg_to_name['HANDLE'][handler]}"
else:
str_handler = ""
with self.logLock:
for logfile in self.logfiles:
with open(logfile, "a") as out:
out.write(
f"send:msg={_msg_to_name['MSG'][msg]}:seq={seq}{str_handler}:args={str_args}\n"
)
self.timings[seq] = time.time()
return super()._send(msg, seq, args)

def _dispatch(self, data):
"""tracing only"""
got1 = time.time()
try:
return super()._dispatch(data)
finally:
got2 = time.time()
msg, seq, args = brine.load(data)
sent = self.timings.pop(seq, got1)
if msg == consts.MSG_REQUEST:
handler, args = args
str_handler = f":req={_msg_to_name['HANDLE'][handler]}"
else:
str_handler = ""
str_args = (
self.__to_text(args).replace("\r", "").replace("\n", "\tNEWLINE\t")
)
with self.logLock:
for logfile in self.logfiles:
with open(logfile, "a") as out:
out.write(
f"recv:timing={got1 - sent}+{got2 - got1}:msg={_msg_to_name['MSG'][msg]}:seq={seq}{str_handler}:args={str_args}\n"
)

def _log_extra(self, logname):
return _Logger(self, logname)

0 comments on commit 06c112d

Please sign in to comment.