Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory consumption for Kubernetes source configuration #1425

Merged
merged 3 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions internal/source/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ func NewDispatcher(log logrus.FieldLogger, clusterName string, notifiers map[str
}

// Dispatch starts a given plugin, watches for incoming events and calling all notifiers to dispatch received event.
// Once we will have the gRPC contract established with proper Cloud Event schema, we should move also this logic here:
// https://github.com/kubeshop/botkube/blob/525c737956ff820a09321879284037da8bf5d647/pkg/controller/controller.go#L200-L253
func (d *Dispatcher) Dispatch(dispatch PluginDispatch) error {
log := d.log.WithFields(logrus.Fields{
"pluginName": dispatch.pluginName,
Expand Down
70 changes: 70 additions & 0 deletions internal/source/kubernetes/bg_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kubernetes

import (
"context"
"sync"
"time"

"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

// backgroundProcessor is responsible for running background processes.
type backgroundProcessor struct {
mu sync.RWMutex
cancelCtxFn func()
startTime time.Time

errGroup *errgroup.Group
}

// newBackgroundProcessor creates new background processor.
func newBackgroundProcessor() *backgroundProcessor {
return &backgroundProcessor{}
}

// StartTime returns the start time of the background processor.
func (b *backgroundProcessor) StartTime() time.Time {
b.mu.RLock()
defer b.mu.RUnlock()
return b.startTime
}

// Run starts the background processes.
func (b *backgroundProcessor) Run(parentCtx context.Context, fns []func(ctx context.Context)) {
b.mu.Lock()
defer b.mu.Unlock()

b.startTime = time.Now()
ctx, cancelFn := context.WithCancel(parentCtx)
b.cancelCtxFn = cancelFn

errGroup, errGroupCtx := errgroup.WithContext(ctx)
b.errGroup = errGroup

for _, fn := range fns {
fn := fn
errGroup.Go(func() error {
fn(errGroupCtx)
return nil
})
}
}

// StopAndWait stops the background processes and waits for them to finish.
func (b *backgroundProcessor) StopAndWait(log logrus.FieldLogger) error {
b.mu.Lock()
defer b.mu.Unlock()

if b.cancelCtxFn != nil {
log.Debug("Cancelling context of the background processor...")
b.cancelCtxFn()
}

if b.errGroup == nil {
return nil
}

log.Debug("Waiting for background processor to finish...")
return b.errGroup.Wait()
}
91 changes: 91 additions & 0 deletions internal/source/kubernetes/configuration_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package kubernetes

import (
"fmt"
"sync"

"github.com/kubeshop/botkube/pkg/maputil"
)

// configurationStore stores all source configurations in a thread-safe way.
type configurationStore struct {
store map[string]SourceConfig
storeByKubeconfig map[string]map[string]SourceConfig

lock sync.RWMutex
}

// newConfigurations creates new empty configurationStore instance.
func newConfigurations() *configurationStore {
return &configurationStore{
store: make(map[string]SourceConfig),
storeByKubeconfig: make(map[string]map[string]SourceConfig),
}
}

// Store stores SourceConfig in a thread-safe way.
func (c *configurationStore) Store(sourceName string, cfg SourceConfig) {
c.lock.Lock()
defer c.lock.Unlock()

key := c.keyForStore(sourceName, cfg.isInteractivitySupported)

c.store[key] = cfg

kubeConfigKey := string(cfg.kubeConfig)
if _, ok := c.storeByKubeconfig[kubeConfigKey]; !ok {
c.storeByKubeconfig[kubeConfigKey] = make(map[string]SourceConfig)
}
c.storeByKubeconfig[kubeConfigKey][key] = cfg
}

// Get returns SourceConfig by a key.
func (c *configurationStore) Get(sourceKey string) (SourceConfig, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
val, ok := c.store[sourceKey]
return val, ok
}

// GetGlobal returns global SourceConfig.
func (c *configurationStore) GetGlobal() (SourceConfig, bool) {
c.lock.RLock()
defer c.lock.RUnlock()

sortedKeys := c.sortedKeys()
if len(sortedKeys) == 0 {
return SourceConfig{}, false
}

return c.store[sortedKeys[0]], true
}

// Len returns number of stored SourceConfigs.
func (c *configurationStore) Len() int {
c.lock.RLock()
defer c.lock.RUnlock()
return len(c.store)
}

// CloneByKubeconfig returns a copy of the underlying map of source configurations grouped by kubeconfigs.
func (c *configurationStore) CloneByKubeconfig() map[string]map[string]SourceConfig {
c.lock.RLock()
defer c.lock.RUnlock()

cloned := make(map[string]map[string]SourceConfig)
for k, v := range c.storeByKubeconfig {
cloned[k] = v
}

return cloned
}

func (c *configurationStore) sortedKeys() []string {
c.lock.RLock()
defer c.lock.RUnlock()
return maputil.SortKeys(c.store)
}

func (c *configurationStore) keyForStore(sourceName string, isInteractivitySupported bool) string {
return fmt.Sprintf("%s/%t", sourceName, isInteractivitySupported)
}
3 changes: 1 addition & 2 deletions internal/source/kubernetes/filterengine/filterengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func New(log logrus.FieldLogger) *DefaultFilterEngine {
func (f *DefaultFilterEngine) Run(ctx context.Context, event event.Event) event.Event {
f.log.Debug("Running registered filters")
filters := f.RegisteredFilters()
f.log.Debugf("registered filters: %+v", filters)

for _, filter := range filters {
if !filter.Enabled {
Expand All @@ -59,7 +58,7 @@ func (f *DefaultFilterEngine) Run(ctx context.Context, event event.Event) event.

err := filter.Run(ctx, &event)
if err != nil {
f.log.Errorf("while running filter %q: %w", filter.Name(), err)
f.log.Errorf("while running filter %q: %s", filter.Name(), err.Error())
}
f.log.Debugf("ran filter name: %q, event was skipped: %t", filter.Name(), event.Skip)
}
Expand Down
Loading
Loading