Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce memory consumption for Kubernetes source configuration #1425

Merged
merged 3 commits into from
Apr 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ func run(ctx context.Context) (err error) {
scheduler := source.NewScheduler(ctx, logger, conf, sourcePluginDispatcher, schedulerChan)
err = scheduler.Start(ctx)
if err != nil {
return reportFatalError("while starting source plugin event dispatcher: %w", err)
return reportFatalError("while starting source plugin event dispatcher", err)
}

if conf.Plugins.IncomingWebhook.Enabled {
Expand Down
2 changes: 0 additions & 2 deletions internal/source/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ func NewDispatcher(log logrus.FieldLogger, clusterName string, notifiers map[str
}

// Dispatch starts a given plugin, watches for incoming events and calling all notifiers to dispatch received event.
// Once we will have the gRPC contract established with proper Cloud Event schema, we should move also this logic here:
// https://github.com/kubeshop/botkube/blob/525c737956ff820a09321879284037da8bf5d647/pkg/controller/controller.go#L200-L253
func (d *Dispatcher) Dispatch(dispatch PluginDispatch) error {
log := d.log.WithFields(logrus.Fields{
"pluginName": dispatch.pluginName,
Expand Down
70 changes: 70 additions & 0 deletions internal/source/kubernetes/bg_processor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package kubernetes

import (
"context"
"sync"
"time"

"github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
)

// backgroundProcessor is responsible for running background processes.
type backgroundProcessor struct {
mu sync.RWMutex
cancelCtxFn func()
startTime time.Time

errGroup *errgroup.Group
}

// newBackgroundProcessor creates new background processor.
func newBackgroundProcessor() *backgroundProcessor {
return &backgroundProcessor{}
}

// StartTime returns the start time of the background processor.
func (b *backgroundProcessor) StartTime() time.Time {
b.mu.RLock()
defer b.mu.RUnlock()
return b.startTime
}

// Run starts the background processes.
func (b *backgroundProcessor) Run(parentCtx context.Context, fns []func(ctx context.Context)) {
b.mu.Lock()
defer b.mu.Unlock()

b.startTime = time.Now()
ctx, cancelFn := context.WithCancel(parentCtx)
b.cancelCtxFn = cancelFn

errGroup, errGroupCtx := errgroup.WithContext(ctx)
b.errGroup = errGroup

for _, fn := range fns {
fn := fn
errGroup.Go(func() error {
fn(errGroupCtx)
return nil
})
}
}

// StopAndWait stops the background processes and waits for them to finish.
func (b *backgroundProcessor) StopAndWait(log logrus.FieldLogger) error {
b.mu.Lock()
defer b.mu.Unlock()

if b.cancelCtxFn != nil {
log.Debug("Cancelling context of the background processor...")
b.cancelCtxFn()
}

if b.errGroup == nil {
return nil
}

log.Debug("Waiting for background processor to finish...")
return b.errGroup.Wait()
}
93 changes: 93 additions & 0 deletions internal/source/kubernetes/configuration_store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package kubernetes

import (
"fmt"
"sync"

"github.com/kubeshop/botkube/pkg/maputil"
)

// configurationStore stores all source configurations in a thread-safe way.
type configurationStore struct {
store map[string]SourceConfig
storeByKubeconfig map[string]map[string]struct{}

lock sync.RWMutex
}

// newConfigurations creates new empty configurationStore instance.
func newConfigurations() *configurationStore {
return &configurationStore{
store: make(map[string]SourceConfig),
storeByKubeconfig: make(map[string]map[string]struct{}),
}
}

// Store stores SourceConfig in a thread-safe way.
func (c *configurationStore) Store(sourceName string, cfg SourceConfig) {
c.lock.Lock()
defer c.lock.Unlock()

key := c.keyForStore(sourceName, cfg.isInteractivitySupported)

c.store[key] = cfg

kubeConfigKey := string(cfg.kubeConfig)
if _, ok := c.storeByKubeconfig[kubeConfigKey]; !ok {
c.storeByKubeconfig[kubeConfigKey] = make(map[string]struct{})
}
c.storeByKubeconfig[kubeConfigKey][key] = struct{}{}
}

// Get returns SourceConfig by a key.
func (c *configurationStore) Get(sourceKey string) (SourceConfig, bool) {
c.lock.RLock()
defer c.lock.RUnlock()
val, ok := c.store[sourceKey]
return val, ok
}

// GetSystemConfig returns system Source Config.
// The system config is used for getting system (plugin-wide) logger and informer resync period.
func (c *configurationStore) GetSystemConfig() (SourceConfig, bool) {
c.lock.RLock()
defer c.lock.RUnlock()

sortedKeys := maputil.SortKeys(c.store)
if len(sortedKeys) == 0 {
return SourceConfig{}, false
}

return c.store[sortedKeys[0]], true
}

// Len returns number of stored SourceConfigs.
func (c *configurationStore) Len() int {
c.lock.RLock()
defer c.lock.RUnlock()
return len(c.store)
}

// CloneByKubeconfig returns a copy of the underlying map of source configurations grouped by kubeconfigs.
func (c *configurationStore) CloneByKubeconfig() map[string]map[string]SourceConfig {
c.lock.RLock()
defer c.lock.RUnlock()

var out = make(map[string]map[string]SourceConfig)
for kubeConfig, srcIndex := range c.storeByKubeconfig {
if out[kubeConfig] == nil {
out[kubeConfig] = make(map[string]SourceConfig)
}

for srcKey := range srcIndex {
out[kubeConfig][srcKey] = c.store[srcKey]
}
}

return out
}

// keyForStore returns a key for storing configuration in the store.
func (c *configurationStore) keyForStore(sourceName string, isInteractivitySupported bool) string {
return fmt.Sprintf("%s/%t", sourceName, isInteractivitySupported)
}
3 changes: 1 addition & 2 deletions internal/source/kubernetes/filterengine/filterengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func New(log logrus.FieldLogger) *DefaultFilterEngine {
func (f *DefaultFilterEngine) Run(ctx context.Context, event event.Event) event.Event {
f.log.Debug("Running registered filters")
filters := f.RegisteredFilters()
f.log.Debugf("registered filters: %+v", filters)

for _, filter := range filters {
if !filter.Enabled {
Expand All @@ -59,7 +58,7 @@ func (f *DefaultFilterEngine) Run(ctx context.Context, event event.Event) event.

err := filter.Run(ctx, &event)
if err != nil {
f.log.Errorf("while running filter %q: %w", filter.Name(), err)
f.log.Errorf("while running filter %q: %s", filter.Name(), err.Error())
}
f.log.Debugf("ran filter name: %q, event was skipped: %t", filter.Name(), event.Skip)
}
Expand Down
Loading
Loading