Skip to content

Commit

Permalink
feat: add cloud grpc method for server notifications
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Sukhin <vladislav@kubeshop.io>
  • Loading branch information
vsukhin committed Nov 27, 2024
1 parent 07ce308 commit 9c03fe4
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 74 deletions.
25 changes: 13 additions & 12 deletions cmd/api-server/commons/commons.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,18 +282,19 @@ func ReadDefaultExecutors(cfg *config.Config) (executors []testkube.ExecutorDeta

func ReadProContext(ctx context.Context, cfg *config.Config, grpcClient cloud.TestKubeCloudAPIClient) config.ProContext {
proContext := config.ProContext{
APIKey: cfg.TestkubeProAPIKey,
URL: cfg.TestkubeProURL,
TLSInsecure: cfg.TestkubeProTLSInsecure,
WorkerCount: cfg.TestkubeProWorkerCount,
LogStreamWorkerCount: cfg.TestkubeProLogStreamWorkerCount,
WorkflowNotificationsWorkerCount: cfg.TestkubeProWorkflowNotificationsWorkerCount,
SkipVerify: cfg.TestkubeProSkipVerify,
EnvID: cfg.TestkubeProEnvID,
OrgID: cfg.TestkubeProOrgID,
Migrate: cfg.TestkubeProMigrate,
ConnectionTimeout: cfg.TestkubeProConnectionTimeout,
DashboardURI: cfg.TestkubeDashboardURI,
APIKey: cfg.TestkubeProAPIKey,
URL: cfg.TestkubeProURL,
TLSInsecure: cfg.TestkubeProTLSInsecure,
WorkerCount: cfg.TestkubeProWorkerCount,
LogStreamWorkerCount: cfg.TestkubeProLogStreamWorkerCount,
WorkflowNotificationsWorkerCount: cfg.TestkubeProWorkflowNotificationsWorkerCount,
WorkflowServiceNotificationsWorkerCount: cfg.TestkubeProWorkflowServiceNotificationsWorkerCount,
SkipVerify: cfg.TestkubeProSkipVerify,
EnvID: cfg.TestkubeProEnvID,
OrgID: cfg.TestkubeProOrgID,
Migrate: cfg.TestkubeProMigrate,
ConnectionTimeout: cfg.TestkubeProConnectionTimeout,
DashboardURI: cfg.TestkubeDashboardURI,
}

if cfg.TestkubeProAPIKey == "" || grpcClient == nil {
Expand Down
17 changes: 17 additions & 0 deletions cmd/api-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,22 @@ func main() {
}
return notifications.Channel(), nil
}
getTestWorkflowServiceNotificationsStream := func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error) {
execution, err := testWorkflowResultsRepository.Get(ctx, executionID)
if err != nil {
return nil, err
}
notifications := executionWorker.Notifications(ctx, fmt.Sprintf("%s-%s-%d", execution.Id, serviceName, serviceIndex), executionworkertypes.NotificationsOptions{
Hints: executionworkertypes.Hints{
Namespace: execution.Namespace,
ScheduledAt: common.Ptr(execution.ScheduledAt),
},
})
if notifications.Err() != nil {
return nil, notifications.Err()
}
return notifications.Channel(), nil
}
getDeprecatedLogStream := func(ctx context.Context, executionID string) (chan output.Output, error) {
return nil, errors.New("deprecated features have been disabled")
}
Expand All @@ -337,6 +353,7 @@ func main() {
grpcClient,
getDeprecatedLogStream,
getTestWorkflowNotificationsStream,
getTestWorkflowServiceNotificationsStream,
clusterId,
cfg.TestkubeClusterName,
features,
Expand Down
85 changes: 43 additions & 42 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,48 +40,49 @@ type Config struct {
LogsStorage string `envconfig:"LOGS_STORAGE" default:""`
WorkflowStorage string `envconfig:"WORKFLOW_STORAGE" default:"crd"`
// WhitelistedContainers is a list of containers from which logs should be collected.
WhitelistedContainers []string `envconfig:"WHITELISTED_CONTAINERS" default:"init,logs,scraper"`
NatsEmbedded bool `envconfig:"NATS_EMBEDDED" default:"false"`
NatsEmbeddedStoreDir string `envconfig:"NATS_EMBEDDED_STORE_DIR" default:"/app/nats"`
NatsURI string `envconfig:"NATS_URI" default:"nats://localhost:4222"`
NatsSecure bool `envconfig:"NATS_SECURE" default:"false"`
NatsSkipVerify bool `envconfig:"NATS_SKIP_VERIFY" default:"false"`
NatsCertFile string `envconfig:"NATS_CERT_FILE" default:""`
NatsKeyFile string `envconfig:"NATS_KEY_FILE" default:""`
NatsCAFile string `envconfig:"NATS_CA_FILE" default:""`
NatsConnectTimeout time.Duration `envconfig:"NATS_CONNECT_TIMEOUT" default:"5s"`
JobServiceAccountName string `envconfig:"JOB_SERVICE_ACCOUNT_NAME" default:""`
JobTemplateFile string `envconfig:"JOB_TEMPLATE_FILE" default:""`
DisableTestTriggers bool `envconfig:"DISABLE_TEST_TRIGGERS" default:"false"`
TestkubeDefaultExecutors string `envconfig:"TESTKUBE_DEFAULT_EXECUTORS" default:""`
TestkubeEnabledExecutors string `envconfig:"TESTKUBE_ENABLED_EXECUTORS" default:""`
TestkubeTemplateJob string `envconfig:"TESTKUBE_TEMPLATE_JOB" default:""`
TestkubeContainerTemplateJob string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_JOB" default:""`
TestkubeContainerTemplateScraper string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_SCRAPER" default:""`
TestkubeContainerTemplatePVC string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_PVC" default:""`
TestkubeTemplateSlavePod string `envconfig:"TESTKUBE_TEMPLATE_SLAVE_POD" default:""`
TestkubeConfigDir string `envconfig:"TESTKUBE_CONFIG_DIR" default:"config"`
TestkubeAnalyticsEnabled bool `envconfig:"TESTKUBE_ANALYTICS_ENABLED" default:"false"`
TestkubeReadonlyExecutors bool `envconfig:"TESTKUBE_READONLY_EXECUTORS" default:"false"`
TestkubeNamespace string `envconfig:"TESTKUBE_NAMESPACE" default:"testkube"`
TestkubeProAPIKey string `envconfig:"TESTKUBE_PRO_API_KEY" default:""`
TestkubeProURL string `envconfig:"TESTKUBE_PRO_URL" default:""`
TestkubeProTLSInsecure bool `envconfig:"TESTKUBE_PRO_TLS_INSECURE" default:"false"`
TestkubeProWorkerCount int `envconfig:"TESTKUBE_PRO_WORKER_COUNT" default:"50"`
TestkubeProLogStreamWorkerCount int `envconfig:"TESTKUBE_PRO_LOG_STREAM_WORKER_COUNT" default:"25"`
TestkubeProWorkflowNotificationsWorkerCount int `envconfig:"TESTKUBE_PRO_WORKFLOW_NOTIFICATIONS_STREAM_WORKER_COUNT" default:"25"`
TestkubeProSkipVerify bool `envconfig:"TESTKUBE_PRO_SKIP_VERIFY" default:"false"`
TestkubeProEnvID string `envconfig:"TESTKUBE_PRO_ENV_ID" default:""`
TestkubeProOrgID string `envconfig:"TESTKUBE_PRO_ORG_ID" default:""`
TestkubeProMigrate string `envconfig:"TESTKUBE_PRO_MIGRATE" default:"false"`
TestkubeProConnectionTimeout int `envconfig:"TESTKUBE_PRO_CONNECTION_TIMEOUT" default:"10"`
TestkubeProCertFile string `envconfig:"TESTKUBE_PRO_CERT_FILE" default:""`
TestkubeProKeyFile string `envconfig:"TESTKUBE_PRO_KEY_FILE" default:""`
TestkubeProTLSSecret string `envconfig:"TESTKUBE_PRO_TLS_SECRET" default:""`
TestkubeProRunnerCustomCASecret string `envconfig:"TESTKUBE_PRO_RUNNER_CUSTOM_CA_SECRET" default:""`
TestkubeWatcherNamespaces string `envconfig:"TESTKUBE_WATCHER_NAMESPACES" default:""`
TestkubeRegistry string `envconfig:"TESTKUBE_REGISTRY" default:""`
TestkubePodStartTimeout time.Duration `envconfig:"TESTKUBE_POD_START_TIMEOUT" default:"30m"`
WhitelistedContainers []string `envconfig:"WHITELISTED_CONTAINERS" default:"init,logs,scraper"`
NatsEmbedded bool `envconfig:"NATS_EMBEDDED" default:"false"`
NatsEmbeddedStoreDir string `envconfig:"NATS_EMBEDDED_STORE_DIR" default:"/app/nats"`
NatsURI string `envconfig:"NATS_URI" default:"nats://localhost:4222"`
NatsSecure bool `envconfig:"NATS_SECURE" default:"false"`
NatsSkipVerify bool `envconfig:"NATS_SKIP_VERIFY" default:"false"`
NatsCertFile string `envconfig:"NATS_CERT_FILE" default:""`
NatsKeyFile string `envconfig:"NATS_KEY_FILE" default:""`
NatsCAFile string `envconfig:"NATS_CA_FILE" default:""`
NatsConnectTimeout time.Duration `envconfig:"NATS_CONNECT_TIMEOUT" default:"5s"`
JobServiceAccountName string `envconfig:"JOB_SERVICE_ACCOUNT_NAME" default:""`
JobTemplateFile string `envconfig:"JOB_TEMPLATE_FILE" default:""`
DisableTestTriggers bool `envconfig:"DISABLE_TEST_TRIGGERS" default:"false"`
TestkubeDefaultExecutors string `envconfig:"TESTKUBE_DEFAULT_EXECUTORS" default:""`
TestkubeEnabledExecutors string `envconfig:"TESTKUBE_ENABLED_EXECUTORS" default:""`
TestkubeTemplateJob string `envconfig:"TESTKUBE_TEMPLATE_JOB" default:""`
TestkubeContainerTemplateJob string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_JOB" default:""`
TestkubeContainerTemplateScraper string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_SCRAPER" default:""`
TestkubeContainerTemplatePVC string `envconfig:"TESTKUBE_CONTAINER_TEMPLATE_PVC" default:""`
TestkubeTemplateSlavePod string `envconfig:"TESTKUBE_TEMPLATE_SLAVE_POD" default:""`
TestkubeConfigDir string `envconfig:"TESTKUBE_CONFIG_DIR" default:"config"`
TestkubeAnalyticsEnabled bool `envconfig:"TESTKUBE_ANALYTICS_ENABLED" default:"false"`
TestkubeReadonlyExecutors bool `envconfig:"TESTKUBE_READONLY_EXECUTORS" default:"false"`
TestkubeNamespace string `envconfig:"TESTKUBE_NAMESPACE" default:"testkube"`
TestkubeProAPIKey string `envconfig:"TESTKUBE_PRO_API_KEY" default:""`
TestkubeProURL string `envconfig:"TESTKUBE_PRO_URL" default:""`
TestkubeProTLSInsecure bool `envconfig:"TESTKUBE_PRO_TLS_INSECURE" default:"false"`
TestkubeProWorkerCount int `envconfig:"TESTKUBE_PRO_WORKER_COUNT" default:"50"`
TestkubeProLogStreamWorkerCount int `envconfig:"TESTKUBE_PRO_LOG_STREAM_WORKER_COUNT" default:"25"`
TestkubeProWorkflowNotificationsWorkerCount int `envconfig:"TESTKUBE_PRO_WORKFLOW_NOTIFICATIONS_STREAM_WORKER_COUNT" default:"25"`
TestkubeProWorkflowServiceNotificationsWorkerCount int `envconfig:"TESTKUBE_PRO_WORKFLOW_SERVICE_NOTIFICATIONS_STREAM_WORKER_COUNT" default:"25"`
TestkubeProSkipVerify bool `envconfig:"TESTKUBE_PRO_SKIP_VERIFY" default:"false"`
TestkubeProEnvID string `envconfig:"TESTKUBE_PRO_ENV_ID" default:""`
TestkubeProOrgID string `envconfig:"TESTKUBE_PRO_ORG_ID" default:""`
TestkubeProMigrate string `envconfig:"TESTKUBE_PRO_MIGRATE" default:"false"`
TestkubeProConnectionTimeout int `envconfig:"TESTKUBE_PRO_CONNECTION_TIMEOUT" default:"10"`
TestkubeProCertFile string `envconfig:"TESTKUBE_PRO_CERT_FILE" default:""`
TestkubeProKeyFile string `envconfig:"TESTKUBE_PRO_KEY_FILE" default:""`
TestkubeProTLSSecret string `envconfig:"TESTKUBE_PRO_TLS_SECRET" default:""`
TestkubeProRunnerCustomCASecret string `envconfig:"TESTKUBE_PRO_RUNNER_CUSTOM_CA_SECRET" default:""`
TestkubeWatcherNamespaces string `envconfig:"TESTKUBE_WATCHER_NAMESPACES" default:""`
TestkubeRegistry string `envconfig:"TESTKUBE_REGISTRY" default:""`
TestkubePodStartTimeout time.Duration `envconfig:"TESTKUBE_POD_START_TIMEOUT" default:"30m"`
// TestkubeImageCredentialsCacheTTL is the duration for which the image pull credentials should be cached provided as a Go duration string.
// If set to 0, the cache is disabled.
TestkubeImageCredentialsCacheTTL time.Duration `envconfig:"TESTKUBE_IMAGE_CREDENTIALS_CACHE_TTL" default:"30m"`
Expand Down
25 changes: 13 additions & 12 deletions internal/config/procontext.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package config

type ProContext struct {
APIKey string
URL string
TLSInsecure bool
WorkerCount int
LogStreamWorkerCount int
WorkflowNotificationsWorkerCount int
SkipVerify bool
EnvID string
OrgID string
Migrate string
ConnectionTimeout int
DashboardURI string
APIKey string
URL string
TLSInsecure bool
WorkerCount int
LogStreamWorkerCount int
WorkflowNotificationsWorkerCount int
WorkflowServiceNotificationsWorkerCount int
SkipVerify bool
EnvID string
OrgID string
Migrate string
ConnectionTimeout int
DashboardURI string
}
28 changes: 23 additions & 5 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ type Agent struct {
testWorkflowNotificationsResponseBuffer chan *cloud.TestWorkflowNotificationsResponse
testWorkflowNotificationsFunc func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error)

testWorkflowServiceNotificationsWorkerCount int
testWorkflowServiceNotificationsRequestBuffer chan *cloud.TestWorkflowServiceNotificationsRequest
testWorkflowServiceNotificationsResponseBuffer chan *cloud.TestWorkflowServiceNotificationsResponse
testWorkflowServiceNotificationsFunc func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error)

events chan testkube.Event
sendTimeout time.Duration
receiveTimeout time.Duration
Expand All @@ -73,6 +78,7 @@ func NewAgent(logger *zap.SugaredLogger,
client cloud.TestKubeCloudAPIClient,
logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error),
workflowNotificationsFunc func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error),
workflowServiceNotificationsFunc func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error),
clusterID string,
clusterName string,
features featureflags.FeatureFlags,
Expand All @@ -99,11 +105,16 @@ func NewAgent(logger *zap.SugaredLogger,
testWorkflowNotificationsRequestBuffer: make(chan *cloud.TestWorkflowNotificationsRequest, bufferSizePerWorker*proContext.WorkflowNotificationsWorkerCount),
testWorkflowNotificationsResponseBuffer: make(chan *cloud.TestWorkflowNotificationsResponse, bufferSizePerWorker*proContext.WorkflowNotificationsWorkerCount),
testWorkflowNotificationsFunc: workflowNotificationsFunc,
clusterID: clusterID,
clusterName: clusterName,
features: features,
proContext: proContext,
dockerImageVersion: dockerImageVersion,
testWorkflowServiceNotificationsWorkerCount: proContext.WorkflowServiceNotificationsWorkerCount,
testWorkflowServiceNotificationsRequestBuffer: make(chan *cloud.TestWorkflowServiceNotificationsRequest, bufferSizePerWorker*proContext.WorkflowServiceNotificationsWorkerCount),
testWorkflowServiceNotificationsResponseBuffer: make(chan *cloud.TestWorkflowServiceNotificationsResponse, bufferSizePerWorker*proContext.WorkflowServiceNotificationsWorkerCount),
testWorkflowServiceNotificationsFunc: workflowServiceNotificationsFunc,

clusterID: clusterID,
clusterName: clusterName,
features: features,
proContext: proContext,
dockerImageVersion: dockerImageVersion,
}, nil
}

