Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: cleanup store/streaming/constructor.go #14044

Merged
merged 2 commits into from
Nov 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 65 additions & 30 deletions store/streaming/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,36 @@ import (
serverTypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/spf13/cast"
)

// ServiceConstructor is used to construct a streaming service
type ServiceConstructor func(opts serverTypes.AppOptions, keys []types.StoreKey, marshaller codec.BinaryCodec) (baseapp.StreamingService, error)
type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, codec.BinaryCodec) (baseapp.StreamingService, error)

// ServiceType enum for specifying the type of StreamingService
type ServiceType int

const (
Unknown ServiceType = iota
File
// add more in the future
)

// ServiceTypeFromString returns the streaming.ServiceType corresponding to the provided name
// Streaming option keys
const (
OptStreamersFilePrefix = "streamers.file.prefix"
OptStreamersFileWriteDir = "streamers.file.write_dir"
OptStoreStreamers = "store.streamers"
)

// ServiceTypeFromString returns the streaming.ServiceType corresponding to the
// provided name.
func ServiceTypeFromString(name string) ServiceType {
switch strings.ToLower(name) {
case "file", "f":
return File

default:
return Unknown
}
Expand All @@ -41,48 +50,71 @@ func (sst ServiceType) String() string {
switch sst {
case File:
return "file"

default:
return "unknown"
}
}

// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors
// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to
// streaming.ServiceConstructors types.
var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{
File: NewFileStreamingService,
}

// NewServiceConstructor returns the streaming.ServiceConstructor corresponding to the provided name
// NewServiceConstructor returns the streaming.ServiceConstructor corresponding
// to the provided name.
func NewServiceConstructor(name string) (ServiceConstructor, error) {
ssType := ServiceTypeFromString(name)
if ssType == Unknown {
return nil, fmt.Errorf("unrecognized streaming service name %s", name)
}

if constructor, ok := ServiceConstructorLookupTable[ssType]; ok && constructor != nil {
return constructor, nil
}

return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String())
}

// NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService
func NewFileStreamingService(opts serverTypes.AppOptions, keys []types.StoreKey, marshaller codec.BinaryCodec) (baseapp.StreamingService, error) {
filePrefix := cast.ToString(opts.Get("streamers.file.prefix"))
fileDir := cast.ToString(opts.Get("streamers.file.write_dir"))
// NewFileStreamingService is the streaming.ServiceConstructor function for
// creating a FileStreamingService.
func NewFileStreamingService(
opts serverTypes.AppOptions,
keys []types.StoreKey,
marshaller codec.BinaryCodec,
) (baseapp.StreamingService, error) {
filePrefix := cast.ToString(opts.Get(OptStreamersFilePrefix))
fileDir := cast.ToString(opts.Get(OptStreamersFileWriteDir))

return file.NewStreamingService(fileDir, filePrefix, keys, marshaller)
}

// LoadStreamingServices is a function for loading StreamingServices onto the BaseApp using the provided AppOptions, codec, and keys
// It returns the WaitGroup and quit channel used to synchronize with the streaming services and any error that occurs during the setup
func LoadStreamingServices(bApp *baseapp.BaseApp, appOpts serverTypes.AppOptions, appCodec codec.BinaryCodec, keys map[string]*types.KVStoreKey) ([]baseapp.StreamingService, *sync.WaitGroup, error) {
// LoadStreamingServices is a function for loading StreamingServices onto the
// BaseApp using the provided AppOptions, codec, and keys. It returns the
// WaitGroup and quit channel used to synchronize with the streaming services
// and any error that occurs during the setup.
func LoadStreamingServices(
bApp *baseapp.BaseApp,
appOpts serverTypes.AppOptions,
appCodec codec.BinaryCodec,
keys map[string]*types.KVStoreKey,
) ([]baseapp.StreamingService, *sync.WaitGroup, error) {
// waitgroup and quit channel for optional shutdown coordination of the streaming service(s)
wg := new(sync.WaitGroup)

// configure state listening capabilities using AppOptions
streamers := cast.ToStringSlice(appOpts.Get("store.streamers"))
streamers := cast.ToStringSlice(appOpts.Get(OptStoreStreamers))
activeStreamers := make([]baseapp.StreamingService, 0, len(streamers))

for _, streamerName := range streamers {
var exposeStoreKeys []types.StoreKey

// get the store keys allowed to be exposed for this streaming service
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName)))
var exposeStoreKeys []types.StoreKey
if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys

// if list contains '*', expose all store keys
if sdk.SliceContains(exposeKeyStrs, "*") {
exposeStoreKeys = make([]types.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
Expand All @@ -95,43 +127,46 @@ func LoadStreamingServices(bApp *baseapp.BaseApp, appOpts serverTypes.AppOptions
}
}
}
if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything

if len(exposeStoreKeys) == 0 {
continue
}
// get the constructor for this streamer name

constructor, err := NewServiceConstructor(streamerName)
if err != nil {
// close any services we may have already spun up before hitting the error on this one
// Close any services we may have already spun up before hitting the error
// on this one.
for _, activeStreamer := range activeStreamers {
activeStreamer.Close()
}

return nil, nil, err
}
// generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose

// Generate the streaming service using the constructor, appOptions, and the
// StoreKeys we want to expose.
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
if err != nil {
// close any services we may have already spun up before hitting the error on this one
// Close any services we may have already spun up before hitting the error
// on this one.
for _, activeStreamer := range activeStreamers {
activeStreamer.Close()
}

return nil, nil, err
}

// register the streaming service with the BaseApp
bApp.SetStreamingService(streamingService)

// kick off the background streaming service loop
streamingService.Stream(wg)

// add to the list of active streamers
activeStreamers = append(activeStreamers, streamingService)
}
// if there are no active streamers, activeStreamers is empty (len == 0) and the waitGroup is not waiting on anything
return activeStreamers, wg, nil
}

func exposeAll(list []string) bool {
for _, ele := range list {
if ele == "*" {
return true
}
}
return false
// If there are no active streamers, activeStreamers is empty (len == 0) and
// the waitGroup is not waiting on anything.
return activeStreamers, wg, nil
}
15 changes: 14 additions & 1 deletion types/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"fmt"
"time"

"github.com/cosmos/cosmos-sdk/types/kv"
log "github.com/tendermint/tendermint/libs/log"

"github.com/cosmos/cosmos-sdk/types/kv"
)

// SortedJSON takes any JSON and returns it sorted by keys. Also, all white-spaces
Expand Down Expand Up @@ -141,3 +142,15 @@ func LogDeferred(logger log.Logger, f func() error) {
logger.Error(err.Error())
}
}

// SliceContains implements a generic function for checking if a slice contains
// a certain value.
func SliceContains[T comparable](elements []T, v T) bool {
for _, s := range elements {
if v == s {
return true
}
}

return false
}