Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adds general purpose event webhook #401

Merged
merged 12 commits into from
Jan 11, 2020
1 change: 1 addition & 0 deletions charts/flagger/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ Parameter | Description | Default
`slack.url` | Slack incoming webhook | None
`slack.channel` | Slack channel | None
`slack.user` | Slack username | `flagger`
`eventWebhook` | If set, Flagger will publish events to the given webhook | None
`msteams.url` | Microsoft Teams incoming webhook | None
`podMonitor.enabled` | if `true`, create a PodMonitor for [monitoring the metrics](https://docs.flagger.app/usage/monitoring#metrics) | `false`
`podMonitor.namespace` | the namespace where the PodMonitor is created | the same namespace
Expand Down
3 changes: 3 additions & 0 deletions charts/flagger/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ spec:
{{- if .Values.ingressAnnotationsPrefix }}
- -ingress-annotations-prefix={{ .Values.ingressAnnotationsPrefix }}
{{- end }}
{{- if .Values.eventWebhook }}
- -event-webhook={{ .Values.eventWebhook }}
{{- end }}
livenessProbe:
exec:
command:
Expand Down
3 changes: 3 additions & 0 deletions charts/flagger/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ slack:
# incoming webhook https://api.slack.com/incoming-webhooks
url:

# when specified, flagger will publish events to the provided webhook
eventWebhook: ""

msteams:
# MS Teams incoming webhook URL
url:
Expand Down
3 changes: 3 additions & 0 deletions cmd/flagger/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var (
slackURL string
slackUser string
slackChannel string
eventWebhook string
threadiness int
zapReplaceGlobals bool
zapEncoding string
Expand All @@ -67,6 +68,7 @@ func init() {
flag.StringVar(&slackURL, "slack-url", "", "Slack hook URL.")
flag.StringVar(&slackUser, "slack-user", "flagger", "Slack user name.")
flag.StringVar(&slackChannel, "slack-channel", "", "Slack channel.")
flag.StringVar(&eventWebhook, "event-webhook", "", "Webhook for publishing flagger events")
flag.StringVar(&msteamsURL, "msteams-url", "", "MS Teams incoming webhook URL.")
flag.IntVar(&threadiness, "threadiness", 2, "Worker concurrency.")
flag.BoolVar(&zapReplaceGlobals, "zap-replace-globals", false, "Whether to change the logging level of the global zap logger.")
Expand Down Expand Up @@ -200,6 +202,7 @@ func main() {
observerFactory,
meshProvider,
version.VERSION,
eventWebhook,
)

flaggerInformerFactory.Start(stopCh)
Expand Down
39 changes: 39 additions & 0 deletions docs/gitbook/usage/alerting.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,42 @@ Besides Slack, you can use Alertmanager to trigger alerts when a canary deployme
description: "Workload {{ $labels.name }} namespace {{ $labels.namespace }}"
```

### Event Webhook

Flagger can be configured to send event payloads to a specified webhook:

```bash
helm upgrade -i flagger flagger/flagger \
--set eventWebhook=https://example.com/flagger-canary-event-webhook
```

When configured, every action that Flagger takes during a canary deployment will be sent as JSON via an HTTP POST
request. The JSON payload has the following schema:

```json
{
"name": "string (canary name)",
"namespace": "string (canary namespace)",
"phase": "string (canary phase)",
"metadata": {
"eventMessage": "string (canary event message)",
"eventType": "string (canary event type)",
"timestamp": "string (unix timestamp ms)"
}
}
```

Example:

```json
{
"name": "podinfo",
"namespace": "default",
"phase": "Progressing",
"metadata": {
"eventMessage": "New revision detected! Scaling up podinfo.default",
"eventType": "Normal",
"timestamp": "1578607635167"
}
}
```
15 changes: 15 additions & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Controller struct {
routerFactory *router.Factory
observerFactory *metrics.Factory
meshProvider string
eventWebhook string
}

func NewController(
Expand All @@ -66,6 +67,7 @@ func NewController(
observerFactory *metrics.Factory,
meshProvider string,
version string,
eventWebhook string,
) *Controller {
logger.Debug("Creating event broadcaster")
flaggerscheme.AddToScheme(scheme.Scheme)
Expand Down Expand Up @@ -97,6 +99,7 @@ func NewController(
canaryFactory: canaryFactory,
routerFactory: routerFactory,
meshProvider: meshProvider,
eventWebhook: eventWebhook,
}

flaggerInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -244,19 +247,31 @@ func checkCustomResourceType(obj interface{}, logger *zap.SugaredLogger) (flagge
return *roll, true
}

func (c *Controller) sendEventToWebhook(r *flaggerv1.Canary, eventtype, template string, args []interface{}) {
if c.eventWebhook != "" {
err := CallEventWebhook(r, c.eventWebhook, fmt.Sprintf(template, args...), eventtype)
if err != nil {
c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Errorf("error sending event to webhook: %s", err)
}
}
}

func (c *Controller) recordEventInfof(r *flaggerv1.Canary, template string, args ...interface{}) {
c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Infof(template, args...)
c.eventRecorder.Event(r, corev1.EventTypeNormal, "Synced", fmt.Sprintf(template, args...))
c.sendEventToWebhook(r, corev1.EventTypeNormal, template, args)
}

func (c *Controller) recordEventErrorf(r *flaggerv1.Canary, template string, args ...interface{}) {
c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Errorf(template, args...)
c.eventRecorder.Event(r, corev1.EventTypeWarning, "Synced", fmt.Sprintf(template, args...))
c.sendEventToWebhook(r, corev1.EventTypeWarning, template, args)
}

func (c *Controller) recordEventWarningf(r *flaggerv1.Canary, template string, args ...interface{}) {
c.logger.With("canary", fmt.Sprintf("%s.%s", r.Name, r.Namespace)).Infof(template, args...)
c.eventRecorder.Event(r, corev1.EventTypeWarning, "Synced", fmt.Sprintf(template, args...))
c.sendEventToWebhook(r, corev1.EventTypeWarning, template, args)
}

func (c *Controller) sendNotification(cd *flaggerv1.Canary, message string, metadata bool, warn bool) {
Expand Down
14 changes: 10 additions & 4 deletions pkg/controller/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,12 @@ func (c *Controller) advanceCanary(name string, namespace string, skipLivenessCh
return
}

canaryPhaseFailed := cd.DeepCopy()
canaryPhaseFailed.Status.Phase = flaggerv1.CanaryPhaseFailed
c.recordEventWarningf(canaryPhaseFailed, "Canary failed! Scaling down %s.%s",
canaryPhaseFailed.Name, canaryPhaseFailed.Namespace)

c.recorder.SetWeight(cd, primaryWeight, canaryWeight)
c.recordEventWarningf(cd, "Canary failed! Scaling down %s.%s",
cd.Name, cd.Namespace)

// shutdown canary
if err := canaryController.Scale(cd, 0); err != nil {
Expand Down Expand Up @@ -645,9 +648,12 @@ func (c *Controller) checkCanaryStatus(canary *flaggerv1.Canary, canaryControlle
}

if shouldAdvance {
c.recordEventInfof(canary, "New revision detected! Scaling up %s.%s", canary.Spec.TargetRef.Name, canary.Namespace)
c.sendNotification(canary, "New revision detected, starting canary analysis.",
canaryPhaseProgressing := canary.DeepCopy()
canaryPhaseProgressing.Status.Phase = flaggerv1.CanaryPhaseProgressing
c.recordEventInfof(canaryPhaseProgressing, "New revision detected! Scaling up %s.%s", canaryPhaseProgressing.Spec.TargetRef.Name, canaryPhaseProgressing.Namespace)
c.sendNotification(canaryPhaseProgressing, "New revision detected, starting canary analysis.",
true, false)

if err := canaryController.ScaleFromZero(canary); err != nil {
c.recordEventErrorf(canary, "%v", err)
return false
Expand Down
63 changes: 45 additions & 18 deletions pkg/controller/webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,32 +7,22 @@ import (
"errors"
"fmt"
"io/ioutil"
"k8s.io/utils/clock"
"net/http"
"net/url"
"strconv"
"time"

flaggerv1 "github.com/weaveworks/flagger/pkg/apis/flagger/v1alpha3"
)

// CallWebhook does a HTTP POST to an external service and
// returns an error if the response status code is non-2xx
func CallWebhook(name string, namespace string, phase flaggerv1.CanaryPhase, w flaggerv1.CanaryWebhook) error {
payload := flaggerv1.CanaryWebhookPayload{
Name: name,
Namespace: namespace,
Phase: phase,
}

if w.Metadata != nil {
payload.Metadata = *w.Metadata
}

func callWebhook(webhook string, payload interface{}, timeout string) error {
payloadBin, err := json.Marshal(payload)
if err != nil {
return err
}

hook, err := url.Parse(w.URL)
hook, err := url.Parse(webhook)
if err != nil {
return err
}
Expand All @@ -44,16 +34,16 @@ func CallWebhook(name string, namespace string, phase flaggerv1.CanaryPhase, w f

req.Header.Set("Content-Type", "application/json")

if len(w.Timeout) < 2 {
w.Timeout = "10s"
if timeout == "" {
timeout = "10s"
}

timeout, err := time.ParseDuration(w.Timeout)
t, err := time.ParseDuration(timeout)
if err != nil {
return err
}

ctx, cancel := context.WithTimeout(req.Context(), timeout)
ctx, cancel := context.WithTimeout(req.Context(), t)
defer cancel()

r, err := http.DefaultClient.Do(req.WithContext(ctx))
Expand All @@ -73,3 +63,40 @@ func CallWebhook(name string, namespace string, phase flaggerv1.CanaryPhase, w f

return nil
}

// CallWebhook does a HTTP POST to an external service and
// returns an error if the response status code is non-2xx
func CallWebhook(name string, namespace string, phase flaggerv1.CanaryPhase, w flaggerv1.CanaryWebhook) error {
payload := flaggerv1.CanaryWebhookPayload{
Name: name,
Namespace: namespace,
Phase: phase,
}

if w.Metadata != nil {
payload.Metadata = *w.Metadata
}

if len(w.Timeout) < 2 {
w.Timeout = "10s"
}

return callWebhook(w.URL, payload, w.Timeout)
}

func CallEventWebhook(r *flaggerv1.Canary, webhook, message, eventtype string) error {
t := clock.RealClock{}.Now()

payload := flaggerv1.CanaryWebhookPayload{
Name: r.Name,
Namespace: r.Namespace,
Phase: r.Status.Phase,
Metadata: map[string]string{
"eventMessage": message,
"eventType": eventtype,
"timestamp": strconv.FormatInt(t.UnixNano()/1000000, 10),
},
}

return callWebhook(webhook, payload, "5s")
}
92 changes: 90 additions & 2 deletions pkg/controller/webhook_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package controller

import (
"encoding/json"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -20,7 +24,7 @@ func TestCallWebhook(t *testing.T) {
Metadata: &map[string]string{"key1": "val1"},
}

err := CallWebhook("podinfo", "default", flaggerv1.CanaryPhaseProgressing, hook)
err := CallWebhook("podinfo", v1.NamespaceDefault, flaggerv1.CanaryPhaseProgressing, hook)
if err != nil {
t.Fatal(err.Error())
}
Expand All @@ -36,7 +40,91 @@ func TestCallWebhook_StatusCode(t *testing.T) {
URL: ts.URL,
}

err := CallWebhook("podinfo", "default", flaggerv1.CanaryPhaseProgressing, hook)
err := CallWebhook("podinfo", v1.NamespaceDefault, flaggerv1.CanaryPhaseProgressing, hook)
if err == nil {
t.Errorf("Got no error wanted %v", http.StatusInternalServerError)
}
}

func TestCallEventWebhook(t *testing.T) {
canaryName := "podinfo"
canaryNamespace := v1.NamespaceDefault
canaryMessage := fmt.Sprintf("Starting canary analysis for %s.%s", canaryName, canaryNamespace)
canaryEventType := corev1.EventTypeNormal

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
d := json.NewDecoder(r.Body)

var payload flaggerv1.CanaryWebhookPayload

err := d.Decode(&payload)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}

if payload.Metadata["eventMessage"] != canaryMessage {
w.WriteHeader(http.StatusBadRequest)
return
}

if payload.Metadata["eventType"] != canaryEventType {
w.WriteHeader(http.StatusBadRequest)
return
}

if payload.Name != canaryName {
w.WriteHeader(http.StatusBadRequest)
return
}

if payload.Namespace != canaryNamespace {
w.WriteHeader(http.StatusBadRequest)
return
}

w.WriteHeader(http.StatusAccepted)
}))
defer ts.Close()

canary := &flaggerv1.Canary{
ObjectMeta: v1.ObjectMeta{
Name: canaryName,
Namespace: canaryNamespace,
},
Status: flaggerv1.CanaryStatus{
Phase: flaggerv1.CanaryPhaseProgressing,
},
}

err := CallEventWebhook(canary, ts.URL, canaryMessage, canaryEventType)
if err != nil {
t.Fatal(err.Error())
}
}

func TestCallEventWebhookStatusCode(t *testing.T) {
canaryName := "podinfo"
canaryNamespace := v1.NamespaceDefault
canaryMessage := fmt.Sprintf("Starting canary analysis for %s.%s", canaryName, canaryNamespace)
canaryEventType := corev1.EventTypeNormal

ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer ts.Close()

canary := &flaggerv1.Canary{
ObjectMeta: v1.ObjectMeta{
Name: canaryName,
Namespace: canaryNamespace,
},
Status: flaggerv1.CanaryStatus{
Phase: flaggerv1.CanaryPhaseProgressing,
},
}

err := CallEventWebhook(canary, ts.URL, canaryMessage, canaryEventType)
if err == nil {
t.Errorf("Got no error wanted %v", http.StatusInternalServerError)
}
Expand Down