From 56acab72d6aad58fdb17041f7a9cd378ae2b7ae4 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Thu, 4 Nov 2021 14:33:09 +0800 Subject: [PATCH] Reduce time cost of `cpu_percent()` calls (#2567) --- azure-pipelines.yml | 3 +- .../zh_CN/LC_MESSAGES/development/operand.po | 6 ++-- mars/deploy/oscar/session.py | 5 ++-- mars/oscar/backends/communication/dummy.py | 2 +- mars/oscar/backends/message.py | 11 +++++-- mars/resource.py | 24 ++++++++++----- mars/services/cluster/uploader.py | 30 ++++++++++++++----- mars/services/context.py | 4 ++- mars/services/session/api/oscar.py | 1 + 9 files changed, 60 insertions(+), 26 deletions(-) diff --git a/azure-pipelines.yml b/azure-pipelines.yml index abbfd54b8d..d04b2b949a 100644 --- a/azure-pipelines.yml +++ b/azure-pipelines.yml @@ -83,7 +83,8 @@ jobs: if [[ "$(mars.test.module)" == "learn" ]]; then # remove version limit when blue-yonder/tsfresh#897 is fixed. - pip install xgboost lightgbm tensorflow faiss-cpu torch torchvision \ + # remove keras version after https://github.com/tensorflow/tensorflow/issues/52922 is fixed. + pip install xgboost lightgbm keras==2.6.0 tensorflow faiss-cpu torch torchvision \ statsmodels\<0.13.0 tsfresh fi fi diff --git a/docs/source/locale/zh_CN/LC_MESSAGES/development/operand.po b/docs/source/locale/zh_CN/LC_MESSAGES/development/operand.po index 6f86004cb8..4189b15813 100644 --- a/docs/source/locale/zh_CN/LC_MESSAGES/development/operand.po +++ b/docs/source/locale/zh_CN/LC_MESSAGES/development/operand.po @@ -55,7 +55,7 @@ msgid "" "which is useful for serialization. If the type is uncertain, ``AnyField``" " will work." msgstr "" -"对于 ``SimpleReadCSV`` 算子,类持有的 ``path`` 属性记录 csv 的文件地址,这里使用``StringField`` " +"对于 ``SimpleReadCSV`` 算子,类持有的 ``path`` 属性记录 csv 的文件地址,这里使用 ``StringField`` " "表示改属性的类型是字符串,指定类型主要是为了方便序列化算子,如果某个属性的类型是不确定的,可以用 ``AnyField`` 表示。" #: ../../source/development/operand.rst:52 @@ -93,7 +93,7 @@ msgid "" " at delimiter boundaries." msgstr "" "当拆分好的子任务被分发到执行器时,Mars 会调用算子的 ``execute`` 方法来做计算,对于 ``read_csv`` " -"的子任务,在函数里需要根据 ``offset`` 和 ``length``读取对应的数据块,但是这两个值只是一个粗略的值,因为 csv " +"的子任务,在函数里需要根据 ``offset`` 和 ``length`` 读取对应的数据块,但是这两个值只是一个粗略的值,因为 csv " "文件不能从一行的中间读取,所以每次执行的时候需要计算出分隔符所在的起始位置。" #: ../../source/development/operand.rst:190 @@ -117,7 +117,7 @@ msgid "" "taken to infer some meta information of Mars DataFrame, such as dtypes, " "columns, index, etc." msgstr "" -"最后,需要定义一个暴露给用户的函数接口 ``read_csv``。在这个函数里,我们需要创建``SimpleReadCSV`` " +"最后,需要定义一个暴露给用户的函数接口 ``read_csv``。在这个函数里,我们需要创建 ``SimpleReadCSV`` " "算子,并且需要读取一小块采样数据,推断出输出的 DataFrame 的dtypes, columns, index 等元信息。" #: ../../source/development/operand.rst:223 diff --git a/mars/deploy/oscar/session.py b/mars/deploy/oscar/session.py index 52462e3b0e..2368c8306f 100644 --- a/mars/deploy/oscar/session.py +++ b/mars/deploy/oscar/session.py @@ -1206,8 +1206,6 @@ async def _init( if new: # create new session await session_api.create_session(session_id) - else: - await session_api.get_session_address(session_id) lifecycle_api = WebLifecycleAPI(session_id, address) meta_api = WebMetaAPI(session_id, address) task_api = WebTaskAPI(session_id, address) @@ -1838,6 +1836,7 @@ def new_session( session_id: str = None, backend: str = "oscar", default: bool = True, + new: bool = True, **kwargs, ) -> AbstractSession: ensure_isolation_created(kwargs) @@ -1851,7 +1850,7 @@ def new_session( session_id = _new_session_id() session = SyncSession.init( - address, session_id=session_id, backend=backend, new=True, **kwargs + address, session_id=session_id, backend=backend, new=new, **kwargs ) if default: session.as_default() diff --git a/mars/oscar/backends/communication/dummy.py b/mars/oscar/backends/communication/dummy.py index 1f3b042633..abb8d09cce 100644 --- a/mars/oscar/backends/communication/dummy.py +++ b/mars/oscar/backends/communication/dummy.py @@ -64,7 +64,7 @@ async def send(self, message: Any): if self._closed.is_set(): # pragma: no cover raise ChannelClosed("Channel already closed, cannot send message") # put message directly into queue - await self._out_queue.put(message) + self._out_queue.put_nowait(message) @implements(Channel.recv) async def recv(self): diff --git a/mars/oscar/backends/message.py b/mars/oscar/backends/message.py index 60ae01a427..88f73a12d7 100644 --- a/mars/oscar/backends/message.py +++ b/mars/oscar/backends/message.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os from abc import ABC, abstractmethod from dataclasses import dataclass from enum import Enum @@ -25,6 +24,14 @@ from ...utils import classproperty, dataslots, implements from ..core import ActorRef +try: + from random import randbytes +except ImportError: + from random import getrandbits + + def randbytes(n: int) -> bytes: + return getrandbits(n * 8).to_bytes(n, "little") + # make sure traceback can be pickled pickling_support.install() @@ -368,4 +375,4 @@ def _get_slots(message_cls: Type[_MessageBase]): def new_message_id(): - return os.urandom(32) + return randbytes(32) diff --git a/mars/resource.py b/mars/resource.py index 1c49a7546d..bee901717c 100644 --- a/mars/resource.py +++ b/mars/resource.py @@ -151,7 +151,9 @@ def cpu_count(): _last_cgroup_cpu_measure = None _last_proc_cpu_measure = None +_last_psutil_measure = None _last_cpu_percent = None +_cpu_percent_interval = 0.1 def _take_process_cpu_snapshot(): @@ -167,7 +169,7 @@ def _take_process_cpu_snapshot(): def cpu_percent(): - global _last_cgroup_cpu_measure, _last_proc_cpu_measure, _last_cpu_percent + global _last_cgroup_cpu_measure, _last_proc_cpu_measure, _last_cpu_percent, _last_psutil_measure if _cpu_use_cgroup_stat: # see https://www.kernel.org/doc/Documentation/cgroup-v1/cpuacct.txt with open(CGROUP_CPU_STAT_FILE, "r") as cgroup_file: @@ -179,15 +181,15 @@ def cpu_percent(): last_cpu_acct, last_sample_time = _last_cgroup_cpu_measure time_delta = sample_time - last_sample_time - if time_delta < 1e-2: - return _last_cpu_percent + if time_delta < _cpu_percent_interval: + return _last_cpu_percent or 0 _last_cgroup_cpu_measure = (cpu_acct, sample_time) # nanoseconds / seconds * 100, we shall divide 1e7. _last_cpu_percent = round( (cpu_acct - last_cpu_acct) / (sample_time - last_sample_time) / 1e7, 1 ) - return _last_cpu_percent + return _last_cpu_percent or 0 elif _cpu_use_process_stat: pts, sts = _take_process_cpu_snapshot() @@ -206,14 +208,22 @@ def cpu_percent(): delta_proc = (pt2.user - pt1.user) + (pt2.system - pt1.system) time_delta = sts[pid] - old_sts[pid] - if time_delta < 1e-2: + if time_delta < _cpu_percent_interval: return _last_cpu_percent or 0 percents.append((delta_proc / time_delta) * 100) _last_proc_cpu_measure = (pts, sts) _last_cpu_percent = round(sum(percents), 1) - return _last_cpu_percent + return _last_cpu_percent or 0 else: - return sum(psutil.cpu_percent(percpu=True)) + measure_time = time.time() + if ( + _last_psutil_measure is not None + and measure_time - _last_psutil_measure < _cpu_percent_interval + ): + return _last_cpu_percent or 0 + _last_psutil_measure = measure_time + _last_cpu_percent = psutil.cpu_percent() * _cpu_total + return _last_cpu_percent or 0 def disk_usage(d): diff --git a/mars/services/cluster/uploader.py b/mars/services/cluster/uploader.py index 2ec4eba53f..655657ca2b 100644 --- a/mars/services/cluster/uploader.py +++ b/mars/services/cluster/uploader.py @@ -44,6 +44,7 @@ def __init__(self, role=None, interval=None, band_to_slots=None, use_gpu=True): self._interval = interval or DEFAULT_INFO_UPLOAD_INTERVAL self._upload_task = None self._upload_enabled = False + self._uploaded_future = asyncio.Future() self._node_ready_event = asyncio.Event() self._use_gpu = use_gpu @@ -54,7 +55,8 @@ def __init__(self, role=None, interval=None, band_to_slots=None, use_gpu=True): self._disk_infos = [] async def __post_create__(self): - await self.upload_node_info() + self._upload_task = asyncio.create_task(self._periodical_upload_node_info()) + await self._uploaded_future async def __pre_destroy__(self): self._upload_task.cancel() @@ -77,10 +79,27 @@ async def _get_node_info_ref(self): NodeInfoCollectorActor.default_uid(), address=supervisor_addr ) + async def _periodical_upload_node_info(self): + while True: + try: + await self.upload_node_info() + if not self._uploaded_future.done(): + self._uploaded_future.set_result(None) + except asyncio.CancelledError: # pragma: no cover + break + except Exception as ex: # pragma: no cover # noqa: E722 # nosec # pylint: disable=bare-except + logger.error(f"Failed to upload node info: {ex}") + if not self._uploaded_future.done(): + self._uploaded_future.set_exception(ex) + try: + await asyncio.sleep(self._interval) + except asyncio.CancelledError: # pragma: no cover + break + async def mark_node_ready(self): self._upload_enabled = True # upload info in time to reduce latency - await self.upload_node_info(call_next=False, status=NodeStatus.READY) + await self.upload_node_info(status=NodeStatus.READY) self._node_ready_event.set() def is_node_ready(self): @@ -89,7 +108,7 @@ def is_node_ready(self): async def wait_node_ready(self): return self._node_ready_event.wait() - async def upload_node_info(self, call_next: bool = True, status: NodeStatus = None): + async def upload_node_info(self, status: NodeStatus = None): try: if not self._info.env: self._info.env = await asyncio.to_thread(gather_node_env) @@ -133,11 +152,6 @@ async def upload_node_info(self, call_next: bool = True, status: NodeStatus = No except: # noqa: E722 # nosec # pylint: disable=bare-except # pragma: no cover logger.exception(f"Failed to upload node info") raise - finally: - if call_next: - self._upload_task = self.ref().upload_node_info.tell_delay( - delay=self._interval - ) def get_bands(self) -> Dict[BandType, int]: band_slots = dict() diff --git a/mars/services/context.py b/mars/services/context.py index 008c4215a9..07ef9a2bfd 100644 --- a/mars/services/context.py +++ b/mars/services/context.py @@ -84,7 +84,9 @@ def _call(self, coro): def get_current_session(self) -> SessionType: from ..deploy.oscar.session import new_session - return new_session(self.supervisor_address, self.session_id, default=False) + return new_session( + self.supervisor_address, self.session_id, new=False, default=False + ) @implements(Context.get_supervisor_addresses) def get_supervisor_addresses(self) -> List[str]: diff --git a/mars/services/session/api/oscar.py b/mars/services/session/api/oscar.py index b492691ef9..c1f407abcc 100644 --- a/mars/services/session/api/oscar.py +++ b/mars/services/session/api/oscar.py @@ -65,6 +65,7 @@ async def has_session(self, session_id: str) -> bool: async def delete_session(self, session_id: str): await self._session_manager_ref.delete_session(session_id) + @alru_cache(cache_exceptions=False) async def get_session_address(self, session_id: str) -> str: """ Get session address.