Skip to content

Commit

Permalink
util: added logs for slow gRPC calls
Browse files Browse the repository at this point in the history
This commit adds a gRPC middleware that logs calls that
keep running after their deadline.

Adds --logslowopinterval cmdline argument to pass the log rate.

Signed-off-by: Robert Vasek <robert.vasek@clyso.com>
  • Loading branch information
Robert Vasek authored and mergify[bot] committed Sep 20, 2024
1 parent 56d08e1 commit 7a727c2
Show file tree
Hide file tree
Showing 8 changed files with 114 additions and 14 deletions.
5 changes: 5 additions & 0 deletions cmd/cephcsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ func init() {
"path of prometheus endpoint where metrics will be available")
flag.DurationVar(&conf.PollTime, "polltime", time.Second*pollTime, "time interval in seconds between each poll")
flag.DurationVar(&conf.PoolTimeout, "timeout", time.Second*probeTimeout, "probe timeout in seconds")
flag.DurationVar(
&conf.LogSlowOpInterval,
"logslowopinterval",
time.Second*30,
"how often to inform about slow gRPC calls")

flag.UintVar(
&conf.RbdHardMaxCloneDepth,
Expand Down
8 changes: 6 additions & 2 deletions internal/cephfs/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ func (fs *Driver) Run(conf *util.Config) {
NS: fs.ns,
GS: fs.cs,
}
server.Start(conf.Endpoint, srv)
server.Start(conf.Endpoint, srv, csicommon.MiddlewareServerOptionConfig{
LogSlowOpInterval: conf.LogSlowOpInterval,
})

if conf.EnableProfiling {
go util.StartMetricsServer(conf)
Expand Down Expand Up @@ -230,7 +232,9 @@ func (fs *Driver) setupCSIAddonsServer(conf *util.Config) error {
}

// start the server, this does not block, it runs a new go-routine
err = fs.cas.Start()
err = fs.cas.Start(csicommon.MiddlewareServerOptionConfig{
LogSlowOpInterval: conf.LogSlowOpInterval,
})
if err != nil {
return fmt.Errorf("failed to start CSI-Addons server: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/csi-addons/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,9 @@ func (cas *CSIAddonsServer) RegisterService(svc CSIAddonsService) {
// Start creates the internal gRPC server, and registers the CSIAddonsServices.
// The internal gRPC server is started in it's own go-routine when no error is
// returned.
func (cas *CSIAddonsServer) Start() error {
func (cas *CSIAddonsServer) Start(middlewareConfig csicommon.MiddlewareServerOptionConfig) error {
// create the gRPC server and register services
cas.server = grpc.NewServer(csicommon.NewMiddlewareServerOption())
cas.server = grpc.NewServer(csicommon.NewMiddlewareServerOption(middlewareConfig))

for _, svc := range cas.services {
svc.RegisterService(cas.server)
Expand Down
18 changes: 13 additions & 5 deletions internal/csi-common/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// NonBlockingGRPCServer defines Non blocking GRPC server interfaces.
type NonBlockingGRPCServer interface {
// Start services at the endpoint
Start(endpoint string, srv Servers)
Start(endpoint string, srv Servers, middlewareConfig MiddlewareServerOptionConfig)
// Waits for the service to stop
Wait()
// Stops the service gracefully
Expand Down Expand Up @@ -60,9 +60,13 @@ type nonBlockingGRPCServer struct {
}

// Start start service on endpoint.
func (s *nonBlockingGRPCServer) Start(endpoint string, srv Servers) {
func (s *nonBlockingGRPCServer) Start(
endpoint string,
srv Servers,
middlewareConfig MiddlewareServerOptionConfig,
) {
s.wg.Add(1)
go s.serve(endpoint, srv)
go s.serve(endpoint, srv, middlewareConfig)
}

// Wait blocks until the WaitGroup counter.
Expand All @@ -80,7 +84,11 @@ func (s *nonBlockingGRPCServer) ForceStop() {
s.server.Stop()
}

func (s *nonBlockingGRPCServer) serve(endpoint string, srv Servers) {
func (s *nonBlockingGRPCServer) serve(
endpoint string,
srv Servers,
middlewareConfig MiddlewareServerOptionConfig,
) {
proto, addr, err := parseEndpoint(endpoint)
if err != nil {
klog.Fatal(err.Error())
Expand All @@ -98,7 +106,7 @@ func (s *nonBlockingGRPCServer) serve(endpoint string, srv Servers) {
klog.Fatalf("Failed to listen: %v", err)
}

server := grpc.NewServer(NewMiddlewareServerOption())
server := grpc.NewServer(NewMiddlewareServerOption(middlewareConfig))
s.server = server

if srv.IS != nil {
Expand Down
77 changes: 75 additions & 2 deletions internal/csi-common/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"runtime/debug"
"strings"
"sync/atomic"
"time"

"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
Expand Down Expand Up @@ -108,10 +109,35 @@ func NewGroupControllerServiceCapability(ctrlCap csi.GroupControllerServiceCapab
}
}

// MiddlewareServerOptionConfig contains configuration parameters
// that are passed to the respective middleware interceptors that
// are instantiated when starting gRPC servers.
type MiddlewareServerOptionConfig struct {
LogSlowOpInterval time.Duration
}

// NewMiddlewareServerOption creates a new grpc.ServerOption that configures a
// common format for log messages and other gRPC related handlers.
func NewMiddlewareServerOption() grpc.ServerOption {
middleWare := []grpc.UnaryServerInterceptor{contextIDInjector, logGRPC, panicHandler}
func NewMiddlewareServerOption(config MiddlewareServerOptionConfig) grpc.ServerOption {
middleWare := []grpc.UnaryServerInterceptor{
contextIDInjector,
logGRPC,
}

if config.LogSlowOpInterval > 0 {
middleWare = append(middleWare, func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
return logSlowGRPC(
config.LogSlowOpInterval, ctx, req, info, handler,
)
})
}

middleWare = append(middleWare, panicHandler)

return grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(middleWare...))
}
Expand Down Expand Up @@ -250,6 +276,53 @@ func logGRPC(
return resp, err
}

func logSlowGRPC(
logInterval time.Duration,
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
handlerFinished := make(chan struct{})
callStartTime := time.Now()

// Ticks at a logInterval rate and logs a slow-call message until handler finishes.
// This is called once the handler outlives its context, see below.
doLogSlowGRPC := func() {
ticker := time.NewTicker(logInterval)
defer ticker.Stop()

for {
select {
case t := <-ticker.C:
timePassed := t.Sub(callStartTime).Truncate(time.Second)
log.ExtendedLog(ctx,
"Slow GRPC call %s (%s)", info.FullMethod, timePassed)
log.TraceLog(ctx,
"Slow GRPC request: %s", protosanitizer.StripSecrets(req))
case <-handlerFinished:
return
}
}
}

go func() {
select {
case <-ctx.Done():
// The call (most likely) outlived its context. Start logging slow messages.
doLogSlowGRPC()
case <-handlerFinished:
// The call finished, exit.
return
}
}()

resp, err := handler(ctx, req)
close(handlerFinished)

return resp, err
}

//nolint:nonamedreturns // named return used to send recovered panic error.
func panicHandler(
ctx context.Context,
Expand Down
5 changes: 4 additions & 1 deletion internal/nfs/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,10 @@ func (fs *Driver) Run(conf *util.Config) {
srv.CS = controller.NewControllerServer(cd)
}

server.Start(conf.Endpoint, srv)
server.Start(conf.Endpoint, srv, csicommon.MiddlewareServerOptionConfig{
LogSlowOpInterval: conf.LogSlowOpInterval,
})

if conf.EnableProfiling {
go util.StartMetricsServer(conf)
log.DebugLogMsg("Registering profiling handler")
Expand Down
8 changes: 6 additions & 2 deletions internal/rbd/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,9 @@ func (r *Driver) Run(conf *util.Config) {
CS: r.cs,
NS: r.ns,
}
s.Start(conf.Endpoint, srv)
s.Start(conf.Endpoint, srv, csicommon.MiddlewareServerOptionConfig{
LogSlowOpInterval: conf.LogSlowOpInterval,
})

r.startProfiling(conf)

Expand Down Expand Up @@ -233,7 +235,9 @@ func (r *Driver) setupCSIAddonsServer(conf *util.Config) error {
}

// start the server, this does not block, it runs a new go-routine
err = r.cas.Start()
err = r.cas.Start(csicommon.MiddlewareServerOptionConfig{
LogSlowOpInterval: conf.LogSlowOpInterval,
})
if err != nil {
return fmt.Errorf("failed to start CSI-Addons server: %w", err)
}
Expand Down
3 changes: 3 additions & 0 deletions internal/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ type Config struct {
MetricsPort int // TCP port for liveness/grpc metrics requests
PollTime time.Duration // time interval in seconds between each poll
PoolTimeout time.Duration // probe timeout in seconds
// Log interval for slow GRPC calls. Calls that outlive their context deadline
// are considered slow.
LogSlowOpInterval time.Duration

EnableProfiling bool // flag to enable profiling
IsControllerServer bool // if set to true start provisioner server
Expand Down

0 comments on commit 7a727c2

Please sign in to comment.