Skip to content

Commit

Permalink
Print the status of finished jobs (#2801)
Browse files Browse the repository at this point in the history
* `neuro run`, `neuro attach`, `neuro logs`, `neuro exec` in non-quiet
  mode print now a line with the status of finished job. It helps to notice
  jobs terminated by the OOM Killer and cancelled jobs.
* `neuro attach` to finished job now prints its status instead of
  "Job is running ..."
* Job start progress now correctly handles statuses SUSPENDED,
  CANCELLED, UNKNOWN.
  • Loading branch information
serhiy-storchaka authored Jul 29, 2022
1 parent d61909e commit 05a4d12
Show file tree
Hide file tree
Showing 35 changed files with 338 additions and 145 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.D/2800.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Commands `neuro run`, `neuro logs`, `neuro attach` and `neuro exec` in non-quiet mode now prints details for cancelled and failed jobs. Also improved other indications of the job status.
133 changes: 81 additions & 52 deletions neuro-cli/src/neuro_cli/ael.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from prompt_toolkit.keys import Keys
from prompt_toolkit.output import Output, create_output
from prompt_toolkit.shortcuts import PromptSession
from rich.markup import escape as rich_escape

from neuro_sdk import (
JobDescription,
Expand All @@ -37,7 +38,10 @@
log = logging.getLogger(__name__)


JOB_STARTED = "[dim]===== Job is running, press Ctrl-C to detach/kill =====[/dim]"
JOB_STARTED_NEURO_HAS_TTY = (
"[green]√[/green] "
"[dim]===== Job is running, press Ctrl-C to detach/kill =====[/dim]"
)

JOB_STARTED_NEURO_HAS_NO_TTY = (
"[dim]===== Job is running, press Ctrl-C to detach =====[/dim]"
Expand Down Expand Up @@ -66,13 +70,15 @@ class InterruptAction(enum.Enum):
class AttachHelper:
attach_ready: bool
log_printed: bool
job_started_msg: str
write_sem: asyncio.Semaphore
quiet: bool
action: InterruptAction

def __init__(self, *, quiet: bool) -> None:
self.attach_ready = False
self.log_printed = False
self.job_started_msg = ""
self.write_sem = asyncio.Semaphore()
self.quiet = quiet
self.action = InterruptAction.NOTHING
Expand Down Expand Up @@ -109,7 +115,10 @@ async def process_logs(
if helper.attach_ready:
return
async with helper.write_sem:
helper.log_printed = True
if not helper.log_printed:
if not root.quiet:
root.print(helper.job_started_msg, markup=True)
helper.log_printed = True
sys.stdout.write(txt)
sys.stdout.flush()
else:
Expand All @@ -128,6 +137,10 @@ async def process_exec(
finally:
root.soft_reset_tty()

if not root.quiet:
status = await root.client.jobs.status(job)
print_job_result(root, status)

sys.exit(exit_code)


Expand Down Expand Up @@ -278,16 +291,14 @@ async def _process_attach_single_try(
root, job.id, logs, cluster_name=job.cluster_name
)

with JobStopProgress.create(
console=root.console,
quiet=root.quiet,
) as progress:
if action == InterruptAction.KILL:
if action == InterruptAction.KILL:
with JobStopProgress.create(root.console, quiet=root.quiet) as progress:
progress.kill(job)
sys.exit(128 + signal.SIGINT)
elif action == InterruptAction.DETACH:
sys.exit(128 + signal.SIGINT)
elif action == InterruptAction.DETACH:
with JobStopProgress.create(root.console, quiet=root.quiet) as progress:
progress.detach(job)
sys.exit(0)
sys.exit(0)
except ResourceNotFound:
# Container already stopped, so we can ignore such error.
pass
Expand Down Expand Up @@ -315,28 +326,25 @@ async def _process_attach_single_try(
# The class pins the current time in counstructor,
# that's why we need to initialize
# it AFTER the disconnection from attached session.
with JobStopProgress.create(
console=root.console,
quiet=root.quiet,
) as progress:
while job.status == JobStatus.RUNNING:
await asyncio.sleep(0.2)
job = await root.client.jobs.status(job.id)
with JobStopProgress.create(root.console, quiet=root.quiet) as progress:
while not job.status.is_finished:
if not progress.step(job):
sys.exit(EX_IOERR)
if job.status == JobStatus.FAILED:
sys.exit(job.history.exit_code or EX_PLATFORMERROR)
await asyncio.sleep(0.2)
job = await root.client.jobs.status(job.id)
progress.end(job)
if job.status == JobStatus.FAILED:
sys.exit(job.history.exit_code or EX_PLATFORMERROR)
else:
sys.exit(job.history.exit_code)


async def _attach_tty(
root: Root, job: str, logs: bool, *, cluster_name: Optional[str]
) -> InterruptAction:
if not root.quiet:
root.print(JOB_STARTED_TTY, markup=True)

loop = asyncio.get_event_loop()
helper = AttachHelper(quiet=root.quiet)
helper.job_started_msg = JOB_STARTED_TTY

stdout = create_output()
h, w = stdout.get_size()
Expand All @@ -357,6 +365,7 @@ async def _attach_tty(
if status.status is not JobStatus.RUNNING:
# Job is finished
await logs_printer
print_job_result(root, status)
if status.status == JobStatus.FAILED:
sys.exit(status.history.exit_code or EX_PLATFORMERROR)
else:
Expand Down Expand Up @@ -484,33 +493,22 @@ async def _process_stdout_tty(
else:
txt = decoder.decode(chunk.data)
async with helper.write_sem:
if not helper.quiet and not helper.attach_ready:
# Print header to stdout only,
# logs are printed to stdout and never to
# stderr (but logs printing is stopped by
# helper.attach_ready = True regardless
# what stream had receive text in attached mode.
if helper.log_printed:
s = ATTACH_STARTED_AFTER_LOGS
if root.tty:
s = "[green]√[/green] " + s
root.print(s, markup=True)
helper.attach_ready = True
if not helper.attach_ready:
await _print_header(root, helper)
helper.attach_ready = True
stdout.write_raw(txt)
stdout.flush()


async def _attach_non_tty(
root: Root, job: str, logs: bool, *, cluster_name: Optional[str]
) -> InterruptAction:
if not root.quiet:
s = JOB_STARTED_NEURO_HAS_NO_TTY
if root.tty:
s = "[green]√[/green] " + JOB_STARTED
root.print(s, markup=True)

loop = asyncio.get_event_loop()
helper = AttachHelper(quiet=root.quiet)
if root.tty:
helper.job_started_msg = JOB_STARTED_NEURO_HAS_TTY
else:
helper.job_started_msg = JOB_STARTED_NEURO_HAS_NO_TTY

if logs:
logs_printer = loop.create_task(
Expand All @@ -527,6 +525,7 @@ async def _attach_non_tty(
if status.history.exit_code is not None:
# Wait for logs printing finish before exit
await logs_printer
print_job_result(root, status)
sys.exit(status.history.exit_code)

input_task = None
Expand Down Expand Up @@ -580,18 +579,9 @@ async def _process_stdout_non_tty(
async def _write(fileno: int, txt: str) -> None:
f = streams[fileno]
async with helper.write_sem:
if not helper.quiet and not helper.attach_ready:
# Print header to stdout only,
# logs are printed to stdout and never to
# stderr (but logs printing is stopped by
# helper.attach_ready = True regardless
# what stream had receive text in attached mode.
if helper.log_printed:
s = ATTACH_STARTED_AFTER_LOGS
if root.tty:
s = "[green]√[/green] " + s
root.print(s, markup=True)
helper.attach_ready = True
if not helper.attach_ready:
await _print_header(root, helper)
helper.attach_ready = True
f.write(txt)
f.flush()

Expand All @@ -608,6 +598,23 @@ async def _write(fileno: int, txt: str) -> None:
await _write(chunk.fileno, txt)


async def _print_header(root: Root, helper: AttachHelper) -> None:
if not helper.quiet and not helper.attach_ready:
# Print header to stdout only,
# logs are printed to stdout and never to
# stderr (but logs printing is stopped by
# helper.attach_ready = True regardless
# what stream had receive text in attached mode.
if helper.log_printed:
s = ATTACH_STARTED_AFTER_LOGS
if root.tty:
s = "[green]√[/green] " + s
root.print(s, markup=True)
else:
if not root.quiet:
root.print(helper.job_started_msg, markup=True)


def _create_interruption_dialog() -> PromptSession[InterruptAction]:
bindings = KeyBindings()

Expand Down Expand Up @@ -701,3 +708,25 @@ async def _cancel_attach_output(root: Root, output_task: "asyncio.Task[Any]") ->
if ex and isinstance(ex, StdStreamError):
return
await root.cancel_with_logging(output_task)


def print_job_result(root: Root, job: JobDescription) -> None:
if job.status == JobStatus.SUCCEEDED and root.verbosity > 0:
msg = f"Job [b]{job.id}[/b] finished successfully"
if root.tty:
msg = "[green]√[/green] " + msg
root.print(msg, markup=True)
if job.status == JobStatus.CANCELLED and root.verbosity >= 0:
msg = f"Job [b]{job.id}[/b] was cancelled"
if root.tty:
msg = "[green]√[/green] " + msg
if job.history.reason:
msg += f" ({rich_escape(job.history.reason)})"
root.print(msg, markup=True)
if job.status == JobStatus.FAILED and root.verbosity >= 0:
msg = f"Job [b]{job.id}[/b] failed"
if root.tty:
msg = "[red]×[/red] " + msg
if job.history.reason:
msg += f" ({rich_escape(job.history.reason)})"
root.print(msg, markup=True)
64 changes: 46 additions & 18 deletions neuro-cli/src/neuro_cli/formatters/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ def _get_status_reason_message(self, job: JobDescription) -> str:
return ""

def _get_status_description_message(self, job: JobDescription) -> str:
description = job.history.description or ""
description = job.history.description
if description:
return f"({description})"
return ""
Expand Down Expand Up @@ -599,13 +599,13 @@ def begin(self, job: JobDescription) -> None:
def step(self, job: JobDescription) -> None:
new_time = self.time_factory()
dt = new_time - self._time
if job.status == JobStatus.PENDING:
if job.status.is_pending:
msg = Text("-", "yellow")
elif job.status == JobStatus.FAILED:
msg = Text("×", "red")
else:
# RUNNING or SUCCEDED
elif job.status in (JobStatus.RUNNING, JobStatus.SUCCEEDED):
msg = Text("√", "green")
else:
# FAILED or CANCELLED or UNKNOWN
msg = Text("×", "red")

msg = Text.assemble(msg, " Status: ", fmt_status(job.status))
reason = self._get_status_reason_message(job)
Expand Down Expand Up @@ -635,7 +635,7 @@ def end(self, job: JobDescription) -> None:
self._prev = empty
self._live_render.set_renderable(empty)

if job.status != JobStatus.FAILED:
if not job.status.is_finished:
http_url = job.http_url
if http_url:
out.append(f"{yes()} [b]Http URL[/b]: {rich_escape(str(http_url))}")
Expand Down Expand Up @@ -739,6 +739,9 @@ def step(self, job: JobDescription) -> bool:
def tick(self, job: JobDescription) -> None:
pass

def end(self, job: JobDescription) -> None:
pass

def timeout(self, job: JobDescription) -> None:
pass

Expand Down Expand Up @@ -790,18 +793,29 @@ def kill(self, job: JobDescription) -> None:
]
)

def end(self, job: JobDescription) -> None:
if job.status == JobStatus.SUCCEEDED:
msg = yes() + f" Job [b]{job.id}[/b] finished successfully"
elif job.status == JobStatus.CANCELLED:
msg = yes() + f" Job [b]{job.id}[/b] was cancelled"
if job.history.reason:
msg += f" ({rich_escape(job.history.reason)})"
else:
msg = no() + f" Job [b]{job.id}[/b] failed"
if job.history.reason:
msg += f" ({rich_escape(job.history.reason)})"

self._live_render.set_renderable(Text.from_markup(msg))
with self._console:
self._console.print(Control())

def tick(self, job: JobDescription) -> None:
new_time = self.time_factory()
dt = new_time - self._time

if job.status == JobStatus.RUNNING:
msg = (
"[yellow]-[/yellow]"
+ f" Wait for stop {next(self._spinner)} [{dt:.1f} sec]"
)
else:
msg = yes() + f" Job [b]{job.id}[/b] stopped"

msg = (
"[yellow]-[/yellow]"
+ f" Wait for stop {next(self._spinner)} [{dt:.1f} sec]"
)
self._live_render.set_renderable(Text.from_markup(msg))
with self._console:
self._console.print(Control())
Expand Down Expand Up @@ -853,16 +867,30 @@ class StreamJobStopProgress(JobStopProgress):
def __init__(self, console: Console) -> None:
super().__init__()
self._console = console
self._console.print("Wait for stopping")
self._first = True

def detach(self, job: JobDescription) -> None:
pass

def kill(self, job: JobDescription) -> None:
self._console.print("Job was killed")

def end(self, job: JobDescription) -> None:
if job.status == JobStatus.CANCELLED:
msg = "Job was cancelled"
if job.history.reason:
msg += f" ({job.history.reason})"
self._console.print(msg)
if job.status == JobStatus.FAILED:
msg = "Job failed"
if job.history.reason:
msg += f" ({job.history.reason})"
self._console.print(msg)

def tick(self, job: JobDescription) -> None:
pass
if self._first:
self._console.print("Wait for stopping")
self._first = False

def timeout(self, job: JobDescription) -> None:
self._console.print("")
Expand Down
Loading

0 comments on commit 05a4d12

Please sign in to comment.