Skip to content

Commit

Permalink
Merge pull request #70 from kakao/admin-grpc-logger
Browse files Browse the repository at this point in the history
chore(admin): add grpc logging middleware
  • Loading branch information
ijsong authored Aug 26, 2022
2 parents 751d142 + 74ff8f4 commit 4cdbb9e
Show file tree
Hide file tree
Showing 38 changed files with 2,213 additions and 1,555 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ bin/benchmark
bin/varlogcli
bin/stress
bin/varlogsn
bin/varlogadm

# Python
*.pyc
Expand All @@ -61,7 +62,7 @@ cmake-build-debug/
.DS_Store

# ctags
tags
/tags

# k8s
deploy/k8s-experiment/dev/kustomization.yaml
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/gofuzz v1.2.0
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4
github.com/pkg/errors v0.9.1
github.com/smartystreets/assertions v1.13.0
github.com/smartystreets/goconvey v1.7.2
Expand Down
3 changes: 3 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB7
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 h1:z53tR0945TRRQO/fLEVPI6SMv7ZflF0TEaTAoU7tOzg=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
Expand Down Expand Up @@ -342,6 +343,7 @@ github.com/klauspost/compress v1.9.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0
github.com/klauspost/compress v1.11.7 h1:0hzRabrMN4tSTvMfnL3SCv1ZGeAP23ynzodBgaHeMeg=
github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
Expand Down Expand Up @@ -441,6 +443,7 @@ github.com/shirou/gopsutil/v3 v3.21.9 h1:Vn4MUz2uXhqLSiCbGFRc0DILbMVLAY92DSkT8bs
github.com/shirou/gopsutil/v3 v3.21.9/go.mod h1:YWp/H8Qs5fVmf17v7JNZzA0mPJ+mS2e9JdiUF9LlKzQ=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo=
Expand Down
12 changes: 11 additions & 1 deletion internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
"time"

"github.com/gogo/protobuf/proto"
grpcmiddleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpczap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap"
grpcctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags"
"github.com/pkg/errors"
"go.uber.org/multierr"
"go.uber.org/zap"
Expand Down Expand Up @@ -68,11 +71,18 @@ func New(ctx context.Context, opts ...Option) (*Admin, error) {
return nil, err
}

grpcServer := grpc.NewServer(
grpcmiddleware.WithUnaryServerChain(
grpcctxtags.UnaryServerInterceptor(),
grpczap.UnaryServerInterceptor(cfg.logger),
),
)

