Skip to content

Commit

Permalink
FEAT-modin-project#1831: Add optional RPyC tracing and a simple trace…
Browse files Browse the repository at this point in the history
… parser (modin-project#1843)

For details see modin-project#1843 

Signed-off-by: Vasilij Litvinov <vasilij.n.litvinov@intel.com>
  • Loading branch information
vnlitvinov authored and aregm committed Sep 16, 2020
1 parent cb04f3c commit 7966a86
Show file tree
Hide file tree
Showing 4 changed files with 358 additions and 1 deletion.
9 changes: 8 additions & 1 deletion modin/experimental/cloud/rpyc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import types
import collections
import os

import rpyc
from rpyc.lib.compat import pickle
Expand All @@ -33,6 +34,9 @@ def _tuplize(arg):
return tuple(arg)


_TRACE_RPYC = os.environ.get("MODIN_TRACE_RPYC", "").title() == "True"


class WrappingConnection(rpyc.Connection):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
Expand Down Expand Up @@ -231,7 +235,10 @@ def _init_deliver(self):


class WrappingService(rpyc.ClassicService):
_protocol = WrappingConnection
if _TRACE_RPYC:
from .tracing.tracing_connection import TracingWrappingConnection as _protocol
else:
_protocol = WrappingConnection

def on_connect(self, conn):
super().on_connect(conn)
Expand Down
12 changes: 12 additions & 0 deletions modin/experimental/cloud/tracing/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.
229 changes: 229 additions & 0 deletions modin/experimental/cloud/tracing/parse_rpyc_trace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""
Simple parser for rpyc traces produced by tracing_connection.py
"""

from rpyc.core import consts
import re
import collections
import sys


def read_log(fname):
with open(fname, "rb") as inp:
data = inp.read()
data = data.decode("utf8", "xmlcharrefreplace")
# split the last logging chunk
data = data.rsplit("---[", 1)[1]

items = []
for line in data.splitlines():
if ":args=" not in line:
continue
preargs, args = line.split(":args=", 1)
pieces = ("kind=" + preargs).split(":") + ["args=" + args]
item = dict(piece.split("=", 1) for piece in pieces)
item["timing"] = eval(item.get("timing", "0"))
items.append(item)

return items


def get_syncs(items):
# dels are asynchronous, assume others are synchronous
dels = {
i["seq"]
for i in items
if i["kind"] == "send" and i.get("req", "") == "HANDLE_DEL"
}
return [i for i in items if i["seq"] not in dels]


items = read_log((sys.argv[1:] or ["rpyc-trace.log"])[0])
syncs = get_syncs(items)

print( # noqa: T001
f"total time={sum(i['timing'] for i in syncs if i['kind'] == 'recv' and i['msg'] == 'MSG_REPLY')}"
)

longs = [i for i in syncs if i["timing"] > 0.5]
print(f'longs ({len(longs)}) time={sum(i["timing"] for i in longs)}') # noqa: T001

s_sends = [i for i in syncs if i["kind"] == "send"]

buckets = collections.defaultdict(list)

for i in s_sends:
buckets[i.get("req", "<REVERSE>")].append(i["args"])

print("-------------------") # noqa: T001
for k, v in buckets.items():
print(f"{k}={len(v)}") # noqa: T001
print("-------------------") # noqa: T001

sends = {
i["seq"]: i for i in items if i["kind"] == "send" and i["msg"] == "MSG_REQUEST"
}
pairs, responses = [], {}
for i in items:
if i["kind"] == "recv" and i["msg"] == "MSG_REPLY":
try:
pairs.append((sends[i["seq"]], i))
responses[i["seq"]] = i
except KeyError:
pass
getattrs = [p for p in pairs if p[0].get("req", "") == "HANDLE_GETATTR"]


def _unbox(package): # boxing
label, value = package
if label == consts.LABEL_VALUE:
return value
if label == consts.LABEL_TUPLE:
return tuple(_unbox(item) for item in value)
if label == consts.LABEL_LOCAL_REF:
id_pack = (str(value[0]), value[1], value[2]) # so value is a id_pack
return f"[local]{id_pack[0]}(cls={id_pack[1]}:inst={id_pack[2]})"
if label == consts.LABEL_REMOTE_REF:
id_pack = (str(value[0]), value[1], value[2]) # so value is a id_pack
return f"[remote]{id_pack[0]}(cls={id_pack[1]}:inst={id_pack[2]})"
raise ValueError("invalid label %r" % (label,))


def from_getattr_send(i, s=True):
_, args = eval(i["args"])
obj, attr = _unbox(args)
return f"{obj}::{attr}" if s else (obj, attr)


def from_getattr_recv(i, s=True):
if not i:
return ""
args = eval(i["args"])
return _unbox(args)


def from_hash_send(i, s=True):
_, args = eval(i["args"])
obj = _unbox(args)[0]
return obj


def _unwrap_obj(obj, remote):
try:
obj, attr = remote[obj.replace("[local]", "[remote]")]
except (KeyError, ValueError):
obj = "[>_<] " + obj
else:
obj = f"{obj.replace('[local]', '[remote]')}.{attr}"
return obj


def _stringify(obj):
if not isinstance(obj, str):
return str(obj)
if "[local]" in obj or "[remote]" in obj:
return obj
return repr(obj)


def _format_args(args, kw):
fargs = ", ".join(_stringify(x) for x in args)
fkw = ", ".join(f"{k}={_stringify(v)}" for (k, v) in kw)
if fargs and fkw:
fargs += ", "
return f"({fargs}{fkw})"


def from_callattr_send(i, s=True, remote=None):
_, args = eval(i["args"])
obj, name, args, kw = _unbox(args)
if remote:
obj = _unwrap_obj(obj, remote)
return f"{obj}.{name}{_format_args(args, kw)}" if s else (obj, name, args, kw)


def from_call_send(i, s=True, remote=None):
_, args = eval(i["args"])
obj, args, kw = _unbox(args)
if remote:
obj = _unwrap_obj(obj, remote)
if s:
res = f"{obj}{_format_args(args, kw)}"
return re.sub(r"\(cls=\d+:inst=", "(inst:", res)
return obj, args, kw


def _parse_msg(m, s=False, **kw):
if m["kind"] == "send":
if m.get("req") == "HANDLE_GETATTR":
return from_getattr_send(m, s, **kw)
if m.get("req") in ("HANDLE_HASH", "HANDLE_STR"):
return from_hash_send(m, s, **kw)
if m.get("req") == "HANDLE_CALLATTR":
return from_callattr_send(m, s, **kw)
if m.get("req") == "HANDLE_CALL":
return from_call_send(m, s, **kw)
return str(m)
return from_getattr_recv(m, s, **kw)


remote = {}
for gsend, grecv in pairs:
got, sent = _parse_msg(grecv, False), _parse_msg(gsend, False)
if isinstance(got, str):
remote[got] = sent
# remote[from_getattr_recv(grecv, False)] = from_getattr_send(gsend, False)

print(f"total time getattrs={sum(x[1]['timing'] for x in getattrs)}") # noqa: T001

# import pdb; pdb.set_trace()

print("\n\n----[ getattr ]----") # noqa: T001
for gsend, grecv in getattrs:
print(f"{from_getattr_send(gsend)} --> {from_getattr_recv(grecv)}") # noqa: T001


print("\n\n----[ hash ]----") # noqa: T001
for i in syncs:
if i.get("req", "") == "HANDLE_HASH" and i["kind"] == "send":
print( # noqa: T001
from_hash_send(i), "-->", from_getattr_recv(responses.get(i["seq"]))
)

print("\n\n----[ str ]----") # noqa: T001
for i in syncs:
if i.get("req", "") == "HANDLE_STR" and i["kind"] == "send":
print( # noqa: T001
from_hash_send(i), "-->", from_getattr_recv(responses.get(i["seq"]))
)

print("\n\n----[ callattr ]----") # noqa: T001
for i in syncs:
if i.get("req", "") == "HANDLE_CALLATTR" and i["kind"] == "send":
print( # noqa: T001
from_callattr_send(i, remote=remote),
"-->",
from_getattr_recv(responses.get(i["seq"])),
)

print("\n\n----[ call ]----") # noqa: T001
for i in syncs:
if i.get("req", "") == "HANDLE_CALL" and i["kind"] == "send":
print( # noqa: T001
from_call_send(i, remote=remote),
"-->",
from_getattr_recv(responses.get(i["seq"])),
)
109 changes: 109 additions & 0 deletions modin/experimental/cloud/tracing/tracing_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import threading
import time
import collections

from rpyc.core import brine, consts, netref

from ..rpyc_proxy import WrappingConnection

_msg_to_name = collections.defaultdict(dict)
for name in dir(consts):
if name.upper() == name:
category, _ = name.split("_", 1)
_msg_to_name[category][getattr(consts, name)] = name
_msg_to_name = dict(_msg_to_name)


class _Logger:
def __init__(self, conn, logname):
self.conn = conn
self.logname = logname

def __enter__(self):
with self.conn.logLock:
self.conn.logfiles.add(self.logname)
with open(self.logname, "a") as out:
out.write(f"------------[new trace at {time.asctime()}]----------\n")
return self

def __exit__(self, *a, **kw):
with self.conn.logLock:
self.conn.logfiles.remove(self.logname)


class TracingWrappingConnection(WrappingConnection):
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self.logLock = threading.RLock()
self.timings = {}
with open("rpyc-trace.log", "a") as out:
out.write(f"------------[new trace at {time.asctime()}]----------\n")
self.logfiles = set(["rpyc-trace.log"])

@classmethod
def __stringify(cls, args):
if isinstance(args, (tuple, list)):
return tuple(cls.__stringify(i) for i in args)
if isinstance(args, netref.BaseNetref):
return str(args.____id_pack__)
return args

@classmethod
def __to_text(cls, args):
return str(cls.__stringify(args))

def _send(self, msg, seq, args):
str_args = self.__to_text(args).replace("\r", "").replace("\n", "\tNEWLINE\t")
if msg == consts.MSG_REQUEST:
handler, _ = args
str_handler = f":req={_msg_to_name['HANDLE'][handler]}"
else:
str_handler = ""
with self.logLock:
for logfile in self.logfiles:
with open(logfile, "a") as out:
out.write(
f"send:msg={_msg_to_name['MSG'][msg]}:seq={seq}{str_handler}:args={str_args}\n"
)
self.timings[seq] = time.time()
return super()._send(msg, seq, args)

def _dispatch(self, data):
"""tracing only"""
got1 = time.time()
try:
return super()._dispatch(data)
finally:
got2 = time.time()
msg, seq, args = brine.load(data)
sent = self.timings.pop(seq, got1)
if msg == consts.MSG_REQUEST:
handler, args = args
str_handler = f":req={_msg_to_name['HANDLE'][handler]}"
else:
str_handler = ""
str_args = (
self.__to_text(args).replace("\r", "").replace("\n", "\tNEWLINE\t")
)
with self.logLock:
for logfile in self.logfiles:
with open(logfile, "a") as out:
out.write(
f"recv:timing={got1 - sent}+{got2 - got1}:msg={_msg_to_name['MSG'][msg]}:seq={seq}{str_handler}:args={str_args}\n"
)

def _log_extra(self, logname):
return _Logger(self, logname)

0 comments on commit 7966a86

Please sign in to comment.