Skip to content

Commit

Permalink
patch work
Browse files Browse the repository at this point in the history
  • Loading branch information
visualDust committed Nov 24, 2023
1 parent a5c298c commit d8599ef
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 195 deletions.
Empty file added neetbox/core/packing.py
Empty file.
1 change: 1 addition & 0 deletions neetbox/daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ def __attach_daemon(daemon_config):
f"No daemon running at {daemon_config['host']}:{daemon_config['port']}, trying to create daemon..."
)

print(["--config", json.dumps(daemon_config)])
popen = DaemonableProcess(
target="neetbox.daemon.server._daemon_launcher",
args=["--config", json.dumps(daemon_config)],
Expand Down
39 changes: 33 additions & 6 deletions neetbox/daemon/client/_connection.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import asyncio
import functools
import logging
from threading import Thread
from typing import Callable, Optional

import httpx
import websockets
import websocket

from neetbox.config import get_module_level_config
from neetbox.core import Registry
Expand All @@ -23,10 +24,10 @@
class ClientConn(metaclass=Singleton):
http: httpx.Client = None

__ws_client: None # _websocket_client
__ws_client: websocket.WebSocketApp = None # _websocket_client
__ws_subscription = Registry("__client_ws_subscription") # { event-type-name : list(Callable)}

def __init__(self) -> None:
def _init_ws():
cfg = get_module_level_config()

def __load_http_client():
Expand All @@ -41,8 +42,34 @@ def __load_http_client():
# create htrtp client
ClientConn.http = __load_http_client()

ws_server_addr = f"ws://{cfg['host']}/{CLIENT_API_ROOT}"
# todo establishing socket connection
# ws server url
ClientConn.ws_server_addr = f"ws://{cfg['host']}/{CLIENT_API_ROOT}"

# todo wait for server online
# create websocket app
websocket.WebSocketApp(
ClientConn.ws_server_addr,
on_open=ClientConn.__on_ws_open,
on_message=ClientConn.__on_ws_message,
on_error=ClientConn.__on_ws_err,
on_close=ClientConn.__on_ws_close,
).run_forever(reconnect=True)

# assign self to websocket log writer
from neetbox.logging._writer import _assign_connection_to_WebSocketLogWriter

_assign_connection_to_WebSocketLogWriter(ClientConn)

def __on_ws_open(ws):
ClientConn.__ws_client = ws
logger.ok(f"client websocket {ws} connected")

def __on_ws_err(ws, msg):
logger.err(f"client websocket {ws} encountered {msg}")

def __on_ws_close(ws):
ClientConn.__ws_client = None
logger.warn(f"client websocket {ws} closed")

def __on_ws_message(ws, msg):
logger.debug(f"ws received {msg}")
Expand Down Expand Up @@ -87,5 +114,5 @@ def _ws_subscribe(function: Callable, event_type_name: str):


# singleton
ClientConn() # run init
# ClientConn() # !!! DO NOT run init. websocket client should be initialzed only after server is ready
connection = ClientConn
1 change: 1 addition & 0 deletions neetbox/logging/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from neetbox.logging.logger import set_log_level

_cfg = get_module_level_config()
print(_cfg)
logger.set_log_dir(_cfg["logdir"])
set_log_level(_cfg["level"])

Expand Down
188 changes: 23 additions & 165 deletions neetbox/logging/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ def write(self, raw_log):
pass


# ================== DEFINE LOG TYPE =====================


@dataclass
class RawLog:
rich_msg: str
Expand Down Expand Up @@ -107,158 +110,6 @@ def write(self, raw_log: RawLog):
# ================== FILE LOG WRITER =====================


class LogMetadata:
def __init__(self, writer: "_AutoSplitLogWriter"):
self.written_bytes = 0
self.log_writer = writer


SplitStrategyCallable = Callable[[LogMetadata], Union[str, Iterable[str]]]


class LogSplitStrategies:
@staticmethod
def by_date() -> SplitStrategyCallable:
def _split_strategy(metadata: LogMetadata):
return date.today().strftime("%Y%m%d")

return _split_strategy

@staticmethod
def by_hour() -> SplitStrategyCallable:
def _split_strategy(metadata: LogMetadata):
return datetime.now().strftime("%Y%m%d-%H")

return _split_strategy

@staticmethod
def by_date_and_size(size_in_bytes: int) -> SplitStrategyCallable:
class DateSizeSplitStrategy:
def __init__(self):
self.file_id = None

def _already_exists(self, metadata: LogMetadata, file_id: int) -> bool:
f = metadata.log_writer.make_logfile_path(self.make_result(file_id))
return f.exists()

def make_result(self, file_id):
return date.today().strftime("%Y%m%d"), str(file_id)

def __call__(self, metadata: LogMetadata):
if self.file_id is None:
self.file_id = 0
while self._already_exists(metadata, self.file_id):
self.file_id += 1
return self.make_result(self.file_id + metadata.written_bytes // size_in_bytes)

return DateSizeSplitStrategy()


class _AutoSplitLogWriter(io.TextIOBase):
class ReentrantCounter:
def __init__(self):
self._count = 0

def __enter__(self):
self._count += 1

def __exit__(self, exc_type, exc_val, exc_tb):
self._count -= 1

def __bool__(self):
return self._count > 0

_writer: Union[io.IOBase, None]
_filename_template: str
_split_strategy: Union[SplitStrategyCallable, Callable]
_current_logfile: Union[pathlib.Path, None]

def __init__(
self,
base_dir,
filename_template,
split_strategy: Optional[SplitStrategyCallable],
*,
encoding="utf-8",
open_on_creation=True,
overwrite_existing=False,
) -> None:
self._writer = None
self._current_logfile = None
self._filename_template = filename_template
self._base_dir = pathlib.Path(str(base_dir))
self._encoding = encoding
self._open_mode = "wb" if overwrite_existing else "ab"
self._split_lock = _AutoSplitLogWriter.ReentrantCounter()

self._split_strategy = (lambda *_: None) if split_strategy is None else split_strategy

self._stats = LogMetadata(self)

if open_on_creation:
self.open()

def _apply_filename_template(self, provider_supplied):
if provider_supplied is None:
return self._filename_template
if isinstance(provider_supplied, str):
return provider_supplied
if isinstance(provider_supplied, Iterable):
return self._filename_template.format(*provider_supplied)

raise ValueError("Filename provider must return either a string or an iterable of strings")

def make_logfile_path(self, provider_supplied):
return self._base_dir / self._apply_filename_template(provider_supplied)

def _create_logfile(self):
expected_logfile = self.make_logfile_path(self._split_strategy(self._stats))
if expected_logfile != self._current_logfile:
if self._writer is not None:
self._writer.close()
expected_logfile.parent.mkdir(parents=True, exist_ok=True)
self._current_logfile = expected_logfile
self._writer = open(self._current_logfile, self._open_mode) # type: ignore

def _check_open(self):
if self._writer is None:
raise ValueError("Writer not opened")

def write(self, __s):
self._check_open()
if not self._split_lock:
self._create_logfile()

print("writing")
bytes = __s.encode(self._encoding)
self._stats.written_bytes += len(bytes)
self._writer.write(bytes)

def writelines(self, __lines: Iterable[str]) -> None:
for line in __lines:
self.write(line + "\n")

def open(self):
self._create_logfile()

def __enter__(self):
if self._writer is None:
self.open()

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()

def flush(self):
self._writer.flush()

def close(self):
if self._writer is not None:
self._writer.close()

def split_lock(self):
return self._split_lock


class FileLogWriter(LogWriter):
# class level static pool
PATH_2_FILE_WRITER = {}
Expand Down Expand Up @@ -289,7 +140,7 @@ def __init__(self, path) -> None:
def write(self, raw_log: RawLog):
_msg_dict = raw_log.json()
_style = raw_log.style
text_msg = (
text_msg = str(
_msg_dict["prefix"]
+ _msg_dict["datetime"]
+ _style.split_char_txt * min(len(_msg_dict["datetime"]), 1)
Expand All @@ -301,33 +152,40 @@ def write(self, raw_log: RawLog):
self.file_writer.write(text_msg)


# ================== JSON LOG WRITER =====================
# ================== WS LOG WRITER =====================


class JsonLogWriter(FileLogWriter):
class __WebSocketLogWriter(LogWriter):
# class level statics
connection = None # connection should be assigned by neetbox.daemon.client._connection to avoid recursive import

def write(self, raw_log: RawLog):
# todo convert to json and write
pass
json_date = raw_log.json()
if __WebSocketLogWriter.connection:
__WebSocketLogWriter.connection.ws_send(json_date)


# ================== WS LOG WRITER =====================
def _assign_connection_to_WebSocketLogWriter(conn):
__WebSocketLogWriter.connection = conn


class __WebSocketLogWriter(LogWriter):
def __init__(self) -> None:
pass
webSocketLogWriter = __WebSocketLogWriter()

def write(self, raw_log: RawLog):
pass

# ================== JSON LOG WRITER =====================

webSocketLogWriter = __WebSocketLogWriter()

class JsonLogWriter(FileLogWriter):
def write(self, raw_log: RawLog):
# todo convert to json and write to file
pass


# ================== POST INIT TYPE REF =====================

RawLog.name2writerType = {
"console": __ConsoleLogWriter,
"stdout": __ConsoleLogWriter,
"file": FileLogWriter,
"ws": __WebSocketLogWriter,
"json": JsonLogWriter,
}
Loading

0 comments on commit d8599ef

Please sign in to comment.