Skip to content
This repository has been archived by the owner on Sep 19, 2022. It is now read-only.

Commit

Permalink
fix: retry to connect to pod on failure
Browse files Browse the repository at this point in the history
Retries to establish the SHH connection to the pod on failure. Also
dials with context, allowing the dial to be interrupted by CTRL+C.
  • Loading branch information
fischor committed Sep 4, 2021
1 parent a9572d3 commit 62f7cbb
Showing 1 changed file with 75 additions and 33 deletions.
108 changes: 75 additions & 33 deletions internal/command/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,12 +163,12 @@ func (o *TunnelOptions) Run(ctx context.Context, graceCh <-chan struct{}) error
}
klog.V(3).Infof("Created service \"%s\".", service.GetObjectMeta().GetName())
defer interruptcontext.DoGraceful(ctx, func() {
klog.V(2).Infof("Deleting service %s ...", service.Name)
klog.V(2).Infof("Cleanup: deleting service %s ...", service.Name)
deletePolicy := metav1.DeletePropagationForeground
deleteOptions := metav1.DeleteOptions{PropagationPolicy: &deletePolicy}
err := serviceClient.Delete(ctx, service.Name, deleteOptions)
if err != nil {
klog.Warningf("error deleting service: %v", err)
klog.Warningf("Cleanup: error deleting service: %v", err)
}
})

Expand All @@ -195,12 +195,12 @@ func (o *TunnelOptions) Run(ctx context.Context, graceCh <-chan struct{}) error
}
klog.V(3).Infof("Created pod \"%s\".", service.GetObjectMeta().GetName())
defer interruptcontext.DoGraceful(ctx, func() {
klog.V(2).Infof("Deleting pod %s ...", pod.Name)
klog.V(2).Infof("Cleanup: deleting pod %s ...", pod.Name)
deletePolicy := metav1.DeletePropagationForeground
deleteOptions := metav1.DeleteOptions{PropagationPolicy: &deletePolicy}
err := podClient.Delete(ctx, pod.Name, deleteOptions)
if err != nil {
klog.Warningf("error deleting pod: %v. That pod probably still runs. You can use kubetnl cleanup to clean up all resources created by kubetnl.", err)
klog.Warningf("tunnel Cleanup: error deleting pod: %v. That pod probably still runs. You can use kubetnl cleanup to clean up all resources created by kubetnl.", err)
}
})

Expand Down Expand Up @@ -239,10 +239,9 @@ func (o *TunnelOptions) Run(ctx context.Context, graceCh <-chan struct{}) error
return interruptcontext.Interrupted
default:
}

