Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

requirements: avoid installing ssh2-python #24

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions pytest_mh/_private/logging.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import logging
import logging.handlers
import textwrap
from typing import Any

Expand Down Expand Up @@ -55,11 +56,17 @@ def Setup(cls, log_path: str) -> MultihostLogger:
if log_path == "/dev/stdout" or log_path == "/dev/stderr":
logger.allow_colors = True

handler = logging.FileHandler(log_path)
handler.setLevel(logging.DEBUG)
handler.setFormatter(logging.Formatter("%(levelname)-8s %(asctime)s %(message)s"))
# All messages go to a single file
main_handler = logging.FileHandler(log_path)
main_handler.setLevel(logging.DEBUG)
main_handler.setFormatter(logging.Formatter("%(levelname)-8s %(asctime)s %(message)s"))

logger.addHandler(handler)
# This handler can be flushed whenever needed to log each test case
# into a test case specific file.
intermediate_handler = ManualMemoryHandler()

logger.addHandler(main_handler)
logger.addHandler(intermediate_handler)
logger.addFilter(LogExtraDataFilter(logger=logger))
logger.setLevel(logging.DEBUG)

Expand Down Expand Up @@ -142,3 +149,32 @@ def filter(self, record):
)

return super().filter(record)


class ManualMemoryHandler(logging.handlers.MemoryHandler):
"""
Logs messages inside a memory.

All messages are logged. The amount of messages is unlimited and
:meth:`flush` must be called manually in order to send them to the target
handler and clear the buffer.
"""
def __init__(self, target: logging.Handler | None = None) -> None:
"""
:param target: Logging target when the messages are flushed, defaults to None
:type target: logging.Handler | None, optional
"""
super().__init__(capacity=0, flushLevel=0, target=target, flushOnClose=False)

def shouldFlush(self, record: logging.LogRecord) -> bool:
"""
Always returns ``False``. In order to flush the messages, call
:meth:`flush` manually.
"""
return False

def clear(self):
"""
Remove all records from the buffer.
"""
self.buffer.clear()
4 changes: 3 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
--no-dependencies parallel-ssh
colorama
parallel-ssh
gevent
pytest
PyYAML
ssh-python
Loading