Skip to content

Commit

Permalink
optimize engine cache checker
Browse files Browse the repository at this point in the history
  • Loading branch information
DuodenumL committed Jan 19, 2022
1 parent 0f54c24 commit aa94e3e
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 60 deletions.
2 changes: 1 addition & 1 deletion core.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func serve(c *cli.Context) error {
defer cancel()

// start engine cache checker
go factory.EngineCacheChecker(ctx, config.ConnectionTimeout)
go factory.EngineCacheChecker(ctx, config)

<-ctx.Done()

Expand Down
86 changes: 64 additions & 22 deletions engine/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/projecteru2/core/engine"
"github.com/projecteru2/core/engine/docker"
"github.com/projecteru2/core/engine/fake"
"github.com/projecteru2/core/engine/mocks/fakeengine"
"github.com/projecteru2/core/engine/systemd"
"github.com/projecteru2/core/engine/virt"
Expand Down Expand Up @@ -36,9 +37,21 @@ func getEngineCacheKey(endpoint, ca, cert, key string) string {
return endpoint + "-" + utils.SHA256(fmt.Sprintf(":%v:%v:%v", ca, cert, key))[:8]
}

type engineParams struct {
nodename string
endpoint string
ca string
cert string
key string
}

func (ep engineParams) getCacheKey() string {
return getEngineCacheKey(ep.endpoint, ep.ca, ep.cert, ep.key)
}

// EngineCacheChecker checks if the engine in cache is available
func EngineCacheChecker(ctx context.Context, timeout time.Duration) {
log.Info("[EngineCacheChecker] starts")
func EngineCacheChecker(ctx context.Context, config types.Config) {
log.Info("[EngineCacheChecker] starts %v")
defer log.Info("[EngineCacheChecker] ends")
for {
select {
Expand All @@ -47,25 +60,43 @@ func EngineCacheChecker(ctx context.Context, timeout time.Duration) {
default:
}

keysToRemove := []string{}
keysToCheck.Range(func(key, _ interface{}) bool {
cacheKey := key.(string)
client := engineCache.Get(cacheKey)
if client == nil {
keysToRemove = append(keysToRemove, cacheKey)
return true
}
if err := validateEngine(ctx, client, timeout); err != nil {
log.Errorf(ctx, "[GetEngineFromCache] engine %v is unavailable, will be removed from cache, err: %v", cacheKey, err)
keysToRemove = append(keysToRemove, cacheKey)
keysToRemove := make(chan engineParams, config.MaxConcurrency)
pool := utils.NewGoroutinePool(int(config.MaxConcurrency))
go func() {
for key := range keysToRemove {
engineCache.Delete(key.getCacheKey())
keysToCheck.Delete(key)
}
}()

keysToCheck.Range(func(key, _ interface{}) bool {
pool.Go(ctx, func() {
params := key.(engineParams)
cacheKey := params.getCacheKey()
client := engineCache.Get(cacheKey)
if client == nil {
keysToRemove <- params
return
}
if _, ok := client.(*fake.Engine); ok {
engineCache.Delete(cacheKey)
if _, err := GetEngine(ctx, config, params.nodename, params.endpoint, params.ca, params.key, params.cert); err != nil {
log.Errorf(ctx, "[EngineCacheChecker] engine %v is still unavailable, err: %v", cacheKey, err)
}
return
}
if err := validateEngine(ctx, client, config.ConnectionTimeout); err != nil {
log.Errorf(ctx, "[EngineCacheChecker] engine %v is unavailable, will be replaced with a fake engine, err: %v", cacheKey, err)
engineCache.Set(cacheKey, &fake.Engine{DefaultErr: err})
}
return
})
return true
})
for _, key := range keysToRemove {
engineCache.Delete(key)
keysToCheck.Delete(key)
}
time.Sleep(timeout)

pool.Wait(ctx)
close(keysToRemove)
time.Sleep(config.ConnectionTimeout)
}
}

Expand Down Expand Up @@ -95,12 +126,20 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
}

defer func() {
if err == nil && client != nil {
cacheKey := getEngineCacheKey(endpoint, ca, cert, key)
cacheKey := getEngineCacheKey(endpoint, ca, cert, key)
if err == nil {
engineCache.Set(cacheKey, client)
keysToCheck.Store(cacheKey, struct{}{})
log.Infof(ctx, "[GetEngine] store engine %v in cache", cacheKey)
} else {
engineCache.Set(cacheKey, &fake.Engine{DefaultErr: err})
log.Infof(ctx, "[GetEngine] store fake engine %v in cache", cacheKey)
}
keysToCheck.Store(engineParams{
endpoint: endpoint,
ca: ca,
cert: cert,
key: key,
}, struct{}{})
}()

prefix, err := getEnginePrefix(endpoint)
Expand All @@ -111,7 +150,10 @@ func GetEngine(ctx context.Context, config types.Config, nodename, endpoint, ca,
if !ok {
return nil, types.ErrNotSupport
}
if client, err = e(ctx, config, nodename, endpoint, ca, cert, key); err != nil {
utils.WithTimeout(ctx, config.ConnectionTimeout, func(ctx context.Context) {
client, err = e(ctx, config, nodename, endpoint, ca, cert, key)
})
if err != nil {
return nil, err
}
if err = validateEngine(ctx, client, config.ConnectionTimeout); err != nil {
Expand Down
71 changes: 36 additions & 35 deletions engine/fake/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,100 +7,101 @@ import (

enginetypes "github.com/projecteru2/core/engine/types"
coresource "github.com/projecteru2/core/source"
"github.com/projecteru2/core/types"
)

// Engine to replace nil engine
type Engine struct{}
type Engine struct {
DefaultErr error
}

// Info .
func (f *Engine) Info(ctx context.Context) (*enginetypes.Info, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// Ping .
func (f *Engine) Ping(ctx context.Context) error {
return types.ErrNilEngine
return f.DefaultErr
}

// Execute .
func (f *Engine) Execute(ctx context.Context, ID string, config *enginetypes.ExecConfig) (result string, stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) {
return "", nil, nil, nil, types.ErrNilEngine
return "", nil, nil, nil, f.DefaultErr
}

// ExecResize .
func (f *Engine) ExecResize(ctx context.Context, ID, result string, height, width uint) (err error) {
return types.ErrNilEngine
return f.DefaultErr
}

// ExecExitCode .
func (f *Engine) ExecExitCode(ctx context.Context, ID, result string) (int, error) {
return 0, types.ErrNilEngine
return 0, f.DefaultErr
}

// NetworkConnect .
func (f *Engine) NetworkConnect(ctx context.Context, network, target, ipv4, ipv6 string) ([]string, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// NetworkDisconnect .
func (f *Engine) NetworkDisconnect(ctx context.Context, network, target string, force bool) error {
return types.ErrNilEngine
return f.DefaultErr
}

// NetworkList .
func (f *Engine) NetworkList(ctx context.Context, drivers []string) ([]*enginetypes.Network, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// ImageList .
func (f *Engine) ImageList(ctx context.Context, image string) ([]*enginetypes.Image, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// ImageRemove .
func (f *Engine) ImageRemove(ctx context.Context, image string, force, prune bool) ([]string, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// ImagesPrune .
func (f *Engine) ImagesPrune(ctx context.Context) error {
return types.ErrNilEngine
return f.DefaultErr
}

// ImagePull .
func (f *Engine) ImagePull(ctx context.Context, ref string, all bool) (io.ReadCloser, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// ImagePush .
func (f *Engine) ImagePush(ctx context.Context, ref string) (io.ReadCloser, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// ImageBuild .
func (f *Engine) ImageBuild(ctx context.Context, input io.Reader, refs []string) (io.ReadCloser, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// ImageBuildCachePrune .
func (f *Engine) ImageBuildCachePrune(ctx context.Context, all bool) (uint64, error) {
return 0, types.ErrNilEngine
return 0, f.DefaultErr
}

// ImageLocalDigests .
func (f *Engine) ImageLocalDigests(ctx context.Context, image string) ([]string, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// ImageRemoteDigest .
func (f *Engine) ImageRemoteDigest(ctx context.Context, image string) (string, error) {
return "", types.ErrNilEngine
return "", f.DefaultErr
}

// ImageBuildFromExist .
func (f *Engine) ImageBuildFromExist(ctx context.Context, ID string, refs []string, user string) (string, error) {
return "", types.ErrNilEngine
return "", f.DefaultErr
}

// BuildRefs .
Expand All @@ -110,75 +111,75 @@ func (f *Engine) BuildRefs(ctx context.Context, opts *enginetypes.BuildRefOption

// BuildContent .
func (f *Engine) BuildContent(ctx context.Context, scm coresource.Source, opts *enginetypes.BuildContentOptions) (string, io.Reader, error) {
return "", nil, types.ErrNilEngine
return "", nil, f.DefaultErr
}

// VirtualizationCreate .
func (f *Engine) VirtualizationCreate(ctx context.Context, opts *enginetypes.VirtualizationCreateOptions) (*enginetypes.VirtualizationCreated, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// VirtualizationResourceRemap .
func (f *Engine) VirtualizationResourceRemap(ctx context.Context, options *enginetypes.VirtualizationRemapOptions) (<-chan enginetypes.VirtualizationRemapMessage, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// VirtualizationCopyTo .
func (f *Engine) VirtualizationCopyTo(ctx context.Context, ID, target string, content []byte, uid, gid int, mode int64) error {
return types.ErrNilEngine
return f.DefaultErr
}

// VirtualizationStart .
func (f *Engine) VirtualizationStart(ctx context.Context, ID string) error {
return types.ErrNilEngine
return f.DefaultErr
}

// VirtualizationStop .
func (f *Engine) VirtualizationStop(ctx context.Context, ID string, gracefulTimeout time.Duration) error {
return types.ErrNilEngine
return f.DefaultErr
}

// VirtualizationRemove .
func (f *Engine) VirtualizationRemove(ctx context.Context, ID string, volumes, force bool) error {
return types.ErrNilEngine
return f.DefaultErr
}

// VirtualizationInspect .
func (f *Engine) VirtualizationInspect(ctx context.Context, ID string) (*enginetypes.VirtualizationInfo, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// VirtualizationLogs .
func (f *Engine) VirtualizationLogs(ctx context.Context, opts *enginetypes.VirtualizationLogStreamOptions) (stdout, stderr io.ReadCloser, err error) {
return nil, nil, types.ErrNilEngine
return nil, nil, f.DefaultErr
}

// VirtualizationAttach .
func (f *Engine) VirtualizationAttach(ctx context.Context, ID string, stream, openStdin bool) (stdout, stderr io.ReadCloser, stdin io.WriteCloser, err error) {
return nil, nil, nil, types.ErrNilEngine
return nil, nil, nil, f.DefaultErr
}

// VirtualizationResize .
func (f *Engine) VirtualizationResize(ctx context.Context, ID string, height, width uint) error {
return types.ErrNilEngine
return f.DefaultErr
}

// VirtualizationWait .
func (f *Engine) VirtualizationWait(ctx context.Context, ID, state string) (*enginetypes.VirtualizationWaitResult, error) {
return nil, types.ErrNilEngine
return nil, f.DefaultErr
}

// VirtualizationUpdateResource .
func (f *Engine) VirtualizationUpdateResource(ctx context.Context, ID string, opts *enginetypes.VirtualizationResource) error {
return types.ErrNilEngine
return f.DefaultErr
}

// VirtualizationCopyFrom .
func (f *Engine) VirtualizationCopyFrom(ctx context.Context, ID, path string) (content []byte, uid, gid int, mode int64, _ error) {
return nil, 0, 0, 0, types.ErrNilEngine
return nil, 0, 0, 0, f.DefaultErr
}

// ResourceValidate .
func (f *Engine) ResourceValidate(ctx context.Context, cpu float64, cpumap map[string]int64, memory, storage int64) error {
return types.ErrNilEngine
return f.DefaultErr
}
2 changes: 1 addition & 1 deletion store/etcdv3/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func (m *Mercury) doGetNodes(ctx context.Context, kvs []*mvccpb.KeyValue, labels
return nil, err
}
node.Init()
node.Engine = &fake.Engine{}
node.Engine = &fake.Engine{DefaultErr: types.ErrNilEngine}
if utils.FilterWorkload(node.Labels, labels) {
allNodes = append(allNodes, node)
}
Expand Down
2 changes: 1 addition & 1 deletion store/redis/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (r *Rediaron) doGetNodes(ctx context.Context, kvs map[string]string, labels
return nil, err
}
node.Init()
node.Engine = &fake.Engine{}
node.Engine = &fake.Engine{DefaultErr: types.ErrNilEngine}
if utils.FilterWorkload(node.Labels, labels) {
allNodes = append(allNodes, node)
}
Expand Down

0 comments on commit aa94e3e

Please sign in to comment.