Skip to content

Commit

Permalink
serve: Add a "local" server
Browse files Browse the repository at this point in the history
Define an interface for the observer server, and add a new implementation
LocalObserverServer. It ignores all the state change events and does not
connect to Cilium's monitor socket. It's meant to be started embeeded in
Cilium process so that:

1. the parser can access Cilium state directly without making a copy, and
2. the server can receive monitor payloads by registering a listener.

The observer server interface adds a new method HandleMonitorSocket() that
encapsulates the logic for connecting to the monitor socket and handling
monitor events instead of exposing the logic in cmd/serve.go.

Signed-off-by: Michi Mutsuzaki <michi@isovalent.com>
  • Loading branch information
michi-covalent committed Jan 27, 2020
1 parent 6ac8623 commit 624070a
Show file tree
Hide file tree
Showing 6 changed files with 494 additions and 228 deletions.
233 changes: 34 additions & 199 deletions cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,7 @@
package serve

import (
"bytes"
"encoding/gob"
"fmt"
"io"
"net"
"net/http"
_ "net/http/pprof" // a comment justifying it
Expand All @@ -28,20 +25,6 @@ import (
"syscall"
"time"

"github.com/cilium/cilium/pkg/defaults"
"github.com/cilium/cilium/pkg/monitor"
"github.com/cilium/cilium/pkg/monitor/agent/listener"
monitorAPI "github.com/cilium/cilium/pkg/monitor/api"
"github.com/cilium/cilium/pkg/monitor/payload"
"github.com/gogo/protobuf/types"
"github.com/google/gops/agent"
"github.com/spf13/cobra"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"

pb "github.com/cilium/hubble/api/v1/flow"
"github.com/cilium/hubble/api/v1/observer"
"github.com/cilium/hubble/pkg/api"
v1 "github.com/cilium/hubble/pkg/api/v1"
Expand All @@ -53,6 +36,12 @@ import (
"github.com/cilium/hubble/pkg/parser"
"github.com/cilium/hubble/pkg/server"
"github.com/cilium/hubble/pkg/servicecache"
"github.com/google/gops/agent"
"github.com/spf13/cobra"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
)

// New ...
Expand Down Expand Up @@ -84,7 +73,32 @@ func New(log *zap.Logger) *cobra.Command {
}()
}

err = Serve(log, listenClientUrls)
ciliumClient, err := client.NewClient()
if err != nil {
log.Fatal("failed to get Cilium client", zap.Error(err))
}
ipCache := ipcache.New()
fqdnCache := fqdncache.New()
serviceCache := servicecache.New()
endpoints := v1.NewEndpoints()
podGetter := &server.LegacyPodGetter{
PodGetter: ipCache,
EndpointGetter: endpoints,
}
payloadParser, err := parser.New(endpoints, ciliumClient, fqdnCache, podGetter, serviceCache)
if err != nil {
log.Fatal("failed to get parser", zap.Error(err))
}
s := server.NewServer(
ciliumClient,
endpoints,
ipCache,
fqdnCache,
serviceCache,
payloadParser,
int(maxFlows),
)
err = Serve(log, listenClientUrls, s)
if err != nil {
log.Fatal("", zap.Error(err))
}
Expand Down Expand Up @@ -215,41 +229,12 @@ func setupListeners(listenClientUrls []string) (listeners map[string]net.Listene

// Serve starts the GRPC server on the provided socketPath. If the port is non-zero, it listens
// to the TCP port instead of the unix domain socket.
func Serve(log *zap.Logger, listenClientUrls []string) error {
func Serve(log *zap.Logger, listenClientUrls []string, s server.Observer) error {
clientListeners, err := setupListeners(listenClientUrls)
if err != nil {
return err
}

ciliumClient, err := client.NewClient()
if err != nil {
return err
}

ipCache := ipcache.New()
fqdnCache := fqdncache.New()
serviceCache := servicecache.New()
endpoints := v1.NewEndpoints()
podGetter := &server.LegacyPodGetter{
PodGetter: ipCache,
EndpointGetter: endpoints,
}

payloadParser, err := parser.New(endpoints, ciliumClient, fqdnCache, podGetter, serviceCache)
if err != nil {
return err
}

s := server.NewServer(
ciliumClient,
endpoints,
ipCache,
fqdnCache,
serviceCache,
payloadParser,
int(maxFlows),
)

serverStart = time.Now()
go s.Start()

Expand Down Expand Up @@ -285,30 +270,7 @@ func Serve(log *zap.Logger, listenClientUrls []string) error {

setupSigHandler()
fmt.Printf("Press Ctrl-C to quit\n")

// On EOF, retry
// On other errors, exit
// always wait connTimeout when retrying
for ; ; time.Sleep(api.ConnectionTimeout) {
conn, version, err := openMonitorSock()
if err != nil {
log.Error("Cannot open monitor serverSocketPath", zap.Error(err))
return err
}

err = consumeMonitorEvents(s, conn, version)
switch {
case err == nil:
// no-op

case err == io.EOF, err == io.ErrUnexpectedEOF:
log.Warn("connection closed", zap.Error(err))
continue

default:
log.Fatal("decoding error", zap.Error(err))
}
}
return s.HandleMonitorSocket(nodeName)
}

func setupSigHandler() {
Expand All @@ -321,130 +283,3 @@ func setupSigHandler() {
}
}()
}

// openMonitorSock attempts to open a version specific monitor serverSocketPath It
// returns a connection, with a version, or an error.
func openMonitorSock() (conn net.Conn, version listener.Version, err error) {
errors := make([]string, 0)

// try the 1.2 serverSocketPath
conn, err = net.Dial("unix", defaults.MonitorSockPath1_2)
if err == nil {
return conn, listener.Version1_2, nil
}
errors = append(errors, defaults.MonitorSockPath1_2+": "+err.Error())

return nil, listener.VersionUnsupported, fmt.Errorf("cannot find or open a supported node-monitor serverSocketPath. %s", strings.Join(errors, ","))
}

// eventParseFunc is a convenience function type used as a version-specific
// parser of monitor events
type eventParserFunc func() (*pb.Payload, error)

// getMonitorParser constructs and returns an eventParserFunc. It is
// appropriate for the monitor API version passed in.
func getMonitorParser(conn net.Conn, version listener.Version) (parser eventParserFunc, err error) {
switch version {
case listener.Version1_2:
var (
pl payload.Payload
dec = gob.NewDecoder(conn)
)
// This implements the newer 1.2 API. Each listener maintains its own gob
// session, and type information is only ever sent once.
return func() (*pb.Payload, error) {
if err := pl.DecodeBinary(dec); err != nil {
return nil, err
}
b := make([]byte, len(pl.Data))
copy(b, pl.Data)

// TODO: Eventually, the monitor will add these timestaps to events.
// For now, we add them in hubble server.
grpcPl := &pb.Payload{
Data: b,
CPU: int32(pl.CPU),
Lost: pl.Lost,
Type: pb.EventType(pl.Type),
Time: types.TimestampNow(),
HostName: nodeName,
}
return grpcPl, nil
}, nil

default:
return nil, fmt.Errorf("unsupported version %s", version)
}
}

// consumeMonitorEvents handles and prints events on a monitor connection. It
// calls getMonitorParsed to construct a monitor-version appropriate parser.
// It closes conn on return, and returns on error, including io.EOF
func consumeMonitorEvents(s *server.ObserverServer, conn net.Conn, version listener.Version) error {
defer conn.Close()
ch := s.GetEventsChannel()
endpointEvents := s.GetEndpointEventsChannel()

dnsAdd := s.GetLogRecordNotifyChannel()

ipCacheEvents := make(chan monitorAPI.AgentNotify, 100)
s.StartMirroringIPCache(ipCacheEvents)

serviceEvents := make(chan monitorAPI.AgentNotify, 100)
s.StartMirroringServiceCache(serviceEvents)

getParsedPayload, err := getMonitorParser(conn, version)
if err != nil {
return err
}

for {
pl, err := getParsedPayload()
if err != nil {
return err
}

ch <- pl
// we don't expect to have many MessageTypeAgent so we
// can "decode" this messages as they come.
switch pl.Data[0] {
case monitorAPI.MessageTypeAgent:
buf := bytes.NewBuffer(pl.Data[1:])
dec := gob.NewDecoder(buf)

an := monitorAPI.AgentNotify{}
if err := dec.Decode(&an); err != nil {
fmt.Printf("Error while decoding agent notification message: %s\n", err)
continue
}
switch an.Type {
case monitorAPI.AgentNotifyEndpointCreated,
monitorAPI.AgentNotifyEndpointRegenerateSuccess,
monitorAPI.AgentNotifyEndpointDeleted:
endpointEvents <- an
case monitorAPI.AgentNotifyIPCacheUpserted,
monitorAPI.AgentNotifyIPCacheDeleted:
ipCacheEvents <- an
case monitorAPI.AgentNotifyServiceUpserted,
monitorAPI.AgentNotifyServiceDeleted:
serviceEvents <- an
}
case monitorAPI.MessageTypeAccessLog:
// TODO re-think the way this is being done. We are dissecting/
// TypeAccessLog messages here *and* when we are dumping
// them into JSON.
buf := bytes.NewBuffer(pl.Data[1:])
dec := gob.NewDecoder(buf)

lr := monitor.LogRecordNotify{}

if err := dec.Decode(&lr); err != nil {
fmt.Printf("Error while decoding access log message type: %s\n", err)
continue
}
if lr.DNS != nil {
dnsAdd <- lr
}
}
}
}
2 changes: 1 addition & 1 deletion pkg/server/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,6 @@ func TestObserverServer_consumeLogRecordNotifyChannel(t *testing.T) {
}
go s.consumeLogRecordNotifyChannel()

s.GetLogRecordNotifyChannel() <- lr
s.getLogRecordNotifyChannel() <- lr
wg.Wait()
}
8 changes: 4 additions & 4 deletions pkg/server/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func TestObserverServer_EndpointAddEvent(t *testing.T) {
}
go s.consumeEndpointEvents()

s.GetEndpointEventsChannel() <- monitorAPI.AgentNotify{
s.getEndpointEventsChannel() <- monitorAPI.AgentNotify{
Type: monitorAPI.AgentNotifyEndpointCreated,
Text: string(ecnMarshal),
}
Expand All @@ -377,7 +377,7 @@ func TestObserverServer_EndpointAddEvent(t *testing.T) {
}
go s.consumeEndpointEvents()

s.GetEndpointEventsChannel() <- monitorAPI.AgentNotify{
s.getEndpointEventsChannel() <- monitorAPI.AgentNotify{
Type: monitorAPI.AgentNotifyEndpointCreated,
Text: string(ecnMarshal),
}
Expand Down Expand Up @@ -418,7 +418,7 @@ func TestObserverServer_EndpointDeleteEvent(t *testing.T) {
}
go s.consumeEndpointEvents()

s.GetEndpointEventsChannel() <- monitorAPI.AgentNotify{
s.getEndpointEventsChannel() <- monitorAPI.AgentNotify{
Type: monitorAPI.AgentNotifyEndpointDeleted,
Text: string(ednMarshal),
}
Expand Down Expand Up @@ -482,7 +482,7 @@ func TestObserverServer_EndpointRegenEvent(t *testing.T) {
}
go s.consumeEndpointEvents()

s.GetEndpointEventsChannel() <- monitorAPI.AgentNotify{
s.getEndpointEventsChannel() <- monitorAPI.AgentNotify{
Type: monitorAPI.AgentNotifyEndpointRegenerateSuccess,
Text: string(ednMarshal),
}
Expand Down
Loading

0 comments on commit 624070a

Please sign in to comment.