Skip to content

Commit

Permalink
Merge pull request #72 from txn2/v1.9.x
Browse files Browse the repository at this point in the history
V1.9.x improved support for healess services
  • Loading branch information
cjimti authored Aug 8, 2019
2 parents 77c6a2d + 716cb05 commit 99e6407
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 90 deletions.
23 changes: 20 additions & 3 deletions cmd/kubefwd/kubefwd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ limitations under the License.
package main

import (
"log"
"os"

"bytes"
"fmt"
"os"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/txn2/kubefwd/cmd/kubefwd/services"
)
Expand Down Expand Up @@ -54,8 +54,25 @@ func newRootCmd() *cobra.Command {
return cmd
}

type LogOutputSplitter struct{}

func (splitter *LogOutputSplitter) Write(p []byte) (n int, err error) {
if bytes.Contains(p, []byte("level=error")) || bytes.Contains(p, []byte("level=warn")) {
return os.Stderr.Write(p)
}
return os.Stdout.Write(p)
}

func main() {

log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
ForceColors: true,
TimestampFormat: "15:04:05",
})

log.SetOutput(&LogOutputSplitter{})

log.Print(` _ _ __ _`)
log.Print(`| | ___ _| |__ ___ / _|_ ____| |`)
log.Print(`| |/ / | | | '_ \ / _ \ |_\ \ /\ / / _ |`)
Expand Down
171 changes: 88 additions & 83 deletions cmd/kubefwd/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,20 @@ package services

