Skip to content

Commit

Permalink
feat: Add VM Preemption Example
Browse files Browse the repository at this point in the history
Closes: vmware-samples#696
Signed-off-by: Michael Gasch <mgasch@vmware.com>
  • Loading branch information
Michael Gasch committed Nov 23, 2021
1 parent 7afd759 commit c85abea
Show file tree
Hide file tree
Showing 11 changed files with 2,153 additions and 0 deletions.
14 changes: 14 additions & 0 deletions examples/knative/go/kn-go-preemption/.ko.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
builds:
- id: function
# dir: .
# main: .
env:
- GOPRIVATE=*.vmware.com
flags:
- -tags
- netgo
ldflags:
- -s -w
- -extldflags "-static"
- -X main.buildCommit={{.Env.KO_COMMIT}}
- -X main.buildTag={{.Env.KO_TAG}}
322 changes: 322 additions & 0 deletions examples/knative/go/kn-go-preemption/README.md

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions examples/knative/go/kn-go-preemption/function.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
apiVersion: serving.knative.dev/v1
kind: Service
metadata:
name: kn-go-preemption
labels:
app: veba-ui
workflow: vsphere-preemption
spec:
template:
metadata:
annotations:
autoscaling.knative.dev/maxScale: "1"
autoscaling.knative.dev/minScale: "1"
spec:
containers:
- image: us.gcr.io/daisy-284300/veba/kn-go-preemption:1.0
env:
- name: TEMPORAL_URL
value: "FILL-ME-IN"
- name: TEMPORAL_NAMESPACE
value: "FILL-ME-IN"
- name: TEMPORAL_TASKQUEUE
value: "FILL-ME-IN"
- name: VSPHERE_PREEMPTION_TAG
value: "FILL-ME-IN"
- name: VSPHERE_ALARM_NAME
value: "FILL-ME-IN"
- name: DEBUG
value: "true"
---
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: vsphere-preemption
labels:
app: veba-ui
workflow: vsphere-preemption
spec:
broker: default
filter:
attributes:
subject: AlarmStatusChangedEvent
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: kn-go-preemption
62 changes: 62 additions & 0 deletions examples/knative/go/kn-go-preemption/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
module github.com/vmware-samples/vcenter-event-broker-appliance/examples/go/kn-go-preemption

go 1.17

require (
github.com/cloudevents/sdk-go/v2 v2.5.0
github.com/embano1/vsphere-preemption v0.1.0
github.com/hashicorp/go-multierror v1.0.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/stretchr/testify v1.7.0
github.com/vmware/govmomi v0.27.0
go.temporal.io/api v1.5.0
go.temporal.io/sdk v1.10.0
go.uber.org/zap v1.19.1
gotest.tools/v3 v3.0.3
knative.dev/pkg v0.0.0-20211018141937-a34efd6b409d
)

require (
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/blendle/zapdriver v1.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/go-logr/logr v0.4.0 // indirect
github.com/gogo/googleapis v1.4.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/gogo/status v1.1.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.6 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pborman/uuid v1.2.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/robfig/cron v1.2.0 // indirect
github.com/stretchr/objx v0.3.0 // indirect
github.com/twmb/murmur3 v1.1.6 // indirect
github.com/uber-go/tally v3.4.2+incompatible // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/net v0.0.0-20210928044308-7d9f5e0b762b // indirect
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0 // indirect
google.golang.org/grpc v1.41.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
k8s.io/api v0.22.2 // indirect
k8s.io/apimachinery v0.22.2 // indirect
k8s.io/klog/v2 v2.9.0 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.1.2 // indirect
)
1,108 changes: 1,108 additions & 0 deletions examples/knative/go/kn-go-preemption/go.sum

Large diffs are not rendered by default.

230 changes: 230 additions & 0 deletions examples/knative/go/kn-go-preemption/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package main

import (
"context"
"net/http"
"strings"
"time"

ce "github.com/cloudevents/sdk-go/v2"
preemption "github.com/embano1/vsphere-preemption"
errs "github.com/hashicorp/go-multierror"
"github.com/kelseyhightower/envconfig"
"github.com/vmware/govmomi/vim25/types"
filterpb "go.temporal.io/api/filter/v1"
"go.temporal.io/api/workflow/v1"
"go.temporal.io/api/workflowservice/v1"
sdk "go.temporal.io/sdk/client"
"go.uber.org/zap"
"knative.dev/pkg/logging"
)

const (
maxLastWorkflows = 10 // retrieve up to workflows for cancellation
wfExecutionTimeout = time.Hour * 24
)

type envConfig struct {
// Temporal settings
Address string `envconfig:"TEMPORAL_URL" required:"true"`
Namespace string `envconfig:"TEMPORAL_NAMESPACE" required:"true"`
Queue string `envconfig:"TEMPORAL_TASKQUEUE" required:"true"`

// vsphere settings
Tag string `envconfig:"VSPHERE_PREEMPTION_TAG" required:"true"` // vsphere tag
AlarmName string `envconfig:"VSPHERE_ALARM_NAME" required:"true"` // vsphere alarm name

// Knative settings (injected)
Sink string `envconfig:"K_SINK" required:"false"` // via sinkbinding (optional)
Port int `envconfig:"PORT" required:"true"`

Debug bool `envconfig:"DEBUG" default:"false"`
}

type client struct {
tc sdk.Client

address string // temporal address
namespace string // temporal namespace
queue string // temporal queue
tag string // identifies preemptible vms
alarmName string // identifies alarm to trigger workflow
sink string // K_SINK binding injection
}

