Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Train Log]Ray Train Structured Logging #47806

Open
wants to merge 35 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
48c0961
copy over ray data structure log
hongpeng-guo Sep 20, 2024
90b6893
remove TRACE level for ray train
hongpeng-guo Sep 20, 2024
dc79a60
move constant variables to the constant.py file
hongpeng-guo Sep 23, 2024
d388da5
fix default yaml config path
hongpeng-guo Sep 23, 2024
ace76a7
fix unittests
hongpeng-guo Sep 24, 2024
d6b64c0
Merge branch 'master' into hpguo/train_structured_log
hongpeng-guo Sep 24, 2024
ddadc1c
add unittest to BUILD file
hongpeng-guo Sep 24, 2024
6216d4f
modify setup.py to include yaml file into the python package
hongpeng-guo Sep 24, 2024
c77a181
update log
hongpeng-guo Sep 27, 2024
427b861
update logging
hongpeng-guo Sep 27, 2024
58bbf5d
Merge branch 'master' into hpguo/train_structured_log
hongpeng-guo Oct 7, 2024
e4d8b63
use () key instead if class for filter
hongpeng-guo Oct 7, 2024
99266ae
handle comments
hongpeng-guo Oct 28, 2024
cbf4677
add test json logging configuration
hongpeng-guo Oct 28, 2024
d3aeff8
handle comments
hongpeng-guo Oct 28, 2024
02d439f
resolve merge conflict
hongpeng-guo Oct 28, 2024
c3adf12
remove reduant __init__ part
hongpeng-guo Oct 28, 2024
0fed499
add back the change on __init__.py
hongpeng-guo Oct 28, 2024
35249c8
change yaml file to constant yaml string
hongpeng-guo Oct 28, 2024
6a5fc2d
clean code chanegs
hongpeng-guo Oct 28, 2024
1768dc6
handle comments
hongpeng-guo Oct 31, 2024
c1a9955
handle comments
hongpeng-guo Oct 31, 2024
82b23ad
add TrainContextFilter that add ranks information to worker nodes
hongpeng-guo Nov 1, 2024
d79119f
Merge remote-tracking branch 'origin' into hpguo/train_structured_log
hongpeng-guo Nov 1, 2024
b6c8cbd
fix corner cases
hongpeng-guo Nov 1, 2024
1576354
split core context filter and train context filter into two filters
hongpeng-guo Nov 4, 2024
441b043
Merge remote-tracking branch 'origin' into hpguo/train_structured_log
hongpeng-guo Nov 5, 2024
6bcb2a2
Merge remote-tracking branch 'origin' into hpguo/train_structured_log
hongpeng-guo Nov 6, 2024
7abb76d
add run_id and hidden_default keys
hongpeng-guo Nov 7, 2024
50d2067
change where the run_id is introduced
hongpeng-guo Nov 8, 2024
dee3fde
add run_id to function_trainable
hongpeng-guo Nov 8, 2024
921fd4c
change the logging behavior similar to ray core, that we may write TE…
hongpeng-guo Nov 8, 2024
6335cf0
remove constans from the constants file
hongpeng-guo Nov 8, 2024
2a4a9fb
Update python/ray/tune/trainable/function_trainable.py
hongpeng-guo Nov 13, 2024
263d2b9
Update python/ray/tune/trainable/function_trainable.py
hongpeng-guo Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion python/ray/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ def __getattr__(self, attr):
"actor",
"autoscaler",
"data",
"train",
"internal",
"util",
"widgets",
Expand Down Expand Up @@ -284,7 +285,7 @@ def __getattr__(self, attr):
def __getattr__(name: str):
import importlib

if name in ["data", "workflow", "autoscaler"]:
if name in ["data", "workflow", "autoscaler", "train"]:
hongpeng-guo marked this conversation as resolved.
Show resolved Hide resolved
return importlib.import_module("." + name, __name__)
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

Expand Down
8 changes: 8 additions & 0 deletions python/ray/train/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,14 @@ py_test(
deps = [":train_lib", ":conftest"]
)

py_test(
name = "test_logging",
size = "small",
srcs = ["tests/test_logging.py"],
tags = ["team:ml", "exclusive"],
deps = [":train_lib", ":conftest"]
)

