Skip to content

Commit

Permalink
gangplank: take two on ssh port forwarding
Browse files Browse the repository at this point in the history
This time we'll go ahead and use the remote chosen port locally as
well. This is because the minio bucket is consulted by the local running
process for required artifacts before even starting a pod on the remote
(i.e. why start the pod until the artifacts exist that it needs?). If
they use different port numbers then we'd have to track them separately
and behave differently whether we're running in the remote podman cosa or
in the local cosa. Let's just use the same port locally as well.

To achieve this goal we submerge the starting of the minio server into
the ssh port forwarding code so we can detect the port before we start
the server and set up the proxy.
  • Loading branch information
dustymabe committed Aug 20, 2021
1 parent 075ca9e commit 4086e49
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 50 deletions.
44 changes: 20 additions & 24 deletions gangplank/internal/ocp/bc.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,26 @@ binary build interface.`)

// Start minio after all the setup. Each directory is an implicit
// bucket and files, are implicit keys.
if err := m.start(ctx); err != nil {
return fmt.Errorf("failed to start Minio: %w", err)
//
// Job Control:
// terminate channel: uses to tell workFunctions to ceases
// errorCh channel: workFunctions report errors through this channel
// when a error is recieved over the channel, a terminate is signaled
// sig channel: this watches for sigterm and interrupts, which will
// signal a terminate. (i.e. sigterm or crtl-c)
//
// The go-routine will run until it recieves a terminate itself.
//
errorCh := make(chan error)
terminate := make(chan bool)
if m.overSSH == nil {
if err := m.start(ctx); err != nil {
return fmt.Errorf("failed to start Minio: %w", err)
}
} else {
if err := m.startMinioAndForwardOverSSH(ctx, terminate, errorCh); err != nil {
return fmt.Errorf("failed to start Minio: %w", err)
}
}
defer m.Kill()

Expand Down Expand Up @@ -377,28 +395,6 @@ binary build interface.`)
}
sort.Ints(order)

/*
Job Control:
terminate channel: uses to tell workFunctions to ceases
errorCh channel: workFunctions report errors through this channel
when a error is recieved over the channel, a terminate is signaled
sig channel: this watches for sigterm and interrupts, which will
signal a terminate. (i.e. sigterm or crtl-c)
The go-routine will run until it recieves a terminate itself.
*/

errorCh := make(chan error)

// Terminate is used to tell all go-routines to end
terminate := make(chan bool)

