Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Train Log]Ray Train Structured Logging #47806

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
3 changes: 2 additions & 1 deletion python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def __getattr__(self, attr):
"actor",
"autoscaler",
"data",
"train",
"internal",
"util",
"widgets",
Expand Down Expand Up @@ -284,7 +285,7 @@ def __getattr__(self, attr):
def __getattr__(name: str):
import importlib

if name in ["data", "workflow", "autoscaler"]:
if name in ["data", "workflow", "autoscaler", "train"]:
return importlib.import_module("." + name, __name__)
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

Expand Down
8 changes: 8 additions & 0 deletions python/ray/train/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,14 @@ py_test(
deps = [":train_lib", ":conftest"]
)

py_test(
name = "test_logging",
size = "small",
srcs = ["tests/test_logging.py"],
tags = ["team:ml", "exclusive"],
deps = [":train_lib", ":conftest"]
)

py_test(
name = "test_torch_fsdp",
size = "small",
Expand Down
3 changes: 3 additions & 0 deletions python/ray/train/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# Import this first so it can be used in other modules
from ray.train._checkpoint import Checkpoint
from ray.train._internal.data_config import DataConfig
from ray.train._internal.logging import configure_logging
from ray.train._internal.session import get_checkpoint, get_dataset_shard, report
from ray.train._internal.syncer import SyncConfig
from ray.train.backend import BackendConfig
Expand All @@ -29,6 +30,8 @@

usage_lib.record_library_usage("train")

configure_logging()

Checkpoint.__module__ = "ray.train"

__all__ = [
Expand Down
143 changes: 143 additions & 0 deletions python/ray/train/_internal/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import logging
import logging.config
import os
from typing import Optional

import yaml

import ray
from ray.train.constants import (
DEFAULT_RAY_TRAIN_LOG_CONFIG_PATH,
RAY_TRAIN_JSON_LOG_ENCODING_FORMAT,
RAY_TRAIN_LOG_CONFIG_PATH_ENV,
RAY_TRAIN_LOG_ENCODING_ENV,
)

# Env. variable to specify the encoding of the file logs when using the default config.
RAY_TRAIN_LOG_ENCODING = os.environ.get(RAY_TRAIN_LOG_ENCODING_ENV, "").upper()
# Env. variable to specify the logging config path use defaults if not set
RAY_TRAIN_LOG_CONFIG_PATH = os.environ.get(RAY_TRAIN_LOG_CONFIG_PATH_ENV)


class HiddenRecordFilter:
"""Filters out log records with the "hide" attribute set to True.

This filter allows you to override default logging behavior. For example, if errors
are printed by default, and you don't want to print a specific error, you can set
the "hide" attribute to avoid printing the message.

.. testcode::

import logging
logger = logging.getLogger("ray.train.spam")

# This warning won't be printed to the console.
logger.warning("ham", extra={"hide": True})
"""

def filter(self, record):
return not getattr(record, "hide", False)


class SessionFileHandler(logging.Handler):
"""A handler that writes to a log file in the Ray session directory.

The Ray session directory isn't available until Ray is initialized, so this handler
lazily creates the file handler when you emit a log record.

Args:
filename: The name of the log file. The file is created in the 'logs' directory
of the Ray session directory.
"""

def __init__(self, filename: str):
super().__init__()
self._filename = filename
self._handler = None
self._formatter = None
self._path = None

def emit(self, record):
if self._handler is None:
self._try_create_handler()
if self._handler is not None:
self._handler.emit(record)

def setFormatter(self, fmt: logging.Formatter) -> None:
if self._handler is not None:
self._handler.setFormatter(fmt)
self._formatter = fmt

def _try_create_handler(self):
assert self._handler is None

log_directory = get_log_directory()
if log_directory is None:
return

os.makedirs(log_directory, exist_ok=True)

self._path = os.path.join(log_directory, self._filename)
self._handler = logging.FileHandler(self._path)
if self._formatter is not None:
self._handler.setFormatter(self._formatter)


def configure_logging() -> None:
"""Configure the Python logger named 'ray.train'.

This function loads the configration YAML specified by "RAY_TRAIN_LOGGING_CONFIG"
environment variable. If the variable isn't set, this function loads the default
"logging.yaml" file that is adjacent to this module.

If "RAY_TRAIN_LOG_ENCODING" is specified as "JSON" we will enable JSON reading mode
if using the default logging config.
"""

def _load_logging_config(config_path: str):
with open(config_path) as file:
config = yaml.safe_load(file)
return config

if RAY_TRAIN_LOG_CONFIG_PATH is not None:
config = _load_logging_config(RAY_TRAIN_LOG_CONFIG_PATH)
else:
config = _load_logging_config(DEFAULT_RAY_TRAIN_LOG_CONFIG_PATH)
if RAY_TRAIN_LOG_ENCODING == RAY_TRAIN_JSON_LOG_ENCODING_FORMAT:
for logger in config["loggers"].values():
logger["handlers"].remove("file")
logger["handlers"].append("file_json")

logging.config.dictConfig(config)

# After configuring logger, warn if RAY_TRAIN_LOGGING_CONFIG_PATH is used with
# RAY_TRAIN_LOG_ENCODING, because they are not both supported together.
if RAY_TRAIN_LOG_CONFIG_PATH is not None and RAY_TRAIN_LOG_ENCODING is not None:
logger = logging.getLogger("ray.train")
logger.warning(
f"Using {RAY_TRAIN_LOG_ENCODING_ENV} is not supported with "
f"{RAY_TRAIN_LOG_CONFIG_PATH_ENV}."
)


def reset_logging() -> None:
"""Reset the logger named 'ray.train' to its initial state.

Used for testing.
"""
logger = logging.getLogger("ray.train")
logger.handlers.clear()
logger.setLevel(logging.NOTSET)


def get_log_directory() -> Optional[str]:
"""Return the directory where Ray Train writes log files.

If Ray isn't initialized, this function returns ``None``.
"""
global_node = ray._private.worker._global_node
if global_node is None:
return None

session_dir = global_node.get_session_dir_path()
return os.path.join(session_dir, "logs", "ray-train")
36 changes: 36 additions & 0 deletions python/ray/train/_internal/logging.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
version: 1
disable_existing_loggers: False

formatters:
ray:
format: "%(asctime)s\t%(levelname)s %(filename)s:%(lineno)s -- %(message)s"
ray_json:
class: ray._private.ray_logging.formatters.JSONFormatter

filters:
console_filter:
(): ray.train._internal.logging.HiddenRecordFilter
core_context_filter:
class: ray._private.ray_logging.filters.CoreContextFilter

handlers:
file:
class: ray.train._internal.logging.SessionFileHandler
formatter: ray
filename: ray-train.log
file_json:
class: ray.train._internal.logging.SessionFileHandler
formatter: ray_json
filename: ray-train.log
filters: [core_context_filter]
console:
class: ray._private.log.PlainRayHandler
formatter: ray
level: INFO
filters: [console_filter]

loggers:
ray.train:
level: DEBUG
handlers: [file, console]
propagate: False
16 changes: 16 additions & 0 deletions python/ray/train/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ def _get_ray_train_session_dir() -> str:
# Deprecated configs can use this value to detect if the user has set it.
_DEPRECATED_VALUE = "DEPRECATED"

# Default structured logging config file path used by Ray Train
DEFAULT_RAY_TRAIN_LOG_CONFIG_PATH = (
Path(__file__).parent / "_internal/logging.yaml"
).as_posix()

# JSON Encoding format for Ray Train structured logging
RAY_TRAIN_JSON_LOG_ENCODING_FORMAT = "JSON"

# ==================================================
# Environment Variables
# ==================================================
Expand Down Expand Up @@ -86,6 +94,12 @@ def _get_ray_train_session_dir() -> str:
# Defaults to 0
RAY_TRAIN_ENABLE_STATE_TRACKING = "RAY_TRAIN_ENABLE_STATE_TRACKING"

# Env. variable to specify the encoding of the file logs when using the default config.
RAY_TRAIN_LOG_ENCODING_ENV = "RAY_TRAIN_LOG_ENCODING"

# Env. variable to specify the logging config path use defaults if not set
RAY_TRAIN_LOG_CONFIG_PATH_ENV = "RAY_TRAIN_LOG_CONFIG_PATH"

# NOTE: When adding a new environment variable, please track it in this list.
TRAIN_ENV_VARS = {
ENABLE_DETAILED_AUTOFILLED_METRICS_ENV,
Expand All @@ -96,6 +110,8 @@ def _get_ray_train_session_dir() -> str:
RAY_CHDIR_TO_TRIAL_DIR,
RAY_TRAIN_COUNT_PREEMPTION_AS_FAILURE,
RAY_TRAIN_ENABLE_STATE_TRACKING,
RAY_TRAIN_LOG_ENCODING_ENV,
RAY_TRAIN_LOG_CONFIG_PATH_ENV,
}

# Key for AIR Checkpoint metadata in TrainingResult metadata
Expand Down
131 changes: 131 additions & 0 deletions python/ray/train/tests/test_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
import logging
import os
import re
from datetime import datetime

import pytest
import yaml

import ray
from ray.tests.conftest import * # noqa
from ray.train._internal.logging import configure_logging, get_log_directory


@pytest.fixture(name="configure_logging")
def configure_logging_fixture():
from ray.train._internal.logging import configure_logging

configure_logging()
yield


@pytest.fixture(name="reset_logging")
def reset_logging_fixture():
from ray.train._internal.logging import reset_logging

yield
reset_logging()


def test_messages_logged_to_file(configure_logging, reset_logging, shutdown_only):
ray.init()
logger = logging.getLogger("ray.train.spam")

logger.debug("ham")
log_path = os.path.join(get_log_directory(), "ray-train.log")
with open(log_path) as file:
log_contents = file.read()
assert "ham" in log_contents


def test_messages_printed_to_console(
capsys,
configure_logging,
reset_logging,
propagate_logs,
):
logger = logging.getLogger("ray.train.spam")

logger.info("ham")

assert "ham" in capsys.readouterr().err


def test_hidden_messages_not_printed_to_console(
capsys,
configure_logging,
reset_logging,
propagate_logs,
):
logger = logging.getLogger("ray.train.spam")

logger.info("ham", extra={"hide": True})

assert "ham" not in capsys.readouterr().err


def test_message_format(configure_logging, reset_logging, shutdown_only):
ray.init()
logger = logging.getLogger("ray.train.spam")

logger.info("ham")

log_path = os.path.join(get_log_directory(), "ray-train.log")
with open(log_path, "r") as f:
log_contents = f.read()
(
logged_ds,
logged_ts,
logged_level,
logged_filepath,
sep,
logged_msg,
) = log_contents.split()

try:
datetime.strptime(f"{logged_ds} {logged_ts}", "%Y-%m-%d %H:%M:%S,%f")
except ValueError:
raise Exception(f"Invalid log timestamp: {logged_ds} {logged_ts}")

assert logged_level == logging.getLevelName(logging.INFO)
assert re.match(r"test_logging.py:\d+", logged_filepath)
assert logged_msg == "ham"


def test_custom_config(reset_logging, monkeypatch, tmp_path):
config_path = tmp_path / "logging.yaml"
monkeypatch.setattr(
ray.train._internal.logging, "RAY_TRAIN_LOG_CONFIG_PATH", config_path
)

handlers = {
"console": {"class": "logging.StreamHandler", "stream": "ext://sys.stdout"}
}
loggers = {
"ray.train": {
"level": "CRITICAL",
"handlers": ["console"],
},
}
config = {
"version": 1,
"handlers": handlers,
"loggers": loggers,
"disable_existing_loggers": False,
}
with open(config_path, "w") as file:
yaml.dump(config, file)

configure_logging()

logger = logging.getLogger("ray.train")

assert logger.getEffectiveLevel() == logging.CRITICAL
assert len(logger.handlers) == 1
assert isinstance(logger.handlers[0], logging.StreamHandler)


if __name__ == "__main__":
import sys

sys.exit(pytest.main(["-v", __file__]))
Loading