cm := &Admin{
config: cfg,
lsidGen: logStreamIDGen,
tpidGen: topicIDGen,
server: grpc.NewServer(),
server: grpcServer,
healthServer: health.NewServer(),
}
cm.snw, err = snwatcher.New(append(
Expand Down
164 changes: 48 additions & 116 deletions internal/admin/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package admin

import (
"context"
"fmt"

pbtypes "github.com/gogo/protobuf/types"
"go.uber.org/zap"
"google.golang.org/grpc/codes"

"github.com/kakao/varlog/pkg/types"
Expand All @@ -19,18 +17,6 @@ type server struct {

var _ vmspb.ClusterManagerServer = (*server)(nil)

type handler func(ctx context.Context, req interface{}) (rsp interface{}, err error)

func (s *server) withTelemetry(ctx context.Context, spanName string, req interface{}, fn handler) (rsp interface{}, err error) {
rsp, err = fn(ctx, req)
if err == nil {
s.admin.logger.Debug(spanName, zap.Stringer("request", req.(fmt.Stringer)), zap.Stringer("response", rsp.(fmt.Stringer)))
} else {
s.admin.logger.Error(spanName, zap.Stringer("request", req.(fmt.Stringer)), zap.Error(err))
}
return rsp, err
}

func (s *server) GetStorageNode(ctx context.Context, req *vmspb.GetStorageNodeRequest) (*vmspb.GetStorageNodeResponse, error) {
snm, err := s.admin.getStorageNode(ctx, req.StorageNodeID)
if err != nil {
Expand Down Expand Up @@ -106,89 +92,55 @@ func (s *server) ListLogStreams(ctx context.Context, req *vmspb.ListLogStreamsRe
}

func (s *server) DescribeTopic(ctx context.Context, req *vmspb.DescribeTopicRequest) (*vmspb.DescribeTopicResponse, error) {
rspI, err := s.withTelemetry(ctx, "varlog.vmspb.ClusterManagerDeprecated/DescribeTopic", req,
func(ctx context.Context, _ interface{}) (interface{}, error) {
td, lsds, err := s.admin.describeTopic(ctx, req.TopicID)
return &vmspb.DescribeTopicResponse{
Topic: td,
LogStreams: lsds,
}, err
},
)
return rspI.(*vmspb.DescribeTopicResponse), verrors.ToStatusErrorWithCode(err, codes.Unavailable)
td, lsds, err := s.admin.describeTopic(ctx, req.TopicID)
if err != nil {
return nil, verrors.ToStatusErrorWithCode(err, codes.Unavailable)
}
return &vmspb.DescribeTopicResponse{
Topic: td,
LogStreams: lsds,
}, nil
}

func (s *server) AddLogStream(ctx context.Context, req *vmspb.AddLogStreamRequest) (*vmspb.AddLogStreamResponse, error) {
rspI, err := s.withTelemetry(ctx, "varlog.vmspb.ClusterManagerDeprecated/AddLogStream", req,
func(ctx context.Context, _ interface{}) (interface{}, error) {
logStreamDesc, err := s.admin.addLogStream(ctx, req.GetTopicID(), req.GetReplicas())
return &vmspb.AddLogStreamResponse{LogStream: logStreamDesc}, err
},
)
return rspI.(*vmspb.AddLogStreamResponse), verrors.ToStatusErrorWithCode(err, codes.Unavailable)
logStreamDesc, err := s.admin.addLogStream(ctx, req.GetTopicID(), req.GetReplicas())
if err != nil {
return nil, verrors.ToStatusErrorWithCode(err, codes.Unavailable)
}
return &vmspb.AddLogStreamResponse{LogStream: logStreamDesc}, nil
}

func (s *server) UnregisterLogStream(ctx context.Context, req *vmspb.UnregisterLogStreamRequest) (*vmspb.UnregisterLogStreamResponse, error) {
rspI, err := s.withTelemetry(ctx, "varlog.vmspb.ClusterManagerDeprecated/UnregisterLogStream", req,
func(ctx context.Context, _ interface{}) (interface{}, error) {
err := s.admin.unregisterLogStream(ctx, req.GetTopicID(), req.GetLogStreamID())
return &vmspb.UnregisterLogStreamResponse{}, err
},
)
return rspI.(*vmspb.UnregisterLogStreamResponse), verrors.ToStatusError(err)
err := s.admin.unregisterLogStream(ctx, req.GetTopicID(), req.GetLogStreamID())
return &vmspb.UnregisterLogStreamResponse{}, verrors.ToStatusError(err)
}

func (s *server) RemoveLogStreamReplica(ctx context.Context, req *vmspb.RemoveLogStreamReplicaRequest) (*vmspb.RemoveLogStreamReplicaResponse, error) {
rspI, err := s.withTelemetry(ctx, "varlog.vmspb.ClusterManagerDeprecated/RemoveLogStreamReplica", req,
func(ctx context.Context, _ interface{}) (interface{}, error) {
err := s.admin.removeLogStreamReplica(ctx, req.GetStorageNodeID(), req.GetTopicID(), req.GetLogStreamID())
return &vmspb.RemoveLogStreamReplicaResponse{}, err
},
)
return rspI.(*vmspb.RemoveLogStreamReplicaResponse), verrors.ToStatusError(err)
err := s.admin.removeLogStreamReplica(ctx, req.GetStorageNodeID(), req.GetTopicID(), req.GetLogStreamID())
return &vmspb.RemoveLogStreamReplicaResponse{}, verrors.ToStatusError(err)
}

func (s *server) UpdateLogStream(ctx context.Context, req *vmspb.UpdateLogStreamRequest) (*vmspb.UpdateLogStreamResponse, error) {
rspI, err := s.withTelemetry(ctx, "varlog.vmspb.ClusterManagerDeprecated/UpdateLogStream", req,
func(ctx context.Context, _ interface{}) (interface{}, error) {
lsdesc, err := s.admin.updateLogStream(ctx, req.GetLogStreamID(), req.GetPoppedReplica(), req.GetPushedReplica())
return &vmspb.UpdateLogStreamResponse{LogStream: lsdesc}, err
},
)
return rspI.(*vmspb.UpdateLogStreamResponse), verrors.ToStatusError(err)
lsdesc, err := s.admin.updateLogStream(ctx, req.GetLogStreamID(), req.GetPoppedReplica(), req.GetPushedReplica())
return &vmspb.UpdateLogStreamResponse{LogStream: lsdesc}, verrors.ToStatusError(err)
}

func (s *server) Seal(ctx context.Context, req *vmspb.SealRequest) (*vmspb.SealResponse, error) {
rspI, err := s.withTelemetry(ctx, "varlog.vmspb.ClusterManagerDeprecated/Seal", req,
func(ctx context.Context, _ interface{}) (interface{}, error) {
lsmetas, sealedGLSN, err := s.admin.seal(ctx, req.GetTopicID(), req.GetLogStreamID())
return &vmspb.SealResponse{
LogStreams: lsmetas,
SealedGLSN: sealedGLSN,
}, err
},
)
return rspI.(*vmspb.SealResponse), verrors.ToStatusError(err)
lsmetas, sealedGLSN, err := s.admin.seal(ctx, req.GetTopicID(), req.GetLogStreamID())
return &vmspb.SealResponse{
LogStreams: lsmetas,
SealedGLSN: sealedGLSN,
}, verrors.ToStatusError(err)
}

func (s *server) Sync(ctx context.Context, req *vmspb.SyncRequest) (*vmspb.SyncResponse, error) {
rspI, err := s.withTelemetry(ctx, "varlog.vmspb.ClusterManagerDeprecated/Sync", req,
func(ctx context.Context, _ interface{}) (interface{}, error) {
status, err := s.admin.sync(ctx, req.GetTopicID(), req.GetLogStreamID(), req.GetSrcStorageNodeID(), req.GetDstStorageNodeID())
return &vmspb.SyncResponse{Status: status}, err
},
)
return rspI.(*vmspb.SyncResponse), verrors.ToStatusError(err)
status, err := s.admin.sync(ctx, req.GetTopicID(), req.GetLogStreamID(), req.GetSrcStorageNodeID(), req.GetDstStorageNodeID())
return &vmspb.SyncResponse{Status: status}, verrors.ToStatusError(err)
}

func (s *server) Unseal(ctx context.Context, req *vmspb.UnsealRequest) (*vmspb.UnsealResponse, error) {
rspI, err := s.withTelemetry(ctx, "varlog.vmspb.ClusterManagerDeprecated/Unseal", req,
func(ctx context.Context, _ interface{}) (interface{}, error) {
lsdesc, err := s.admin.unseal(ctx, req.GetTopicID(), req.GetLogStreamID())
return &vmspb.UnsealResponse{LogStream: lsdesc}, err
},
)
return rspI.(*vmspb.UnsealResponse), verrors.ToStatusError(err)
lsdesc, err := s.admin.unseal(ctx, req.GetTopicID(), req.GetLogStreamID())
return &vmspb.UnsealResponse{LogStream: lsdesc}, verrors.ToStatusError(err)
}

func (s *server) GetMetadataRepositoryNode(ctx context.Context, req *vmspb.GetMetadataRepositoryNodeRequest) (*vmspb.GetMetadataRepositoryNodeResponse, error) {
Expand All @@ -202,25 +154,20 @@ func (s *server) ListMetadataRepositoryNodes(ctx context.Context, _ *vmspb.ListM
}

func (s *server) GetMRMembers(ctx context.Context, req *pbtypes.Empty) (*vmspb.GetMRMembersResponse, error) {
rspI, err := s.withTelemetry(ctx, "varlog.vmspb.ClusterManagerDeprecated/GetMRMembers", req,
func(ctx context.Context, _ interface{}) (interface{}, error) {
var rsp *vmspb.GetMRMembersResponse
mrInfo, err := s.admin.mrInfos(ctx)
if err != nil {
return rsp, err
}
rsp = &vmspb.GetMRMembersResponse{
Leader: mrInfo.Leader,
ReplicationFactor: mrInfo.ReplicationFactor,
Members: make(map[types.NodeID]string, len(mrInfo.Members)),
}
for nodeID, m := range mrInfo.Members {
rsp.Members[nodeID] = m.Peer
}
return rsp, nil
},
)
return rspI.(*vmspb.GetMRMembersResponse), verrors.ToStatusError(err)
var rsp *vmspb.GetMRMembersResponse
mrInfo, err := s.admin.mrInfos(ctx)
if err != nil {
return rsp, verrors.ToStatusError(err)
}
rsp = &vmspb.GetMRMembersResponse{
Leader: mrInfo.Leader,
ReplicationFactor: mrInfo.ReplicationFactor,
Members: make(map[types.NodeID]string, len(mrInfo.Members)),
}
for nodeID, m := range mrInfo.Members {
rsp.Members[nodeID] = m.Peer
}
return rsp, nil
}

func (s *server) AddMetadataRepositoryNode(ctx context.Context, req *vmspb.AddMetadataRepositoryNodeRequest) (*vmspb.AddMetadataRepositoryNodeResponse, error) {
Expand All @@ -229,13 +176,8 @@ func (s *server) AddMetadataRepositoryNode(ctx context.Context, req *vmspb.AddMe
}

func (s *server) AddMRPeer(ctx context.Context, req *vmspb.AddMRPeerRequest) (*vmspb.AddMRPeerResponse, error) {
rspI, err := s.withTelemetry(ctx, "varlog.vmspb.ClusterManagerDeprecated/AddMRPeer", req,
func(ctx context.Context, _ interface{}) (interface{}, error) {
nodeID, err := s.admin.addMRPeer(ctx, req.RaftURL, req.RPCAddr)
return &vmspb.AddMRPeerResponse{NodeID: nodeID}, err
},
)
return rspI.(*vmspb.AddMRPeerResponse), verrors.ToStatusError(err)
nodeID, err := s.admin.addMRPeer(ctx, req.RaftURL, req.RPCAddr)
return &vmspb.AddMRPeerResponse{NodeID: nodeID}, verrors.ToStatusError(err)
}

func (s *server) DeleteMetadataRepositoryNode(ctx context.Context, req *vmspb.DeleteMetadataRepositoryNodeRequest) (*vmspb.DeleteMetadataRepositoryNodeResponse, error) {
Expand All @@ -244,21 +186,11 @@ func (s *server) DeleteMetadataRepositoryNode(ctx context.Context, req *vmspb.De
}

func (s *server) RemoveMRPeer(ctx context.Context, req *vmspb.RemoveMRPeerRequest) (*vmspb.RemoveMRPeerResponse, error) {
rspI, err := s.withTelemetry(ctx, "varlog.vmspb.ClusterManagerDeprecated/RemoveMRPeer", req,
func(ctx context.Context, _ interface{}) (interface{}, error) {
err := s.admin.removeMRPeer(ctx, req.RaftURL)
return &vmspb.RemoveMRPeerResponse{}, err
},
)
return rspI.(*vmspb.RemoveMRPeerResponse), verrors.ToStatusError(err)
err := s.admin.removeMRPeer(ctx, req.RaftURL)
return &vmspb.RemoveMRPeerResponse{}, verrors.ToStatusError(err)
}

func (s *server) Trim(ctx context.Context, req *vmspb.TrimRequest) (*vmspb.TrimResponse, error) {
rspI, err := s.withTelemetry(ctx, "varlog.vmspb.ClusterManagerDeprecated/Trim", req,
func(ctx context.Context, _ interface{}) (interface{}, error) {
res, err := s.admin.trim(ctx, req.TopicID, req.LastGLSN)
return &vmspb.TrimResponse{Results: res}, err
},
)
return rspI.(*vmspb.TrimResponse), verrors.ToStatusError(err)
res, err := s.admin.trim(ctx, req.TopicID, req.LastGLSN)
return &vmspb.TrimResponse{Results: res}, verrors.ToStatusError(err)
}
2 changes: 1 addition & 1 deletion scripts/grpcui_admin.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ grpcui \
--import-path ${scriptdir}/../vendor \
--import-path ${scriptdir}/../proto \
--import-path $GOPATH/src \
--proto vmspb/vms.proto \
--proto vmspb/admin.proto \
$@
Loading

0 comments on commit 4cdbb9e

Please sign in to comment.