// If defined, startup SSH before any work begins
if m.overSSH != nil {
if err := m.forwardOverSSH(terminate, errorCh); err != nil {
return err
}
}
// Watch the channels for signals to terminate
errored := false
go func() {
Expand Down
17 changes: 11 additions & 6 deletions gangplank/internal/ocp/filer.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,14 +69,19 @@ func StartStandaloneMinioServer(ctx context.Context, srvDir, cfgFile string, ove
m.overSSH = overSSH
m.dir = srvDir

if err := m.start(ctx); err != nil {
return nil, err
}

if m.overSSH != nil {
// Start the minio server. If we're forwarding over SSH we'll call
// startMinioAndForwardOverSSH to start the minio server. because
// the port we use will be dynamically chosen based on the SSH
// connection.
if m.overSSH == nil {
if err := m.start(ctx); err != nil {
return nil, err
}
} else {
m.sshStopCh = make(chan bool, 1)
m.sshErrCh = make(chan error, 256)
if err := m.forwardOverSSH(m.sshStopCh, m.sshErrCh); err != nil {
err := m.startMinioAndForwardOverSSH(ctx, m.sshStopCh, m.sshErrCh)
if err != nil {
return nil, err
}
}
Expand Down
56 changes: 36 additions & 20 deletions gangplank/internal/ocp/ssh.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocp

import (
"context"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -119,8 +120,8 @@ func sshClient(user, host, port string, secure bool, identity string) (*ssh.Clie
)
}

// forwardOverSSH forwards the minio connection over SSH.
func (m *minioServer) forwardOverSSH(termCh termChan, errCh chan<- error) error {
// startMinioAndForwardOverSSH starts minio and forwards the connection over SSH.
func (m *minioServer) startMinioAndForwardOverSSH(ctx context.Context, termCh termChan, errCh chan<- error) error {
sshPort := 22
if m.overSSH.SSHPort != 0 {
sshPort = m.overSSH.SSHPort
Expand All @@ -132,12 +133,6 @@ func (m *minioServer) forwardOverSSH(termCh termChan, errCh chan<- error) error
"remote user": m.overSSH.User,
"port": sshport,
})
// Set the port to use for the proxied connection *once* here so
// it won't change during the course of the run. This is because
// below we change the m.Port to match the dynamically chosen
// remote port for the ssh forward but the server listening port
// will remain whatever the original value of m.Port is.
minioServerPort := m.Port

l.Info("Forwarding local port over SSH to remote host")

Expand All @@ -150,23 +145,44 @@ func (m *minioServer) forwardOverSSH(termCh termChan, errCh chan<- error) error
// dynamically chosen based on port availabilty on the remote. If
// we don't do this then multiple concurrent gangplank runs will fail
// because they'll try to use the same port.
remoteConn, err := client.Listen("tcp4", "127.0.0.1:")
if err != nil {
err = fmt.Errorf("%w: failed to open remote port over ssh for proxy", err)
return err
}
remoteSSHport, err := strconv.Atoi(strings.Split(remoteConn.Addr().String(), ":")[1])
if err != nil {
err = fmt.Errorf("%w: failed to parse remote ssh port from connection", err)
return err
var remoteConn net.Listener
var remoteSSHport int
// Loop until we've found a common port available locally and remote
for {
remoteConn, err = client.Listen("tcp4", "127.0.0.1:")
if err != nil {
err = fmt.Errorf("%w: failed to open remote port over ssh for proxy", err)
return err
}
remoteSSHport, err := strconv.Atoi(strings.Split(remoteConn.Addr().String(), ":")[1])
if err != nil {
err = fmt.Errorf("%w: failed to parse remote ssh port from connection", err)
return err
}
log.Infof("The SSH forwarding chose port %d on the remote host", remoteSSHport)

if getPortOrNext(remoteSSHport) == remoteSSHport {
break
}

log.Infof("Local Port %d is not available, selecting another port", remoteSSHport)
remoteConn.Close()
}
log.Infof("The SSH forwarding chose port %v on the remote host", remoteSSHport)
// Update m.Port in the minioServer definition so the miniocfg
// that gets passed to the remote specifies the correct port for
// the local connection there.
log.Infof("Changing remote local port (forward) from %v to %v", m.Port, remoteSSHport)
log.Infof("Changing minio port for local and remote (forward) from %v to %v",
m.Port, remoteSSHport)
m.Port = remoteSSHport

// Now that we know the port let's start the minio server. It's
// highly unlikely to have a port conflict here because we are
// running inside the cosa container where no other services are
// running/listening.
if err := m.start(ctx); err != nil {
return err
}

// copyIO is a blind copier that copies between source and destination
copyIO := func(src, dest net.Conn) {
defer src.Close() //nolint
Expand All @@ -176,7 +192,7 @@ func (m *minioServer) forwardOverSSH(termCh termChan, errCh chan<- error) error

// proxy is a helper function that connects the local port to the remoteClient
proxy := func(conn net.Conn) {
proxy, err := net.Dial("tcp4", fmt.Sprintf("127.0.0.1:%d", minioServerPort))
proxy, err := net.Dial("tcp4", fmt.Sprintf("127.0.0.1:%d", m.Port))
if err != nil {
err = fmt.Errorf("%w: failed to open local port for proxy", err)
errCh <- err
Expand Down

0 comments on commit 4086e49

Please sign in to comment.