py_test(
name = "test_torch_fsdp",
size = "small",
Expand Down
3 changes: 3 additions & 0 deletions python/ray/train/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# Import this first so it can be used in other modules
from ray.train._checkpoint import Checkpoint
from ray.train._internal.data_config import DataConfig
from ray.train._internal.logging import configure_logging
from ray.train._internal.session import get_checkpoint, get_dataset_shard, report
from ray.train._internal.syncer import SyncConfig
from ray.train.backend import BackendConfig
Expand All @@ -29,6 +30,8 @@

usage_lib.record_library_usage("train")

configure_logging()

Checkpoint.__module__ = "ray.train"

__all__ = [
Expand Down
241 changes: 241 additions & 0 deletions python/ray/train/_internal/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
import logging
import logging.config
import os
from enum import Enum
from typing import Optional

import yaml

import ray
from ray.train._internal.session import _get_session
from ray.train.constants import LOG_CONFIG_PATH_ENV, LOG_ENCODING_ENV

# JSON Encoding format for Ray Train structured logging
DEFAULT_JSON_LOG_ENCODING_FORMAT = "JSON"

# Default logging configuration for Ray Train
DEFAULT_LOG_CONFIG_JSON_STRING = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"ray": {
"format": "%(asctime)s\t%(levelname)s %(filename)s:%(lineno)s -- %(message)s" # noqa: E501
},
"ray_json": {"class": "ray._private.ray_logging.formatters.JSONFormatter"},
},
"filters": {
"console_filter": {"()": "ray.train._internal.logging.HiddenRecordFilter"},
"core_context_filter": {
"()": "ray._private.ray_logging.filters.CoreContextFilter"
},
"train_context_filter": {
"()": "ray.train._internal.logging.TrainContextFilter"
},
},
"handlers": {
"console_json": {
"class": "logging.StreamHandler",
"formatter": "ray_json",
"filters": ["core_context_filter", "train_context_filter"],
},
"console_text": {
"class": "ray._private.log.PlainRayHandler",
"formatter": "ray",
"level": "INFO",
"filters": ["train_context_filter", "console_filter"],
},
Comment on lines +40 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we switch between the core_context_filter vs. the console_filter?

Let's just use the core_context_filter and remove the console_filter (HiddenRecordFilter), since Alan mentions this HiddenRecordFilter is not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I think this HiddenRecordFilter is not very useful as well. I will remove it.

"file": {
"class": "ray.train._internal.logging.SessionFileHandler",
"formatter": "ray_json",
"filename": "ray-train.log",
"filters": ["core_context_filter", "train_context_filter"],
},
},
"loggers": {
"ray.train": {
"level": "DEBUG",
"handlers": ["file", "console_text"],
Comment on lines +60 to +61
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we limit console output to INFO and above? DEBUG is ok for the log-viewer since people can filter it out, but it will spam the console.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We actually specified level INFO in the console_text handler.

  1. In text mode, the logger pass every message >=DEBUG, when it passed to console_text handler, the INFO level will filter out the DEBUG messages, making it less spammy.
  2. In json mode, the file handler don't specify extra levels, so the DEBUG level info will show in JSON mode and can be filtered out by the user.

However, if we set the default level of ray.trian logger as INFO, DEBUG info will be removed at the logger level, that the file handler can not ingest extra information in the JSON mode.

"propagate": False,
},
},
}


class TrainLogKey(str, Enum):
RUN_ID = "run_id"
WORLD_SIZE = "world_size"
WORLD_RANK = "world_rank"
LOCAL_WORLD_SIZE = "local_world_size"
LOCAL_RANK = "local_rank"
NODE_RANK = "node_rank"
Comment on lines +70 to +74
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only tag world_rank, local_rank, and node_rank. world_size / local_world_size is confusing to filter by.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

emm, good point. the size information is not useful for log searching / filtering.

# This key is used to hide the log record if the value is True.
# By default, train workers that are not ranked zero will hide
# the log record.
HIDE = "hide"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think hide=True is necessary. At least, the product won't utilize this field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it! A followup question: what's the format we should follow so that the product team can use to filter for logs to be shown /hidden by default in the log viewer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can just filter out non "rank 0" or do whatever behavior based on the other fields



