Skip to content

Commit

Permalink
[7X] Skip ssh if segments are on the same host with coordinator. (#16…
Browse files Browse the repository at this point in the history
…812)

We can skip ssh connections if segments are on the same host with the
coordinator. We can save some time when executing commands. E.g., with
this patch, we don't ssh connection when creating the demo cluster and
the creation time can be reduced from 18s to 10s on my laptop. Besides,
we can skip setting up ssh keys for deploying the demo cluster.
  • Loading branch information
jnihal authored and yihong0618 committed Dec 3, 2024
1 parent a1e9e2d commit 0aad1ca
Showing 1 changed file with 9 additions and 7 deletions.
16 changes: 9 additions & 7 deletions gpMgmt/bin/gppylib/commands/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ def __init__(self, stdin):
self.stdin = stdin
pass

def execute(self, cmd, wait=True, pickled=False):
def execute(self, cmd, wait=True, pickled=False, start_new_session=False):
# prepend env. variables from ExcecutionContext.propagate_env_map
# e.g. Given {'FOO': 1, 'BAR': 2}, we'll produce "FOO=1 BAR=2 ..."

Expand All @@ -453,6 +453,7 @@ def execute(self, cmd, wait=True, pickled=False):
# actual command executed, but the shell that command string runs under.
self.proc = gpsubprocess.Popen(cmd.cmdStr, env=None, shell=True,
executable='/bin/bash',
start_new_session=start_new_session,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE, close_fds=True)
Expand Down Expand Up @@ -490,7 +491,7 @@ def __init__(self, targetHost, stdin, gphome=None):
else:
self.gphome = GPHOME

def execute(self, cmd, pickled=False):
def execute(self, cmd, pickled=False, start_new_session=False):
# prepend env. variables from ExcecutionContext.propagate_env_map
# e.g. Given {'FOO': 1, 'BAR': 2}, we'll produce "FOO=1 BAR=2 ..."
self.__class__.trail.add(self.targetHost)
Expand All @@ -506,8 +507,8 @@ def execute(self, cmd, pickled=False):
"{targethost} \"{gphome} {cmdstr}\"".format(targethost=self.targetHost,
gphome=". %s/greenplum_path.sh;" % self.gphome,
cmdstr=cmd.cmdStr)
LocalExecutionContext.execute(self, cmd, pickled=pickled)
if (cmd.get_results().stderr.startswith('ssh_exchange_identification: Connection closed by remote host')):
LocalExecutionContext.execute(self, cmd, pickled=pickled, start_new_session=start_new_session)
if (cmd.get_stderr().startswith('ssh_exchange_identification: Connection closed by remote host')):
self.__retry(cmd, 0, pickled)
pass

Expand All @@ -528,14 +529,15 @@ class Command(object):
exec_context = None
propagate_env_map = {} # specific environment variables for this command instance

def __init__(self, name, cmdStr, ctxt=LOCAL, remoteHost=None, stdin=None, gphome=None, pickled=False):
def __init__(self, name, cmdStr, ctxt=LOCAL, remoteHost=None, stdin=None, gphome=None, pickled=False, start_new_session=False):
self.name = name
self.cmdStr = cmdStr
self.exec_context = createExecutionContext(ctxt, remoteHost, stdin=stdin,
gphome=gphome)
self.remoteHost = remoteHost
self.logger = gplog.get_default_logger()
self.pickled = pickled
self.start_new_session = start_new_session

def __str__(self):
if self.results:
Expand All @@ -546,12 +548,12 @@ def __str__(self):
# Start a process that will execute the command but don't wait for
# it to complete. Return the Popen object instead.
def runNoWait(self):
self.exec_context.execute(self, wait=False, pickled=self.pickled)
self.exec_context.execute(self, wait=False, pickled=self.pickled, start_new_session=self.start_new_session)
return self.exec_context.proc

def run(self, validateAfter=False):
self.logger.debug("Running Command: %s" % self.cmdStr)
self.exec_context.execute(self, pickled=self.pickled)
self.exec_context.execute(self, pickled=self.pickled, start_new_session=self.start_new_session)

if validateAfter:
self.validate()
Expand Down

0 comments on commit 0aad1ca

Please sign in to comment.