Skip to content

Commit

Permalink
added testing apis into daemon
Browse files Browse the repository at this point in the history
  • Loading branch information
visualDust committed Nov 21, 2023
1 parent 59c50c1 commit d93a841
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 22 deletions.
5 changes: 5 additions & 0 deletions neetbox/cli/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
import os
import json
import neetbox
<<<<<<< Updated upstream
=======
from neetbox.daemon._client_apis import get_status_of
from neetbox.logging.formatting import LogStyle
>>>>>>> Stashed changes
from neetbox.logging.logger import Logger
from neetbox.logging.formatting import LogStyle
from neetbox.daemon._apis import get_status_of
Expand Down
7 changes: 7 additions & 0 deletions neetbox/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
# URL: https://gong.host
# Date: 20230413

<<<<<<< Updated upstream
from neetbox.config._config import DEFAULT_CONFIG as default
=======
import inspect
from typing import Optional, Union

from neetbox.config._config import DEFAULT_WORKSPACE_CONFIG as default
>>>>>>> Stashed changes
from neetbox.config._config import get_current
from neetbox.utils.framing import *
from typing import Union, Optional
Expand Down
16 changes: 14 additions & 2 deletions neetbox/config/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,19 @@
from neetbox.utils.mvc import patch
from multiprocessing import current_process

DEFAULT_CONFIG = {
DEFAULT_GLOBAL_CONFIG = {
"daemon": {
"enable": True,
"allowIpython": False,
"servers": [{"host": "localhost", "port": "20202"},],
"mode": "detached",
"displayName": None,
"uploadInterval": 10,
"mute": True,
},
}

DEFAULT_WORKSPACE_CONFIG = {
"name": None,
"version": None,
"logging": {"logdir": None},
Expand All @@ -35,7 +47,7 @@
}
},
}
WORKSPACE_CONFIG: dict = DEFAULT_CONFIG.copy()
WORKSPACE_CONFIG: dict = DEFAULT_WORKSPACE_CONFIG.copy()


def update_with(cfg: dict):
Expand Down
12 changes: 11 additions & 1 deletion neetbox/daemon/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
import os
import json

pkg.is_installed("flask", try_install_if_not=True)
pkg.is_installed("setproctitle", try_install_if_not=True)


def __attach_daemon(daemon_config):
if not daemon_config["allowIpython"]:
Expand All @@ -31,7 +34,14 @@ def __attach_daemon(daemon_config):
_online_status = connect_daemon(daemon_config) # try to connect daemon
logger.log("daemon connection status: " + str(_online_status))
if not _online_status: # if no daemon online
logger.log(
if daemon_config["server"] not in ["localhost", "127.0.0.1", "0.0.0.0"]:
# daemon not running on localhost
logger.err(
f"No daemon running at {daemon_config['server']}:{daemon_config['port']}, daemon will not be attached. Continue anyway."
)
return False

logger.warn(
f"No daemon running at {daemon_config['server']}:{daemon_config['port']}, trying to create daemon..."
)

Expand Down
37 changes: 37 additions & 0 deletions neetbox/daemon/_client_apis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# -*- coding: utf-8 -*-
#
# Author: GavinGong aka VisualDust
# URL: https://gong.host
# Date: 20230414


from neetbox.daemon._local_http_client import _local_http_client
from neetbox.utils import pkg
from neetbox.utils.framing import get_frame_module_traceback

module_name = get_frame_module_traceback().__name__
assert pkg.is_installed(
"httpx", try_install_if_not=True
), f"{module_name} requires httpx which is not installed"
import json
import time

import httpx

from neetbox.config import get_module_level_config
from neetbox.logging import logger

logger = logger("NEETBOX DAEMON API")

__cfg = get_module_level_config()
daemon_address = f"{__cfg['server']}:{__cfg['port']}"
base_addr = f"http://{daemon_address}"


def get_status_of(name=None):
name = name or ""
api_addr = f"{base_addr}/status"
logger.info(f"Fetching from {api_addr}")
r = _local_http_client.get(api_addr)
_data = r.json()
return _data
40 changes: 21 additions & 19 deletions neetbox/daemon/_daemon_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,22 +30,23 @@ def _upload_thread(daemon_config, base_addr, display_name):
time.sleep(__TIME_UNIT_SEC)
if _ctr % _upload_interval: # not zero
continue
# upload data
# dump status as json
_data = json.dumps(_update_value_dict, default=str)
_headers = {"Content-Type": "application/json"}
try:
# upload data
resp = _local_http_client.post(_api_addr, data=_data, headers=_headers)
if resp.is_error:
if resp.is_error: # upload failed
raise IOError(f"Failed to upload data to daemon. ({resp.status_code})")
except Exception as e:
if _disconnect_flag:
if _disconnect_flag: # already in retries
_disconnect_retries -= 1
if not _disconnect_retries:
if not _disconnect_retries: # retry count down exceeded
logger.err(
"Failed to reconnect to daemon after {10} retries, Trying to launch new daemon..."
)
from neetbox.daemon import _try_attach_daemon

_try_attach_daemon()
time.sleep(__TIME_UNIT_SEC)
continue
Expand All @@ -61,7 +62,7 @@ def _upload_thread(daemon_config, base_addr, display_name):
_disconnect_retries = 10


def connect_daemon(daemon_config):
def connect_daemon(daemon_config, launch_upload_thread=True):
_display_name = get_module_level_config()["displayName"]
_launch_config = get_module_level_config("@")
_display_name = _display_name or _launch_config["name"]
Expand All @@ -70,30 +71,31 @@ def connect_daemon(daemon_config):
f"Connecting to daemon at {daemon_config['server']}:{daemon_config['port']} ..."
)
_daemon_address = f"{daemon_config['server']}:{daemon_config['port']}"
base_addr = f"http://{_daemon_address}"
_base_addr = f"http://{_daemon_address}"