func newClient(_ context.Context, logger *zap.Logger) (*client, error) {
var env envConfig
if err := envconfig.Process("", &env); err != nil {
return nil, err
}

if env.Sink == "" {
logger.Info("K_SINK variable not set")
}

tc, err := sdk.NewClient(sdk.Options{
HostPort: env.Address,
Namespace: env.Namespace,
Logger: preemption.NewZapAdapter(logger),
})
if err != nil {
return nil, err
}

c := client{
tc: tc,
address: env.Address,
namespace: env.Namespace,
queue: env.Queue,
tag: env.Tag,
alarmName: env.AlarmName,
sink: env.Sink,
}
return &c, nil
}

func (c *client) handler(ctx context.Context, event ce.Event) error {
logger := logging.FromContext(ctx).With("eventID", event.ID())
logger.Debugw("received event", "event", event.String())

var ae types.AlarmStatusChangedEvent
err := event.DataAs(&ae)
if err != nil {
logger.Warnw("get alarm event data", zap.Error(err))
return ce.NewHTTPResult(http.StatusBadRequest, "event payload must be valid AlarmStatusChangedEvent")
}

if ae.Alarm.Name != c.alarmName {
logger.Debugw("alarm event name does not match, skipping event", "incomingAlarmName", ae.Alarm.Name, "definedAlarmName", c.alarmName)
return nil
}

changedFrom := strings.ToLower(ae.From)
changedTo := strings.ToLower(ae.To)

// check whether this was a valid AlarmStatusChangedEvent and not other
// (inherited) AlarmEvent type
if changedTo == "" || changedFrom == "" {
logger.Warn("event is not of type AlarmStatusChangedEvent")
return ce.NewHTTPResult(http.StatusBadRequest, "event payload must be valid AlarmStatusChangedEvent")
}

criticality := func() preemption.Criticality {
switch changedTo {
case "yellow":
return preemption.CriticalityMedium
case "red":
return preemption.CriticalityHigh
default:
// treat any other (even unset) value as LOW
return preemption.CriticalityLow
}
}

// retrieve in-progress workflows to avoid multiple executions (best effort
// due to concurrent function executions and lack of conditional workflow
// execution)
running, err := c.getRunningWorkflows(ctx)
if err != nil {
// just warn and continue
logger.Warnw("get running workflows", zap.Error(err))
}

if isAlarmRaising(changedFrom, changedTo) {
logger.Infow("alarm level is raising", "from", changedFrom, "to", changedTo)
logger.Infow("triggering preemption")
if err = c.triggerPreemption(ctx, event, criticality()); err != nil {
logger.Errorw("trigger preemption", zap.Error(err))
return ce.NewHTTPResult(http.StatusInternalServerError, "failed to run preemption")
}
return nil
}

logger.Infow("alarm level is decreasing", "from", changedFrom, "to", changedTo)
logger.Infow("canceling any running workflows", "running", len(running))

if err = c.cancelRunningWorkflows(ctx, running); err != nil {
// just log and return successfully (no retry)
logger.Warnw("cancel running workflows", zap.Error(err))
}
return nil
}

func isAlarmRaising(changedFrom string, changedTo string) bool {
return (changedFrom != "red" && changedTo == "yellow") || changedTo == "red"
}

func (c *client) getRunningWorkflows(ctx context.Context) ([]*workflow.WorkflowExecutionInfo, error) {
filter := workflowservice.ListOpenWorkflowExecutionsRequest_TypeFilter{
TypeFilter: &filterpb.WorkflowTypeFilter{
Name: preemption.WorkflowName,
},
}
wfs, err := c.tc.ListOpenWorkflow(ctx, &workflowservice.ListOpenWorkflowExecutionsRequest{
Namespace: c.namespace,
MaximumPageSize: maxLastWorkflows,
Filters: &filter,
})
if err != nil {
return nil, err
}

return wfs.Executions, nil
}

func (c *client) triggerPreemption(ctx context.Context, e ce.Event, criticality preemption.Criticality) error {
req := preemption.WorkflowRequest{
Tag: c.tag,
Event: e,
Criticality: criticality,
ReplyTo: c.sink,
}

options := sdk.StartWorkflowOptions{
ID: c.alarmName,
TaskQueue: c.queue,
WorkflowExecutionTimeout: wfExecutionTimeout,
// WorkflowIDReusePolicy:
// enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, // multiple
// executions handled in workflow
WorkflowExecutionErrorWhenAlreadyStarted: false,
}

// fire and forget
logging.FromContext(ctx).Infow(
"executing workflow",
zap.String("workflow", preemption.WorkflowName),
zap.String("eventID", e.ID()),
zap.String("queue", c.queue),
zap.String("tag", c.tag),
zap.String("criticality", string(criticality)),
zap.String("sink", c.sink),
zap.String("alarmName", c.alarmName),
zap.String("event", e.String()),
)

// alarm name is used as the workflow name and a new workflow is started
// unless it is already running
if _, err := c.tc.SignalWithStartWorkflow(ctx, c.alarmName, preemption.SignalChannel, req, options, preemption.WorkflowName); err != nil {
return err
}
return nil
}

func (c *client) cancelRunningWorkflows(ctx context.Context, running []*workflow.WorkflowExecutionInfo) error {
var cancelErrs error
for _, wf := range running {
id := wf.Execution.WorkflowId
runID := wf.Execution.RunId
logging.FromContext(ctx).Debugw("canceling workflow", zap.String("ID", id), zap.String("runID", runID))
if err := c.tc.CancelWorkflow(ctx, id, runID); err != nil {
cancelErrs = errs.Append(cancelErrs, err)
}
}

return cancelErrs
}

func (c *client) close() {
c.tc.Close()
}
Loading

0 comments on commit c85abea

Please sign in to comment.