diff --git a/dpdispatcher/base_context.py b/dpdispatcher/base_context.py index 408aecd6..0935e845 100644 --- a/dpdispatcher/base_context.py +++ b/dpdispatcher/base_context.py @@ -1,5 +1,5 @@ from abc import ABCMeta, abstractmethod -from typing import List, Tuple +from typing import Any, List, Tuple from dargs import Argument @@ -73,6 +73,66 @@ def read_file(self, fname): def check_finish(self, proc): raise NotImplementedError("abstract method") + def block_checkcall(self, cmd, asynchronously=False) -> Tuple[Any, Any, Any]: + """Run command with arguments. Wait for command to complete. + + Parameters + ---------- + cmd : str + The command to run. + asynchronously : bool, optional, default=False + Run command asynchronously. If True, `nohup` will be used to run the command. + + Returns + ------- + stdin + standard inout + stdout + standard output + stderr + standard error + + Raises + ------ + RuntimeError + when the return code is not zero + """ + if asynchronously: + cmd = f"nohup {cmd} >/dev/null &" + exit_status, stdin, stdout, stderr = self.block_call(cmd) + if exit_status != 0: + raise RuntimeError( + "Get error code %d in calling %s with job: %s . message: %s" + % ( + exit_status, + cmd, + self.submission.submission_hash, + stderr.read().decode("utf-8"), + ) + ) + return stdin, stdout, stderr + + @abstractmethod + def block_call(self, cmd) -> Tuple[int, Any, Any, Any]: + """Run command with arguments. Wait for command to complete. + + Parameters + ---------- + cmd : str + The command to run. + + Returns + ------- + exit_status + exit code + stdin + standard inout + stdout + standard output + stderr + standard error + """ + @classmethod def machine_arginfo(cls) -> Argument: """Generate the machine arginfo. diff --git a/dpdispatcher/contexts/dp_cloud_server_context.py b/dpdispatcher/contexts/dp_cloud_server_context.py index d9e7ac9e..4b92f1c9 100644 --- a/dpdispatcher/contexts/dp_cloud_server_context.py +++ b/dpdispatcher/contexts/dp_cloud_server_context.py @@ -335,6 +335,11 @@ def machine_subfields(cls) -> List[Argument]: ) ] + def block_call(self, cmd): + raise RuntimeError( + "Unsupported method. You may use an unsupported combination of the machine and the context." + ) + DpCloudServerContext = BohriumContext LebesgueContext = BohriumContext diff --git a/dpdispatcher/contexts/hdfs_context.py b/dpdispatcher/contexts/hdfs_context.py index 890d3cfa..4c0a3b72 100644 --- a/dpdispatcher/contexts/hdfs_context.py +++ b/dpdispatcher/contexts/hdfs_context.py @@ -244,3 +244,8 @@ def write_file(self, fname, write_str): def read_file(self, fname): return HDFS.read_hdfs_file(os.path.join(self.remote_root, fname)) + + def block_call(self, cmd): + raise RuntimeError( + "Unsupported method. You may use an unsupported combination of the machine and the context." + ) diff --git a/dpdispatcher/contexts/lazy_local_context.py b/dpdispatcher/contexts/lazy_local_context.py index 536c19ec..b6b35101 100644 --- a/dpdispatcher/contexts/lazy_local_context.py +++ b/dpdispatcher/contexts/lazy_local_context.py @@ -112,23 +112,6 @@ def download( # else: # raise RuntimeError('do not find download file ' + fname) - def block_checkcall(self, cmd): - # script_dir = os.path.join(self.local_root, self.submission.work_base) - # os.chdir(script_dir) - proc = sp.Popen( - cmd, cwd=self.local_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE - ) - o, e = proc.communicate() - stdout = SPRetObj(o) - stderr = SPRetObj(e) - code = proc.returncode - if code != 0: - raise RuntimeError( - "Get error code %d in locally calling %s with job: %s ", - (code, cmd, self.submission.submission_hash), - ) - return None, stdout, stderr - def block_call(self, cmd): proc = sp.Popen( cmd, cwd=self.local_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE diff --git a/dpdispatcher/contexts/local_context.py b/dpdispatcher/contexts/local_context.py index 905a8d6e..6ecbd64b 100644 --- a/dpdispatcher/contexts/local_context.py +++ b/dpdispatcher/contexts/local_context.py @@ -288,21 +288,6 @@ def download( # no nothing in the case of linked files pass - def block_checkcall(self, cmd): - proc = sp.Popen( - cmd, cwd=self.remote_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE - ) - o, e = proc.communicate() - stdout = SPRetObj(o) - stderr = SPRetObj(e) - code = proc.returncode - if code != 0: - raise RuntimeError( - f"Get error code {code} in locally calling {cmd} with job: {self.submission.submission_hash}" - f"\nStandard error: {stderr}" - ) - return None, stdout, stderr - def block_call(self, cmd): proc = sp.Popen( cmd, cwd=self.remote_root, shell=True, stdout=sp.PIPE, stderr=sp.PIPE diff --git a/dpdispatcher/contexts/openapi_context.py b/dpdispatcher/contexts/openapi_context.py index 16bc64b7..9251ce48 100644 --- a/dpdispatcher/contexts/openapi_context.py +++ b/dpdispatcher/contexts/openapi_context.py @@ -258,3 +258,8 @@ def _clean_backup(self, local_root, keep_backup=True): dir_to_be_removed = os.path.join(local_root, "backup") if os.path.exists(dir_to_be_removed): shutil.rmtree(dir_to_be_removed) + + def block_call(self, cmd): + raise RuntimeError( + "Unsupported method. You may use an unsupported combination of the machine and the context." + ) diff --git a/dpdispatcher/contexts/ssh_context.py b/dpdispatcher/contexts/ssh_context.py index 6e80c033..8537894d 100644 --- a/dpdispatcher/contexts/ssh_context.py +++ b/dpdispatcher/contexts/ssh_context.py @@ -767,41 +767,6 @@ def download( tar_compress=self.remote_profile.get("tar_compress", None), ) - def block_checkcall(self, cmd, asynchronously=False, stderr_whitelist=None): - """Run command with arguments. Wait for command to complete. If the return code - was zero then return, otherwise raise RuntimeError. - - Parameters - ---------- - cmd : str - The command to run. - asynchronously : bool, optional, default=False - Run command asynchronously. If True, `nohup` will be used to run the command. - stderr_whitelist : list of str, optional, default=None - If not None, the stderr will be checked against the whitelist. If the stderr - contains any of the strings in the whitelist, the command will be considered - successful. - """ - assert self.remote_root is not None - self.ssh_session.ensure_alive() - if asynchronously: - cmd = f"nohup {cmd} >/dev/null &" - stdin, stdout, stderr = self.ssh_session.exec_command( - (f"cd {shlex.quote(self.remote_root)} ;") + cmd - ) - exit_status = stdout.channel.recv_exit_status() - if exit_status != 0: - raise RuntimeError( - "Get error code %d in calling %s through ssh with job: %s . message: %s" - % ( - exit_status, - cmd, - self.submission.submission_hash, - stderr.read().decode("utf-8"), - ) - ) - return stdin, stdout, stderr - def block_call(self, cmd): assert self.remote_root is not None self.ssh_session.ensure_alive()