# check if daemon is alive
def _check_daemon_alive():
def _check_daemon_alive(_api_addr):
_api_name = "hello"
_api_addr = f"{base_addr}/{_api_name}"
_api_addr = f"{_api_addr}/{_api_name}"
r = _local_http_client.get(_api_addr)
if r.is_error:
raise IOError(f"Daemon at {_api_addr} is not alive. ({r.status_code})")
logger.log(f"daemon response from {_api_addr} is {r} ({r.status_code})")

try:
_check_daemon_alive()
_check_daemon_alive(_base_addr)
logger.ok(f"daemon alive at {_base_addr}")
except Exception as e:
logger.err(e)
return False

global __upload_thread
if __upload_thread is None or not __upload_thread.is_alive():
__upload_thread = Thread(
target=_upload_thread,
daemon=True,
args=[daemon_config, base_addr, _display_name],
)
__upload_thread.start()
if launch_upload_thread:
global __upload_thread
if __upload_thread is None or not __upload_thread.is_alive():
__upload_thread = Thread(
target=_upload_thread,
daemon=True,
args=[daemon_config, _base_addr, _display_name],
)
__upload_thread.start()

return True
5 changes: 5 additions & 0 deletions neetbox/daemon/_daemon_launcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@
import os
import sys

<<<<<<< Updated upstream
=======
from neetbox.daemon._flask_server import daemon_process

>>>>>>> Stashed changes
# sys.stdout=open(r'D:\Projects\ML\neetbox\logdir\daemon.log', 'a+')

import json
Expand Down
94 changes: 94 additions & 0 deletions neetbox/daemon/_flask_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
# -*- coding: utf-8 -*-
#
# Author: GavinGong aka VisualDust
# URL: https://gong.host
# Date: 20230414

from neetbox.utils import pkg
from neetbox.utils.framing import get_frame_module_traceback

module_name = get_frame_module_traceback().__name__
assert pkg.is_installed(
"flask", try_install_if_not=True
), f"{module_name} requires flask which is not installed"
import sys
import time
from threading import Thread

from flask import Flask, abort, json, request

from neetbox.config import get_module_level_config

_STAT_POOL = {}
__DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC = 60 * 60 * 12 # 12 Hours
__COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC
__DAEMON_NAME = "NEETBOX DAEMON"


def daemon_process(daemon_config=None):
import setproctitle

setproctitle.setproctitle(__DAEMON_NAME)
daemon_config = daemon_config or get_module_level_config()
api = Flask(__DAEMON_NAME)

@api.route("/hello", methods=["GET"])
def just_send_hello():
return json.dumps({"hello": "hello"})

@api.route("/status", methods=["GET"], defaults={"name": None})
@api.route("/status/<name>", methods=["GET"])
def return_status_of(name):
global __COUNT_DOWN
global _STAT_POOL
__COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC
_returning_stat = dict(_STAT_POOL)
if not name:
pass # returning full dict
elif name in _returning_stat:
_returning_stat = _returning_stat[name] # returning specific status
else:
abort(404)
return _returning_stat

@api.route("/status/list", methods=["GET"])
def return_names_of_status(name):
global __COUNT_DOWN
global _STAT_POOL
__COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC
_names = {_STAT_POOL.keys()}
return _names

@api.route("/sync/<name>", methods=["POST"])
def sync_status_of(name):
global __COUNT_DOWN
global _STAT_POOL
__COUNT_DOWN = __DAEMON_SHUTDOWN_IF_NO_UPLOAD_TIMEOUT_SEC
_json_data = request.get_json()
_STAT_POOL[name] = _json_data
return "ok"

@api.route("/shutdown", methods=["POST"])
def shutdown():
global __COUNT_DOWN
__COUNT_DOWN = -1

def __sleep_and_shutdown(secs=3):
time.sleep(secs=secs)
sys.exit(0)

Thread(target=__sleep_and_shutdown, args=(3)).start() # shutdown after 3 seconds
return "ok"

def _count_down_thread():
global __COUNT_DOWN
while True:
__COUNT_DOWN -= 1
if not __COUNT_DOWN:
sys.exit(0)
time.sleep(1)

count_down_thread = Thread(target=_count_down_thread, daemon=True)
count_down_thread.start()

api.run(host="0.0.0.0", port=daemon_config["port"])

0 comments on commit d93a841

Please sign in to comment.