diff --git a/.github/workflows/go-checks.yaml b/.github/workflows/go-checks.yaml index 9c35293..5260910 100644 --- a/.github/workflows/go-checks.yaml +++ b/.github/workflows/go-checks.yaml @@ -15,7 +15,8 @@ on: env: GO_VERSION: '1.19' - GOLANGCI_VERSION: 'v1.49.0' + # Keep this in sync with build/lint.sh + GOLANGCI_VERSION: '1.49.0' USE_BUILD_CONTAINER: '1' jobs: @@ -51,10 +52,11 @@ jobs: with: go-version: ${{ env.GO_VERSION }} + # Use this action instead of running golangci directly because it can comment on pr. - name: Lint uses: golangci/golangci-lint-action@v3 with: - version: ${{ env.GOLANGCI_VERSION }} + version: v${{ env.GOLANGCI_VERSION }} - name: Check Diff run: make checkdiff diff --git a/.github/workflows/unit-test.yaml b/.github/workflows/unit-test.yaml index e2331e0..ec62871 100644 --- a/.github/workflows/unit-test.yaml +++ b/.github/workflows/unit-test.yaml @@ -15,7 +15,6 @@ on: env: GO_VERSION: '1.19' - GOLANGCI_VERSION: 'v1.47.2' jobs: detect-noop: @@ -74,8 +73,7 @@ jobs: - name: Upload coverage report uses: codecov/codecov-action@v3 with: - # This is a public repo, so token is not required. - # token: ${{ secrets.CODECOV_TOKEN }} + token: ${{ secrets.CODECOV_TOKEN }} files: ./cover.out flags: unittests name: codecov-umbrella diff --git a/.golangci.yaml b/.golangci.yaml index 5c7fb03..aadab3d 100644 --- a/.golangci.yaml +++ b/.golangci.yaml @@ -10,6 +10,9 @@ run: - "e2e" - "bin" + skip-dirs-use-default: false + + output: # colored-line-number|line-number|json|tab|checkstyle|code-climate, default is "colored-line-number" format: colored-line-number diff --git a/build/lint.sh b/build/lint.sh index 7614f1b..2477235 100755 --- a/build/lint.sh +++ b/build/lint.sh @@ -26,57 +26,36 @@ if [ -f "bin/golangci-lint" ]; then GOLANGCI="bin/golangci-lint" fi -function print_download_help() { - echo "You can install golangci-lint v${GOLANGCI_VERSION} by running:" 1>&2 - echo " curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(pwd)/bin v${GOLANGCI_VERSION}" 1>&2 - echo "By default, it will be installed in ./bin/golangci-lint so that it won't interfere with other versions (if any)." 1>&2 +function print_install_help() { + echo "Automatic installation failed, you can install golangci-lint v${GOLANGCI_VERSION} manually by running:" + echo " curl -sSfL \"https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh\" | sh -s -- -b \"$(pwd)/bin\" v${GOLANGCI_VERSION}" + echo "It will be installed to \"$(pwd)/bin/golangci-lint\" so that it won't interfere with existing versions (if any)." + exit 1 +} + +function install_golangci() { + echo "Installing golangci-lint v${GOLANGCI_VERSION} ..." + echo "It will be installed to \"$(pwd)/bin/golangci-lint\" so that it won't interfere with existing versions (if any)." + curl -sSfL "https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh" | + sh -s -- -b "$(pwd)/bin" v${GOLANGCI_VERSION} || print_install_help } if ! ${GOLANGCI} version >/dev/null 2>&1; then echo "You don't have golangci-lint installed." 2>&1 - print_download_help - exit 1 + install_golangci + $0 "$@" + exit fi CURRENT_GOLANGCI_VERSION="$(${GOLANGCI} version 2>&1)" CURRENT_GOLANGCI_VERSION="${CURRENT_GOLANGCI_VERSION#*version }" CURRENT_GOLANGCI_VERSION="${CURRENT_GOLANGCI_VERSION% built*}" -function greaterver() { - if [[ $1 == $2 ]]; then - return 0 - fi - local IFS=. - local i ver1=($1) ver2=($2) - # fill empty fields in ver1 with zeros - for ((i = ${#ver1[@]}; i < ${#ver2[@]}; i++)); do - ver1[i]=0 - done - for ((i = 0; i < ${#ver1[@]}; i++)); do - if [[ -z ${ver2[i]} ]]; then - # fill empty fields in ver2 with zeros - ver2[i]=0 - fi - if ((10#${ver1[i]} > 10#${ver2[i]})); then - return 0 - fi - if ((10#${ver1[i]} < 10#${ver2[i]})); then - return 2 - fi - done - return 0 -} - -if ! greaterver "${CURRENT_GOLANGCI_VERSION}" "${GOLANGCI_VERSION}"; then - echo "golangci-lint version is too low." 1>&2 - echo "You have v${CURRENT_GOLANGCI_VERSION}, but we need at least v${GOLANGCI_VERSION}" 1>&2 - print_download_help - exit 1 -fi - if [ "${CURRENT_GOLANGCI_VERSION}" != "${GOLANGCI_VERSION}" ]; then - echo "Warning: you have golangci-lint v${CURRENT_GOLANGCI_VERSION}, but we want v${GOLANGCI_VERSION}" 1>&2 - print_download_help + echo "You have golangci-lint v${CURRENT_GOLANGCI_VERSION} installed, but we want v${GOLANGCI_VERSION}" 1>&2 + install_golangci + $0 "$@" + exit fi echo "# Running golangci-lint v${CURRENT_GOLANGCI_VERSION}..." diff --git a/examples/conf-cronjob.yaml b/examples/conf-cronjob.yaml new file mode 100644 index 0000000..bc67e85 --- /dev/null +++ b/examples/conf-cronjob.yaml @@ -0,0 +1,9 @@ +triggers: + - source: + type: cronjob + properties: + schedule: "* * * * *" + timeZone: "Asia/Shanghai" # Optional + filter: "" + action: + # TODO: add your action here diff --git a/go.mod b/go.mod index a1dd8b9..867e2e0 100644 --- a/go.mod +++ b/go.mod @@ -9,11 +9,11 @@ require ( github.com/kubevela/pkg v1.8.1-0.20230411071527-ac5fa22727f7 github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/pkg/errors v0.9.1 + github.com/robfig/cron/v3 v3.0.1 github.com/sirupsen/logrus v1.9.0 github.com/spf13/cobra v1.6.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.8.0 - golang.org/x/sync v0.1.0 golang.org/x/time v0.3.0 k8s.io/api v0.26.3 k8s.io/apimachinery v0.26.3 @@ -93,6 +93,7 @@ require ( golang.org/x/crypto v0.4.0 // indirect golang.org/x/net v0.8.0 // indirect golang.org/x/oauth2 v0.0.0-20220722155238-128564f6959c // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.6.0 // indirect golang.org/x/term v0.6.0 // indirect golang.org/x/text v0.8.0 // indirect diff --git a/go.sum b/go.sum index c486652..f49d6b8 100644 --- a/go.sum +++ b/go.sum @@ -272,6 +272,8 @@ github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5mo= github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/protocolbuffers/txtpbfmt v0.0.0-20220428173112-74888fd59c2b h1:zd/2RNzIRkoGGMjE+YIsZ85CnDIz672JK2F3Zl4vux4= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= diff --git a/pkg/cmd/cmd.go b/pkg/cmd/cmd.go index 7b4510c..4231f9d 100644 --- a/pkg/cmd/cmd.go +++ b/pkg/cmd/cmd.go @@ -159,11 +159,15 @@ func runCli(cmd *cobra.Command, args []string) error { // Make this Source type exists. s, ok := sourceReg.Get(w.Source.Type) if !ok { - return fmt.Errorf("source type %s does not exist", w.Source.Type) + logger.Errorf("source type %s does not exist", w.Source.Type) + continue } + source := s.New() - if s, ok := instances[w.Source.Type]; ok { - source = s + if s.Singleton() { + if s, ok := instances[w.Source.Type]; ok { + source = s + } } // Create a EventHandler @@ -172,7 +176,8 @@ func runCli(cmd *cobra.Command, args []string) error { // Initialize Source, with user-provided prop and event handler err = source.Init(w.Source.Properties, eh) if err != nil { - return errors.Wrapf(err, "failed to initialize source %s", source.Type()) + logger.Errorf("failed to initialize source %s: %s", source.Type(), err) + continue } instances[w.Source.Type] = source @@ -181,8 +186,8 @@ func runCli(cmd *cobra.Command, args []string) error { for _, instance := range instances { err := instance.Run(ctx) if err != nil { - logger.Fatalf("source %s failed to run: %v", instance.Type(), err) - return err + logger.Errorf("source %s failed to run: %v", instance.Type(), err) + continue } } diff --git a/pkg/cmd/options.go b/pkg/cmd/options.go index 66f36a3..33cabdd 100644 --- a/pkg/cmd/options.go +++ b/pkg/cmd/options.go @@ -20,8 +20,9 @@ import ( "fmt" "time" - "github.com/kubevela/kube-trigger/pkg/executor" "github.com/sirupsen/logrus" + + "github.com/kubevela/kube-trigger/pkg/executor" ) type option struct { diff --git a/pkg/config/parser_test.go b/pkg/config/parser_test.go index 6d79a6a..cdc27b2 100644 --- a/pkg/config/parser_test.go +++ b/pkg/config/parser_test.go @@ -17,9 +17,10 @@ limitations under the License. package config import ( - "github.com/stretchr/testify/assert" "reflect" "testing" + + "github.com/stretchr/testify/assert" ) func TestNewFromFileOrDir(t *testing.T) { diff --git a/pkg/eventhandler/event_handler.go b/pkg/eventhandler/event_handler.go index a41dde1..1833de4 100644 --- a/pkg/eventhandler/event_handler.go +++ b/pkg/eventhandler/event_handler.go @@ -66,9 +66,10 @@ func NewFromConfig(ctx context.Context, cli client.Client, actionMeta v1alpha1.A // TODO: use handler to handle // Apply filters context := map[string]interface{}{ - "event": event, - "data": data, - "timestamp": time.Now().Format(time.RFC3339), + "sourceType": sourceType, + "event": event, + "data": data, + "timestamp": time.Now().Format(time.RFC3339), } kept, err := filter.ApplyFilter(ctx, context, filterMeta) if err != nil { diff --git a/pkg/executor/executor.go b/pkg/executor/executor.go index 84ccb00..a271ac3 100644 --- a/pkg/executor/executor.go +++ b/pkg/executor/executor.go @@ -23,7 +23,6 @@ import ( "time" "github.com/sirupsen/logrus" - "golang.org/x/sync/syncmap" "golang.org/x/time/rate" "k8s.io/client-go/util/workqueue" ) @@ -89,7 +88,7 @@ func New(c Config) (*Executor, error) { e.maxRetries = c.MaxJobRetries e.allowRetries = c.RetryJobAfterFailure e.wg = sync.WaitGroup{} - e.runningJobs = syncmap.Map{} + e.runningJobs = sync.Map{} // Create a rate limited queue, with a token bucket for overall limiting, // and exponential failure for per-item limiting. e.queue = workqueue.NewRateLimitingQueue( diff --git a/pkg/source/builtin/cronjob/config.go b/pkg/source/builtin/cronjob/config.go new file mode 100644 index 0000000..abcb09c --- /dev/null +++ b/pkg/source/builtin/cronjob/config.go @@ -0,0 +1,54 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cronjob + +import ( + "fmt" + "strings" +) + +// Config is the config for CronJob. +type Config struct { + Schedule string `json:"schedule"` + TimeZone string `json:"timeZone"` +} + +func (c *Config) String() string { + // When TZ is set in schedule, ignore timeZone, just use schedule as is. + // This is not the intended use case, but we want to support it. + if strings.Contains(c.Schedule, "TZ") { + return c.Schedule + } + + if c.TimeZone != "" { + // We don't check if the timezone is valid here. + // cron lib will do it. + return fmt.Sprintf("TZ=%s %s", c.TimeZone, c.Schedule) + } + + return c.Schedule +} + +func formatSchedule(c Config) string { + // When TZ is set in schedule, warn the user. This is not the intended use case. + // However, it should still work, so we can continue. + if strings.Contains(c.Schedule, "TZ") { + logger.Warnf("do NOT set 'TZ' in schedule, setting 'timeZone' is the preferred way. With 'TZ' set, any 'timeZone' setting will be ignored.") + } + + return c.String() +} diff --git a/pkg/source/builtin/cronjob/config_test.go b/pkg/source/builtin/cronjob/config_test.go new file mode 100644 index 0000000..8a11d10 --- /dev/null +++ b/pkg/source/builtin/cronjob/config_test.go @@ -0,0 +1,58 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cronjob + +import "testing" + +func TestFormatSchedule(t *testing.T) { + tests := []struct { + name string + schedule string + timeZone string + want string + }{ + { + name: "schedule_without_timezone", + schedule: "* * * * *", + timeZone: "", + want: "* * * * *", + }, + { + name: "schedule_with_timezone", + schedule: "* * * * *", + timeZone: "Asia/Shanghai", + want: "TZ=Asia/Shanghai * * * * *", + }, + { + name: "schedule_with_timezone_prefixed_unsupported_but_will_work", + schedule: "TZ=Asia/Shanghai * * * * *", + timeZone: "", + want: "TZ=Asia/Shanghai * * * * *", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Config{ + Schedule: tt.schedule, + TimeZone: tt.timeZone, + } + if got := formatSchedule(*c); got != tt.want { + t.Errorf("String() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/source/builtin/cronjob/cronjob.go b/pkg/source/builtin/cronjob/cronjob.go new file mode 100644 index 0000000..50d51c3 --- /dev/null +++ b/pkg/source/builtin/cronjob/cronjob.go @@ -0,0 +1,113 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cronjob + +import ( + "context" + "encoding/json" + + "github.com/pkg/errors" + "github.com/robfig/cron/v3" + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/kubevela/kube-trigger/pkg/eventhandler" + "github.com/kubevela/kube-trigger/pkg/source/types" +) + +func init() { + logger = logrus.WithField("source", cronJobType) +} + +var ( + logger *logrus.Entry + cronJobType = "cronjob" +) + +// CronJob triggers Actions on a schedule. +type CronJob struct { + config Config + cronRunner *cron.Cron +} + +var _ types.Source = &CronJob{} + +// New creates a new CronJob. +func (c *CronJob) New() types.Source { + return &CronJob{} +} + +// Init initializes the CronJob. +func (c *CronJob) Init(properties *runtime.RawExtension, eh eventhandler.EventHandler) error { + b, err := properties.MarshalJSON() + if err != nil { + return errors.Wrapf(err, "error when parsing properties for %s", c.Type()) + } + err = json.Unmarshal(b, &c.config) + if err != nil { + return errors.Wrapf(err, "error when parsing properties for %s", c.Type()) + } + + c.cronRunner = cron.New() + sched, err := cron.ParseStandard(formatSchedule(c.config)) + if err != nil { + return errors.Wrapf(err, "error when parsing schedule for %s", c.Type()) + } + c.cronRunner.Schedule(sched, cron.FuncJob(func() { + logger.Infof("schedule \"%s\" fired", c.config.String()) + e := Event{ + Config: c.config, + TimeFired: metav1.Now(), + } + err := eh(c.Type(), e, e) + if err != nil { + logger.Infof("calling event handler failed: %s", err) + } + })) + + return nil +} + +// Run starts the CronJob. +func (c *CronJob) Run(ctx context.Context) error { + go func() { + logger.Infof("cronjob \"%s\" started", c.config.String()) + c.cronRunner.Start() + <-ctx.Done() + logger.Infof("context cancelled, stoppping cronjob \"%s\"", c.config.String()) + c.cronRunner.Stop() + }() + + return nil +} + +// Type returns the type of the CronJob. +func (c *CronJob) Type() string { + return cronJobType +} + +// Singleton . +func (c *CronJob) Singleton() bool { + return false +} + +// Event is the context passed to Actions. +type Event struct { + Config `json:",inline"` + TimeFired metav1.Time `json:"timeFired"` +} diff --git a/pkg/source/builtin/cronjob/cronjob_test.go b/pkg/source/builtin/cronjob/cronjob_test.go new file mode 100644 index 0000000..041968d --- /dev/null +++ b/pkg/source/builtin/cronjob/cronjob_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2023 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cronjob + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/robfig/cron/v3" + "github.com/stretchr/testify/assert" + "k8s.io/apimachinery/pkg/runtime" + + "github.com/kubevela/kube-trigger/pkg/eventhandler" +) + +func TestCronJob_Init(t *testing.T) { + tests := []struct { + name string + config Config + wantErr bool + }{ + { + name: "normal", + config: Config{ + Schedule: "* * * * *", + TimeZone: "Asia/Shanghai", + }, + wantErr: false, + }, + { + name: "invalid_schedule", + config: Config{ + Schedule: "0 0 0 0 0", + TimeZone: "", + }, + wantErr: true, + }, + { + name: "invalid_timezone", + config: Config{ + Schedule: "* * * * *", + TimeZone: "Nowhere/nowhere", + }, + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := (&CronJob{}).New() + re := &runtime.RawExtension{} + b, err := json.Marshal(tt.config) + if err != nil { + t.Fail() + } + err = re.UnmarshalJSON(b) + if err != nil { + t.Fail() + } + if err := c.Init(re, eventhandler.New()); (err != nil) != tt.wantErr { + t.Errorf("Init() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } + + t.Run("invalid_properties", func(t *testing.T) { + c := (&CronJob{}).New() + re := &runtime.RawExtension{ + Raw: []byte("this-is-not-valid"), + Object: nil, + } + err := c.Init(re, eventhandler.New()) + assert.Error(t, err) + }) +} + +func TestCronJob_Run(t *testing.T) { + c := CronJob{ + config: Config{}, + cronRunner: cron.New(), + } + ctx, cancel := context.WithCancel(context.TODO()) + go func() { + _ = c.Run(ctx) + }() + cancel() + time.Sleep(50 * time.Millisecond) +} diff --git a/pkg/source/builtin/k8sresourcewatcher/controller/controller.go b/pkg/source/builtin/k8sresourcewatcher/controller/controller.go index 51fe144..4ad51e0 100644 --- a/pkg/source/builtin/k8sresourcewatcher/controller/controller.go +++ b/pkg/source/builtin/k8sresourcewatcher/controller/controller.go @@ -114,6 +114,7 @@ func newResourceController(ctx context.Context, logger *logrus.Entry, informer c var newEvent types.InformerEvent var err error cluster, _ := multicluster.ClusterFrom(ctx) + //nolint:errcheck // no need to check err here informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { newEvent.Event = types.Event{ @@ -207,6 +208,7 @@ func (c *Controller) processNextItem() bool { meta := utils.GetObjectMetaData(newEvent.(types.InformerEvent).EventObj) err := c.processItem(newEvent.(types.InformerEvent)) + //nolint:gocritic // no need to use switch statement here if err == nil { // No error, reset the ratelimit counters c.queue.Forget(newEvent) diff --git a/pkg/source/builtin/k8sresourcewatcher/k8s_resource_watcher.go b/pkg/source/builtin/k8sresourcewatcher/k8s_resource_watcher.go index bda8bd1..2476f04 100644 --- a/pkg/source/builtin/k8sresourcewatcher/k8s_resource_watcher.go +++ b/pkg/source/builtin/k8sresourcewatcher/k8s_resource_watcher.go @@ -33,13 +33,14 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "github.com/kubevela/pkg/multicluster" + "github.com/kubevela/pkg/util/singleton" + "github.com/kubevela/kube-trigger/api/v1alpha1" "github.com/kubevela/kube-trigger/pkg/eventhandler" "github.com/kubevela/kube-trigger/pkg/source/builtin/k8sresourcewatcher/controller" "github.com/kubevela/kube-trigger/pkg/source/builtin/k8sresourcewatcher/types" sourcetypes "github.com/kubevela/kube-trigger/pkg/source/types" - "github.com/kubevela/pkg/multicluster" - "github.com/kubevela/pkg/util/singleton" ) func init() { @@ -50,11 +51,14 @@ func init() { } var ( + // MultiClusterConfigType . MultiClusterConfigType string ) const ( - TypeClusterGateway string = "cluster-gateway" + // TypeClusterGateway . + TypeClusterGateway string = "cluster-gateway" + // TypeClusterGatewaySecret . TypeClusterGatewaySecret string = "cluster-gateway-secret" clusterLabel string = "cluster.core.oam.dev/cluster-credential-type" @@ -65,6 +69,7 @@ const ( clusterEndpoint string = "endpoint" ) +// K8sResourceWatcher watches k8s resources. type K8sResourceWatcher struct { configs map[string]*types.Config eventHandlers map[string][]eventhandler.EventHandler @@ -73,6 +78,7 @@ type K8sResourceWatcher struct { var _ sourcetypes.Source = &K8sResourceWatcher{} +// New . func (w *K8sResourceWatcher) New() sourcetypes.Source { return &K8sResourceWatcher{ configs: make(map[string]*types.Config), @@ -80,6 +86,7 @@ func (w *K8sResourceWatcher) New() sourcetypes.Source { } } +// Parse . func (w *K8sResourceWatcher) Parse(properties *runtime.RawExtension) (*types.Config, error) { props, err := properties.MarshalJSON() if err != nil { @@ -93,6 +100,7 @@ func (w *K8sResourceWatcher) Parse(properties *runtime.RawExtension) (*types.Con return ctrlConf, nil } +// Init . func (w *K8sResourceWatcher) Init(properties *runtime.RawExtension, eh eventhandler.EventHandler) error { var err error @@ -119,6 +127,7 @@ func (w *K8sResourceWatcher) Init(properties *runtime.RawExtension, eh eventhand return nil } +// Run . func (w *K8sResourceWatcher) Run(ctx context.Context) error { clusterGetter, err := NewMultiClustersGetter(MultiClusterConfigType) if err != nil { @@ -143,14 +152,22 @@ func (w *K8sResourceWatcher) Run(ctx context.Context) error { return nil } +// Type . func (w *K8sResourceWatcher) Type() string { return v1alpha1.SourceTypeResourceWatcher } +// Singleton . +func (w *K8sResourceWatcher) Singleton() bool { + return true +} + +// MultiClustersGetter . type MultiClustersGetter interface { GetDynamicClientAndMapper(ctx context.Context, cluster string) (dynamic.Interface, meta.RESTMapper, error) } +// NewMultiClustersGetter new a MultiClustersGetter func NewMultiClustersGetter(typ string) (MultiClustersGetter, error) { config := ctrl.GetConfigOrDie() cli, err := client.New(config, client.Options{Scheme: scheme.Scheme}) diff --git a/pkg/source/builtin/k8sresourcewatcher/types/types.go b/pkg/source/builtin/k8sresourcewatcher/types/types.go index 3eedefb..1d0d8c1 100644 --- a/pkg/source/builtin/k8sresourcewatcher/types/types.go +++ b/pkg/source/builtin/k8sresourcewatcher/types/types.go @@ -33,6 +33,7 @@ type Config struct { Clusters []string `json:"clusters,omitempty"` } +// Key returns the identifier of a Config. func (c *Config) Key() string { var labels string if len(c.MatchingLabels) > 0 { @@ -43,6 +44,7 @@ func (c *Config) Key() string { return strings.Join([]string{c.APIVersion, c.Kind, c.Namespace, labels}, "-") } +// Merge merges two Configs. func (c *Config) Merge(new Config) { for _, event := range new.Events { if !slices.Contains(c.Events, event) { @@ -57,8 +59,10 @@ func (c *Config) Merge(new Config) { } } +// EventType is the type of the observed event. type EventType string +// EventTypes const ( EventTypeCreate EventType = "create" EventTypeUpdate EventType = "update" diff --git a/pkg/source/builtin/k8sresourcewatcher/utils/utils.go b/pkg/source/builtin/k8sresourcewatcher/utils/utils.go index 8d0b1cc..b7d6d52 100644 --- a/pkg/source/builtin/k8sresourcewatcher/utils/utils.go +++ b/pkg/source/builtin/k8sresourcewatcher/utils/utils.go @@ -20,6 +20,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +// GetObjectMetaData . func GetObjectMetaData(obj interface{}) metav1.Object { return obj.(metav1.Object) } diff --git a/pkg/source/registry/register.go b/pkg/source/registry/register.go index 07f2672..d827fcc 100644 --- a/pkg/source/registry/register.go +++ b/pkg/source/registry/register.go @@ -17,6 +17,7 @@ limitations under the License. package registry import ( + "github.com/kubevela/kube-trigger/pkg/source/builtin/cronjob" "github.com/kubevela/kube-trigger/pkg/source/builtin/k8sresourcewatcher" "github.com/kubevela/kube-trigger/pkg/source/types" ) @@ -24,6 +25,7 @@ import ( // RegisterBuiltinSources register builtin sources. func RegisterBuiltinSources(reg *Registry) { registerFromInstance(reg, &k8sresourcewatcher.K8sResourceWatcher{}) + registerFromInstance(reg, &cronjob.CronJob{}) } func registerFromInstance(reg *Registry, act types.Source) { diff --git a/pkg/source/types/types.go b/pkg/source/types/types.go index fa7c6fb..d1271fa 100644 --- a/pkg/source/types/types.go +++ b/pkg/source/types/types.go @@ -41,6 +41,13 @@ type Source interface { // Type returns the type of this Source. Name your source as something-doer, // instead of do-something. Type() string + + // Singleton defines if this type of Source will only be initialized once. + // For example, if Singleton is true, all Source's in config with the same + // type will share the same instance (New will be called only once) + // and Init will be called multiple times for each Source of the same type. + // Otherwise, each Source will have its own instance, i.e., each New for each Init. + Singleton() bool } // SourceMeta is what users type in their configurations, specifying what source