Skip to content

Commit

Permalink
Remove code duplication of #40
Browse files Browse the repository at this point in the history
  • Loading branch information
Radovan Zvoncek authored and adejanovski committed May 20, 2020
1 parent c545395 commit e95a0e7
Showing 1 changed file with 54 additions and 92 deletions.
146 changes: 54 additions & 92 deletions cstar/remote_paramiko.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,55 +101,13 @@ def run_job(self, file, jobid, timeout=None, env={}):
self.write_command(wrapper, "%s/wrapper" % (dir,))

cmd = """
cd %s
nohup ./wrapper
""" % (self.escape(dir),)
cd %s
nohup ./wrapper
""" % (self.escape(dir),)

stdin, stdout, stderr = self.client.exec_command(cmd, timeout=timeout)
# get the shared channel for stdout/stderr/stdin
channel = stdout.channel

stdin.close()
# indicate that we're not going to write to that channel anymore
channel.shutdown_write()

# read stdout/stderr in order to prevent read block hangs
stdout_chunks = []
stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer)))
# chunked read to prevent stalls
while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
# stop if channel was closed prematurely, and there is no data in the buffers.
got_chunk = False
readq, _, _ = select.select([stdout.channel], [], [], timeout)
for c in readq:
if c.recv_ready():
stdout_chunks.append(stdout.channel.recv(len(c.in_buffer)))
got_chunk = True
if c.recv_stderr_ready():
# make sure to read stderr to prevent stall
stderr.channel.recv_stderr(len(c.in_stderr_buffer))
got_chunk = True
'''
1) make sure that there are at least 2 cycles with no data in the input buffers in order to not exit too early (i.e. cat on a >200k file).
2) if no data arrived in the last loop, check if we already received the exit code
3) check if input buffers are empty
4) exit the loop
'''
if not got_chunk \
and stdout.channel.exit_status_ready() \
and not stderr.channel.recv_stderr_ready() \
and not stdout.channel.recv_ready():
# indicate that we're not going to read from this channel anymore
stdout.channel.shutdown_read()
# close the channel
stdout.channel.close()
break # exit as remote side is finished and our bufferes are empty

# close all the pseudofiles
stdout.close()
stderr.close()
_, _, _ = self._read_results(stdin, stdout, stderr, timeout)

stdout.channel.recv_exit_status()
real_output = self.read_file(dir + "/stdout")
real_error = self.read_file(dir + "/stderr")
real_status = int(self.read_file(dir + "/status"))
Expand All @@ -165,54 +123,10 @@ def run(self, argv):
self._connect()
cmd = " ".join(self.escape(s) for s in argv)
stdin, stdout, stderr = self.client.exec_command(cmd)
# get the shared channel for stdout/stderr/stdin
channel = stdout.channel

# we do not need stdin.
stdin.close()
# indicate that we're not going to write to that channel anymore
channel.shutdown_write()

# read stdout/stderr in order to prevent read block hangs
stdout_chunks = []
stderr_chunks = []
stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer)))
# chunked read to prevent stalls
while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
# stop if channel was closed prematurely, and there is no data in the buffers.
got_chunk = False
readq, _, _ = select.select([stdout.channel], [], [], 30)
for c in readq:
if c.recv_ready():
stdout_chunks.append(stdout.channel.recv(len(c.in_buffer)))
got_chunk = True
if c.recv_stderr_ready():
# make sure to read stderr to prevent stall
stderr_chunks.append(stderr.channel.recv_stderr(len(c.in_stderr_buffer)))
got_chunk = True
'''
1) make sure that there are at least 2 cycles with no data in the input buffers in order to not exit too early (i.e. cat on a >200k file).
2) if no data arrived in the last loop, check if we already received the exit code
3) check if input buffers are empty
4) exit the loop
'''
if not got_chunk \
and stdout.channel.exit_status_ready() \
and not stderr.channel.recv_stderr_ready() \
and not stdout.channel.recv_ready():
# indicate that we're not going to read from this channel anymore
stdout.channel.shutdown_read()
# close the channel
stdout.channel.close()
break # exit as remote side is finished and our bufferes are empty

# close all the pseudofiles
stdout.close()
stderr.close()

status, stdout_chunks, stderr_chunks = self._read_results(stdin, stdout, stderr)
out = b''.join(stdout_chunks)
error = b''.join(stderr_chunks)
status = stdout.channel.recv_exit_status()

if status != 0:
err("Command %s failed with status %d on host %s" % (cmd, status, self.hostname))
else:
Expand All @@ -223,6 +137,54 @@ def run(self, argv):
self.client = None
raise BadSSHHost("SSH connection to host %s was reset" % (self.hostname,))

def _read_results(self, stdin, stdout, stderr, timeout=30):

# get the shared channel for stdout/stderr/stdin
channel = stdout.channel

stdin.close()
# indicate that we're not going to write to that channel anymore
channel.shutdown_write()

# read stdout/stderr in order to prevent read block hangs
stdout_chunks, stderr_chunks = [], []
stdout_chunks.append(stdout.channel.recv(len(stdout.channel.in_buffer)))
# chunked read to prevent stalls
while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready():
# stop if channel was closed prematurely, and there is no data in the buffers.
got_chunk = False
readq, _, _ = select.select([stdout.channel], [], [], timeout)
for c in readq:
if c.recv_ready():
stdout_chunks.append(stdout.channel.recv(len(c.in_buffer)))
got_chunk = True
if c.recv_stderr_ready():
# make sure to read stderr to prevent stall
stderr_chunks.append(stderr.channel.recv_stderr(len(c.in_stderr_buffer)))
got_chunk = True
'''
1) make sure that there are at least 2 cycles with no data in the input buffers in order to not exit too early (i.e. cat on a >200k file).
2) if no data arrived in the last loop, check if we already received the exit code
3) check if input buffers are empty
4) exit the loop
'''
if not got_chunk \
and stdout.channel.exit_status_ready() \
and not stderr.channel.recv_stderr_ready() \
and not stdout.channel.recv_ready():
# indicate that we're not going to read from this channel anymore
stdout.channel.shutdown_read()
# close the channel
stdout.channel.close()
break # exit as remote side is finished and our bufferes are empty

# close all the pseudofiles
stdout.close()
stderr.close()

status = stdout.channel.recv_exit_status()
return status, stdout_chunks, stderr_chunks

@staticmethod
def escape(input):
if _alnum_re.search(input):
Expand Down

0 comments on commit e95a0e7

Please sign in to comment.