Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: make block_call as abstractmethod; merge block_checkcall #484

Merged
merged 4 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions dpdispatcher/base_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,67 @@
def check_finish(self, proc):
raise NotImplementedError("abstract method")

def block_checkcall(self, cmd, asynchronously=False):
"""Run command with arguments. Wait for command to complete.

Parameters
----------
cmd : str
The command to run.
asynchronously : bool, optional, default=False
Run command asynchronously. If True, `nohup` will be used to run the command.

Returns
-------
stdin
standard inout
stdout
standard output
stderr
standard error

Raises
------
RuntimeError
when the return code is not zero
"""
if asynchronously:
cmd = f"nohup {cmd} >/dev/null &"
exit_status, stdin, stdout, stderr = self.block_call(cmd)
if exit_status != 0:
raise RuntimeError(
"Get error code %d in calling %s with job: %s . message: %s"
% (
exit_status,
cmd,
self.submission.submission_hash,
stderr.read().decode("utf-8"),
)
)
return stdin, stdout, stderr

@abstractmethod
def block_call(self, cmd):
"""Run command with arguments. Wait for command to complete.

Parameters
----------
cmd : str
The command to run.

Returns
-------
exit_status
exit code
stdin
standard inout
stdout
standard output
stderr
standard error
"""
return exit_status, stdin, stdout, stderr

Check failure on line 135 in dpdispatcher/base_context.py

View workflow job for this annotation

GitHub Actions / pyright

"exit_status" is not defined (reportUndefinedVariable)

Check failure on line 135 in dpdispatcher/base_context.py

View workflow job for this annotation

GitHub Actions / pyright

"stdin" is not defined (reportUndefinedVariable)

Check failure on line 135 in dpdispatcher/base_context.py

View workflow job for this annotation

GitHub Actions / pyright

"stdout" is not defined (reportUndefinedVariable)

Check failure on line 135 in dpdispatcher/base_context.py

View workflow job for this annotation

GitHub Actions / pyright

"stderr" is not defined (reportUndefinedVariable)
njzjz marked this conversation as resolved.
Show resolved Hide resolved

@classmethod
def machine_arginfo(cls) -> Argument:
"""Generate the machine arginfo.
Expand Down
5 changes: 5 additions & 0 deletions dpdispatcher/contexts/dp_cloud_server_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ def machine_subfields(cls) -> List[Argument]:
)
]

def block_call(self, cmd):
raise RuntimeError(
"Unsupported method. You may use an unsupported combination of the machine and the context."
)


DpCloudServerContext = BohriumContext
LebesgueContext = BohriumContext
Expand Down
5 changes: 5 additions & 0 deletions dpdispatcher/contexts/hdfs_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,3 +244,8 @@ def write_file(self, fname, write_str):

def read_file(self, fname):
return HDFS.read_hdfs_file(os.path.join(self.remote_root, fname))

def block_call(self, cmd):
raise RuntimeError(
"Unsupported method. You may use an unsupported combination of the machine and the context."
)
17 changes: 0 additions & 17 deletions dpdispatcher/contexts/lazy_local_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,23 +112,6 @@ def download(
# else:
# raise RuntimeError('do not find download file ' + fname)

def block_checkcall(self, cmd):
# script_dir = os.path.join(self.local_root, self.submission.work_base)
# os.chdir(script_dir)
proc = sp.Popen(
cmd, cwd=self.local_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE
)
o, e = proc.communicate()
stdout = SPRetObj(o)
stderr = SPRetObj(e)
code = proc.returncode
if code != 0:
raise RuntimeError(
"Get error code %d in locally calling %s with job: %s ",
(code, cmd, self.submission.submission_hash),
)
return None, stdout, stderr

def block_call(self, cmd):
proc = sp.Popen(
cmd, cwd=self.local_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE
Expand Down
15 changes: 0 additions & 15 deletions dpdispatcher/contexts/local_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,21 +288,6 @@ def download(
# no nothing in the case of linked files
pass

def block_checkcall(self, cmd):
proc = sp.Popen(
cmd, cwd=self.remote_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE
)
o, e = proc.communicate()
stdout = SPRetObj(o)
stderr = SPRetObj(e)
code = proc.returncode
if code != 0:
raise RuntimeError(
f"Get error code {code} in locally calling {cmd} with job: {self.submission.submission_hash}"
f"\nStandard error: {stderr}"
)
return None, stdout, stderr

def block_call(self, cmd):
proc = sp.Popen(
cmd, cwd=self.remote_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE
Expand Down
5 changes: 5 additions & 0 deletions dpdispatcher/contexts/openapi_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,3 +258,8 @@ def _clean_backup(self, local_root, keep_backup=True):
dir_to_be_removed = os.path.join(local_root, "backup")
if os.path.exists(dir_to_be_removed):
shutil.rmtree(dir_to_be_removed)

def block_call(self, cmd):
raise RuntimeError(
"Unsupported method. You may use an unsupported combination of the machine and the context."
)
35 changes: 0 additions & 35 deletions dpdispatcher/contexts/ssh_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -767,41 +767,6 @@ def download(
tar_compress=self.remote_profile.get("tar_compress", None),
)

def block_checkcall(self, cmd, asynchronously=False, stderr_whitelist=None):
"""Run command with arguments. Wait for command to complete. If the return code
was zero then return, otherwise raise RuntimeError.

Parameters
----------
cmd : str
The command to run.
asynchronously : bool, optional, default=False
Run command asynchronously. If True, `nohup` will be used to run the command.
stderr_whitelist : list of str, optional, default=None
If not None, the stderr will be checked against the whitelist. If the stderr
contains any of the strings in the whitelist, the command will be considered
successful.
"""
assert self.remote_root is not None
self.ssh_session.ensure_alive()
if asynchronously:
cmd = f"nohup {cmd} >/dev/null &"
stdin, stdout, stderr = self.ssh_session.exec_command(
(f"cd {shlex.quote(self.remote_root)} ;") + cmd
)
exit_status = stdout.channel.recv_exit_status()
if exit_status != 0:
raise RuntimeError(
"Get error code %d in calling %s through ssh with job: %s . message: %s"
% (
exit_status,
cmd,
self.submission.submission_hash,
stderr.read().decode("utf-8"),
)
)
return stdin, stdout, stderr

def block_call(self, cmd):
assert self.remote_root is not None
self.ssh_session.ensure_alive()
Expand Down
Loading