Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: make block_call as abstractmethod; merge block_checkcall #484

Merged
merged 4 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion dpdispatcher/base_context.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from abc import ABCMeta, abstractmethod
from typing import List, Tuple
from typing import Any, List, Tuple

from dargs import Argument

Expand Down Expand Up @@ -73,6 +73,66 @@
def check_finish(self, proc):
raise NotImplementedError("abstract method")

def block_checkcall(self, cmd, asynchronously=False) -> Tuple[Any, Any, Any]:
"""Run command with arguments. Wait for command to complete.

Parameters
----------
cmd : str
The command to run.
asynchronously : bool, optional, default=False
Run command asynchronously. If True, `nohup` will be used to run the command.

Returns
-------
stdin
standard inout
stdout
standard output
stderr
standard error

Raises
------
RuntimeError
when the return code is not zero
"""
if asynchronously:
cmd = f"nohup {cmd} >/dev/null &"
exit_status, stdin, stdout, stderr = self.block_call(cmd)
if exit_status != 0:
raise RuntimeError(

Check warning on line 104 in dpdispatcher/base_context.py

View check run for this annotation

Codecov / codecov/patch

dpdispatcher/base_context.py#L100-L104

Added lines #L100 - L104 were not covered by tests
"Get error code %d in calling %s with job: %s . message: %s"
% (
exit_status,
cmd,
self.submission.submission_hash,
stderr.read().decode("utf-8"),
)
)
return stdin, stdout, stderr

Check warning on line 113 in dpdispatcher/base_context.py

View check run for this annotation

Codecov / codecov/patch

dpdispatcher/base_context.py#L113

Added line #L113 was not covered by tests

@abstractmethod
def block_call(self, cmd) -> Tuple[int, Any, Any, Any]:
"""Run command with arguments. Wait for command to complete.

Parameters
----------
cmd : str
The command to run.

Returns
-------
exit_status
exit code
stdin
standard inout
stdout
standard output
stderr
standard error
"""

@classmethod
def machine_arginfo(cls) -> Argument:
"""Generate the machine arginfo.
Expand Down
5 changes: 5 additions & 0 deletions dpdispatcher/contexts/dp_cloud_server_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@
)
]

def block_call(self, cmd):
raise RuntimeError(

Check warning on line 339 in dpdispatcher/contexts/dp_cloud_server_context.py

View check run for this annotation

Codecov / codecov/patch

dpdispatcher/contexts/dp_cloud_server_context.py#L339

Added line #L339 was not covered by tests
"Unsupported method. You may use an unsupported combination of the machine and the context."
)


DpCloudServerContext = BohriumContext
LebesgueContext = BohriumContext
Expand Down
5 changes: 5 additions & 0 deletions dpdispatcher/contexts/hdfs_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,8 @@

def read_file(self, fname):
return HDFS.read_hdfs_file(os.path.join(self.remote_root, fname))

def block_call(self, cmd):
raise RuntimeError(

Check warning on line 249 in dpdispatcher/contexts/hdfs_context.py

View check run for this annotation

Codecov / codecov/patch

dpdispatcher/contexts/hdfs_context.py#L249

Added line #L249 was not covered by tests
"Unsupported method. You may use an unsupported combination of the machine and the context."
)
17 changes: 0 additions & 17 deletions dpdispatcher/contexts/lazy_local_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,6 @@ def download(
# else:
# raise RuntimeError('do not find download file ' + fname)

def block_checkcall(self, cmd):
# script_dir = os.path.join(self.local_root, self.submission.work_base)
# os.chdir(script_dir)
proc = sp.Popen(
cmd, cwd=self.local_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE
)
o, e = proc.communicate()
stdout = SPRetObj(o)
stderr = SPRetObj(e)
code = proc.returncode
if code != 0:
raise RuntimeError(
"Get error code %d in locally calling %s with job: %s ",
(code, cmd, self.submission.submission_hash),
)
return None, stdout, stderr

def block_call(self, cmd):
proc = sp.Popen(
cmd, cwd=self.local_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE
Expand Down
15 changes: 0 additions & 15 deletions dpdispatcher/contexts/local_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,21 +288,6 @@ def download(
# no nothing in the case of linked files
pass

def block_checkcall(self, cmd):
proc = sp.Popen(
cmd, cwd=self.remote_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE
)
o, e = proc.communicate()
stdout = SPRetObj(o)
stderr = SPRetObj(e)
code = proc.returncode
if code != 0:
raise RuntimeError(
f"Get error code {code} in locally calling {cmd} with job: {self.submission.submission_hash}"
f"\nStandard error: {stderr}"
)
return None, stdout, stderr

def block_call(self, cmd):
proc = sp.Popen(
cmd, cwd=self.remote_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE
Expand Down
5 changes: 5 additions & 0 deletions dpdispatcher/contexts/openapi_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,8 @@
dir_to_be_removed = os.path.join(local_root, "backup")
if os.path.exists(dir_to_be_removed):
shutil.rmtree(dir_to_be_removed)

def block_call(self, cmd):
raise RuntimeError(

Check warning on line 263 in dpdispatcher/contexts/openapi_context.py

View check run for this annotation

Codecov / codecov/patch

dpdispatcher/contexts/openapi_context.py#L263

Added line #L263 was not covered by tests
"Unsupported method. You may use an unsupported combination of the machine and the context."
)
35 changes: 0 additions & 35 deletions dpdispatcher/contexts/ssh_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,41 +767,6 @@ def download(
tar_compress=self.remote_profile.get("tar_compress", None),
)

def block_checkcall(self, cmd, asynchronously=False, stderr_whitelist=None):
"""Run command with arguments. Wait for command to complete. If the return code
was zero then return, otherwise raise RuntimeError.

Parameters
----------
cmd : str
The command to run.
asynchronously : bool, optional, default=False
Run command asynchronously. If True, `nohup` will be used to run the command.
stderr_whitelist : list of str, optional, default=None
If not None, the stderr will be checked against the whitelist. If the stderr
contains any of the strings in the whitelist, the command will be considered
successful.
"""
assert self.remote_root is not None
self.ssh_session.ensure_alive()
if asynchronously:
cmd = f"nohup {cmd} >/dev/null &"
stdin, stdout, stderr = self.ssh_session.exec_command(
(f"cd {shlex.quote(self.remote_root)} ;") + cmd
)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
raise RuntimeError(
"Get error code %d in calling %s through ssh with job: %s . message: %s"
% (
exit_status,
cmd,
self.submission.submission_hash,
stderr.read().decode("utf-8"),
)
)
return stdin, stdout, stderr

def block_call(self, cmd):
assert self.remote_root is not None
self.ssh_session.ensure_alive()
Expand Down
Loading