Skip to content

Commit

Permalink
perf(admin): use singleflight to handle Admin's RPCs (#482)
Browse files Browse the repository at this point in the history
### What this PR does

This PR makes the admin server handle RPCs by using singleflight. It helps the admin to reduce redundant computation, especially read operations.

I am only altering read operations. However, it is possible to refactor the admin server to eliminate the use of a single big lock.
  • Loading branch information
ijsong authored Jul 31, 2023
2 parents ebca873 + c231888 commit 1a6a96d
Show file tree
Hide file tree
Showing 2 changed files with 148 additions and 4 deletions.
108 changes: 104 additions & 4 deletions internal/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ import (
"github.com/pkg/errors"
"go.uber.org/multierr"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/kakao/varlog/internal/admin/sfgkey"
"github.com/kakao/varlog/internal/admin/snwatcher"
"github.com/kakao/varlog/pkg/rpc/interceptors/logging"
"github.com/kakao/varlog/pkg/rpc/interceptors/otelgrpc"
Expand All @@ -43,6 +45,7 @@ type Admin struct {

// single large lock
mu sync.RWMutex
sfg singleflight.Group
muLogStreamStatus [numLogStreamMutex]sync.Mutex

snw *snwatcher.StorageNodeWatcher
Expand Down Expand Up @@ -168,13 +171,24 @@ func (adm *Admin) Close() (err error) {
func (adm *Admin) Metadata(ctx context.Context) (*varlogpb.MetadataDescriptor, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()

return adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
}

func (adm *Admin) getStorageNode(ctx context.Context, snid types.StorageNodeID) (*admpb.StorageNodeMetadata, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()

snm, err, _ := adm.sfg.Do(sfgkey.GetStorageNodeKey(snid), func() (interface{}, error) {
return adm.getStorageNodeInternal(ctx, snid)
})
if err != nil {
return nil, err
}
return snm.(*admpb.StorageNodeMetadata), nil
}

func (adm *Admin) getStorageNodeInternal(ctx context.Context, snid types.StorageNodeID) (*admpb.StorageNodeMetadata, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "get storage node: %s", err.Error())
Expand Down Expand Up @@ -221,6 +235,16 @@ func (adm *Admin) listStorageNodes(ctx context.Context) ([]admpb.StorageNodeMeta
adm.mu.RLock()
defer adm.mu.RUnlock()

snms, err, _ := adm.sfg.Do(sfgkey.ListStorageNodesKey(), func() (interface{}, error) {
return adm.listStorageNodesInternal(ctx)
})
if err != nil {
return nil, err
}
return snms.([]admpb.StorageNodeMetadata), nil
}

func (adm *Admin) listStorageNodesInternal(ctx context.Context) ([]admpb.StorageNodeMetadata, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "list storage nodes: %s", err.Error())
Expand Down Expand Up @@ -354,6 +378,19 @@ func (adm *Admin) unregisterStorageNode(ctx context.Context, snid types.StorageN
}

func (adm *Admin) getTopic(ctx context.Context, tpid types.TopicID) (*varlogpb.TopicDescriptor, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()

td, err, _ := adm.sfg.Do(sfgkey.GetTopicKey(tpid), func() (interface{}, error) {
return adm.getTopicInternal(ctx, tpid)
})
if err != nil {
return nil, err
}
return td.(*varlogpb.TopicDescriptor), nil
}

func (adm *Admin) getTopicInternal(ctx context.Context, tpid types.TopicID) (*varlogpb.TopicDescriptor, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "get topic: %s", err.Error())
Expand All @@ -366,9 +403,19 @@ func (adm *Admin) getTopic(ctx context.Context, tpid types.TopicID) (*varlogpb.T
}

func (adm *Admin) listTopics(ctx context.Context) ([]varlogpb.TopicDescriptor, error) {
adm.mu.Lock()
defer adm.mu.Unlock()
adm.mu.RLock()
defer adm.mu.RUnlock()

tds, err, _ := adm.sfg.Do(sfgkey.ListTopicsKey(), func() (interface{}, error) {
return adm.listTopicsInternal(ctx)
})
if err != nil {
return nil, err
}
return tds.([]varlogpb.TopicDescriptor), nil
}

func (adm *Admin) listTopicsInternal(ctx context.Context) ([]varlogpb.TopicDescriptor, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "list topics: %s", err.Error())
Expand Down Expand Up @@ -422,6 +469,19 @@ func (adm *Admin) unregisterTopic(ctx context.Context, tpid types.TopicID) error
}

func (adm *Admin) getLogStream(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (*varlogpb.LogStreamDescriptor, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()

lsd, err, _ := adm.sfg.Do(sfgkey.GetLogStreamKey(tpid, lsid), func() (interface{}, error) {
return adm.getLogStreamInternal(ctx, tpid, lsid)
})
if err != nil {
return nil, err
}
return lsd.(*varlogpb.LogStreamDescriptor), nil
}

func (adm *Admin) getLogStreamInternal(ctx context.Context, tpid types.TopicID, lsid types.LogStreamID) (*varlogpb.LogStreamDescriptor, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "get log stream: %s", err.Error())
Expand All @@ -444,6 +504,19 @@ func (adm *Admin) getLogStream(ctx context.Context, tpid types.TopicID, lsid typ
}

func (adm *Admin) listLogStreams(ctx context.Context, tpid types.TopicID) ([]varlogpb.LogStreamDescriptor, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()

lsds, err, _ := adm.sfg.Do(sfgkey.ListLogStreamsKey(tpid), func() (interface{}, error) {
return adm.listLogStreamsInternal(ctx, tpid)
})
if err != nil {
return nil, err
}
return lsds.([]varlogpb.LogStreamDescriptor), nil
}

func (adm *Admin) listLogStreamsInternal(ctx context.Context, tpid types.TopicID) ([]varlogpb.LogStreamDescriptor, error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil {
return nil, status.Errorf(codes.Unavailable, "list log streams: %s", err.Error())
Expand All @@ -467,9 +540,34 @@ func (adm *Admin) listLogStreams(ctx context.Context, tpid types.TopicID) ([]var
}

func (adm *Admin) describeTopic(ctx context.Context, tpid types.TopicID) (td varlogpb.TopicDescriptor, lsds []varlogpb.LogStreamDescriptor, err error) {
adm.mu.Lock()
defer adm.mu.Unlock()
adm.mu.RLock()
defer adm.mu.RUnlock()

type result struct {
td varlogpb.TopicDescriptor
lsds []varlogpb.LogStreamDescriptor
}

iface, err, _ := adm.sfg.Do(sfgkey.DescribeTopicKey(tpid), func() (interface{}, error) {
td, lsds, err := adm.describeTopicInternal(ctx, tpid)
if err != nil {
return nil, err
}
return &result{
td: td,
lsds: lsds,
}, nil
})
if err != nil {
return
}

res := iface.(*result)
return res.td, res.lsds, nil

}

func (adm *Admin) describeTopicInternal(ctx context.Context, tpid types.TopicID) (td varlogpb.TopicDescriptor, lsds []varlogpb.LogStreamDescriptor, err error) {
md, err := adm.mrmgr.ClusterMetadataView().ClusterMetadata(ctx)
if err != nil || len(md.Topics) == 0 {
return
Expand Down Expand Up @@ -906,6 +1004,7 @@ func (adm *Admin) syncInternal(ctx context.Context, tpid types.TopicID, lsid typ
func (adm *Admin) trim(ctx context.Context, tpid types.TopicID, lastGLSN types.GLSN) ([]admpb.TrimResult, error) {
adm.mu.Lock()
defer adm.mu.Unlock()

return adm.snmgr.Trim(ctx, tpid, lastGLSN)
}

Expand Down Expand Up @@ -948,6 +1047,7 @@ func (adm *Admin) listMetadataRepositoryNodes(ctx context.Context) ([]varlogpb.M
func (adm *Admin) mrInfos(ctx context.Context) (*mrpb.ClusterInfo, error) {
adm.mu.RLock()
defer adm.mu.RUnlock()

return adm.mrmgr.GetClusterInfo(ctx)
}

Expand Down
44 changes: 44 additions & 0 deletions internal/admin/sfgkey/keys.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package sfgkey

import (
"github.com/kakao/varlog/pkg/types"
)

const (
delimiter = "_"
keyGetStorageNode = "getsn"
keyListStorageNodes = "listsns"
keyGetTopic = "gettp"
keyListTopics = "listtps"
keyGetLogStream = "getls"
keyListLogStreams = "listlss"
keyDescribeTopic = "desctp"
)

func GetStorageNodeKey(snid types.StorageNodeID) string {
return keyGetStorageNode + delimiter + snid.String()
}

func ListStorageNodesKey() string {
return keyListStorageNodes
}

func GetTopicKey(tpid types.TopicID) string {
return keyGetTopic + delimiter + tpid.String()
}

func ListTopicsKey() string {
return keyListTopics
}

func GetLogStreamKey(tpid types.TopicID, lsid types.LogStreamID) string {
return keyGetLogStream + delimiter + tpid.String() + delimiter + lsid.String()
}

func ListLogStreamsKey(tpid types.TopicID) string {
return keyListLogStreams + delimiter + tpid.String()
}

func DescribeTopicKey(tpid types.TopicID) string {
return keyDescribeTopic + delimiter + tpid.String()
}

0 comments on commit 1a6a96d

Please sign in to comment.