Skip to content

Commit

Permalink
[release] stream the full anyscale log to buildkite
Browse files Browse the repository at this point in the history
Signed-off-by: can <can@anyscale.com>
  • Loading branch information
can-anyscale committed Sep 24, 2024
1 parent d2982b7 commit e489cb1
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 6 deletions.
7 changes: 6 additions & 1 deletion release/ray_release/buildkite/step.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,12 @@ def get_step(

step = copy.deepcopy(DEFAULT_STEP_TEMPLATE)

cmd = ["./release/run_release_test.sh", test["name"]]
cmd = [
"./release/run_release_test.sh",
test["name"],
"--log-streaming-limit",
"100",
]

for file in test_collection_file or []:
cmd += ["--test-collection-file", file]
Expand Down
4 changes: 3 additions & 1 deletion release/ray_release/cluster_manager/cluster_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
add_tags_to_aws_config,
RELEASE_AWS_RESOURCE_TYPES_TO_TRACK_FOR_BILLING,
)
from ray_release.anyscale_util import get_project_name
from ray_release.anyscale_util import get_project_name, LAST_LOGS_LENGTH
from ray_release.config import DEFAULT_AUTOSUSPEND_MINS, DEFAULT_MAXIMUM_UPTIME_MINS
from ray_release.test import Test
from ray_release.exception import CloudInfoError
Expand All @@ -24,13 +24,15 @@ def __init__(
project_id: str,
sdk: Optional["AnyscaleSDK"] = None,
smoke_test: bool = False,
log_streaming_limit: int = LAST_LOGS_LENGTH,
):
self.sdk = sdk or get_anyscale_sdk()

self.test = test
self.smoke_test = smoke_test
self.project_id = project_id
self.project_name = get_project_name(self.project_id, self.sdk)
self.log_streaming_limit = log_streaming_limit

self.cluster_name = (
f"{test.get_name()}{'-smoke-test' if smoke_test else ''}_{int(time.time())}"
Expand Down
6 changes: 5 additions & 1 deletion release/ray_release/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from typing import Optional, List, Tuple

from ray_release.alerts.handle import handle_result, require_result
from ray_release.anyscale_util import get_cluster_name
from ray_release.anyscale_util import get_cluster_name, LAST_LOGS_LENGTH
from ray_release.buildkite.output import buildkite_group, buildkite_open_last
from ray_release.cluster_manager.cluster_manager import ClusterManager
from ray_release.cluster_manager.full import FullClusterManager
Expand Down Expand Up @@ -75,6 +75,7 @@ def _load_test_configuration(
smoke_test: bool = False,
no_terminate: bool = False,
test_definition_root: Optional[str] = None,
log_streaming_limit: int = LAST_LOGS_LENGTH,
) -> Tuple[ClusterManager, CommandRunner, str]:
logger.info(f"Test config: {test}")

Expand Down Expand Up @@ -129,6 +130,7 @@ def _load_test_configuration(
test,
anyscale_project,
smoke_test=smoke_test,
log_streaming_limit=log_streaming_limit,
)
command_runner = command_runner_cls(
cluster_manager,
Expand Down Expand Up @@ -390,6 +392,7 @@ def run_release_test(
cluster_env_id: Optional[str] = None,
no_terminate: bool = False,
test_definition_root: Optional[str] = None,
log_streaming_limit: int = LAST_LOGS_LENGTH,
) -> Result:
old_wd = os.getcwd()
start_time = time.monotonic()
Expand All @@ -407,6 +410,7 @@ def run_release_test(
smoke_test,
no_terminate,
test_definition_root,
log_streaming_limit,
)
buildkite_group(":nut_and_bolt: Setting up cluster environment")
(
Expand Down
8 changes: 6 additions & 2 deletions release/ray_release/job_manager/anyscale_job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
CreateProductionJob,
HaJobStates,
)
from ray_release.anyscale_util import LAST_LOGS_LENGTH, get_cluster_name
from ray_release.anyscale_util import get_cluster_name
from ray_release.cluster_manager.cluster_manager import ClusterManager
from ray_release.exception import (
CommandTimeout,
Expand Down Expand Up @@ -273,7 +273,11 @@ def _get_ray_logs(self) -> str:
"""
Obtain the last few logs
"""
return anyscale.job.get_logs(id=self.job_id, max_lines=LAST_LOGS_LENGTH)
if self.cluster_manager.log_streaming_limit == -1:
return anyscale.job.get_logs(id=self.job_id)
return anyscale.job.get_logs(
id=self.job_id, max_lines=self.cluster_manager.log_streaming_limit
)

def get_last_logs(self):
if not self.job_id:
Expand Down
9 changes: 9 additions & 0 deletions release/ray_release/scripts/run_release_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from ray_release.reporter.ray_test_db import RayTestDBReporter
from ray_release.reporter.log import LogReporter
from ray_release.result import Result
from ray_release.anyscale_util import LAST_LOGS_LENGTH


@click.command()
Expand Down Expand Up @@ -89,6 +90,12 @@
type=str,
help="Root of the test definition files. Default is the root of the repo.",
)
@click.option(
"--log-streaming-limit",
default=LAST_LOGS_LENGTH,
type=int,
help="Limit of log streaming in number of lines. Set to -1 to stream all logs.",
)
def main(
test_name: str,
test_collection_file: Tuple[str],
Expand All @@ -100,6 +107,7 @@ def main(
global_config: str = "oss_config.yaml",
no_terminate: bool = False,
test_definition_root: Optional[str] = None,
log_streaming_limit: int = LAST_LOGS_LENGTH,
):
global_config_file = os.path.join(
os.path.dirname(__file__), "..", "configs", global_config
Expand Down Expand Up @@ -157,6 +165,7 @@ def main(
cluster_env_id=cluster_env_id,
no_terminate=no_terminate,
test_definition_root=test_definition_root,
log_streaming_limit=log_streaming_limit,
)
return_code = result.return_code
except ReleaseTestError as e:
Expand Down
9 changes: 8 additions & 1 deletion release/ray_release/tests/test_glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,14 @@ def __init__(
project_id: str,
sdk=None,
smoke_test: bool = False,
log_streaming_limit: int = 100,
):
super(MockClusterManager, self).__init__(
test_name, project_id, this_sdk, smoke_test=smoke_test
test_name,
project_id,
this_sdk,
smoke_test=smoke_test,
log_streaming_limit=log_streaming_limit,
)
self.return_dict = this_cluster_manager_return
this_instances["cluster_manager"] = self
Expand Down Expand Up @@ -237,6 +242,7 @@ def _run(self, result: Result, **kwargs):
test=self.test,
anyscale_project=self.anyscale_project,
result=result,
log_streaming_limit=1000,
**kwargs
)

Expand Down Expand Up @@ -473,6 +479,7 @@ def testAlertFails(self):

self.assertEqual(result.return_code, ExitCode.COMMAND_ALERT.value)
self.assertEqual(result.status, "error")
self.assertEqual(self.instances["cluster_manager"].log_streaming_limit, 1000)

# Ensure cluster was terminated
self.assertGreaterEqual(self.sdk.call_counter["terminate_cluster"], 1)
Expand Down

0 comments on commit e489cb1

Please sign in to comment.