class HiddenRecordFilter(logging.Filter):
hongpeng-guo marked this conversation as resolved.
Show resolved Hide resolved
"""Filters out log records with the "hide" attribute set to True.

This filter allows you to override default logging behavior. For example, if errors
are printed by default, and you don't want to print a specific error, you can set
the "hide" attribute to avoid printing the message.

.. testcode::

import logging
logger = logging.getLogger("ray.train.spam")

# This warning won't be printed to the console.
logger.warning("ham", extra={"hide": True})
"""

def filter(self, record):
return not getattr(record, TrainLogKey.HIDE, False)


class TrainContextFilter(logging.Filter):
"""Add rank and size information to the log record if the log is from a train worker.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"""Add rank and size information to the log record if the log is from a train worker.
"""Add training worker rank information to the log record.


This filter is a subclass of CoreContextFilter, which adds the job_id, worker_id,
and node_id to the log record. This filter adds the rank and size information of
the train context to the log record.
Comment on lines +99 to +102
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This filter is a subclass of CoreContextFilter, which adds the job_id, worker_id,
and node_id to the log record. This filter adds the rank and size information of
the train context to the log record.

"""

# TODO (hpguo): This implementation is subject to change in Train V2.
def _is_worker_process(self) -> bool:
# If this process does not have a train session, it is a driver process,
# not a worker process.
if not _get_session():
return False
# If this process has a train session, but its world size field is None,
# it means the process is the train controller process created from Tune.
# It is not a worker process, either.
return _get_session().world_size is not None

def filter(self, record):
# If this process does not have a train session, it is a driver process,
# not a worker process. We don't need to add any extra information to the log.
if not _get_session():
return True
# Add the run_id info to the log for all ray train processes. Including a
# controller process created from Tune and the worker processes.
setattr(record, TrainLogKey.RUN_ID, _get_session().run_id)
# If the process is a train controller process created from Tune, we don't
# need to add the rank and size information to the log record.
if not self._is_worker_process():
return True
# Otherwise, we need to check if the corresponding field of the train session
# is None or not. If it is not None, we add the field to the log record.
# If it is None, it means the process is the train driver created from Tune.
setattr(record, TrainLogKey.WORLD_SIZE, _get_session().world_size)
setattr(record, TrainLogKey.WORLD_RANK, _get_session().world_rank)
setattr(record, TrainLogKey.LOCAL_WORLD_SIZE, _get_session().local_rank)
setattr(record, TrainLogKey.LOCAL_RANK, _get_session().local_world_size)
setattr(record, TrainLogKey.NODE_RANK, _get_session().node_rank)
if _get_session().world_rank != 0:
setattr(record, TrainLogKey.HIDE, True)
return True


class SessionFileHandler(logging.Handler):
"""A handler that writes to a log file in the Ray session directory.

The Ray session directory isn't available until Ray is initialized, so this handler
lazily creates the file handler when you emit a log record.

Args:
filename: The name of the log file. The file is created in the 'logs' directory
of the Ray session directory.
"""

def __init__(self, filename: str):
super().__init__()
self._filename = filename
self._handler = None
self._formatter = None
self._path = None

def emit(self, record):
if self._handler is None:
self._try_create_handler()
if self._handler is not None:
self._handler.emit(record)

def setFormatter(self, fmt: logging.Formatter) -> None:
if self._handler is not None:
self._handler.setFormatter(fmt)
self._formatter = fmt

def _try_create_handler(self):
assert self._handler is None

log_directory = get_log_directory()
if log_directory is None:
return

os.makedirs(log_directory, exist_ok=True)

self._path = os.path.join(log_directory, self._filename)
self._handler = logging.FileHandler(self._path)
if self._formatter is not None:
self._handler.setFormatter(self._formatter)


def configure_logging() -> None:
"""Configure the Python logger named 'ray.train'.

This function loads the configration YAML specified by "RAY_TRAIN_LOGGING_CONFIG"
environment variable. If the variable isn't set, this function loads the default
DEFAULT_LOG_CONFIG_JSON_STRING that is in this module.

If "RAY_TRAIN_LOG_ENCODING" is specified as "JSON" we will enable JSON reading mode
if using the default logging config.
"""

def _load_logging_config(config_path: str):
with open(config_path) as file:
config = yaml.safe_load(file)
return config

# Dynamically configure the logger based on the environment variables.
ray_train_log_encoding = os.environ.get(LOG_ENCODING_ENV)
ray_train_log_config_path = os.environ.get(LOG_CONFIG_PATH_ENV)

