Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace channel with concurrency #154

Merged
merged 8 commits into from
Aug 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/kubescape/opa-utils v0.0.257
github.com/kubescape/storage v0.0.8
github.com/mitchellh/mapstructure v1.5.0
github.com/panjf2000/ants/v2 v2.8.1
github.com/stretchr/testify v1.8.3
go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.37.0
go.opentelemetry.io/otel v1.16.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,8 @@ github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3I
github.com/opencontainers/image-spec v1.1.0-rc3 h1:fzg1mXZFj8YdPeNkRXMg+zb88BFV0Ys52cJydRwBkb8=
github.com/opencontainers/image-spec v1.1.0-rc3/go.mod h1:X4pATf0uXsnn3g5aiGIsVnJBR4mxhKzfwmvK/B2NTm8=
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
github.com/panjf2000/ants/v2 v2.8.1 h1:C+n/f++aiW8kHCExKlpX6X+okmxKXP7DWLutxuAPuwQ=
github.com/panjf2000/ants/v2 v2.8.1/go.mod h1:KIBmYG9QQX5U2qzFP/yQJaq/nSb6rahS9iEHkrCMgM8=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI=
github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ=
Expand Down
4 changes: 2 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func main() {

isReadinessReady = true

go mainHandler.HandleWatchers(ctx)
// wait for requests to come from the websocket or from the REST API
mainHandler.HandleRequest(ctx)
go mainHandler.HandleCommandResponse(ctx)
mainHandler.HandleWatchers(ctx)

}

Expand Down
2 changes: 1 addition & 1 deletion mainhandler/handlecommandresponse.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (mainHandler *MainHandler) createInsertCommandsResponseThreadPool() {
}
}

