Skip to content

Commit

Permalink
feat: add tty/raw mode, direct-exec & fix logging
Browse files Browse the repository at this point in the history
  • Loading branch information
jpts committed Aug 28, 2022
1 parent 6f6d7cb commit 384a861
Show file tree
Hide file tree
Showing 9 changed files with 559 additions and 649 deletions.
21 changes: 21 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
The MIT License (MIT)

Copyright © 2022 jpts

Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,29 @@

A replacement for "kubectl exec" that works over WebSocket connections.

Kubernetes API server has support for exec over WebSockets, but it has yet to land in kubectl. This plugin is designed to be a stopgap until then!
The Kubernetes API server has support for exec over WebSockets, but it has yet to land in kubectl. Although [some](https://github.com/kubernetes/kubernetes/issues/89163) [proposals](https://github.com/kubernetes/enhancements/pull/3401) exist to add the functionality, they seem is quite far away from landing. This plugin is designed to be a stopgap until they do.

Usage:
```
A replacement for "kubectl exec" that works over WebSocket connections.
Usage:
execws <pod name> [--kubeconfig] [-n namespace] [-it] [-c container] <cmd> [flags]
execws <pod name> [options] <cmd>
Flags:
-c, --container string Container name
-h, --help help for execws
--kubeconfig string kubeconfig file (default is $HOME/.kube/config)
-n, --namespace string Override "default" namespace
-i, --stdin Pass stdin to container
-t, --tty Stdin is a TTY
-c, --container string Container name
-h, --help help for execws
--kubeconfig string kubeconfig file (default is $HOME/.kube/config)
-v, --loglevel int Set loglevel (default 2)
-n, --namespace string Set namespace
--no-sanity-check Don't make preflight request to ensure pod exists
--node-direct-exec Partially bypass the API server, by using the kubelet API
--node-direct-exec-ip string Node IP to use with direct-exec feature
-k, --skip-tls-verify Don't perform TLS certificate verifiation
-i, --stdin Pass stdin to container
-t, --tty Stdin is a TTY
```

### ToDo
* raw terminal mode
* correctly handle signals

### Acknowledgements

Work inspired by [rmohr/kubernetes-custom-exec](https://github.com/rmohr/kubernetes-custom-exec) and [kairen/websocket-exec](https://github.com/kairen/websocket-exec).
196 changes: 47 additions & 149 deletions cmd/exec.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,36 @@
package cmd

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"sync"

"github.com/gorilla/websocket"
"github.com/moby/term"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
)

type Options struct {
Command []string
Container string
Kconfig string
Namespace string
Object string
Pod string
Stdin bool
TTY bool
noSanityCheck bool
noTLSVerify bool
Command []string
Container string
Kconfig string
Namespace string
Object string
Pod string
Stdin bool
TTY bool
PodSpec corev1.PodSpec
noSanityCheck bool
noTLSVerify bool
directExec bool
directExecNodeIp string
}

var protocols = []string{
Expand All @@ -41,10 +40,13 @@ var protocols = []string{
"channel.k8s.io",
}

// https://github.com/kubernetes/kubernetes/blob/1a2f167d399b046bea6192df9e9b1ca7ac4f2365/staging/src/k8s.io/client-go/tools/remotecommand/remotecommand_websocket.go#L35
const (
stdin = iota
stdout
stderr
streamStdIn = 0
streamStdOut = 1
streamStdErr = 2
streamErr = 3
streamResize = 4
)

type cliSession struct {
Expand All @@ -53,18 +55,6 @@ type cliSession struct {
namespace string
}

type RoundTripCallback func(conn *websocket.Conn) error

type WebsocketRoundTripper struct {
Dialer *websocket.Dialer
Callback RoundTripCallback
}

type ApiServerError struct {
Reason string `json:"reason"`
Message string `json:"message"`
}

// prep the session
func (c *cliSession) prepConfig() error {
var cfg clientcmd.ClientConfig
Expand Down Expand Up @@ -107,15 +97,15 @@ func (c *cliSession) prepConfig() error {
return err
}

_, err = client.CoreV1().Pods(c.namespace).Get(context.TODO(), c.opts.Pod, metav1.GetOptions{})
res, err := client.CoreV1().Pods(c.namespace).Get(context.TODO(), c.opts.Pod, metav1.GetOptions{})
if err != nil {
return err
}
c.opts.PodSpec = res.Spec
}
return nil
}

// prep a http req
func (c *cliSession) prepExec() (*http.Request, error) {
u, err := url.Parse(c.clientConf.Host)
if err != nil {
Expand All @@ -128,28 +118,27 @@ func (c *cliSession) prepExec() (*http.Request, error) {
case "http":
u.Scheme = "ws"
default:
return nil, fmt.Errorf("Malformed URL %s", u.String())
return nil, errors.New("Cannot determine websocket scheme")
}

u.Path = fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/exec", c.namespace, c.opts.Pod)
rawQuery := "stdout=true&stderr=true"
query := "stdout=true&stderr=true"
for _, c := range c.opts.Command {
rawQuery += "&command=" + c
query += "&command=" + c
}

if c.opts.Container != "" {
rawQuery += "&container=" + c.opts.Container
query += "&container=" + c.opts.Container
}

if c.opts.TTY {
rawQuery += "&tty=true"
klog.Warning("Raw terminal not supported yet, YMMV.")
query += "&tty=true"
}

if c.opts.Stdin {
rawQuery += "&stdin=true"
query += "&stdin=true"
}
u.RawQuery = rawQuery
u.RawQuery = query

req := &http.Request{
Method: http.MethodGet,
Expand All @@ -173,9 +162,24 @@ func (c *cliSession) doExec(req *http.Request) error {
Subprotocols: protocols,
}

initState := &TerminalState{}
if c.opts.TTY {
fd := os.Stdin.Fd()
if term.IsTerminal(fd) {
initState.Fd = fd
initState.StateBlob, err = term.SetRawTerminal(initState.Fd)
if err != nil {
return err
}
defer term.RestoreTerminal(initState.Fd, initState.StateBlob)
}
}

rt := &WebsocketRoundTripper{
Callback: WsCallback,
Dialer: dialer,
Callback: WsCallbackWrapper,
Dialer: dialer,
TermState: initState,
opts: c.opts,
}

rter, err := rest.HTTPWrappersForConfig(c.clientConf, rt)
Expand All @@ -190,109 +194,3 @@ func (c *cliSession) doExec(req *http.Request) error {
}
return nil
}

func (d *WebsocketRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) {
conn, resp, err := d.Dialer.Dial(r.URL.String(), r.Header)
if e, ok := err.(*net.OpError); ok {
return nil, fmt.Errorf("Error connecting to %s, %s", e.Addr, e.Err)
} else if err != nil {
return nil, err
} else if resp.StatusCode != 101 {
var msg ApiServerError
err := json.NewDecoder(resp.Body).Decode(&msg)
if err != nil {
return nil, errors.New("Error from server, unable to decode response")
}
return nil, fmt.Errorf("Error from server (%s): %s", msg.Reason, msg.Message)
}
defer conn.Close()
return resp, d.Callback(conn)
}

func WsCallback(ws *websocket.Conn) error {
errChan := make(chan error, 3)
var sendBuffer bytes.Buffer

wg := sync.WaitGroup{}
wg.Add(2)

// send
go func() {
defer wg.Done()
buf := make([]byte, 1025)
for {
n, err := os.Stdin.Read(buf[1:])
if err != nil {
errChan <- err
return
}

sendBuffer.Write(buf[1:n])
sendBuffer.Write([]byte{13, 10})
err = ws.WriteMessage(websocket.BinaryMessage, buf[:n+1])
if err != nil {
errChan <- err
return
}
}
}()

// recv
go func() {
defer wg.Done()
for {
msgType, buf, err := ws.ReadMessage()
if err != nil {
errChan <- err
return
}
if msgType != websocket.BinaryMessage {
errChan <- errors.New("Received unexpected websocket message")
return
}

if len(buf) > 1 {
var w io.Writer
switch buf[0] {
case stdout:
w = os.Stdout
case stderr:
w = os.Stderr
}

if w == nil {
continue
}

// ash terminal hack
b := bytes.Replace(buf[1:], []byte("\x1b\x5b\x36\x6e"), []byte(""), -1)
out := bytes.Replace(b, sendBuffer.Bytes(), []byte(""), -1)

_, err = w.Write(out)
if err != nil {
errChan <- err
return
}
}
sendBuffer.Reset()
}
}()

go func() {
wg.Wait()
close(errChan)
}()

for err := range errChan {
if e, ok := err.(*websocket.CloseError); ok {
klog.V(4).Infof("Closing websocket connection with error code %d, err: %s", e.Code, err)
}
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
return nil
} else if errors.Is(err, io.EOF) {
return nil
}
return err
}
return nil
}
Loading

0 comments on commit 384a861

Please sign in to comment.