if ray_train_log_config_path is not None:
config = _load_logging_config(ray_train_log_config_path)
else:
config = DEFAULT_LOG_CONFIG_JSON_STRING
if (
ray_train_log_encoding is not None
and ray_train_log_encoding.upper() == DEFAULT_JSON_LOG_ENCODING_FORMAT
):
for logger in config["loggers"].values():
justinvyu marked this conversation as resolved.
Show resolved Hide resolved
logger["handlers"].remove("console_text")
logger["handlers"].append("console_json")

logging.config.dictConfig(config)

# After configuring logger, warn if RAY_TRAIN_LOGGING_CONFIG_PATH is used with
# RAY_TRAIN_LOG_ENCODING, because they are not both supported together.
if ray_train_log_config_path is not None and ray_train_log_encoding is not None:
logger = logging.getLogger("ray.train")
logger.warning(
f"Using {LOG_ENCODING_ENV} is not supported with "
f"{LOG_CONFIG_PATH_ENV}."
"If you are already specifying a custom config yaml file, "
"please configure your desired encoding in the yaml file as well."
)


def get_log_directory() -> Optional[str]:
"""Return the directory where Ray Train writes log files.

If Ray isn't initialized, this function returns ``None``.
"""
global_node = ray._private.worker._global_node
if global_node is None:
return None

session_dir = global_node.get_session_dir_path()
return os.path.join(session_dir, "logs", "ray-train")
8 changes: 8 additions & 0 deletions python/ray/train/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,21 @@ def _get_ray_train_session_dir() -> str:
# Defaults to 0
RAY_TRAIN_ENABLE_STATE_TRACKING = "RAY_TRAIN_ENABLE_STATE_TRACKING"

# Env. variable to specify the encoding of the file logs when using the default config.
LOG_ENCODING_ENV = "RAY_TRAIN_LOG_ENCODING"

# Env. variable to specify the logging config path use defaults if not set
LOG_CONFIG_PATH_ENV = "RAY_TRAIN_LOG_CONFIG_PATH"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move all these constants into the logging.py file? I'd rather keep this module isolated and have fewer changes in train/* code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we want to make this module isolated from the other parts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the default configuration json string and the default decoding format variables to the logging.py, but still kept the two env var name variables within constant.py. I think it makes more sense to keep all the user controlled env var names within constant.py.

# NOTE: When adding a new environment variable, please track it in this list.
TRAIN_ENV_VARS = {
ENABLE_DETAILED_AUTOFILLED_METRICS_ENV,
ENABLE_SHARE_CUDA_VISIBLE_DEVICES_ENV,
ENABLE_SHARE_NEURON_CORES_ACCELERATOR_ENV,
TRAIN_PLACEMENT_GROUP_TIMEOUT_S_ENV,
TRAIN_ENABLE_WORKER_SPREAD_ENV,
LOG_ENCODING_ENV,
LOG_CONFIG_PATH_ENV,
RAY_CHDIR_TO_TRIAL_DIR,
RAY_TRAIN_COUNT_PREEMPTION_AS_FAILURE,
RAY_TRAIN_ENABLE_STATE_TRACKING,
Expand Down
3 changes: 1 addition & 2 deletions python/ray/train/data_parallel_trainer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import uuid
from typing import Any, Callable, Dict, List, Optional, Type, Union

import ray
Expand Down Expand Up @@ -444,7 +443,7 @@ def training_loop(self) -> None:
driver_ip=ray.util.get_node_ip_address(),
driver_node_id=ray.get_runtime_context().get_node_id(),
experiment_name=session.get_experiment_name(),
run_id=uuid.uuid4().hex,
run_id=session.get_run_id(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own understanding, this training_loop function is part of tuner.fit() function that will be called in a tune process. Therefore, the session.get_run_id() will actually get the run_id from a tune process, although this seesion is imported from ray.train._internal. This session is actually initialized inside functional_trainable.py which defined under tune/trainable/function_trainable.py. cc @justinvyu @matthewdeng

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's right, the training_loop here is the Train driver logic that is running inside the tune.FunctionTrainable. So the "session" refers to the Ray Tune session, not the Ray Train Worker session.

)

backend_executor = self._backend_executor_cls(
Expand Down
Loading