Expand Down Expand Up @@ -151,6 +162,13 @@ func (ag *Agent) run(ctx context.Context) (err error) {
return ag.runTestWorkflowNotificationsWorker(groupCtx, ag.testWorkflowNotificationsWorkerCount)
})

g.Go(func() error {
return ag.runTestWorkflowServiceNotificationsLoop(groupCtx)
})
g.Go(func() error {
return ag.runTestWorkflowServiceNotificationsWorker(groupCtx, ag.testWorkflowServiceNotificationsWorkerCount)
})

err = g.Wait()

return err
Expand Down
9 changes: 8 additions & 1 deletion pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ func TestCommandExecution(t *testing.T) {

var logStreamFunc func(ctx context.Context, executionID string) (chan output.Output, error)
var workflowNotificationsStreamFunc func(ctx context.Context, executionID string) (<-chan testkube.TestWorkflowExecutionNotification, error)
var workflowServiceNotificationsStreamFunc func(ctx context.Context, executionID, serviceName string, serviceIndex int) (<-chan testkube.TestWorkflowExecutionNotification, error)

logger, _ := zap.NewDevelopment()
proContext := config.ProContext{APIKey: "api-key", WorkerCount: 5, LogStreamWorkerCount: 5, WorkflowNotificationsWorkerCount: 5}
agent, err := agent.NewAgent(logger.Sugar(), m, grpcClient, logStreamFunc, workflowNotificationsStreamFunc, "", "", featureflags.FeatureFlags{}, &proContext, "")
agent, err := agent.NewAgent(logger.Sugar(), m, grpcClient, logStreamFunc, workflowNotificationsStreamFunc, workflowServiceNotificationsStreamFunc, "", "", featureflags.FeatureFlags{}, &proContext, "")
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -97,6 +98,12 @@ func (cs *CloudServer) GetTestWorkflowNotificationsStream(srv cloud.TestKubeClou
return nil
}

func (cs *CloudServer) GetTestWorkflowServiceNotificationsStream(srv cloud.TestKubeCloudAPI_GetTestWorkflowServiceNotificationsStreamServer) error {
<-cs.ctx.Done()

return nil
}

func (cs *CloudServer) ExecuteAsync(srv cloud.TestKubeCloudAPI_ExecuteAsyncServer) error {
md, ok := metadata.FromIncomingContext(srv.Context())
if !ok {
Expand Down
Loading

0 comments on commit 9c03fe4

Please sign in to comment.