func (mainHandler *MainHandler) handleCommandResponse(ctx context.Context) {
func (mainHandler *MainHandler) HandleCommandResponse(ctx context.Context) {
mainHandler.createInsertCommandsResponseThreadPool()
for {
data := <-*mainHandler.commandResponseChannel.commandResponseChannel
Expand Down
80 changes: 46 additions & 34 deletions mainhandler/handlerequests.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/operator/utils"
"github.com/kubescape/operator/watcher"
"github.com/panjf2000/ants/v2"
"go.opentelemetry.io/otel"

"github.com/armosec/armoapi-go/identifiers"
Expand All @@ -28,17 +29,17 @@ import (
)

type MainHandler struct {
sessionObj *chan utils.SessionObj // TODO: wrap chan with struct for mutex support
eventWorkerPool *ants.PoolWithFunc
k8sAPI *k8sinterface.KubernetesApi
commandResponseChannel *commandResponseChannelData
}

type ActionHandler struct {
k8sAPI *k8sinterface.KubernetesApi
reporter reporterlib.IReporter
wlid string
command apis.Command
reporter reporterlib.IReporter
k8sAPI *k8sinterface.KubernetesApi
commandResponseChannel *commandResponseChannelData
wlid string
}

type waitFunc func()
Expand All @@ -64,11 +65,17 @@ func NewMainHandler(sessionObj *chan utils.SessionObj, k8sAPI *k8sinterface.Kube

commandResponseChannel := make(chan *CommandResponseData, 100)
limitedGoRoutinesCommandResponseChannel := make(chan *timerData, 10)
return &MainHandler{
sessionObj: sessionObj,
mainHandler := &MainHandler{
k8sAPI: k8sAPI,
commandResponseChannel: &commandResponseChannelData{commandResponseChannel: &commandResponseChannel, limitedGoRoutinesCommandResponseChannel: &limitedGoRoutinesCommandResponseChannel},
}
pool, _ := ants.NewPoolWithFunc(utils.ConcurrencyWorkers, func(i interface{}) {
j := i.(utils.Job)

mainHandler.HandleRequest(j)
})
mainHandler.eventWorkerPool = pool
return mainHandler
}

// CreateWebSocketHandler Create ws-handler obj
Expand Down Expand Up @@ -116,15 +123,15 @@ func (mainHandler *MainHandler) HandleWatchers(ctx context.Context) {
mainHandler.insertCommandsToChannel(ctx, commandsList)

// start watching
go watchHandler.PodWatch(ctx, mainHandler.sessionObj)
go watchHandler.SBOMWatch(ctx, mainHandler.sessionObj)
go watchHandler.SBOMFilteredWatch(ctx, mainHandler.sessionObj)
go watchHandler.VulnerabilityManifestWatch(ctx, mainHandler.sessionObj)
go watchHandler.PodWatch(ctx, mainHandler.eventWorkerPool)
go watchHandler.SBOMWatch(ctx, mainHandler.eventWorkerPool)
go watchHandler.SBOMFilteredWatch(ctx, mainHandler.eventWorkerPool)
go watchHandler.VulnerabilityManifestWatch(ctx, mainHandler.eventWorkerPool)
}

func (mainHandler *MainHandler) insertCommandsToChannel(ctx context.Context, commandsList []*apis.Command) {
for _, cmd := range commandsList {
utils.AddCommandToChannel(ctx, cmd, mainHandler.sessionObj)
utils.AddCommandToChannel(ctx, cmd, mainHandler.eventWorkerPool)
}
}

Expand All @@ -137,40 +144,40 @@ func buildScanCommandForWorkload(ctx context.Context, wlid string, mapContainerT
}

// HandlePostmanRequest Parse received commands and run the command
func (mainHandler *MainHandler) HandleRequest(ctx context.Context) []error {
func (mainHandler *MainHandler) HandleRequest(j utils.Job) {

ctx := j.Context()
sessionObj := j.Obj()

// recover
defer func() {
if err := recover(); err != nil {
logger.L().Ctx(ctx).Fatal("recover in HandleRequest", helpers.Interface("error", err))
}
}()

go mainHandler.handleCommandResponse(ctx)
for {
sessionObj := <-*mainHandler.sessionObj
ctx, span := otel.Tracer("").Start(ctx, string(sessionObj.Command.CommandName))
ctx, span := otel.Tracer("").Start(ctx, string(sessionObj.Command.CommandName))

// the all user experience depends on this line(the user/backend must get the action name in order to understand the job report)
sessionObj.Reporter.SetActionName(string(sessionObj.Command.CommandName))
// the all user experience depends on this line(the user/backend must get the action name in order to understand the job report)
sessionObj.Reporter.SetActionName(string(sessionObj.Command.CommandName))

isToItemizeScopeCommand := sessionObj.Command.WildWlid != "" || sessionObj.Command.WildSid != "" || len(sessionObj.Command.Designators) > 0
switch sessionObj.Command.CommandName {
case apis.TypeRunKubescape, apis.TypeRunKubescapeJob, apis.TypeSetKubescapeCronJob, apis.TypeDeleteKubescapeCronJob, apis.TypeUpdateKubescapeCronJob:
isToItemizeScopeCommand = false
isToItemizeScopeCommand := sessionObj.Command.WildWlid != "" || sessionObj.Command.WildSid != "" || len(sessionObj.Command.Designators) > 0
switch sessionObj.Command.CommandName {
case apis.TypeRunKubescape, apis.TypeRunKubescapeJob, apis.TypeSetKubescapeCronJob, apis.TypeDeleteKubescapeCronJob, apis.TypeUpdateKubescapeCronJob:
isToItemizeScopeCommand = false

case apis.TypeSetVulnScanCronJob, apis.TypeDeleteVulnScanCronJob, apis.TypeUpdateVulnScanCronJob:
isToItemizeScopeCommand = false
}
case apis.TypeSetVulnScanCronJob, apis.TypeDeleteVulnScanCronJob, apis.TypeUpdateVulnScanCronJob:
isToItemizeScopeCommand = false
}

if isToItemizeScopeCommand {
mainHandler.HandleScopedRequest(ctx, &sessionObj) // this might be a heavy action, do not send to a goroutine
} else {
// handle requests
mainHandler.HandleSingleRequest(ctx, &sessionObj)
}
span.End()
close(sessionObj.ErrChan)
if isToItemizeScopeCommand {
mainHandler.HandleScopedRequest(ctx, &sessionObj) // this might be a heavy action, do not send to a goroutine
} else {
// handle requests
mainHandler.HandleSingleRequest(ctx, &sessionObj)
}
span.End()
close(sessionObj.ErrChan)
}

func (mainHandler *MainHandler) HandleSingleRequest(ctx context.Context, sessionObj *utils.SessionObj) {
Expand Down Expand Up @@ -335,7 +342,12 @@ func (mainHandler *MainHandler) StartupTriggerActions(ctx context.Context, actio
waitFunc := isActionNeedToWait(actions[index])
waitFunc()
sessionObj := utils.NewSessionObj(ctx, &actions[index], "Websocket", "", uuid.NewString(), 1)
*mainHandler.sessionObj <- *sessionObj
l := utils.Job{}
l.SetContext(ctx)
l.SetObj(*sessionObj)
if err := mainHandler.eventWorkerPool.Invoke(l); err != nil {
logger.L().Ctx(ctx).Error("Failed to invoke job", helpers.String("wlid", actions[index].GetID()), helpers.String("command", fmt.Sprintf("%v", actions[index].CommandName)), helpers.String("args", fmt.Sprintf("%v", actions[index].Args)), helpers.Error(err))
}
}(i)
}
}
Expand Down
1 change: 1 addition & 0 deletions utils/environmentvariables.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ const (
PortEnvironmentVariable = "PORT"
CleanUpDelayEnvironmentVariable = "CLEANUP_DELAY"
TriggerSecurityFrameworkEnvironmentVariable = "TRIGGER_SECURITY_FRAMEWORK"
ConcurrencyEnvironmentVariable = "WORKER_CONCURRENCY"
)
53 changes: 53 additions & 0 deletions utils/globalvar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package utils

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestParseIntEnvVar(t *testing.T) {
tests := []struct {
name string
varName string
varValue string
defaultValue int
expected int
expectError bool
}{
{
name: "environment variable not set",
varName: "TEST_ENV_VAR_NOT_SET",
defaultValue: 10,
expected: 10,
expectError: false,
},
{
name: "environment variable set with valid integer",
varName: "TEST_ENV_VAR_VALID",
varValue: "20",
defaultValue: 10,
expected: 20,
expectError: false,
},
{
name: "environment variable set with invalid integer",
varName: "TEST_ENV_VAR_INVALID",
varValue: "invalid",
defaultValue: 10,
expected: 10,
expectError: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Setenv(tt.varName, tt.varValue)
defer t.Setenv(tt.varName, "")
Comment on lines +45 to +46
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick(non-blocking): there’s no need to use defer testing.T.Setenv rolls back your change at the end of test automatically


val, err := parseIntEnvVar(tt.varName, tt.defaultValue)
assert.Equalf(t, tt.expectError, err != nil, "unexpected error: %v", err)
assert.Equalf(t, tt.expected, val, "unexpected value: %v", val)
})
}
}
16 changes: 16 additions & 0 deletions utils/globalvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
Namespace string = "default" // default namespace
RestAPIPort string = "4002" // default port
CleanUpRoutineInterval time.Duration = 10 * time.Minute
ConcurrencyWorkers int = 3
TriggerSecurityFramework bool = false
)

Expand Down Expand Up @@ -59,6 +60,21 @@ func LoadEnvironmentVariables(ctx context.Context) (err error) {
CleanUpRoutineInterval = dur
}
}
ConcurrencyWorkers, _ = parseIntEnvVar(ConcurrencyEnvironmentVariable, ConcurrencyWorkers)

return nil
}

func parseIntEnvVar(varName string, defaultValue int) (int, error) {
varValue, exists := os.LookupEnv(varName)
if !exists || varValue == "" {
return defaultValue, nil
}

intValue, err := strconv.Atoi(varValue)
if err != nil {
return defaultValue, fmt.Errorf("failed to parse %s env var as int: %w", varName, err)
}

return intValue, nil
}
23 changes: 23 additions & 0 deletions utils/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package utils

import (
"context"

"github.com/armosec/armoapi-go/apis"
reporterlib "github.com/armosec/logger-go/system-reports/datastructures"
)
Expand All @@ -18,3 +20,24 @@ type CredStruct struct {
Password string `json:"password"`
Customer string `json:"customer"`
}

type Job struct {
ctx context.Context
sessionObj SessionObj
}

func (j *Job) Context() context.Context {
return j.ctx
}

func (j *Job) Obj() SessionObj {
return j.sessionObj
}

func (j *Job) SetContext(ctx context.Context) {
j.ctx = ctx
}

func (j *Job) SetObj(sessionObj SessionObj) {
j.sessionObj = sessionObj
}
7 changes: 5 additions & 2 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/google/uuid"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/panjf2000/ants/v2"
core1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -64,10 +65,12 @@ func ExtractImageID(imageID string) string {
return strings.TrimPrefix(imageID, dockerPullableURN)
}

func AddCommandToChannel(ctx context.Context, cmd *apis.Command, channel *chan SessionObj) {
func AddCommandToChannel(ctx context.Context, cmd *apis.Command, workerPool *ants.PoolWithFunc) {
logger.L().Ctx(ctx).Info("Triggering scan for", helpers.String("wlid", cmd.Wlid), helpers.String("command", fmt.Sprintf("%v", cmd.CommandName)), helpers.String("args", fmt.Sprintf("%v", cmd.Args)))
newSessionObj := NewSessionObj(ctx, cmd, "Websocket", "", uuid.NewString(), 1)
*channel <- *newSessionObj
if err := workerPool.Invoke(Job{ctx: ctx, sessionObj: *newSessionObj}); err != nil {
logger.L().Ctx(ctx).Error("Failed to invoke job", helpers.String("wlid", cmd.Wlid), helpers.String("command", fmt.Sprintf("%v", cmd.CommandName)), helpers.String("args", fmt.Sprintf("%v", cmd.Args)), helpers.Error(err))
}
}

func ExtractContainersToImageIDsFromPod(pod *core1.Pod) map[string]string {
Expand Down
Loading