diff --git a/api/v1alpha1/k8sgpt_types.go b/api/v1alpha1/k8sgpt_types.go index c6995bfe..3643a1d5 100644 --- a/api/v1alpha1/k8sgpt_types.go +++ b/api/v1alpha1/k8sgpt_types.go @@ -36,11 +36,17 @@ type ExtraOptionsRef struct { Backstage *Backstage `json:"backstage,omitempty"` } +type WebhookRef struct { + // +kubebuilder:validation:Enum=slack + Type string `json:"type,omitempty"` + Endpoint string `json:"webhook,omitempty"` +} + // K8sGPTSpec defines the desired state of K8sGPT type K8sGPTSpec struct { // +kubebuilder:default:=openai // +kubebuilder:validation:Enum=openai;localai;azureopenai - Backend `json:"backend"` + Backend string `json:"backend"` BaseUrl string `json:"baseUrl,omitempty"` // +kubebuilder:default:=gpt-3.5-turbo Model string `json:"model,omitempty"` @@ -51,14 +57,13 @@ type K8sGPTSpec struct { NoCache bool `json:"noCache,omitempty"` Filters []string `json:"filters,omitempty"` ExtraOptions *ExtraOptionsRef `json:"extraOptions,omitempty"` + Sink *WebhookRef `json:"sink,omitempty"` } -type Backend string - const ( - OpenAI Backend = "openai" - AzureOpenAI Backend = "azureopenai" - LocalAI Backend = "localai" + OpenAI = "openai" + AzureOpenAI = "azureopenai" + LocalAI = "localai" ) // K8sGPTStatus defines the observed state of K8sGPT diff --git a/api/v1alpha1/result_types.go b/api/v1alpha1/result_types.go index c5734c95..3201c94b 100644 --- a/api/v1alpha1/result_types.go +++ b/api/v1alpha1/result_types.go @@ -32,7 +32,7 @@ type Sensitive struct { // ResultSpec defines the desired state of Result type ResultSpec struct { - Backend `json:"backend"` + Backend string `json:"backend"` Kind string `json:"kind"` Name string `json:"name"` Error []Failure `json:"error"` @@ -42,8 +42,8 @@ type ResultSpec struct { // ResultStatus defines the observed state of Result type ResultStatus struct { - // INSERT ADDITIONAL STATUS FIELD - define observed state of cluster - // Important: Run "make" to regenerate code after modifying this file + LifeCycle string `json:"lifecycle,omitempty"` + Webhook string `json:"webhook,omitempty"` } //+kubebuilder:object:root=true diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 660ac99f..1d307070 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -157,6 +157,11 @@ func (in *K8sGPTSpec) DeepCopyInto(out *K8sGPTSpec) { *out = new(ExtraOptionsRef) (*in).DeepCopyInto(*out) } + if in.Sink != nil { + in, out := &in.Sink, &out.Sink + *out = new(WebhookRef) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new K8sGPTSpec. @@ -309,3 +314,18 @@ func (in *Sensitive) DeepCopy() *Sensitive { in.DeepCopyInto(out) return out } + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WebhookRef) DeepCopyInto(out *WebhookRef) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WebhookRef. +func (in *WebhookRef) DeepCopy() *WebhookRef { + if in == nil { + return nil + } + out := new(WebhookRef) + in.DeepCopyInto(out) + return out +} diff --git a/chart/operator/templates/deployment.yaml b/chart/operator/templates/deployment.yaml index 484fdd4d..b6575099 100644 --- a/chart/operator/templates/deployment.yaml +++ b/chart/operator/templates/deployment.yaml @@ -77,6 +77,8 @@ spec: env: - name: KUBERNETES_CLUSTER_DOMAIN value: {{ quote .Values.kubernetesClusterDomain }} + - name: OPERATOR_SINK_WEBHOOK_TIMEOUT_SECONDS + value: {{ quote .Values.controllerManager.manager.sinkWebhookTimeout }} image: {{ .Values.controllerManager.manager.image.repository }}:{{ .Values.controllerManager.manager.image.tag | default .Chart.AppVersion }} livenessProbe: @@ -99,4 +101,4 @@ spec: securityContext: runAsNonRoot: true serviceAccountName: {{ include "chart.fullname" . }}-controller-manager - terminationGracePeriodSeconds: 10 \ No newline at end of file + terminationGracePeriodSeconds: 10 diff --git a/chart/operator/values.yaml b/chart/operator/values.yaml index 324c8a93..43ba2fab 100644 --- a/chart/operator/values.yaml +++ b/chart/operator/values.yaml @@ -29,6 +29,7 @@ controllerManager: cpu: 5m memory: 64Mi manager: + sinkWebhookTimeout: 30s containerSecurityContext: allowPrivilegeEscalation: false capabilities: diff --git a/config/crd/bases/core.k8sgpt.ai_k8sgpts.yaml b/config/crd/bases/core.k8sgpt.ai_k8sgpts.yaml index b67fce91..3104878a 100644 --- a/config/crd/bases/core.k8sgpt.ai_k8sgpts.yaml +++ b/config/crd/bases/core.k8sgpt.ai_k8sgpts.yaml @@ -72,6 +72,15 @@ spec: name: type: string type: object + sink: + properties: + type: + enum: + - slack + type: string + webhook: + type: string + type: object version: type: string required: diff --git a/config/crd/bases/core.k8sgpt.ai_results.yaml b/config/crd/bases/core.k8sgpt.ai_results.yaml index 3e4a8ff6..9d938573 100644 --- a/config/crd/bases/core.k8sgpt.ai_results.yaml +++ b/config/crd/bases/core.k8sgpt.ai_results.yaml @@ -80,6 +80,11 @@ spec: type: object status: description: ResultStatus defines the observed state of Result + properties: + lifecycle: + type: string + webhook: + type: string type: object type: object served: true diff --git a/controllers/k8sgpt_controller.go b/controllers/k8sgpt_controller.go index 64752205..47ec5f56 100644 --- a/controllers/k8sgpt_controller.go +++ b/controllers/k8sgpt_controller.go @@ -27,12 +27,11 @@ import ( kclient "github.com/k8sgpt-ai/k8sgpt-operator/pkg/client" "github.com/k8sgpt-ai/k8sgpt-operator/pkg/integrations" "github.com/k8sgpt-ai/k8sgpt-operator/pkg/resources" + "github.com/k8sgpt-ai/k8sgpt-operator/pkg/sinks" "github.com/k8sgpt-ai/k8sgpt-operator/pkg/utils" "github.com/prometheus/client_golang/prometheus" v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" @@ -71,6 +70,7 @@ type K8sGPTReconciler struct { client.Client Scheme *runtime.Scheme Integrations *integrations.Integrations + SinkClient *sinks.Client K8sGPTClient *kclient.Client // This is a map of clients for each deployment k8sGPTClients map[string]*kclient.Client @@ -202,29 +202,11 @@ func (r *K8sGPTReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr } // Parse the k8sgpt-deployment response into a list of results k8sgptNumberOfResults.Set(float64(len(response.Results))) - rawResults := make(map[string]corev1alpha1.Result) - for _, resultSpec := range response.Results { - resultSpec.Backend = k8sgptConfig.Spec.Backend - name := strings.ReplaceAll(resultSpec.Name, "-", "") - name = strings.ReplaceAll(name, "/", "") - result := corev1alpha1.Result{ - Spec: resultSpec, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: k8sgptConfig.Namespace, - }, - } - if k8sgptConfig.Spec.ExtraOptions != nil && k8sgptConfig.Spec.ExtraOptions.Backstage.Enabled { - backstageLabel, err := r.Integrations.BackstageLabel(resultSpec) - if err != nil { - k8sgptReconcileErrorCount.Inc() - return r.finishReconcile(err, false) - } - result.ObjectMeta.Labels = backstageLabel - } - rawResults[name] = result + rawResults, err := resources.MapResults(*r.Integrations, response.Results, *k8sgptConfig) + if err != nil { + k8sgptReconcileErrorCount.Inc() + return r.finishReconcile(err, false) } - // Prior to creating or updating any results we will delete any stale results that // no longer are relevent, we can do this by using the resultSpec composed name against // the custom resource name @@ -255,39 +237,65 @@ func (r *K8sGPTReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr // At this point we are able to loop through our rawResults and create them or update // them as needed for _, result := range rawResults { - // Check if the result already exists - var existingResult corev1alpha1.Result - err = r.Get(ctx, client.ObjectKey{Namespace: k8sgptConfig.Namespace, - Name: result.Name}, &existingResult) + operation, err := resources.CreateOrUpdateResult(ctx, r.Client, result) if err != nil { - // if the result doesn't exist, we will create it - if errors.IsNotFound(err) { - err = r.Create(ctx, &result) - if err != nil { + k8sgptReconcileErrorCount.Inc() + return r.finishReconcile(err, false) + + } + + // Update metrics + if operation == resources.CreatedResult { + k8sgptNumberOfResultsByType.With(prometheus.Labels{ + "kind": result.Spec.Kind, + "name": result.Name, + }).Inc() + } else if operation == resources.UpdatedResult { + fmt.Printf("Updated successfully %s \n", result.Name) + } + + } + + // We emit when result Status is not historical + // and when user configures a sink for the first time + latestResultList := &corev1alpha1.ResultList{} + if err := r.List(ctx, latestResultList); err != nil { + return r.finishReconcile(err, false) + } + if len(latestResultList.Items) == 0 { + return r.finishReconcile(nil, false) + } + sinkEnabled := k8sgptConfig.Spec.Sink != nil && k8sgptConfig.Spec.Sink.Type != "" && k8sgptConfig.Spec.Sink.Endpoint != "" + + var sinkType sinks.ISink + if sinkEnabled { + sinkType = sinks.NewSink(k8sgptConfig.Spec.Sink.Type) + sinkType.Configure(*k8sgptConfig, *r.SinkClient) + } + + for _, result := range latestResultList.Items { + var res corev1alpha1.Result + if err := r.Get(ctx, client.ObjectKey{Namespace: result.Namespace, Name: result.Name}, &res); err != nil { + return r.finishReconcile(err, false) + } + + if sinkEnabled { + if res.Status.LifeCycle != string(resources.NoOpResult) || res.Status.Webhook == "" { + if err := sinkType.Emit(res.Spec); err != nil { k8sgptReconcileErrorCount.Inc() return r.finishReconcile(err, false) - } else { - k8sgptNumberOfResultsByType.With(prometheus.Labels{ - "kind": result.Spec.Kind, - "name": result.Name, - }).Inc() } - } else { - k8sgptReconcileErrorCount.Inc() - return r.finishReconcile(err, false) + res.Status.Webhook = k8sgptConfig.Spec.Sink.Endpoint } } else { - // If the result already exists we will update it - existingResult.Spec = result.Spec - existingResult.Labels = result.Labels - err = r.Update(ctx, &existingResult) - if err != nil { - k8sgptReconcileErrorCount.Inc() - return r.finishReconcile(err, false) - } + // Remove the Webhook status from results + res.Status.Webhook = "" + } + if err := r.Status().Update(ctx, &res); err != nil { + k8sgptReconcileErrorCount.Inc() + return r.finishReconcile(err, false) } } - } return r.finishReconcile(nil, false) diff --git a/main.go b/main.go index 980e29ab..fdae8141 100644 --- a/main.go +++ b/main.go @@ -17,6 +17,7 @@ import ( "context" "flag" "os" + "time" // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. @@ -25,6 +26,7 @@ import ( corev1alpha1 "github.com/k8sgpt-ai/k8sgpt-operator/api/v1alpha1" "github.com/k8sgpt-ai/k8sgpt-operator/controllers" "github.com/k8sgpt-ai/k8sgpt-operator/pkg/integrations" + "github.com/k8sgpt-ai/k8sgpt-operator/pkg/sinks" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -93,10 +95,23 @@ func main() { os.Exit(1) } + timeout, exists := os.LookupEnv("OPERATOR_SINK_WEBHOOK_TIMEOUT_SECONDS") + if !exists { + timeout = "35s" + } + + sinkTimeout, err := time.ParseDuration(timeout) + if err != nil { + setupLog.Error(err, "unable to read webhook timeout value") + os.Exit(1) + } + sinkClient := sinks.NewClient(sinkTimeout) + if err = (&controllers.K8sGPTReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), Integrations: integration, + SinkClient: sinkClient, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "K8sGPT") os.Exit(1) diff --git a/pkg/resources/results.go b/pkg/resources/results.go new file mode 100644 index 00000000..97804538 --- /dev/null +++ b/pkg/resources/results.go @@ -0,0 +1,84 @@ +package resources + +import ( + "context" + "reflect" + "strings" + + "github.com/k8sgpt-ai/k8sgpt-operator/api/v1alpha1" + "github.com/k8sgpt-ai/k8sgpt-operator/pkg/integrations" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type ResultOperation string + +const ( + CreatedResult ResultOperation = "created" + UpdatedResult = "updated" + NoOpResult = "historical" +) + +func MapResults(i integrations.Integrations, resultsSpec []v1alpha1.ResultSpec, config v1alpha1.K8sGPT) (map[string]v1alpha1.Result, error) { + namespace := config.Namespace + backend := config.Spec.Backend + backstageEnabled := config.Spec.ExtraOptions != nil && config.Spec.ExtraOptions.Backstage.Enabled + rawResults := make(map[string]v1alpha1.Result) + for _, resultSpec := range resultsSpec { + name := strings.ReplaceAll(resultSpec.Name, "-", "") + name = strings.ReplaceAll(name, "/", "") + result := GetResult(resultSpec, name, namespace, backend) + if backstageEnabled { + backstageLabel, err := i.BackstageLabel(resultSpec) + if err != nil { + return nil, err + } + // add Backstage label + result.ObjectMeta.Labels = backstageLabel + } + + rawResults[name] = result + } + return rawResults, nil +} + +func GetResult(resultSpec v1alpha1.ResultSpec, name, namespace, backend string) v1alpha1.Result { + resultSpec.Backend = backend + return v1alpha1.Result{ + Spec: resultSpec, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + } +} +func CreateOrUpdateResult(ctx context.Context, c client.Client, res v1alpha1.Result) (ResultOperation, error) { + var existing v1alpha1.Result + if err := c.Get(ctx, client.ObjectKey{Namespace: res.Namespace, Name: res.Name}, &existing); err != nil { + if !errors.IsNotFound(err) { + return NoOpResult, err + } + if err := c.Create(ctx, &res); err != nil { + return NoOpResult, err + } + return CreatedResult, nil + } + if existing.Spec.Details == res.Spec.Details && reflect.DeepEqual(res.Labels, existing.Labels) { + existing.Status.LifeCycle = string(NoOpResult) + err := c.Status().Update(ctx, &existing) + return NoOpResult, err + } + + existing.Spec = res.Spec + existing.Labels = res.Labels + if err := c.Update(ctx, &existing); err != nil { + return NoOpResult, err + } + existing.Status.LifeCycle = string(UpdatedResult) + if err := c.Status().Update(ctx, &existing); err != nil { + return NoOpResult, err + } + + return UpdatedResult, nil +} diff --git a/pkg/sinks/sinkreporter.go b/pkg/sinks/sinkreporter.go new file mode 100644 index 00000000..33d41232 --- /dev/null +++ b/pkg/sinks/sinkreporter.go @@ -0,0 +1,36 @@ +package sinks + +import ( + "net/http" + "time" + + "github.com/k8sgpt-ai/k8sgpt-operator/api/v1alpha1" +) + +type ISink interface { + Configure(config v1alpha1.K8sGPT, c Client) + Emit(results v1alpha1.ResultSpec) error +} + +func NewSink(sinkType string) ISink { + switch sinkType { + case "slack": + return &SlackSink{} + //Introduce more Sink Providers + default: + return &SlackSink{} + } +} + +type Client struct { + hclient *http.Client +} + +func NewClient(timeout time.Duration) *Client { + client := &http.Client{ + Timeout: timeout, + } + return &Client{ + hclient: client, + } +} diff --git a/pkg/sinks/slack.go b/pkg/sinks/slack.go new file mode 100644 index 00000000..210389f3 --- /dev/null +++ b/pkg/sinks/slack.go @@ -0,0 +1,73 @@ +package sinks + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + + "github.com/k8sgpt-ai/k8sgpt-operator/api/v1alpha1" +) + +var _ ISink = (*SlackSink)(nil) + +type SlackSink struct { + Endpoint string + Client Client +} + +type SlackMessage struct { + Text string `json:"text"` + Attachments []Attachment `json:"attachments"` +} + +type Attachment struct { + Type string `json:"type"` + Text string `json:"text"` + Color string `json:"color"` + Title string `json:"title"` +} + +func buildSlackMessage(kind, name, details string) SlackMessage { + return SlackMessage{ + Text: fmt.Sprintf(">*K8sGPT analysis of the %s %s*", kind, name), + Attachments: []Attachment{ + Attachment{ + Type: "mrkdwn", + Text: details, + Color: "danger", + Title: "Report", + }, + }, + } +} + +func (s *SlackSink) Configure(config v1alpha1.K8sGPT, c Client) { + s.Endpoint = config.Spec.Sink.Endpoint + s.Client = c +} + +func (s *SlackSink) Emit(results v1alpha1.ResultSpec) error { + message := buildSlackMessage(results.Kind, results.Name, results.Details) + payload, err := json.Marshal(message) + if err != nil { + return err + } + req, err := http.NewRequest(http.MethodPost, s.Endpoint, bytes.NewBuffer(payload)) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + resp, err := s.Client.hclient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to send report: %s", resp.Status) + } + + return nil +}