Skip to content

Commit

Permalink
MOD:add a simple kubefwd mainPipeline test, add the labelselector to …
Browse files Browse the repository at this point in the history
…listWatcher, and add update to readme
  • Loading branch information
calmkart committed Nov 19, 2019
1 parent a0cd781 commit f4398bf
Show file tree
Hide file tree
Showing 5 changed files with 415 additions and 121 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ Check out the [releases](https://github.com/txn2/kubefwd/releases) section on Gi
## Usage

Forward all services for the namespace `the-project`. Kubefwd finds the first Pod associated with each Kubernetes service found in the Namespace and port forwards it based on the Service spec to a local IP address and port. A domain name is added to your /etc/hosts file pointing to the local IP.
### Update
Forwarding of headlesss Service is currently supported, Kubefwd forward all Pods for headless service;

At the same time, the namespace-level service monitoring is supported. When a new service is created or the old service is deleted under the namespace, kubefwd can automatically start/end forwarding.
```bash
sudo kubefwd svc -n the-project
```
Expand Down
5 changes: 5 additions & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ docker exec the-project curl -s elasticsearch:9200
## 用法

转发namespace `the-project`下的所有服务。 Kubefwd找到Kubernetess集群中,该namespace下对应的Service端口匹配的第一个Pod,并将其转发到本地IP地址和端口。同时service的域名将被添加到本地的 hosts文件中。

### 更新
当前已支持headlesss Service的转发,Kubefwd将转发所有headlesss Service的Pod;

同时支持namespace级服务监听,当namespace下有新Service创建或旧Service删除时,Kubefwd能够自动完成转发/结束转发;
```bash
sudo kubefwd svc -n the-project
```
Expand Down
241 changes: 123 additions & 118 deletions cmd/kubefwd/services/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ var exitOnFail bool
var verbose bool
var domain string
var AllPortForwardOpts []*fwdport.PortForwardOpts
var lock sync.Mutex

func init() {

Expand Down Expand Up @@ -78,12 +77,15 @@ var Cmd = &cobra.Command{
" kubefwd svc -n default -n the-project\n" +
" kubefwd svc -n default -d internal.example.com\n" +
" kubefwd svc -n the-project -x prod-cluster\n",
Run: func(cmd *cobra.Command, args []string) {
Run: runCmd,
}

func runCmd(cmd *cobra.Command, args []string) {

hasRoot, err := utils.CheckRoot()
hasRoot, err := utils.CheckRoot()

if !hasRoot {
fmt.Printf(`
if !hasRoot {
fmt.Printf(`
This program requires superuser privileges to run. These
privileges are required to add IP address aliases to your
loopback interface. Superuser privileges are also needed
Expand All @@ -94,145 +96,144 @@ Try:
- Running a shell with administrator rights (Windows)
`)
if err != nil {
log.Fatalf("Root check failure: %s", err.Error())
}
return
if err != nil {
log.Fatalf("Root check failure: %s", err.Error())
}
return
}

log.Println("Press [Ctrl-C] to stop forwarding.")
log.Println("'cat /etc/hosts' to see all host entries.")
log.Println("Press [Ctrl-C] to stop forwarding.")
log.Println("'cat /etc/hosts' to see all host entries.")

hostFile, err := txeh.NewHostsDefault()
if err != nil {
log.Fatalf("Hostfile error: %s", err.Error())
}
hostFile, err := txeh.NewHostsDefault()
if err != nil {
log.Fatalf("Hostfile error: %s", err.Error())
}

log.Printf("Loaded hosts file %s\n", hostFile.ReadFilePath)
log.Printf("Loaded hosts file %s\n", hostFile.ReadFilePath)

msg, err := fwdhost.BackupHostFile(hostFile)
if err != nil {
log.Fatalf("Error backing up hostfile: %s\n", err.Error())
}
msg, err := fwdhost.BackupHostFile(hostFile)
if err != nil {
log.Fatalf("Error backing up hostfile: %s\n", err.Error())
}

log.Printf("Hostfile management: %s", msg)
log.Printf("Hostfile management: %s", msg)

// if sudo -E is used and the KUBECONFIG environment variable is set
// it's easy to merge with kubeconfig files in env automatic.
// if KUBECONFIG is blank, ToRawKubeConfigLoader() will use the
// default kubeconfig file in $HOME/.kube/config
cfgFilePath := ""
// if sudo -E is used and the KUBECONFIG environment variable is set
// it's easy to merge with kubeconfig files in env automatic.
// if KUBECONFIG is blank, ToRawKubeConfigLoader() will use the
// default kubeconfig file in $HOME/.kube/config
cfgFilePath := ""

// if we set the option --kubeconfig, It will have a higher priority
// than KUBECONFIG environment. so it will override the KubeConfig options.
flagCfgFilePath := cmd.Flag("kubeconfig").Value.String()
if flagCfgFilePath != "" {
cfgFilePath = flagCfgFilePath
}
// if we set the option --kubeconfig, It will have a higher priority
// than KUBECONFIG environment. so it will override the KubeConfig options.
flagCfgFilePath := cmd.Flag("kubeconfig").Value.String()
if flagCfgFilePath != "" {
cfgFilePath = flagCfgFilePath
}

// create a ConfigGetter
configGetter := fwdcfg.NewConfigGetter()
// build the ClientConfig
rawConfig, err := configGetter.GetClientConfig(cfgFilePath)
if err != nil {
log.Fatalf("Error in get rawConfig: %s\n", err.Error())
}
// create a ConfigGetter
configGetter := fwdcfg.NewConfigGetter()
// build the ClientConfig
rawConfig, err := configGetter.GetClientConfig(cfgFilePath)
if err != nil {
log.Fatalf("Error in get rawConfig: %s\n", err.Error())
}

// labels selector to filter services
// see: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
selector := cmd.Flag("selector").Value.String()
listOptions := metav1.ListOptions{}
if selector != "" {
listOptions.LabelSelector = selector
}
// labels selector to filter services
// see: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/
selector := cmd.Flag("selector").Value.String()
listOptions := metav1.ListOptions{}
if selector != "" {
listOptions.LabelSelector = selector
}

// if no namespaces were specified, check config then
// explicitly set one to "default"
if len(namespaces) < 1 {
namespaces = []string{"default"}
x := rawConfig.CurrentContext
// use the first context if specified
if len(contexts) > 0 {
x = contexts[0]
}
// if no namespaces were specified, check config then
// explicitly set one to "default"
if len(namespaces) < 1 {
namespaces = []string{"default"}
x := rawConfig.CurrentContext
// use the first context if specified
if len(contexts) > 0 {
x = contexts[0]
}

for ctxName, ctxConfig := range rawConfig.Contexts {
if ctxName == x {
if ctxConfig.Namespace != "" {
log.Printf("Using namespace %s from current context %s.", ctxConfig.Namespace, ctxName)
namespaces = []string{ctxConfig.Namespace}
break
}
for ctxName, ctxConfig := range rawConfig.Contexts {
if ctxName == x {
if ctxConfig.Namespace != "" {
log.Printf("Using namespace %s from current context %s.", ctxConfig.Namespace, ctxName)
namespaces = []string{ctxConfig.Namespace}
break
}
}
}
}

// ipC is the class C for the local IP address
// increment this for each cluster
// ipD is the class D for the local IP address
// increment this for each service in each cluster
ipC := 27
ipD := 1
// ipC is the class C for the local IP address
// increment this for each cluster
// ipD is the class D for the local IP address
// increment this for each service in each cluster
ipC := 27
ipD := 1

wg := &sync.WaitGroup{}
wg := &sync.WaitGroup{}

stopListenCh := make(chan struct{})
defer close(stopListenCh)
stopListenCh := make(chan struct{})
defer close(stopListenCh)

// if no context override
if len(contexts) < 1 {
contexts = append(contexts, rawConfig.CurrentContext)
// if no context override
if len(contexts) < 1 {
contexts = append(contexts, rawConfig.CurrentContext)
}

for i, ctx := range contexts {
// k8s REST config
restConfig, err := configGetter.GetRestConfig(cfgFilePath, ctx)
if err != nil {
log.Fatalf("Error generating REST configuration: %s\n", err.Error())
}

for i, ctx := range contexts {
// k8s REST config
restConfig, err := configGetter.GetRestConfig(cfgFilePath, ctx)
if err != nil {
log.Fatalf("Error generating REST configuration: %s\n", err.Error())
}
// create the k8s clientSet
clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
log.Fatalf("Error creating k8s clientSet: %s\n", err.Error())
}

// create the k8s clientSet
clientSet, err := kubernetes.NewForConfig(restConfig)
if err != nil {
log.Fatalf("Error creating k8s clientSet: %s\n", err.Error())
}
// create the k8s RESTclient
restClient, err := configGetter.GetRESTClient()
if err != nil {
log.Fatalf("Error creating k8s RestClient: %s\n", err.Error())
}

// create the k8s RESTclient
restClient, err := configGetter.GetRESTClient()
if err != nil {
log.Fatalf("Error creating k8s RestClient: %s\n", err.Error())
for ii, namespace := range namespaces {
// ShortName field only use short name for the first namespace and context
fwdServiceOpts := FwdServiceOpts{
Wg: wg,
ClientSet: clientSet,
Context: ctx,
Namespace: namespace,
ListOptions: listOptions,
Hostfile: &fwdport.HostFileWithLock{Hosts: hostFile},
ClientConfig: restConfig,
RESTClient: restClient,
ShortName: i < 1 && ii < 1,
Remote: i > 0,
IpC: byte(ipC),
IpD: ipD,
ExitOnFail: exitOnFail,
Domain: domain,
}
go fwdServiceOpts.StartListen(stopListenCh)

for ii, namespace := range namespaces {
// ShortName field only use short name for the first namespace and context
fwdServiceOpts := FwdServiceOpts{
Wg: wg,
ClientSet: clientSet,
Context: ctx,
Namespace: namespace,
ListOptions: listOptions,
Hostfile: &fwdport.HostFileWithLock{Hosts: hostFile},
ClientConfig: restConfig,
RESTClient: restClient,
ShortName: i < 1 && ii < 1,
Remote: i > 0,
IpC: byte(ipC),
IpD: ipD,
ExitOnFail: exitOnFail,
Domain: domain,
}
go fwdServiceOpts.StartListen(stopListenCh)

ipC = ipC + 1
}
ipC = ipC + 1
}
}

time.Sleep(2 * time.Second)
time.Sleep(2 * time.Second)

wg.Wait()
wg.Wait()

log.Printf("Done...\n")
},
log.Printf("Done...\n")
}

type FwdServiceOpts struct {
Expand All @@ -254,7 +255,11 @@ type FwdServiceOpts struct {

func (opts *FwdServiceOpts) StartListen(stopListenCh <-chan struct{}) {

watchlist := cache.NewListWatchFromClient(opts.RESTClient, "services", v1.NamespaceDefault, fields.Everything())
optionsModifier := func(options *metav1.ListOptions) {
options.FieldSelector = fields.Everything().String()
options.LabelSelector = opts.ListOptions.LabelSelector
}
watchlist := cache.NewFilteredListWatchFromClient(opts.RESTClient, "services", v1.NamespaceDefault, optionsModifier)
_, controller := cache.NewInformer(watchlist, &v1.Service{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: opts.AddServiceHandler,
DeleteFunc: opts.DeleteServiceHandler,
Expand Down Expand Up @@ -350,7 +355,7 @@ func (opts *FwdServiceOpts) ForwardService(svcName string, svcNamespace string)

func (opts *FwdServiceOpts) UnForwardService(svcName string, svcNamespace string) {

lock.Lock()
utils.Lock.Lock()
// search for the PortForwardOpts if the svc should be unForward.
// stop the PortForward and threadSafe delete the PortForward obj.
for i := 0; i < len(AllPortForwardOpts); i++ {
Expand All @@ -361,7 +366,7 @@ func (opts *FwdServiceOpts) UnForwardService(svcName string, svcNamespace string
i--
}
}
lock.Unlock()
utils.Lock.Unlock()
return
}

Expand Down
Loading

0 comments on commit f4398bf

Please sign in to comment.