Skip to content

Commit

Permalink
Support structured and contextual logging
Browse files Browse the repository at this point in the history
  • Loading branch information
bells17 committed May 2, 2024
1 parent f82f9de commit ca88d98
Show file tree
Hide file tree
Showing 14 changed files with 1,066 additions and 87 deletions.
6 changes: 6 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@ all:
go build `go list ./... | grep -v 'vendor'`

include release-tools/build.make

# Check contextual logging.
.PHONY: logcheck
test: logcheck
logcheck:
hack/verify-logcheck.sh
68 changes: 43 additions & 25 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -73,21 +75,21 @@ func SetMaxGRPCLogLength(characterCount int) {
//
// For other connections, the default behavior from gRPC is used and
// loss of connection is not detected reliably.
func Connect(address string, metricsManager metrics.CSIMetricsManager, options ...Option) (*grpc.ClientConn, error) {
func Connect(ctx context.Context, address string, metricsManager metrics.CSIMetricsManager, options ...Option) (*grpc.ClientConn, error) {
// Prepend default options
options = append([]Option{WithTimeout(time.Second * 30)}, options...)
if metricsManager != nil {
options = append([]Option{WithMetrics(metricsManager)}, options...)
}
return connect(address, options)
return connect(ctx, address, options)
}

// ConnectWithoutMetrics behaves exactly like Connect except no metrics are recorded.
// This function is deprecated, prefer using Connect with `nil` as the metricsManager.
func ConnectWithoutMetrics(address string, options ...Option) (*grpc.ClientConn, error) {
func ConnectWithoutMetrics(ctx context.Context, address string, options ...Option) (*grpc.ClientConn, error) {
// Prepend default options
options = append([]Option{WithTimeout(time.Second * 30)}, options...)
return connect(address, options)
return connect(ctx, address, options)
}

// Option is the type of all optional parameters for Connect.
Expand All @@ -97,27 +99,33 @@ type Option func(o *options)
// connection got lost. If that callback returns true, the connection
// is reestablished. Otherwise the connection is left as it is and
// all future gRPC calls using it will fail with status.Unavailable.
func OnConnectionLoss(reconnect func() bool) Option {
func OnConnectionLoss(reconnect func(context.Context) bool) Option {
return func(o *options) {
o.reconnect = reconnect
}
}

// ExitOnConnectionLoss returns callback for OnConnectionLoss() that writes
// an error to /dev/termination-log and exits.
func ExitOnConnectionLoss() func() bool {
return func() bool {
func ExitOnConnectionLoss() func(context.Context) bool {
return func(ctx context.Context) bool {
terminationMsg := "Lost connection to CSI driver, exiting"
if err := os.WriteFile(terminationLogPath, []byte(terminationMsg), 0644); err != nil {
klog.Errorf("%s: %s", terminationLogPath, err)
klog.FromContext(ctx).Error(err, "Failed to write a message to the termination logfile", "terminationLogPath", terminationLogPath)
}
klog.Exit(terminationMsg)
klog.FromContext(ctx).Error(nil, terminationMsg)
klog.FlushAndExit(klog.ExitFlushTimeout, 1)
// Not reached.
return false
}
}

// WithTimeout adds a configurable timeout on the gRPC calls.
// Note that this timeout also prevents all attempts to reconnect
// because it uses context.WithTimeout internally.
//
// For more details, see https://github.com/grpc/grpc-go/issues/133
// and https://github.com/kubernetes-csi/csi-lib-utils/pull/149#discussion_r1574707477
func WithTimeout(timeout time.Duration) Option {
return func(o *options) {
o.timeout = timeout
Expand All @@ -139,30 +147,36 @@ func WithOtelTracing() Option {
}

type options struct {
reconnect func() bool
reconnect func(context.Context) bool
timeout time.Duration
metricsManager metrics.CSIMetricsManager
enableOtelTracing bool
}

// connect is the internal implementation of Connect. It has more options to enable testing.
func connect(
ctx context.Context,
address string,
connectOptions []Option) (*grpc.ClientConn, error) {
logger := klog.FromContext(ctx)
var o options
for _, option := range connectOptions {
option(&o)
}

bc := backoff.DefaultConfig
bc.MaxDelay = time.Second
dialOptions := []grpc.DialOption{
grpc.WithInsecure(), // Don't use TLS, it's usually local Unix domain socket in a container.
grpc.WithBackoffMaxDelay(time.Second), // Retry every second after failure.
grpc.WithTransportCredentials(insecure.NewCredentials()), // Don't use TLS, it's usually local Unix domain socket in a container.
grpc.WithConnectParams(grpc.ConnectParams{Backoff: bc}), // Retry every second after failure.
grpc.WithBlock(), // Block until connection succeeds.
grpc.WithIdleTimeout(time.Duration(0)), // Never close connection because of inactivity.
}

if o.timeout > 0 {
dialOptions = append(dialOptions, grpc.WithTimeout(o.timeout))
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, o.timeout)
defer cancel()
}

interceptors := []grpc.UnaryClientInterceptor{LogGRPC}
Expand All @@ -186,20 +200,25 @@ func connect(
lostConnection := false
reconnect := true

dialOptions = append(dialOptions, grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
dialOptions = append(dialOptions, grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
logger := klog.FromContext(ctx)
if haveConnected && !lostConnection {
// We have detected a loss of connection for the first time. Decide what to do...
// Record this once. TODO (?): log at regular time intervals.
klog.Errorf("Lost connection to %s.", address)
logger.Error(nil, "Lost connection", "address", address)
// Inform caller and let it decide? Default is to reconnect.
if o.reconnect != nil {
reconnect = o.reconnect()
reconnect = o.reconnect(ctx)
}
lostConnection = true
}
if !reconnect {
return nil, errors.New("connection lost, reconnecting disabled")
}
var timeout time.Duration
if deadline, ok := ctx.Deadline(); ok {
timeout = time.Until(deadline)
}
conn, err := net.DialTimeout("unix", address[len(unixPrefix):], timeout)
if err == nil {
// Connection reestablished.
Expand All @@ -212,14 +231,14 @@ func connect(
return nil, errors.New("OnConnectionLoss callback only supported for unix:// addresses")
}

klog.V(5).Infof("Connecting to %s", address)
logger.V(5).Info("Connecting", "address", address)

// Connect in background.
var conn *grpc.ClientConn
var err error
ready := make(chan bool)
go func() {
conn, err = grpc.Dial(address, dialOptions...)
conn, err = grpc.DialContext(ctx, address, dialOptions...)
close(ready)
}()

Expand All @@ -231,7 +250,7 @@ func connect(
for {
select {
case <-ticker.C:
klog.Warningf("Still connecting to %s", address)
logger.Info("Still connecting", "address", address)

case <-ready:
return conn, err
Expand All @@ -241,15 +260,14 @@ func connect(

// LogGRPC is gPRC unary interceptor for logging of CSI messages at level 5. It removes any secrets from the message.
func LogGRPC(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
klog.V(5).Infof("GRPC call: %s", method)
klog.V(5).Infof("GRPC request: %s", protosanitizer.StripSecrets(req))
logger := klog.FromContext(ctx)
logger.V(5).Info("GRPC call", "method", method, "request", protosanitizer.StripSecrets(req))
err := invoker(ctx, method, req, reply, cc, opts...)
cappedStr := protosanitizer.StripSecrets(reply).String()
if maxLogChar > 0 && len(cappedStr) > maxLogChar {
cappedStr = cappedStr[:maxLogChar] + fmt.Sprintf(" [response body too large, log capped to %d chars]", maxLogChar)
}
klog.V(5).Infof("GRPC response: %s", cappedStr)
klog.V(5).Infof("GRPC error: %v", err)
logger.V(5).Info("GRPC response", "response", cappedStr, "err", err)
return err
}

Expand Down Expand Up @@ -286,14 +304,14 @@ func (cmm ExtendedCSIMetricsManager) RecordMetricsClientInterceptor(
if additionalInfo != nil {
additionalInfoVal, ok := additionalInfo.(AdditionalInfo)
if !ok {
klog.Errorf("Failed to record migrated status, cannot convert additional info %v", additionalInfo)
klog.FromContext(ctx).Error(nil, "Failed to record migrated status, cannot convert additional info", "additionalInfo", additionalInfo)
return err
}
migrated = additionalInfoVal.Migrated
}
cmmv, metricsErr := cmm.WithLabelValues(map[string]string{metrics.LabelMigrated: migrated})
if metricsErr != nil {
klog.Errorf("Failed to record migrated status, error: %v", metricsErr)
klog.FromContext(ctx).Error(metricsErr, "Failed to record migrated status")
} else {
cmmBase = cmmv
}
Expand Down
Loading

0 comments on commit ca88d98

Please sign in to comment.