Skip to content

Commit

Permalink
cmd/kg/*: sub command peer validation webhook
Browse files Browse the repository at this point in the history
This commit adds a sub command `webhook` to Kilo.
It will start a https web server that answeres request from a Kubernetes
API server to validate updates and creations of Kilo peers.

Signed-off-by: leonnicolas <leonloechner@gmx.de>
  • Loading branch information
leonnicolas committed Aug 24, 2021
1 parent aca32ab commit 032a8e3
Show file tree
Hide file tree
Showing 12 changed files with 2,828 additions and 54 deletions.
2 changes: 1 addition & 1 deletion cmd/kg/handlers.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 the Kilo authors
// Copyright 2021 the Kilo authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
124 changes: 71 additions & 53 deletions cmd/kg/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2019 the Kilo authors
// Copyright 2021 the Kilo authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -85,7 +85,8 @@ var cmd = &cobra.Command{
It runs on every node of a cluster,
setting up the public and private keys for the VPN
as well as the necessary rules to route packets between locations.`,
RunE: runRoot,
PreRunE: preRun,
RunE: runRoot,
}

var (
Expand All @@ -102,61 +103,47 @@ var (
iface string
listen string
local bool
logLevel string
master string
mtu uint
topologyLabel string
port uint
subnet string
resyncPeriod time.Duration
printVersion bool

printVersion bool
logLevel string

logger log.Logger
registry *prometheus.Registry
)

func init() {
cmd.PersistentFlags().StringVar(&backend, "backend", k8s.Backend, fmt.Sprintf("The backend for the mesh. Possible values: %s", availableBackends))
cmd.PersistentFlags().BoolVar(&cleanUpIface, "clean-up-interface", false, "Should Kilo delete its interface when it shuts down?")
cmd.PersistentFlags().BoolVar(&createIface, "create-interface", true, "Should kilo create an interface on startup?")
cmd.PersistentFlags().BoolVar(&cni, "cni", true, "Should Kilo manage the node's CNI configuration?")
cmd.PersistentFlags().StringVar(&cniPath, "cni-path", mesh.DefaultCNIPath, "Path to CNI config.")
cmd.PersistentFlags().StringVar(&compatibility, "compatibility", "", fmt.Sprintf("Should Kilo run in compatibility mode? Possible values: %s", availableCompatibilities))
cmd.PersistentFlags().StringVar(&encapsulate, "encapsulate", string(encapsulation.Always), fmt.Sprintf("When should Kilo encapsulate packets within a location? Possible values: %s", availableEncapsulations))
cmd.PersistentFlags().StringVar(&granularity, "mesh-granularity", string(mesh.LogicalGranularity), fmt.Sprintf("The granularity of the network mesh to create. Possible values: %s", availableGranularities))
cmd.PersistentFlags().StringVar(&kubeconfig, "kubeconfig", "", "Path to kubeconfig.")
cmd.PersistentFlags().StringVar(&hostname, "hostname", "", "Hostname of the node on which this process is running.")
cmd.PersistentFlags().StringVar(&iface, "interface", mesh.DefaultKiloInterface, "Name of the Kilo interface to use; if it does not exist, it will be created.")
cmd.PersistentFlags().StringVar(&listen, "listen", ":1107", "The address at which to listen for health and metrics.")
cmd.PersistentFlags().BoolVar(&local, "local", true, "Should Kilo manage routes within a location?")
cmd.PersistentFlags().StringVar(&logLevel, "log-level", logLevelInfo, fmt.Sprintf("Log level to use. Possible values: %s", availableLogLevels))
cmd.PersistentFlags().StringVar(&master, "master", "", "The address of the Kubernetes API server (overrides any value in kubeconfig).")
cmd.PersistentFlags().UintVar(&mtu, "mtu", wireguard.DefaultMTU, "The MTU of the WireGuard interface created by Kilo.")
cmd.PersistentFlags().StringVar(&topologyLabel, "topology-label", k8s.RegionLabelKey, "Kubernetes node label used to group nodes into logical locations.")
cmd.PersistentFlags().UintVar(&port, "port", mesh.DefaultKiloPort, "The port over which WireGuard peers should communicate.")
cmd.PersistentFlags().StringVar(&subnet, "subnet", mesh.DefaultKiloSubnet.String(), "CIDR from which to allocate addresses for WireGuard interfaces.")
cmd.PersistentFlags().DurationVar(&resyncPeriod, "resync-period", 30*time.Second, "How often should the Kilo controllers reconcile?")
cmd.Flags().StringVar(&backend, "backend", k8s.Backend, fmt.Sprintf("The backend for the mesh. Possible values: %s", availableBackends))
cmd.Flags().BoolVar(&cleanUpIface, "clean-up-interface", false, "Should Kilo delete its interface when it shuts down?")
cmd.Flags().BoolVar(&createIface, "create-interface", true, "Should kilo create an interface on startup?")
cmd.Flags().BoolVar(&cni, "cni", true, "Should Kilo manage the node's CNI configuration?")
cmd.Flags().StringVar(&cniPath, "cni-path", mesh.DefaultCNIPath, "Path to CNI config.")
cmd.Flags().StringVar(&compatibility, "compatibility", "", fmt.Sprintf("Should Kilo run in compatibility mode? Possible values: %s", availableCompatibilities))
cmd.Flags().StringVar(&encapsulate, "encapsulate", string(encapsulation.Always), fmt.Sprintf("When should Kilo encapsulate packets within a location? Possible values: %s", availableEncapsulations))
cmd.Flags().StringVar(&granularity, "mesh-granularity", string(mesh.LogicalGranularity), fmt.Sprintf("The granularity of the network mesh to create. Possible values: %s", availableGranularities))
cmd.Flags().StringVar(&kubeconfig, "kubeconfig", "", "Path to kubeconfig.")
cmd.Flags().StringVar(&hostname, "hostname", "", "Hostname of the node on which this process is running.")
cmd.Flags().StringVar(&iface, "interface", mesh.DefaultKiloInterface, "Name of the Kilo interface to use; if it does not exist, it will be created.")
cmd.Flags().StringVar(&listen, "listen", ":1107", "The address at which to listen for health and metrics.")
cmd.Flags().BoolVar(&local, "local", true, "Should Kilo manage routes within a location?")
cmd.Flags().StringVar(&master, "master", "", "The address of the Kubernetes API server (overrides any value in kubeconfig).")
cmd.Flags().UintVar(&mtu, "mtu", wireguard.DefaultMTU, "The MTU of the WireGuard interface created by Kilo.")
cmd.Flags().StringVar(&topologyLabel, "topology-label", k8s.RegionLabelKey, "Kubernetes node label used to group nodes into logical locations.")
cmd.Flags().UintVar(&port, "port", mesh.DefaultKiloPort, "The port over which WireGuard peers should communicate.")
cmd.Flags().StringVar(&subnet, "subnet", mesh.DefaultKiloSubnet.String(), "CIDR from which to allocate addresses for WireGuard interfaces.")
cmd.Flags().DurationVar(&resyncPeriod, "resync-period", 30*time.Second, "How often should the Kilo controllers reconcile?")

cmd.PersistentFlags().BoolVar(&printVersion, "version", false, "Print version and exit")
cmd.PersistentFlags().StringVar(&logLevel, "log-level", logLevelInfo, fmt.Sprintf("Log level to use. Possible values: %s", availableLogLevels))
}

// Main is the principal function for the binary, wrapped only by `main` for convenience.
func runRoot(_ *cobra.Command, _ []string) error {
if printVersion {
fmt.Println(version.Version)
return nil
}

_, s, err := net.ParseCIDR(subnet)
if err != nil {
return fmt.Errorf("failed to parse %q as CIDR: %v", subnet, err)
}

if hostname == "" {
var err error
hostname, err = os.Hostname()
if hostname == "" || err != nil {
return errors.New("failed to determine hostname")
}
}

logger := log.NewJSONLogger(log.NewSyncWriter(os.Stdout))
func preRun(_ *cobra.Command, _ []string) error {
logger = log.NewJSONLogger(log.NewSyncWriter(os.Stdout))
switch logLevel {
case logLevelAll:
logger = level.NewFilter(logger, level.AllowAll())
Expand All @@ -176,6 +163,35 @@ func runRoot(_ *cobra.Command, _ []string) error {
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)

registry = prometheus.NewRegistry()
registry.MustRegister(
prometheus.NewGoCollector(),
prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}),
)

return nil
}

// runRoot is the principal function for the binary.
func runRoot(_ *cobra.Command, _ []string) error {
if printVersion {
fmt.Println(version.Version)
return nil
}

_, s, err := net.ParseCIDR(subnet)
if err != nil {
return fmt.Errorf("failed to parse %q as CIDR: %v", subnet, err)
}

if hostname == "" {
var err error
hostname, err = os.Hostname()
if hostname == "" || err != nil {
return errors.New("failed to determine hostname")
}
}

e := encapsulation.Strategy(encapsulate)
switch e {
case encapsulation.Never:
Expand Down Expand Up @@ -221,20 +237,15 @@ func runRoot(_ *cobra.Command, _ []string) error {
return fmt.Errorf("failed to create Kilo mesh: %v", err)
}

r := prometheus.NewRegistry()
r.MustRegister(
prometheus.NewGoCollector(),
prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}),
)
m.RegisterMetrics(r)
m.RegisterMetrics(registry)

var g run.Group
{
// Run the HTTP server.
mux := http.NewServeMux()
mux.HandleFunc("/health", healthHandler)
mux.Handle("/graph", &graphHandler{m, gr, hostname, s})
mux.Handle("/metrics", promhttp.HandlerFor(r, promhttp.HandlerOpts{}))
mux.Handle("/graph", &graphHandler{m, gr, &hostname, s})
mux.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))
l, err := net.Listen("tcp", listen)
if err != nil {
return fmt.Errorf("failed to listen on %s: %v", listen, err)
Expand Down Expand Up @@ -286,7 +297,14 @@ func runRoot(_ *cobra.Command, _ []string) error {
return g.Run()
}

var versionCmd = &cobra.Command{
Use: "version",
Short: "Print the version and exit.",
Run: func(_ *cobra.Command, _ []string) { fmt.Println(version.Version) },
}

func main() {
cmd.AddCommand(webhookCmd, versionCmd)
if err := cmd.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
Expand Down
189 changes: 189 additions & 0 deletions cmd/kg/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
// Copyright 2021 the Kilo authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net/http"
"os"

"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
kilo "github.com/squat/kilo/pkg/k8s/apis/kilo/v1alpha1"
"github.com/squat/kilo/pkg/version"
v1 "k8s.io/api/admission/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
)

var webhookCmd = &cobra.Command{
Use: "webhook",
PreRunE: func(c *cobra.Command, a []string) error {
if c.HasParent() {
return c.Parent().PreRunE(c, a)
}
return nil
},
Short: "webhook starts a https server to validate updates and creations of Kilo peers.",
Run: webhook,
}

var (
certPath string
keyPath string
metricsAddr string
listenAddr string
)

func init() {
webhookCmd.Flags().StringVar(&certPath, "cert-file", "", "file path to certificat file")
webhookCmd.Flags().StringVar(&keyPath, "key-file", "", "file path to key file")
webhookCmd.Flags().StringVar(&metricsAddr, "metrics-address", ":1107", "The metrics server will be listening to that address with port\ne.g. 172.0.0.1:9090")
webhookCmd.Flags().StringVar(&listenAddr, "listen", ":8443", "The webhook server will be listening to that address")
}

var deserializer = serializer.NewCodecFactory(runtime.NewScheme()).UniversalDeserializer()

var (
validationCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "https_admission_requests_total",
Help: "The number of received admission reviews requests",
},
[]string{"operation", "response"},
)
errorCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Name: "errors_total",
Help: "The total number of errors",
},
)
)

func validationHandler(w http.ResponseWriter, r *http.Request) {
level.Debug(logger).Log("msg", "handling request", "source", r.RemoteAddr)
body, err := ioutil.ReadAll(r.Body)
if err != nil {
errorCounter.Inc()
level.Error(logger).Log("err", "failed to parse body from incoming request", "source", r.RemoteAddr)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

var admissionReview v1.AdmissionReview

contentType := r.Header.Get("Content-Type")
if contentType != "application/json" {
errorCounter.Inc()
msg := fmt.Sprintf("Content-Type=%s, expect application/json", contentType)
level.Error(logger).Log("err", msg)
http.Error(w, msg, http.StatusBadRequest)
return
}

response := v1.AdmissionReview{}

_, gvk, err := deserializer.Decode(body, nil, &admissionReview)
if err != nil {
errorCounter.Inc()
msg := fmt.Sprintf("Request could not be decoded: %v", err)
level.Error(logger).Log("err", msg)
http.Error(w, msg, http.StatusBadRequest)
return
}
if *gvk != v1.SchemeGroupVersion.WithKind("AdmissionReview") {
errorCounter.Inc()
level.Error(logger).Log("err", "only api v1 is supported")
http.Error(w, "only api v1 is supported", http.StatusBadRequest)
return
} else {
response.SetGroupVersionKind(*gvk)
response.Response = &v1.AdmissionResponse{
UID: admissionReview.Request.UID,
}
}

rawExtension := admissionReview.Request.Object
var peer kilo.Peer

if err = json.Unmarshal(rawExtension.Raw, &peer); err != nil {
errorCounter.Inc()
msg := fmt.Sprintf("could not unmarshal extension to peer spec: %v:", err)
log.Println(msg)
level.Error(logger).Log("err", msg)
http.Error(w, msg, http.StatusBadRequest)
return
}

if err := peer.Validate(); err == nil {
validationCounter.With(prometheus.Labels{"operation": string(admissionReview.Request.Operation), "response": "allowed"}).Inc()
response.Response.Allowed = true
} else {
validationCounter.With(prometheus.Labels{"operation": string(admissionReview.Request.Operation), "response": "denied"}).Inc()
response.Response.Result = &metav1.Status{
Message: err.Error(),
}
}

res, err := json.Marshal(response)
if err != nil {
errorCounter.Inc()
msg := fmt.Sprintf("failed to marshal response: %v", err)
level.Error(logger).Log("err", msg)
http.Error(w, msg, http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(res); err != nil {
level.Error(logger).Log("err", err, "msg", "failed to write response")
}
}

func webhook(_ *cobra.Command, _ []string) {
if printVersion {
fmt.Println(version.Version)
os.Exit(0)
}
registry.MustRegister(
errorCounter,
validationCounter,
)
mm := http.NewServeMux()
mm.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))

exit := make(chan error, 1)
go func() {
exit <- http.ListenAndServe(metricsAddr, mm)
}()

mux := http.NewServeMux()
mux.HandleFunc("/validate", validationHandler)
server := &http.Server{
Addr: listenAddr,
Handler: mux,
}
go func() {
exit <- server.ListenAndServeTLS(certPath, keyPath)
}()
e := <-exit
level.Error(logger).Log("err", e.Error(), "msg", "shutting down")
}
Loading

0 comments on commit 032a8e3

Please sign in to comment.