pfwdReadyCh := make(chan struct{}) // Closed when portforwarding ready.
pfwdStopCh := make(chan struct{}, 1)
pfwdDoneCh := make(chan struct{}) // Closed when portforwarding exits.
pfwdReadyCh := make(chan struct{}) // Closed when portforwarding ready.
pfwdStopCh := make(chan struct{}, 1) // is never closed by k8sportforward
pfwdDoneCh := make(chan struct{}) // Closed when portforwarding exits.
go func() error {
// Do a portforwarding to the pods exposed SSH port.
req := o.ClientSet.CoreV1().RESTClient().Post().
Expand Down Expand Up @@ -274,7 +273,7 @@ func (o *TunnelOptions) Run(ctx context.Context, graceCh <-chan struct{}) error
defer interruptcontext.DoGraceful(ctx, func() {
close(pfwdStopCh)
<-pfwdDoneCh
klog.V(2).Infof("port-forwarding closed")
klog.V(2).Infof("Cleanup: port-forwarding closed")
})

select {
Expand All @@ -283,41 +282,58 @@ func (o *TunnelOptions) Run(ctx context.Context, graceCh <-chan struct{}) error
case <-graceCh:
return interruptcontext.Interrupted
case <-pfwdReadyCh:
klog.V(2).Infof("port-forwarding from :%d --> %d", o.LocalSSHPort, o.RemoteSSHPort)
// Note that having a ready pfwd just means that it is listening
// on o.LocalSSHPort.
klog.V(2).Infof("Listening to portforward connections from :%d --> %d", o.LocalSSHPort, o.RemoteSSHPort)
}

// TODO: need to do retries here. It seems the pod does not accept SSH
// connections also its ready.
<-time.After(2 * time.Second)
// HACK: Also the pods is in a ready state, the openssh server may not
// yet accept any connections. Also we are retrying to establish the
// SSH connection on failure, the k8sportforward library will log the
// error. To avoid having that error appearing what might confuse user,
// just add 1.5 seconds of delay here.
// TODO(fischor): Get rid of that HACK somewhat.
<-time.After(1500 * time.Millisecond)

// Establish SSH connection over the forwarded port.
sshConfig := &ssh.ClientConfig{
User: "user",
Auth: []ssh.AuthMethod{
ssh.Password("password"),
},
HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
// Accept all keys.
return nil
},
}
// Retry establishing the connection in case of failure every second.
sshAddr := fmt.Sprintf("localhost:%d", o.LocalSSHPort)
sshClient, err := ssh.Dial("tcp", sshAddr, sshConfig)
var sshClient *ssh.Client
var sshErr error
sshCtx, sshCancel := context.WithCancel(ctx)
defer sshCancel()
go func() {
<-graceCh
sshCancel()
// TODO(fischor): avoid goroutines leaks.
}()
sshAttempts := 0
err = wait.PollImmediateUntil(time.Second, func() (bool, error) {
sshAttempts++
sshClient, sshErr = sshDialContext(sshCtx, "tcp", sshAddr, o.sshConfig())
if sshErr != nil {
if sshAttempts > 3 {
fmt.Fprintf(o.Out, "failed to dial ssh: %v. Retrying...\n", sshErr)
}
klog.V(1).Infof("error dialing ssh (%s): %v", sshAddr, sshErr)
}
return sshErr == nil, nil

}, graceCh)
if err == wait.ErrWaitTimeout {
// Grace channel has been closed (after the first attempt to
// connect has been done).
return interruptcontext.Interrupted
}
if err != nil {
klog.Errorf("error dialing ssh (%s): %v", sshAddr, err)
// A non-retryable error from the shh connection has been returned.
return fmt.Errorf("error dialing ssh: %v", err)
}
klog.V(2).Infof("SSH connection (%s) ready", sshAddr)
defer interruptcontext.DoGraceful(ctx, func() {
sshClient.Close()
klog.V(2).Info("ssl connection (%s) closed", sshAddr)
klog.V(2).Info("Cleanup: ssh connection (%s) closed", sshAddr)
})
klog.V(2).Infof("ssl connection (%s) ready", sshAddr)

select {
case <-graceCh:
return interruptcontext.Interrupted
default:
}

// Setup tunnels.
var pairs []forwarderWithListener
Expand Down Expand Up @@ -377,6 +393,32 @@ func (o *TunnelOptions) Run(ctx context.Context, graceCh <-chan struct{}) error
return nil
}

func (o *TunnelOptions) sshConfig() *ssh.ClientConfig {
return &ssh.ClientConfig{
User: "user",
Auth: []ssh.AuthMethod{
ssh.Password("password"),
},
HostKeyCallback: func(hostname string, remote net.Addr, key ssh.PublicKey) error {
// Accept all keys.
return nil
},
}
}

func sshDialContext(ctx context.Context, network, addr string, config *ssh.ClientConfig) (*ssh.Client, error) {
d := net.Dialer{Timeout: config.Timeout}
conn, err := d.DialContext(ctx, network, addr)
if err != nil {
return nil, err
}
c, chans, reqs, err := ssh.NewClientConn(conn, addr, config)
if err != nil {
return nil, err
}
return ssh.NewClient(c, chans, reqs), nil
}

type forwarderWithListener struct {
f *portforward.Forwarder
l net.Listener
Expand Down

0 comments on commit 62f7cbb

Please sign in to comment.