From 0dfb7446302cdcb08cb991019885f28e313dd67c Mon Sep 17 00:00:00 2001 From: leonnicolas <60091705+leonnicolas@users.noreply.github.com> Date: Fri, 8 Apr 2022 13:42:13 +0200 Subject: [PATCH] kgctl connect (#269) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * kgctl connect Use kgctl connect to connect your laptop to a cluster. Signed-off-by: leonnicolas * cmd/kgctl: finish connect command This commit fixes some bugs and finishes the implementation of the `kgctl connect` command. Signed-off-by: Lucas Servén Marín * e2e: add tests for kgctl connect Signed-off-by: Lucas Servén Marín * docs: add documentation for `kgctl connect` Signed-off-by: Lucas Servén Marín * pkg/mesh: move peer route generation to mesh Signed-off-by: Lucas Servén Marín Co-authored-by: Lucas Servén Marín --- Makefile | 2 +- cmd/kgctl/connect_linux.go | 374 +++++++++++++++++++++++++++++++++++++ cmd/kgctl/connect_other.go | 35 ++++ cmd/kgctl/graph.go | 12 +- cmd/kgctl/main.go | 20 +- cmd/kgctl/showconf.go | 30 +-- docs/kgctl.md | 40 +++- e2e/kgctl.sh | 17 ++ pkg/mesh/routes.go | 68 +++++++ pkg/mesh/topology.go | 5 +- 10 files changed, 569 insertions(+), 34 deletions(-) create mode 100644 cmd/kgctl/connect_linux.go create mode 100644 cmd/kgctl/connect_other.go create mode 100644 e2e/kgctl.sh diff --git a/Makefile b/Makefile index b5448bb2..4b66bd97 100644 --- a/Makefile +++ b/Makefile @@ -209,7 +209,7 @@ $(BASH_UNIT): chmod +x $@ e2e: container $(KIND_BINARY) $(KUBECTL_BINARY) $(BASH_UNIT) bin/$(OS)/$(ARCH)/kgctl - KILO_IMAGE=$(IMAGE):$(ARCH)-$(VERSION) KIND_BINARY=$(KIND_BINARY) KUBECTL_BINARY=$(KUBECTL_BINARY) KGCTL_BINARY=$(shell pwd)/bin/$(OS)/$(ARCH)/kgctl $(BASH_UNIT) $(BASH_UNIT_FLAGS) ./e2e/setup.sh ./e2e/full-mesh.sh ./e2e/location-mesh.sh ./e2e/multi-cluster.sh ./e2e/handlers.sh ./e2e/teardown.sh + KILO_IMAGE=$(IMAGE):$(ARCH)-$(VERSION) KIND_BINARY=$(KIND_BINARY) KUBECTL_BINARY=$(KUBECTL_BINARY) KGCTL_BINARY=$(shell pwd)/bin/$(OS)/$(ARCH)/kgctl $(BASH_UNIT) $(BASH_UNIT_FLAGS) ./e2e/setup.sh ./e2e/full-mesh.sh ./e2e/location-mesh.sh ./e2e/multi-cluster.sh ./e2e/handlers.sh ./e2e/kgctl.sh ./e2e/teardown.sh header: .header @HEADER=$$(cat .header); \ diff --git a/cmd/kgctl/connect_linux.go b/cmd/kgctl/connect_linux.go new file mode 100644 index 00000000..42496fcf --- /dev/null +++ b/cmd/kgctl/connect_linux.go @@ -0,0 +1,374 @@ +// Copyright 2022 the Kilo authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux +// +build linux + +package main + +import ( + "context" + "errors" + "fmt" + "net" + "os" + "sort" + "strings" + "syscall" + "time" + + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/oklog/run" + "github.com/spf13/cobra" + "golang.zx2c4.com/wireguard/wgctrl" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/squat/kilo/pkg/iproute" + "github.com/squat/kilo/pkg/k8s/apis/kilo/v1alpha1" + "github.com/squat/kilo/pkg/mesh" + "github.com/squat/kilo/pkg/route" + "github.com/squat/kilo/pkg/wireguard" +) + +var ( + logLevel string + connectOpts struct { + allowedIP net.IPNet + allowedIPs []net.IPNet + privateKey string + cleanUp bool + mtu uint + resyncPeriod time.Duration + interfaceName string + persistentKeepalive int + } +) + +func takeIPNet(_ net.IP, i *net.IPNet, err error) *net.IPNet { + if err != nil { + panic(err) + } + return i +} + +func connect() *cobra.Command { + cmd := &cobra.Command{ + Use: "connect", + Args: cobra.ExactArgs(1), + RunE: runConnect, + Short: "connect to a Kilo cluster as a peer over WireGuard", + SilenceUsage: true, + } + cmd.Flags().IPNetVarP(&connectOpts.allowedIP, "allowed-ip", "a", *takeIPNet(net.ParseCIDR("10.10.10.10/32")), "Allowed IP of the peer.") + cmd.Flags().StringSliceVar(&allowedIPs, "allowed-ips", []string{}, "Additional allowed IPs of the cluster, e.g. the service CIDR.") + cmd.Flags().StringVar(&logLevel, "log-level", logLevelInfo, fmt.Sprintf("Log level to use. Possible values: %s", availableLogLevels)) + cmd.Flags().StringVar(&connectOpts.privateKey, "private-key", "", "Path to an existing WireGuard private key file.") + cmd.Flags().BoolVar(&connectOpts.cleanUp, "clean-up", true, "Should Kilo clean up the routes and interface when it shuts down?") + cmd.Flags().UintVar(&connectOpts.mtu, "mtu", uint(1420), "The MTU for the WireGuard interface.") + cmd.Flags().DurationVar(&connectOpts.resyncPeriod, "resync-period", 30*time.Second, "How often should Kilo reconcile?") + cmd.Flags().StringVarP(&connectOpts.interfaceName, "interface", "i", mesh.DefaultKiloInterface, "Name of the Kilo interface to use; if it does not exist, it will be created.") + cmd.Flags().IntVar(&connectOpts.persistentKeepalive, "persistent-keepalive", 10, "How often should WireGuard send keepalives? Setting to 0 will disable sending keepalives.") + + availableLogLevels = strings.Join([]string{ + logLevelAll, + logLevelDebug, + logLevelInfo, + logLevelWarn, + logLevelError, + logLevelNone, + }, ", ") + + return cmd +} + +func runConnect(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + logger := log.NewJSONLogger(log.NewSyncWriter(os.Stdout)) + switch logLevel { + case logLevelAll: + logger = level.NewFilter(logger, level.AllowAll()) + case logLevelDebug: + logger = level.NewFilter(logger, level.AllowDebug()) + case logLevelInfo: + logger = level.NewFilter(logger, level.AllowInfo()) + case logLevelWarn: + logger = level.NewFilter(logger, level.AllowWarn()) + case logLevelError: + logger = level.NewFilter(logger, level.AllowError()) + case logLevelNone: + logger = level.NewFilter(logger, level.AllowNone()) + default: + return fmt.Errorf("log level %s unknown; possible values are: %s", logLevel, availableLogLevels) + } + logger = log.With(logger, "ts", log.DefaultTimestampUTC) + logger = log.With(logger, "caller", log.DefaultCaller) + peerName := args[0] + + for i := range allowedIPs { + _, aip, err := net.ParseCIDR(allowedIPs[i]) + if err != nil { + return err + } + connectOpts.allowedIPs = append(connectOpts.allowedIPs, *aip) + } + + var privateKey wgtypes.Key + var err error + if connectOpts.privateKey == "" { + privateKey, err = wgtypes.GeneratePrivateKey() + if err != nil { + return fmt.Errorf("failed to generate private key: %w", err) + } + } else { + raw, err := os.ReadFile(connectOpts.privateKey) + if err != nil { + return fmt.Errorf("failed to read private key: %w", err) + } + privateKey, err = wgtypes.ParseKey(string(raw)) + if err != nil { + return fmt.Errorf("failed to parse private key: %w", err) + } + } + publicKey := privateKey.PublicKey() + level.Info(logger).Log("msg", "generated public key", "key", publicKey) + + if _, err := opts.kc.KiloV1alpha1().Peers().Get(ctx, peerName, metav1.GetOptions{}); apierrors.IsNotFound(err) { + peer := &v1alpha1.Peer{ + ObjectMeta: metav1.ObjectMeta{ + Name: peerName, + }, + Spec: v1alpha1.PeerSpec{ + AllowedIPs: []string{connectOpts.allowedIP.String()}, + PersistentKeepalive: connectOpts.persistentKeepalive, + PublicKey: publicKey.String(), + }, + } + if _, err := opts.kc.KiloV1alpha1().Peers().Create(ctx, peer, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("failed to create peer: %w", err) + } + level.Info(logger).Log("msg", "created peer", "peer", peerName) + if connectOpts.cleanUp { + defer func() { + ctxWithTimeout, cancelWithTimeout := context.WithTimeout(context.Background(), 10*time.Second) + defer cancelWithTimeout() + if err := opts.kc.KiloV1alpha1().Peers().Delete(ctxWithTimeout, peerName, metav1.DeleteOptions{}); err != nil { + level.Error(logger).Log("err", fmt.Sprintf("failed to delete peer: %v", err)) + } else { + level.Info(logger).Log("msg", "deleted peer", "peer", peerName) + } + }() + } + + } else if err != nil { + return fmt.Errorf("failed to get peer: %w", err) + } + + iface, _, err := wireguard.New(connectOpts.interfaceName, connectOpts.mtu) + if err != nil { + return fmt.Errorf("failed to create wg interface: %w", err) + } + level.Info(logger).Log("msg", "created WireGuard interface", "name", connectOpts.interfaceName, "index", iface) + + table := route.NewTable() + if connectOpts.cleanUp { + defer cleanUp(iface, table, logger) + } + + if err := iproute.SetAddress(iface, &connectOpts.allowedIP); err != nil { + return err + } + level.Info(logger).Log("msg", "set IP address of WireGuard interface", "IP", connectOpts.allowedIP.String()) + + if err := iproute.Set(iface, true); err != nil { + return err + } + + var g run.Group + g.Add(run.SignalHandler(ctx, syscall.SIGINT, syscall.SIGTERM)) + + { + g.Add( + func() error { + errCh, err := table.Run(ctx.Done()) + if err != nil { + return fmt.Errorf("failed to watch for route table updates: %w", err) + } + for { + select { + case err, ok := <-errCh: + if ok { + level.Error(logger).Log("err", err.Error()) + } else { + return nil + } + case <-ctx.Done(): + return nil + } + } + }, + func(err error) { + cancel() + var serr run.SignalError + if ok := errors.As(err, &serr); ok { + level.Debug(logger).Log("msg", "received signal", "signal", serr.Signal.String(), "err", err.Error()) + } else { + level.Error(logger).Log("msg", "received error", "err", err.Error()) + } + }, + ) + } + { + g.Add( + func() error { + level.Info(logger).Log("msg", "starting syncer") + for { + if err := sync(table, peerName, privateKey, iface, logger); err != nil { + level.Error(logger).Log("msg", "failed to sync", "err", err.Error()) + } + select { + case <-time.After(connectOpts.resyncPeriod): + case <-ctx.Done(): + return nil + } + } + }, func(err error) { + cancel() + var serr run.SignalError + if ok := errors.As(err, &serr); ok { + level.Debug(logger).Log("msg", "received signal", "signal", serr.Signal.String(), "err", err.Error()) + } else { + level.Error(logger).Log("msg", "received error", "err", err.Error()) + } + }) + } + + err = g.Run() + var serr run.SignalError + if ok := errors.As(err, &serr); ok { + return nil + } + return err +} + +func cleanUp(iface int, t *route.Table, logger log.Logger) { + if err := iproute.Set(iface, false); err != nil { + level.Error(logger).Log("err", fmt.Sprintf("failed to set WireGuard interface down: %v", err)) + } + if err := iproute.RemoveInterface(iface); err != nil { + level.Error(logger).Log("err", fmt.Sprintf("failed to remove WireGuard interface: %v", err)) + } + if err := t.CleanUp(); err != nil { + level.Error(logger).Log("failed to clean up routes: %v", err) + } + + return +} + +func sync(table *route.Table, peerName string, privateKey wgtypes.Key, iface int, logger log.Logger) error { + ns, err := opts.backend.Nodes().List() + if err != nil { + return fmt.Errorf("failed to list nodes: %w", err) + } + for _, n := range ns { + _, err := n.Endpoint.UDPAddr(true) + if err != nil { + return err + } + } + ps, err := opts.backend.Peers().List() + if err != nil { + return fmt.Errorf("failed to list peers: %w", err) + } + // Obtain the Granularity by looking at the annotation of the first node. + if opts.granularity, err = determineGranularity(opts.granularity, ns); err != nil { + return fmt.Errorf("failed to determine granularity: %w", err) + } + var hostname string + var subnet *net.IPNet + nodes := make(map[string]*mesh.Node) + var nodeNames []string + for _, n := range ns { + if n.Ready() { + nodes[n.Name] = n + hostname = n.Name + nodeNames = append(nodeNames, n.Name) + } + if n.WireGuardIP != nil && subnet == nil { + subnet = n.WireGuardIP + } + } + if len(nodes) == 0 { + return errors.New("did not find any valid Kilo nodes in the cluster") + } + if subnet == nil { + return errors.New("did not find a valid Kilo subnet on any node") + } + subnet.IP = subnet.IP.Mask(subnet.Mask) + sort.Strings(nodeNames) + nodes[nodeNames[0]].AllowedLocationIPs = append(nodes[nodeNames[0]].AllowedLocationIPs, connectOpts.allowedIPs...) + peers := make(map[string]*mesh.Peer) + for _, p := range ps { + if p.Ready() { + peers[p.Name] = p + } + } + if _, ok := peers[peerName]; !ok { + return fmt.Errorf("did not find any peer named %q in the cluster", peerName) + } + + t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, opts.port, wgtypes.Key{}, subnet, *peers[peerName].PersistentKeepaliveInterval, logger) + if err != nil { + return fmt.Errorf("failed to create topology: %w", err) + } + conf := t.PeerConf(peerName) + conf.PrivateKey = &privateKey + conf.ListenPort = &opts.port + + wgClient, err := wgctrl.New() + if err != nil { + return err + } + defer wgClient.Close() + + current, err := wgClient.Device(connectOpts.interfaceName) + if err != nil { + return err + } + + var equal bool + var diff string + equal, diff = conf.Equal(current) + if !equal { + // If the key is empty, then it's the first time we are running + // so don't bother printing a diff. + if current.PrivateKey != [wgtypes.KeyLen]byte{} { + level.Info(logger).Log("msg", "WireGuard configurations are different", "diff", diff) + } + level.Debug(logger).Log("msg", "setting WireGuard config", "config", conf.WGConfig()) + if err := wgClient.ConfigureDevice(connectOpts.interfaceName, conf.WGConfig()); err != nil { + return err + } + } + + if err := table.Set(t.PeerRoutes(peerName, iface, connectOpts.allowedIPs)); err != nil { + return fmt.Errorf("failed to update route table: %w", err) + } + + return nil +} diff --git a/cmd/kgctl/connect_other.go b/cmd/kgctl/connect_other.go new file mode 100644 index 00000000..b9e98ffe --- /dev/null +++ b/cmd/kgctl/connect_other.go @@ -0,0 +1,35 @@ +// Copyright 2022 the Kilo authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !linux +// +build !linux + +package main + +import ( + "errors" + + "github.com/spf13/cobra" +) + +func connect() *cobra.Command { + cmd := &cobra.Command{ + Use: "connect", + Short: "not supporred on this OS", + RunE: func(_ *cobra.Command, _ []string) error { + return errors.New("this command is not supported on this OS") + }, + } + return cmd +} diff --git a/cmd/kgctl/graph.go b/cmd/kgctl/graph.go index 46a6b12d..87539c95 100644 --- a/cmd/kgctl/graph.go +++ b/cmd/kgctl/graph.go @@ -34,15 +34,15 @@ func graph() *cobra.Command { func runGraph(_ *cobra.Command, _ []string) error { ns, err := opts.backend.Nodes().List() if err != nil { - return fmt.Errorf("failed to list nodes: %v", err) + return fmt.Errorf("failed to list nodes: %w", err) } ps, err := opts.backend.Peers().List() if err != nil { - return fmt.Errorf("failed to list peers: %v", err) + return fmt.Errorf("failed to list peers: %w", err) } // Obtain the Granularity by looking at the annotation of the first node. - if opts.granularity, err = optainGranularity(opts.granularity, ns); err != nil { - return fmt.Errorf("failed to obtain granularity: %w", err) + if opts.granularity, err = determineGranularity(opts.granularity, ns); err != nil { + return fmt.Errorf("failed to determine granularity: %w", err) } var hostname string @@ -69,11 +69,11 @@ func runGraph(_ *cobra.Command, _ []string) error { } t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, 0, wgtypes.Key{}, subnet, nodes[hostname].PersistentKeepalive, nil) if err != nil { - return fmt.Errorf("failed to create topology: %v", err) + return fmt.Errorf("failed to create topology: %w", err) } g, err := t.Dot() if err != nil { - return fmt.Errorf("failed to generate graph: %v", err) + return fmt.Errorf("failed to generate graph: %w", err) } fmt.Println(g) return nil diff --git a/cmd/kgctl/main.go b/cmd/kgctl/main.go index 97184940..aa0a316f 100644 --- a/cmd/kgctl/main.go +++ b/cmd/kgctl/main.go @@ -62,6 +62,7 @@ var ( opts struct { backend mesh.Backend granularity mesh.Granularity + kc kiloclient.Interface port int } backend string @@ -81,29 +82,29 @@ func runRoot(_ *cobra.Command, _ []string) error { case mesh.FullGranularity: case mesh.AutoGranularity: default: - return fmt.Errorf("mesh granularity %v unknown; posible values are: %s", granularity, availableGranularities) + return fmt.Errorf("mesh granularity %s unknown; posible values are: %s", granularity, availableGranularities) } switch backend { case k8s.Backend: config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { - return fmt.Errorf("failed to create Kubernetes config: %v", err) + return fmt.Errorf("failed to create Kubernetes config: %w", err) } c := kubernetes.NewForConfigOrDie(config) - kc := kiloclient.NewForConfigOrDie(config) + opts.kc = kiloclient.NewForConfigOrDie(config) ec := apiextensions.NewForConfigOrDie(config) - opts.backend = k8s.New(c, kc, ec, topologyLabel, log.NewNopLogger()) + opts.backend = k8s.New(c, opts.kc, ec, topologyLabel, log.NewNopLogger()) default: - return fmt.Errorf("backend %v unknown; posible values are: %s", backend, availableBackends) + return fmt.Errorf("backend %s unknown; posible values are: %s", backend, availableBackends) } if err := opts.backend.Nodes().Init(make(chan struct{})); err != nil { - return fmt.Errorf("failed to initialize node backend: %v", err) + return fmt.Errorf("failed to initialize node backend: %w", err) } if err := opts.backend.Peers().Init(make(chan struct{})); err != nil { - return fmt.Errorf("failed to initialize peer backend: %v", err) + return fmt.Errorf("failed to initialize peer backend: %w", err) } return nil } @@ -130,6 +131,7 @@ func main() { for _, subCmd := range []*cobra.Command{ graph(), showConf(), + connect(), } { cmd.AddCommand(subCmd) } @@ -140,7 +142,7 @@ func main() { } } -func optainGranularity(gr mesh.Granularity, ns []*mesh.Node) (mesh.Granularity, error) { +func determineGranularity(gr mesh.Granularity, ns []*mesh.Node) (mesh.Granularity, error) { if gr == mesh.AutoGranularity { if len(ns) == 0 { return gr, errors.New("could not get any nodes") @@ -150,7 +152,7 @@ func optainGranularity(gr mesh.Granularity, ns []*mesh.Node) (mesh.Granularity, case mesh.LogicalGranularity: case mesh.FullGranularity: default: - return ret, fmt.Errorf("mesh granularity %v is not supported", opts.granularity) + return ret, fmt.Errorf("mesh granularity %s is not supported", opts.granularity) } return ret, nil } diff --git a/cmd/kgctl/showconf.go b/cmd/kgctl/showconf.go index 26700464..1b6b1b61 100644 --- a/cmd/kgctl/showconf.go +++ b/cmd/kgctl/showconf.go @@ -83,7 +83,7 @@ func runShowConf(c *cobra.Command, args []string) error { case outputFormatYAML: showConfOpts.serializer = json.NewYAMLSerializer(json.DefaultMetaFactory, peerCreatorTyper{}, peerCreatorTyper{}) default: - return fmt.Errorf("output format %v unknown; posible values are: %s", showConfOpts.output, availableOutputFormats) + return fmt.Errorf("output format %s unknown; posible values are: %s", showConfOpts.output, availableOutputFormats) } for i := range allowedIPs { _, aip, err := net.ParseCIDR(allowedIPs[i]) @@ -116,15 +116,15 @@ func showConfPeer() *cobra.Command { func runShowConfNode(_ *cobra.Command, args []string) error { ns, err := opts.backend.Nodes().List() if err != nil { - return fmt.Errorf("failed to list nodes: %v", err) + return fmt.Errorf("failed to list nodes: %w", err) } ps, err := opts.backend.Peers().List() if err != nil { - return fmt.Errorf("failed to list peers: %v", err) + return fmt.Errorf("failed to list peers: %w", err) } // Obtain the Granularity by looking at the annotation of the first node. - if opts.granularity, err = optainGranularity(opts.granularity, ns); err != nil { - return fmt.Errorf("failed to obtain granularity: %w", err) + if opts.granularity, err = determineGranularity(opts.granularity, ns); err != nil { + return fmt.Errorf("failed to determine granularity: %w", err) } hostname := args[0] subnet := mesh.DefaultKiloSubnet @@ -154,7 +154,7 @@ func runShowConfNode(_ *cobra.Command, args []string) error { t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, int(opts.port), wgtypes.Key{}, subnet, nodes[hostname].PersistentKeepalive, nil) if err != nil { - return fmt.Errorf("failed to create topology: %v", err) + return fmt.Errorf("failed to create topology: %w", err) } var found bool @@ -172,7 +172,7 @@ func runShowConfNode(_ *cobra.Command, args []string) error { if !showConfOpts.asPeer { c, err := t.Conf().Bytes() if err != nil { - return fmt.Errorf("failed to generate configuration: %v", err) + return fmt.Errorf("failed to generate configuration: %w", err) } _, err = os.Stdout.Write(c) return err @@ -202,7 +202,7 @@ func runShowConfNode(_ *cobra.Command, args []string) error { Peers: []wireguard.Peer{*p}, }).Bytes() if err != nil { - return fmt.Errorf("failed to generate configuration: %v", err) + return fmt.Errorf("failed to generate configuration: %w", err) } _, err = os.Stdout.Write(c) return err @@ -213,15 +213,15 @@ func runShowConfNode(_ *cobra.Command, args []string) error { func runShowConfPeer(_ *cobra.Command, args []string) error { ns, err := opts.backend.Nodes().List() if err != nil { - return fmt.Errorf("failed to list nodes: %v", err) + return fmt.Errorf("failed to list nodes: %w", err) } ps, err := opts.backend.Peers().List() if err != nil { - return fmt.Errorf("failed to list peers: %v", err) + return fmt.Errorf("failed to list peers: %w", err) } // Obtain the Granularity by looking at the annotation of the first node. - if opts.granularity, err = optainGranularity(opts.granularity, ns); err != nil { - return fmt.Errorf("failed to obtain granularity: %w", err) + if opts.granularity, err = determineGranularity(opts.granularity, ns); err != nil { + return fmt.Errorf("failed to determine granularity: %w", err) } var hostname string subnet := mesh.DefaultKiloSubnet @@ -257,12 +257,12 @@ func runShowConfPeer(_ *cobra.Command, args []string) error { } t, err := mesh.NewTopology(nodes, peers, opts.granularity, hostname, mesh.DefaultKiloPort, wgtypes.Key{}, subnet, pka, nil) if err != nil { - return fmt.Errorf("failed to create topology: %v", err) + return fmt.Errorf("failed to create topology: %w", err) } if !showConfOpts.asPeer { c, err := t.PeerConf(peer).Bytes() if err != nil { - return fmt.Errorf("failed to generate configuration: %v", err) + return fmt.Errorf("failed to generate configuration: %w", err) } _, err = os.Stdout.Write(c) return err @@ -286,7 +286,7 @@ func runShowConfPeer(_ *cobra.Command, args []string) error { Peers: []wireguard.Peer{*p}, }).Bytes() if err != nil { - return fmt.Errorf("failed to generate configuration: %v", err) + return fmt.Errorf("failed to generate configuration: %w", err) } _, err = os.Stdout.Write(c) return err diff --git a/docs/kgctl.md b/docs/kgctl.md index 6e54fbcf..f991da47 100644 --- a/docs/kgctl.md +++ b/docs/kgctl.md @@ -54,9 +54,47 @@ arkade get kgctl |Command|Syntax|Description| |----|----|-------| +|[connect](#connect)|`kgctl connect [flags]`|Connect the host to the cluster, setting up the required interfaces, routes, and keys.| |[graph](#graph)|`kgctl graph [flags]`|Produce a graph in GraphViz format representing the topology of the cluster.| -|[showconf](#showconf)|`kgctl showconf ( node \| peer ) NAME [flags]`|Show the WireGuard configuration for a node or peer in the mesh.| +|[showconf](#showconf)|`kgctl showconf ( node \| peer ) [flags]`|Show the WireGuard configuration for a node or peer in the mesh.| +### connect + +The `connect` command configures the local host as a WireGuard Peer of the cluster and applies all of the necessary networking configuration to connect to the cluster. +As long as the process is running, it will watch the cluster for changes and automatically manage the configuration for new or updated Peers and Nodes. +If the given Peer name does not exist in the cluster, the command will register a new Peer and generate the necessary WireGuard keys. +When the command exits, all of the configuration, including newly registered Peers, is cleaned up. + +Example: + +```shell +PEER_NAME=laptop +SERVICECIDR=10.43.0.0/16 +kgctl connect $PEER_NAME --allowed-ips $SERVICECIDR +``` + +The local host is now connected to the cluster and all IPs from the cluster and any registered Peers are fully routable. +When combined with the `--clean-up false` flag, the configuration produced by the command is persistent and will remain in effect even after the process is stopped. + +With the service CIDR of the cluster routable from the local host, Kubernetes DNS names can now be resolved by the cluster DNS provider. +For example, the following snippet could be used to resolve the clusterIP of the Kubernetes API: +```shell +dig @$(kubectl get service -n kube-system kube-dns -o=jsonpath='{.spec.clusterIP}') kubernetes.default.svc.cluster.local +short +# > 10.43.0.1 +``` + +For convenience, the cluster DNS provider's IP address can be configured as the local host's DNS server, making Kubernetes DNS names easily resolvable. +For example, if using `systemd-resolved`, the following snippet could be used: +```shell +systemd-resolve --interface kilo0 --set-dns $(kubectl get service -n kube-system kube-dns -o=jsonpath='{.spec.clusterIP}') --set-domain cluster.local +# Now all lookups for DNS names ending in `.cluster.local` will be routed over the `kilo0` interface to the cluster DNS provider. +dig kubernetes.default.svc.cluster.local +short +# > 10.43.0.1 +``` + +> **Note**: The `connect` command is currently only supported on Linux. + +> **Note**: The `connect` command requires the `CAP_NET_ADMIN` capability in order to configure the host's networking stack; unprivileged users will need to use `sudo` or similar tools. ### graph diff --git a/e2e/kgctl.sh b/e2e/kgctl.sh new file mode 100644 index 00000000..752607cf --- /dev/null +++ b/e2e/kgctl.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# shellcheck disable=SC1091 +. lib.sh + +setup_suite() { + # shellcheck disable=SC2016 + block_until_ready_by_name kube-system kilo-userspace + _kubectl wait pod -l app.kubernetes.io/name=adjacency --for=condition=Ready --timeout 3m +} + +test_connect() { + local PEER=test + local ALLOWED_IP=10.5.0.1/32 + docker run -d --name="$PEER" --rm --network=host --cap-add=NET_ADMIN -v "$KGCTL_BINARY":/kgctl -v "$PWD/$KUBECONFIG":/kubeconfig --entrypoint=/kgctl alpine --kubeconfig /kubeconfig connect "$PEER" --allowed-ip "$ALLOWED_IP" + assert "retry 10 5 '' check_ping --local" "should be able to ping Pods from host" + docker stop "$PEER" +} diff --git a/pkg/mesh/routes.go b/pkg/mesh/routes.go index 8d70c274..2c0cf248 100644 --- a/pkg/mesh/routes.go +++ b/pkg/mesh/routes.go @@ -235,6 +235,74 @@ func (t *Topology) Routes(kiloIfaceName string, kiloIface, privIface, tunlIface return routes, rules } +// PeerRoutes generates a slice of routes and rules for a given peer in the Topology. +func (t *Topology) PeerRoutes(name string, kiloIface int, additionalAllowedIPs []net.IPNet) ([]*netlink.Route, []*netlink.Rule) { + var routes []*netlink.Route + var rules []*netlink.Rule + for _, segment := range t.segments { + for i := range segment.cidrs { + // Add routes to the Pod CIDRs of nodes in other segments. + routes = append(routes, &netlink.Route{ + Dst: segment.cidrs[i], + Flags: int(netlink.FLAG_ONLINK), + Gw: segment.wireGuardIP, + LinkIndex: kiloIface, + Protocol: unix.RTPROT_STATIC, + }) + } + for i := range segment.privateIPs { + // Add routes to the private IPs of nodes in other segments. + routes = append(routes, &netlink.Route{ + Dst: oneAddressCIDR(segment.privateIPs[i]), + Flags: int(netlink.FLAG_ONLINK), + Gw: segment.wireGuardIP, + LinkIndex: kiloIface, + Protocol: unix.RTPROT_STATIC, + }) + } + // Add routes for the allowed location IPs of all segments. + for i := range segment.allowedLocationIPs { + routes = append(routes, &netlink.Route{ + Dst: &segment.allowedLocationIPs[i], + Flags: int(netlink.FLAG_ONLINK), + Gw: segment.wireGuardIP, + LinkIndex: kiloIface, + Protocol: unix.RTPROT_STATIC, + }) + } + routes = append(routes, &netlink.Route{ + Dst: oneAddressCIDR(segment.wireGuardIP), + LinkIndex: kiloIface, + Protocol: unix.RTPROT_STATIC, + }) + } + // Add routes for the allowed IPs of peers. + for _, peer := range t.peers { + // Don't add routes to ourselves. + if peer.Name == name { + continue + } + for i := range peer.AllowedIPs { + routes = append(routes, &netlink.Route{ + Dst: &peer.AllowedIPs[i], + LinkIndex: kiloIface, + Protocol: unix.RTPROT_STATIC, + }) + } + } + for i := range additionalAllowedIPs { + routes = append(routes, &netlink.Route{ + Dst: &additionalAllowedIPs[i], + Flags: int(netlink.FLAG_ONLINK), + Gw: t.segments[0].wireGuardIP, + LinkIndex: kiloIface, + Protocol: unix.RTPROT_STATIC, + }) + } + + return routes, rules +} + func encapsulateRoute(route *netlink.Route, encapsulate encapsulation.Strategy, subnet *net.IPNet, tunlIface int) *netlink.Route { if encapsulate == encapsulation.Always || (encapsulate == encapsulation.CrossSubnet && !subnet.Contains(route.Gw)) { route.LinkIndex = tunlIface diff --git a/pkg/mesh/topology.go b/pkg/mesh/topology.go index 297dcbf4..c7aaff7c 100644 --- a/pkg/mesh/topology.go +++ b/pkg/mesh/topology.go @@ -65,6 +65,7 @@ type Topology struct { logger log.Logger } +// segment represents one logical unit in the topology that is united by one common WireGuard IP. type segment struct { allowedIPs []net.IPNet endpoint *wireguard.Endpoint @@ -376,7 +377,7 @@ func (t *Topology) PeerConf(name string) *wireguard.Conf { PresharedKey: psk, PublicKey: s.key, }, - Endpoint: s.endpoint, + Endpoint: t.updateEndpoint(s.endpoint, s.key, &s.persistentKeepalive), } c.Peers = append(c.Peers, peer) } @@ -390,7 +391,7 @@ func (t *Topology) PeerConf(name string) *wireguard.Conf { PersistentKeepaliveInterval: pka, PublicKey: t.peers[i].PublicKey, }, - Endpoint: t.peers[i].Endpoint, + Endpoint: t.updateEndpoint(t.peers[i].Endpoint, t.peers[i].PublicKey, t.peers[i].PersistentKeepaliveInterval), } c.Peers = append(c.Peers, peer) }