Skip to content

Commit

Permalink
Extract Traffic Segment struct.
Browse files Browse the repository at this point in the history
Signed-off-by: Rodrigo Reis <rodrigo.gargravarr@gmail.com>
  • Loading branch information
gargravarr committed Sep 12, 2023
1 parent fd397b5 commit d647def
Show file tree
Hide file tree
Showing 6 changed files with 669 additions and 51 deletions.
40 changes: 28 additions & 12 deletions controller/stackset.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,13 +272,19 @@ func (c *StackSetController) collectResources(ctx context.Context) (map[types.UI
return stacksets, nil
}

func (c *StackSetController) collectIngresses(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
ingresses, err := c.client.NetworkingV1().Ingresses(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
func (c *StackSetController) collectIngresses(
ctx context.Context,
stacksets map[types.UID]*core.StackSetContainer,
) error {
ingresses, err := c.client.NetworkingV1().Ingresses(v1.NamespaceAll).List(
ctx,
metav1.ListOptions{},
)

if err != nil {
return fmt.Errorf("failed to list Ingresses: %v", err)
}

Items:
for _, i := range ingresses.Items {
ingress := i
if uid, ok := getOwnerUID(ingress.ObjectMeta); ok {
Expand All @@ -291,28 +297,36 @@ Items:
// stack ingress
for _, stackset := range stacksets {
if s, ok := stackset.StackContainers[uid]; ok {
if strings.HasSuffix(ingress.ObjectMeta.Name, core.SegmentSuffix) {
if strings.HasSuffix(
ingress.ObjectMeta.Name,
core.SegmentSuffix,
) {
// Traffic Segment
s.Resources.IngressSegment = &ingress
} else {
s.Resources.Ingress = &ingress
}
continue Items
break
}
}
}
}
return nil
}

func (c *StackSetController) collectRouteGroups(ctx context.Context, stacksets map[types.UID]*core.StackSetContainer) error {
routegroups, err := c.client.RouteGroupV1().RouteGroups(v1.NamespaceAll).List(ctx, metav1.ListOptions{})
func (c *StackSetController) collectRouteGroups(
ctx context.Context,
stacksets map[types.UID]*core.StackSetContainer,
) error {
rgs, err := c.client.RouteGroupV1().RouteGroups(v1.NamespaceAll).List(
ctx,
metav1.ListOptions{},
)
if err != nil {
return fmt.Errorf("failed to list RouteGroups: %v", err)
}

Items:
for _, rg := range routegroups.Items {
for _, rg := range rgs.Items {
routegroup := rg
if uid, ok := getOwnerUID(routegroup.ObjectMeta); ok {
// stackset routegroups
Expand All @@ -324,14 +338,16 @@ Items:
// stack routegroups
for _, stackset := range stacksets {
if s, ok := stackset.StackContainers[uid]; ok {
if strings.HasSuffix(routegroup.ObjectMeta.Name, core.SegmentSuffix) {
if strings.HasSuffix(
routegroup.ObjectMeta.Name,
core.SegmentSuffix,
) {
// Traffic Segment
s.Resources.RouteGroupSegment = &routegroup
} else {
s.Resources.RouteGroup = &routegroup
}
s.Resources.RouteGroup = &routegroup
continue Items
break
}
}
}
Expand Down
48 changes: 48 additions & 0 deletions controller/stackset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,54 @@ func TestCollectResources(t *testing.T) {
},
},
},
{
name: "controller collects Ingress segment",
stacksets: []zv1.StackSet{testStacksetA},
stacks: []zv1.Stack{testStackA1},
ingresses: []networking.Ingress{
{ObjectMeta: segmentStackOwned(testStackA1)},
},
expected: map[types.UID]*core.StackSetContainer{
testStacksetA.UID : {
StackSet: &testStacksetA,
StackContainers: map[types.UID]*core.StackContainer{
testStackA1.UID: {
Stack: &testStackA1,
Resources: core.StackResources{
IngressSegment: &networking.Ingress{
ObjectMeta: segmentStackOwned(testStackA1),
},
},
},
},
TrafficReconciler: &core.SimpleTrafficReconciler{},
},
},
},
{
name: "controller collects RouteGroup segment",
stacksets: []zv1.StackSet{testStacksetA},
stacks: []zv1.Stack{testStackA1},
routegroups: []rgv1.RouteGroup{
{ObjectMeta: segmentStackOwned(testStackA1)},
},
expected: map[types.UID]*core.StackSetContainer{
testStacksetA.UID : {
StackSet: &testStacksetA,
StackContainers: map[types.UID]*core.StackContainer{
testStackA1.UID: {
Stack: &testStackA1,
Resources: core.StackResources{
RouteGroupSegment: &rgv1.RouteGroup{
ObjectMeta: segmentStackOwned(testStackA1),
},
},
},
},
TrafficReconciler: &core.SimpleTrafficReconciler{},
},
},
},
{
name: "all resources are collected",
stacksets: []zv1.StackSet{testStacksetA, testStacksetB},
Expand Down
8 changes: 8 additions & 0 deletions controller/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
ssfake "github.com/zalando-incubator/stackset-controller/pkg/client/clientset/versioned/fake"
zi "github.com/zalando-incubator/stackset-controller/pkg/client/clientset/versioned/typed/zalando.org/v1"
ssunified "github.com/zalando-incubator/stackset-controller/pkg/clientset"
"github.com/zalando-incubator/stackset-controller/pkg/core"
apps "k8s.io/api/apps/v1"
autoscaling "k8s.io/api/autoscaling/v2"
v1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -177,6 +178,13 @@ func testStack(name, namespace string, uid types.UID, ownerStack zv1.StackSet) z
}
}

func segmentStackOwned(owner zv1.Stack) metav1.ObjectMeta{
meta := stackOwned(owner)
meta.Name += core.SegmentSuffix

return meta
}

func stackOwned(owner zv1.Stack) metav1.ObjectMeta {
return metav1.ObjectMeta{
Name: owner.Name,
Expand Down
138 changes: 135 additions & 3 deletions pkg/core/traffic.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,146 @@
package core

import (
"errors"
"fmt"
"math"
"regexp"
"sort"
"strconv"
"time"

rgv1 "github.com/szuecs/routegroup-client/apis/zalando.org/v1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/types"
)

type (
TrafficReconciler interface {
// Handle the traffic switching and/or scaling logic.
Reconcile(
stacks map[string]*StackContainer,
currentTimestamp time.Time,
) error
}

// TrafficSegment holds segment information for a stack, specified by its UID.
TrafficSegment struct {
id types.UID
lowerLimit float64
upperLimit float64
IngressSegment *networking.Ingress
RouteGroupSegment *rgv1.RouteGroup
}

// segmentList holds a sortable set of TrafficSegments.
segmentList []TrafficSegment
)

type TrafficReconciler interface {
// Handle the traffic switching and/or scaling logic.
Reconcile(stacks map[string]*StackContainer, currentTimestamp time.Time) error
var (
segmentRe = regexp.MustCompile(
`TrafficSegment\((?P<Low>.*?), (?P<High>.*?)\)`,
)
)

// NewTrafficSegment returns a new TrafficSegment based on the specified stack
// container.
func NewTrafficSegment(id types.UID, sc *StackContainer) (
*TrafficSegment,
error,
) {
res := &TrafficSegment{id: id}

if sc.Resources.IngressSegment != nil {
res.IngressSegment = sc.Resources.IngressSegment
predicates := res.IngressSegment.Annotations[IngressPredicateKey]
lowerLimit, upperLimit, err := getSegmentParams(predicates)
if err != nil {
return nil, err
}
res.lowerLimit, res.upperLimit = lowerLimit, upperLimit
}

if sc.Resources.RouteGroupSegment != nil {
res.RouteGroupSegment = sc.Resources.RouteGroupSegment

lowerLimit, upperLimit, err := getSegmentParams(
res.RouteGroupSegment.Spec.Routes[0].Predicates...,
)
if err != nil {
return nil, err
}

if res.IngressSegment != nil &&
(res.lowerLimit != lowerLimit || res.upperLimit != upperLimit) {

return nil, errors.New(
"mismatch in routegroup and ingress segment values",
)
}
res.lowerLimit, res.upperLimit = lowerLimit, upperLimit
}

return res, nil
}

// size returns the corresponding weight of the segment, in a decimal fraction.
func (t *TrafficSegment) weight() float64 {
return t.upperLimit - t.lowerLimit
}

// Len returns the slice length, as required by the sort Interface.
func (l segmentList) Len() int {
return len(l)
}

// Less reports wheter segment i comes before segment j
func (l segmentList) Less(i, j int) bool {
switch {
case l[i].lowerLimit != l[j].lowerLimit:
return l[i].lowerLimit < l[j].lowerLimit
case l[i].upperLimit != l[j].upperLimit:
return l[i].upperLimit < l[j].upperLimit
default:
return false
}
}

// Swap swaps the segments with indexes i and j
func (l segmentList) Swap(i, j int) {
l[i], l[j] = l[j], l[i]
}

// getSegmentLimits returns the lower and upper limit of the TrafficSegment
// predicate.
//
// Returns an error if it fails to parse.
func getSegmentParams(predicates ...string) (float64, float64, error) {
for _, p := range predicates {
segmentParams := segmentRe.FindStringSubmatch(p)
if len(segmentParams) != 3 {
continue
}

lowerLimit, err := strconv.ParseFloat(segmentParams[1], 64)
if err != nil || lowerLimit < 0.0 {
return -1.0, -1.0, fmt.Errorf(
"error parsing TrafficSegment %q",
p,
)
}

upperLimit, err := strconv.ParseFloat(segmentParams[2], 64)
if err != nil || upperLimit < lowerLimit {
return -1.0, -1.0, fmt.Errorf(
"error parsing TrafficSegment %q",
p,
)
}

return lowerLimit, upperLimit, nil
}

return -1.0, -1.0, fmt.Errorf("TrafficSegment not found")
}

// allZero returns true if all weights defined in the map are 0.
Expand Down
Loading

0 comments on commit d647def

Please sign in to comment.