import (
"fmt"
"log"
"os"
"path/filepath"
"strconv"
"sync"

"github.com/txn2/txeh"

"github.com/txn2/kubefwd/pkg/fwdcfg"
"github.com/txn2/kubefwd/pkg/fwdhost"
"github.com/txn2/kubefwd/pkg/fwdnet"
"github.com/txn2/kubefwd/pkg/fwdport"
"github.com/txn2/kubefwd/pkg/fwdpub"
"github.com/txn2/kubefwd/pkg/utils"
"github.com/txn2/txeh"

log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -48,9 +47,10 @@ var verbose bool
var domain string

func init() {

// override error output from k8s.io/apimachinery/pkg/util/runtime
runtime.ErrorHandlers[0] = func(err error) {
log.Printf("Runtime error: %s", err.Error())
log.Errorf("Runtime error: %s", err.Error())
}

cfgFilePath := ""
Expand Down Expand Up @@ -250,7 +250,7 @@ func fwdServices(opts FwdServiceOpts) error {
return err
}
if len(services.Items) < 1 {
log.Printf("WARNING: No services found for namespace %s.\n", opts.Namespace)
log.Warnf("WARNING: No services found for namespace %s.\n", opts.Namespace)
return nil
}

Expand All @@ -266,129 +266,134 @@ func fwdServices(opts FwdServiceOpts) error {
selector := mapToSelectorStr(svc.Spec.Selector)

if selector == "" {
log.Printf("WARNING: No backing pods for service %s in %s on cluster %s.\n", svc.Name, svc.Namespace, svc.ClusterName)
log.Warnf("WARNING: No backing pods for service %s in %s on cluster %s.\n", svc.Name, svc.Namespace, svc.ClusterName)

continue
}

pods, err := opts.ClientSet.CoreV1().Pods(svc.Namespace).List(metav1.ListOptions{LabelSelector: selector})

if err != nil {
log.Printf("WARNING: No pods found for %s: %s\n", selector, err.Error())
log.Warnf("WARNING: No pods found for %s: %s\n", selector, err.Error())

// TODO: try again after a time

continue
}

if len(pods.Items) < 1 {
log.Printf("WARNING: No pods returned for service %s in %s on cluster %s.\n", svc.Name, svc.Namespace, svc.ClusterName)
log.Warnf("WARNING: No pods returned for service %s in %s on cluster %s.\n", svc.Name, svc.Namespace, svc.ClusterName)

// TODO: try again after a time

continue
}

if svc.Spec.ClusterIP != "None" {
pods.Items = pods.Items[:1]
}
podLoop := func(pods []v1.Pod, podName bool) {
for _, pod := range pods {

for _, pod := range pods.Items {
podPort := ""
svcName := ""

podPort := ""
svcName := ""
localIp, dInc, err := fwdnet.ReadyInterface(127, 1, opts.IpC, d, podPort)
d = dInc

localIp, dInc, err := fwdnet.ReadyInterface(127, 1, opts.IpC, d, podPort)
d = dInc
for _, port := range svc.Spec.Ports {

for _, port := range svc.Spec.Ports {
podPort = port.TargetPort.String()
localPort := strconv.Itoa(int(port.Port))

podPort = port.TargetPort.String()
localPort := strconv.Itoa(int(port.Port))
if _, err := strconv.Atoi(podPort); err != nil {
// search a pods containers for the named port
if namedPodPort, ok := portSearch(podPort, pod.Spec.Containers); ok == true {
podPort = namedPodPort
}
}

if _, err := strconv.Atoi(podPort); err != nil {
// search a pods containers for the named port
if namedPodPort, ok := portSearch(podPort, pod.Spec.Containers); ok == true {
podPort = namedPodPort
_, err = opts.ClientSet.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
if err != nil {
log.Warnf("WARNING: Error getting pod: %s\n", err.Error())
break
}
}

_, err = opts.ClientSet.CoreV1().Pods(pod.Namespace).Get(pod.Name, metav1.GetOptions{})
if err != nil {
log.Printf("WARNING: Error getting pod: %s\n", err.Error())
break
}
serviceHostName := svc.Name

serviceHostName := svc.Name
if podName {
serviceHostName = pod.Name + "." + serviceHostName
}

if svc.Spec.ClusterIP == "None" {
serviceHostName = pod.Name + "." + serviceHostName
}
svcName = serviceHostName

svcName = serviceHostName
if opts.ShortName != true {
serviceHostName = serviceHostName + "." + pod.Namespace
}

if opts.ShortName != true {
serviceHostName = serviceHostName + "." + pod.Namespace
}
if opts.Domain != "" {
if verbose {
log.Printf("Using domain %s in generated hostnames", opts.Domain)
}
serviceHostName = serviceHostName + "." + opts.Domain
}

if opts.Domain != "" {
if verbose {
log.Printf("Using domain %s in generated hostnames", opts.Domain)
if opts.Remote {
serviceHostName = fmt.Sprintf("%s.svc.cluster.%s", serviceHostName, opts.Context)
}
serviceHostName = serviceHostName + "." + opts.Domain
}

if opts.Remote {
serviceHostName = fmt.Sprintf("%s.svc.cluster.%s", serviceHostName, opts.Context)
}
if verbose {
log.Printf("Resolving: %s%s to %s\n",
svc.Name,
serviceHostName,
localIp.String(),
)
}

if verbose {
log.Printf("Resolving: %s%s to %s\n",
svc.Name,
log.Printf("Forwarding: %s:%d to pod %s:%s\n",
serviceHostName,
localIp.String(),
port.Port,
pod.Name,
podPort,
)
}

log.Printf("Forwarding: %s:%d to pod %s:%s\n",
serviceHostName,
port.Port,
pod.Name,
podPort,
)

pfo := &fwdport.PortForwardOpts{
Out: publisher,
Config: opts.ClientConfig,
ClientSet: opts.ClientSet,
Context: opts.Context,
Namespace: pod.Namespace,
Service: svcName,
PodName: pod.Name,
PodPort: podPort,
LocalIp: localIp,
LocalPort: localPort,
Hostfile: opts.Hostfile,
ShortName: opts.ShortName,
Remote: opts.Remote,
ExitOnFail: exitOnFail,
Domain: domain,
}

opts.Wg.Add(1)
go func() {
err := fwdport.PortForward(pfo)
if err != nil {
log.Printf("ERROR: %s", err.Error())
pfo := &fwdport.PortForwardOpts{
Out: publisher,
Config: opts.ClientConfig,
ClientSet: opts.ClientSet,
Context: opts.Context,
Namespace: pod.Namespace,
Service: svcName,
PodName: pod.Name,
PodPort: podPort,
LocalIp: localIp,
LocalPort: localPort,
Hostfile: opts.Hostfile,
ShortName: opts.ShortName,
Remote: opts.Remote,
ExitOnFail: exitOnFail,
Domain: domain,
}

log.Printf("Stopped forwarding %s in %s.", pfo.Service, pfo.Namespace)
opts.Wg.Add(1)
go func() {
err := fwdport.PortForward(pfo)
if err != nil {
log.Printf("ERROR: %s", err.Error())
}

opts.Wg.Done()
}()
log.Printf("Stopped forwarding %s in %s.", pfo.Service, pfo.Namespace)

opts.Wg.Done()
}()

}
}
}

podLoop(pods.Items[:1], false)

if svc.Spec.ClusterIP == "None" {
podLoop(pods.Items, true)
}

}

return nil
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ require (
github.com/elazarl/goproxy v0.0.0-20181111060418-2ce16c963a8a // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/prometheus/common v0.0.0-20190124163007-cfeb6f9992ff
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.3
github.com/txn2/txeh v1.1.0
gopkg.in/yaml.v2 v2.2.2
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ github.com/prometheus/common v0.0.0-20190124163007-cfeb6f9992ff/go.mod h1:TNfzLD
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/sirupsen/logrus v1.2.0 h1:juTguoYk5qI21pwyTXY3B3Y5cOTH3ZUyZCg1v/mihuo=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v0.0.0-20181223182923-24fa6976df40 h1:+fBLXG9122bARNDiwUt7b1hnca/mLnUxXlH7ChuFKPQ=
Expand Down Expand Up @@ -118,6 +120,8 @@ golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5 h1:mzjBh+S5frKOsOBobWIMAbXav
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313 h1:pczuHS43Cp2ktBEEmLwScxgjWsBSzdaQiKzUyf3DTTc=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894 h1:Cz4ceDQGXuKRnVBDTS23GTn/pU5OE2C0WrNTOYK1Uuc=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.1-0.20181227161524-e6919f6577db h1:6/JqlYfC1CCaLnGceQTI+sDGhC9UBSPAsBqI0Gun6kU=
Expand Down
8 changes: 4 additions & 4 deletions pkg/portforward/portforward.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
headers.Set(v1.PortForwardRequestIDHeader, strconv.Itoa(requestID))
errorStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
runtime.HandleError(fmt.Errorf("error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err))
runtime.HandleError(fmt.Errorf("error creating error stream for %s -> %s: %v", conn.LocalAddr(), conn.RemoteAddr(), err))
return
}
// we're not writing to this stream
Expand All @@ -300,9 +300,9 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
message, err := ioutil.ReadAll(errorStream)
switch {
case err != nil:
errorChan <- fmt.Errorf("error reading from error stream for port %d -> %d: %v", port.Local, port.Remote, err)
errorChan <- fmt.Errorf("error reading from error stream for port %s -> %s: %v", conn.LocalAddr(), conn.RemoteAddr(), err)
case len(message) > 0:
errorChan <- fmt.Errorf("an error occurred forwarding %d -> %d: %v", port.Local, port.Remote, string(message))
errorChan <- fmt.Errorf("an error occurred forwarding %s -> %s: %v", conn.LocalAddr(), conn.RemoteAddr(), string(message))
}
close(errorChan)
}()
Expand All @@ -311,7 +311,7 @@ func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
headers.Set(v1.StreamType, v1.StreamTypeData)
dataStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err))
runtime.HandleError(fmt.Errorf("error creating forwarding stream for port %s -> %s: %v", conn.LocalAddr(), conn.RemoteAddr(), err))
return
}

Expand Down

0 comments on commit 99e6407

Please sign in to comment.