Skip to content

Commit

Permalink
Remove gRPC tap server listener from controller (#3276)
Browse files Browse the repository at this point in the history
### Summary

As an initial attempt to secure the connection from clients to the gRPC tap
server on the tap Pod, the tap `addr` only listened on localhost.

As @adleong pointed out #3257, this was not actually secure because the inbound
proxy would establish a connection to localhost anyways.

This change removes the gRPC tap server listener and changes `TapByResource`
requests to interface with the server object directly.

From this, we know that all `TapByResourceRequests` have gone through the tap
APIServer and thus authorized by RBAC.

### Details

[NewAPIServer](https://github.com/linkerd/linkerd2/blob/ef90e0184f238cbe79987a84f36d4eb91cbcda46/controller/tap/apiserver.go#L25-L26) now takes a [GRPCTapServer](https://github.com/linkerd/linkerd2/blob/f6362dfa805de9a009188014256ecd66e7dc3bfc/controller/tap/server.go#L33-L34) instead of a `pb.TapClient` so that
`TapByResource` requests can interact directly with the [TapByResource](https://github.com/linkerd/linkerd2/blob/f6362dfa805de9a009188014256ecd66e7dc3bfc/controller/tap/server.go#L49-L50) method.

`GRPCTapServer.TapByResource` now makes a private [grpcTapServer](https://github.com/linkerd/linkerd2/blob/ef90e0184f238cbe79987a84f36d4eb91cbcda46/controller/tap/handlers.go#L373-L374) that satisfies
the [tap.TapServer](https://godoc.org/github.com/linkerd/linkerd2/controller/gen/controller/tap#TapServer) interface. Because this interface is satisfied, we can interact
with the tap server methods without spawning an additional listener.

Signed-off-by: Kevin Leimkuhler <kleimkuhler@icloud.com>
  • Loading branch information
kleimkuhler authored Aug 16, 2019
1 parent 6567206 commit c9c41e2
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 86 deletions.
32 changes: 5 additions & 27 deletions controller/cmd/tap/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package main

import (
"context"
"crypto/tls"
"flag"
"net"
"os"
"os/signal"
"syscall"
Expand All @@ -17,7 +17,6 @@ import (
)

func main() {
addr := flag.String("addr", ":8088", "address to serve on")
apiServerAddr := flag.String("apiserver-addr", ":8089", "address to serve the apiserver on")
metricsAddr := flag.String("metrics-addr", ":9998", "address to serve scrapable metrics on")
kubeConfigPath := flag.String("kubeconfig", "", "path to kube config")
Expand Down Expand Up @@ -48,42 +47,21 @@ func main() {
log.Fatalf("Failed to initialize K8s API: %s", err)
}

server, lis, err := tap.NewServer(*addr, *tapPort, *controllerNamespace, k8sAPI)
if err != nil {
log.Fatal(err.Error())
}

_, port, err := net.SplitHostPort(lis.Addr().String())
if err != nil {
log.Fatal(err.Error())
}

// TODO: remove the network hop in favor of APIServer calling TapByResource
// directly.
tapClient, cc, err := tap.NewClient("127.0.0.1:" + port)
if err != nil {
log.Fatal(err.Error())
}
defer cc.Close()
grpcTapServer := tap.NewGrpcTapServer(*tapPort, *controllerNamespace, k8sAPI)

// TODO: make this configurable for local development
cert, err := tls.LoadX509KeyPair(*tlsCertPath, *tlsKeyPath)
if err != nil {
log.Fatal(err.Error())
}

apiServer, apiLis, err := tap.NewAPIServer(*apiServerAddr, cert, k8sAPI, tapClient, *disableCommonNames)
apiServer, apiLis, err := tap.NewAPIServer(*apiServerAddr, cert, k8sAPI, grpcTapServer, *disableCommonNames)
if err != nil {
log.Fatal(err.Error())
}

k8sAPI.Sync() // blocks until caches are synced

go func() {
log.Infof("starting gRPC server on %s", *addr)
server.Serve(lis)
}()

go func() {
log.Infof("starting APIServer on %s", *apiServerAddr)
apiServer.ServeTLS(apiLis, "", "")
Expand All @@ -93,6 +71,6 @@ func main() {

<-stop

log.Infof("shutting down gRPC server on %s", *addr)
server.GracefulStop()
log.Infof("shutting down APIServer on %s", *apiServerAddr)
apiServer.Shutdown(context.Background())
}
12 changes: 9 additions & 3 deletions controller/tap/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"net/http"

"github.com/julienschmidt/httprouter"
pb "github.com/linkerd/linkerd2/controller/gen/controller/tap"
"github.com/linkerd/linkerd2/controller/gen/controller/tap"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/prometheus"
"github.com/sirupsen/logrus"
Expand All @@ -23,7 +23,13 @@ type apiServer struct {
}

// NewAPIServer creates a new server that implements the Tap APIService.
func NewAPIServer(addr string, cert tls.Certificate, k8sAPI *k8s.API, client pb.TapClient, disableCommonNames bool) (*http.Server, net.Listener, error) {
func NewAPIServer(
addr string,
cert tls.Certificate,
k8sAPI *k8s.API,
grpcTapServer tap.TapServer,
disableCommonNames bool,
) (*http.Server, net.Listener, error) {
clientCAPem, allowedNames, usernameHeader, groupHeader, err := apiServerAuth(k8sAPI)
if err != nil {
return nil, nil, err
Expand All @@ -43,7 +49,7 @@ func NewAPIServer(addr string, cert tls.Certificate, k8sAPI *k8s.API, client pb.
k8sAPI: k8sAPI,
usernameHeader: usernameHeader,
groupHeader: groupHeader,
tapClient: client,
grpcTapServer: grpcTapServer,
log: log,
}

Expand Down
4 changes: 3 additions & 1 deletion controller/tap/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ data:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}

_, _, err = NewAPIServer("localhost:0", tls.Certificate{}, k8sAPI, nil, false)
fakeGrpcServer := newGRPCTapServer(4190, "controller-ns", k8sAPI)

_, _, err = NewAPIServer("localhost:0", tls.Certificate{}, k8sAPI, fakeGrpcServer, false)
if !reflect.DeepEqual(err, exp.err) {
t.Errorf("NewAPIServer returned unexpected error: %s, expected: %s", err, exp.err)
}
Expand Down
63 changes: 39 additions & 24 deletions controller/tap/handlers.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package tap

import (
"context"
"encoding/json"
"fmt"
"net/http"
Expand All @@ -16,6 +17,7 @@ import (
"github.com/linkerd/linkerd2/pkg/tap"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/version"
Expand All @@ -25,7 +27,7 @@ type handler struct {
k8sAPI *k8s.API
usernameHeader string
groupHeader string
tapClient pb.TapClient
grpcTapServer pb.TapServer
log *logrus.Entry
}

Expand Down Expand Up @@ -167,34 +169,13 @@ func (h *handler) handleTap(w http.ResponseWriter, req *http.Request, p httprout
return
}

client, err := h.tapClient.TapByResource(req.Context(), &tapReq)
serverStream := serverStream{w: flushableWriter, req: req, log: h.log}
err = h.grpcTapServer.TapByResource(&tapReq, &serverStream)
if err != nil {
h.log.Error(err)
protohttp.WriteErrorToHTTPResponse(flushableWriter, err)
return
}

for {
select {
case <-req.Context().Done():
h.log.Debug("Received Done context in Tap Stream")
return
default:
event, err := client.Recv()
if err != nil {
h.log.Errorf("Error receiving from tap client: %s", err)
protohttp.WriteErrorToHTTPResponse(flushableWriter, err)
return
}
err = protohttp.WriteProtoToHTTPResponse(flushableWriter, event)
if err != nil {
h.log.Errorf("Error writing proto to HTTP Response: %s", err)
protohttp.WriteErrorToHTTPResponse(flushableWriter, err)
return
}
flushableWriter.Flush()
}
}
}

// GET (not found)
Expand Down Expand Up @@ -363,3 +344,37 @@ func renderJSONError(w http.ResponseWriter, err error, status int) {
w.WriteHeader(status)
w.Write(rsp)
}

// serverStream provides functionality that satisfies the
// tap.Tap_TapByResourceServer. This allows the tap APIServer to call
// GRPCTapServer.TapByResource() directly, rather than make the request to an
// actual gRPC over the network.
//
// TODO: Share this code with streamServer and destinationServer in
// http_server.go.
type serverStream struct {
w protohttp.FlushableResponseWriter
req *http.Request
log *logrus.Entry
}

// Satisfy the grpc.ServerStream interface
func (s serverStream) SetHeader(metadata.MD) error { return nil }
func (s serverStream) SendHeader(metadata.MD) error { return nil }
func (s serverStream) SetTrailer(metadata.MD) {}
func (s serverStream) Context() context.Context { return s.req.Context() }
func (s serverStream) SendMsg(interface{}) error { return nil }
func (s serverStream) RecvMsg(interface{}) error { return nil }

// Satisfy the tap.Tap_TapByResourceServer interface
func (s *serverStream) Send(m *public.TapEvent) error {
err := protohttp.WriteProtoToHTTPResponse(s.w, m)
if err != nil {
s.log.Errorf("Error writing proto to HTTP Response: %s", err)
protohttp.WriteErrorToHTTPResponse(s.w, err)
return err
}

s.w.Flush()
return nil
}
52 changes: 22 additions & 30 deletions controller/tap/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"io"
"net"
"time"

httpPb "github.com/linkerd/linkerd2-proxy-api/go/http_types"
Expand All @@ -30,23 +29,24 @@ const requireIDHeader = "l5d-require-id"
const podIPIndex = "ip"
const defaultMaxRps = 100.0

type (
server struct {
tapPort uint
k8sAPI *k8s.API
controllerNamespace string
}
)
// GRPCTapServer describes the gRPC server implementing pb.TapServer
type GRPCTapServer struct {
tapPort uint
k8sAPI *k8s.API
controllerNamespace string
}

var (
tapInterval = 1 * time.Second
)

func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error {
// Tap is deprecated, use TapByResource.
func (s *GRPCTapServer) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error {
return status.Error(codes.Unimplemented, "Tap is deprecated, use TapByResource")
}

func (s *server) TapByResource(req *public.TapByResourceRequest, stream pb.Tap_TapByResourceServer) error {
// TapByResource taps all resources matched by the request object.
func (s *GRPCTapServer) TapByResource(req *public.TapByResourceRequest, stream pb.Tap_TapByResourceServer) error {
if req == nil {
return status.Error(codes.InvalidArgument, "TapByResource received nil TapByResourceRequest")
}
Expand Down Expand Up @@ -247,7 +247,7 @@ func destinationLabels(resource *public.Resource) map[string]string {
// of maxRps * 1s at most once per 1s window. If this limit is reached in
// less than 1s, we sleep until the end of the window before calling Observe
// again.
func (s *server) tapProxy(ctx context.Context, maxRps float32, match *proxy.ObserveRequest_Match, addr string, events chan *public.TapEvent) {
func (s *GRPCTapServer) tapProxy(ctx context.Context, maxRps float32, match *proxy.ObserveRequest_Match, addr string, events chan *public.TapEvent) {
tapAddr := fmt.Sprintf("%s:%d", addr, s.tapPort)
log.Infof("Establishing tap on %s", tapAddr)
conn, err := grpc.DialContext(ctx, tapAddr, grpc.WithInsecure())
Expand Down Expand Up @@ -298,7 +298,7 @@ func (s *server) tapProxy(ctx context.Context, maxRps float32, match *proxy.Obse
}
}

func (s *server) translateEvent(orig *proxy.TapEvent) *public.TapEvent {
func (s *GRPCTapServer) translateEvent(orig *proxy.TapEvent) *public.TapEvent {
direction := func(orig proxy.TapEvent_ProxyDirection) public.TapEvent_ProxyDirection {
switch orig {
case proxy.TapEvent_INBOUND:
Expand Down Expand Up @@ -454,31 +454,23 @@ func (s *server) translateEvent(orig *proxy.TapEvent) *public.TapEvent {
return ev
}

// NewServer creates a new gRPC Tap server
func NewServer(
addr string,
// NewGrpcTapServer creates a new gRPC Tap server
func NewGrpcTapServer(
tapPort uint,
controllerNamespace string,
k8sAPI *k8s.API,
) (*grpc.Server, net.Listener, error) {
) *GRPCTapServer {
k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{podIPIndex: indexPodByIP})

lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, nil, err
}

s, _ := newGRPCTapServer(tapPort, controllerNamespace, k8sAPI)

return s, lis, nil
return newGRPCTapServer(tapPort, controllerNamespace, k8sAPI)
}

func newGRPCTapServer(
tapPort uint,
controllerNamespace string,
k8sAPI *k8s.API,
) (*grpc.Server, *server) {
srv := &server{
) *GRPCTapServer {
srv := &GRPCTapServer{
tapPort: tapPort,
k8sAPI: k8sAPI,
controllerNamespace: controllerNamespace,
Expand All @@ -487,7 +479,7 @@ func newGRPCTapServer(
s := prometheus.NewGrpcServer()
pb.RegisterTapServer(s, srv)

return s, srv
return srv
}

func indexPodByIP(obj interface{}) ([]string, error) {
Expand All @@ -503,7 +495,7 @@ func indexPodByIP(obj interface{}) ([]string, error) {
//
// Since errors encountered while hydrating metadata are non-fatal and result
// only in missing labels, any errors are logged at the WARN level.
func (s *server) hydrateEventLabels(ev *public.TapEvent) {
func (s *GRPCTapServer) hydrateEventLabels(ev *public.TapEvent) {
err := s.hydrateIPLabels(ev.GetSource().GetIp(), ev.GetSourceMeta().GetLabels())
if err != nil {
log.Warnf("error hydrating source labels: %s", err)
Expand All @@ -523,7 +515,7 @@ func (s *server) hydrateEventLabels(ev *public.TapEvent) {

// hydrateIPMeta attempts to determine the metadata labels for `ip` and, if
// successful, adds them to `labels`.
func (s *server) hydrateIPLabels(ip *public.IPAddress, labels map[string]string) error {
func (s *GRPCTapServer) hydrateIPLabels(ip *public.IPAddress, labels map[string]string) error {
pod, err := s.podForIP(ip)
switch {
case err != nil:
Expand Down Expand Up @@ -552,7 +544,7 @@ func (s *server) hydrateIPLabels(ip *public.IPAddress, labels map[string]string)
//
// If no pods were found for the provided IP address, it returns nil. Errors are
// returned only in the event of an error indexing the pods list.
func (s *server) podForIP(ip *public.IPAddress) (*corev1.Pod, error) {
func (s *GRPCTapServer) podForIP(ip *public.IPAddress) (*corev1.Pod, error) {
ipStr := addr.PublicIPToString(ip)
objs, err := s.k8sAPI.Pod().Informer().GetIndexer().ByIndex(podIPIndex, ipStr)

Expand Down
2 changes: 1 addition & 1 deletion controller/tap/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ status:
t.Fatalf("Invalid port: %s", port)
}

_, fakeGrpcServer := newGRPCTapServer(uint(tapPort), "controller-ns", k8sAPI)
fakeGrpcServer := newGRPCTapServer(uint(tapPort), "controller-ns", k8sAPI)

k8sAPI.Sync()

Expand Down

0 comments on commit c9c41e2

Please sign in to comment.