Skip to content

Commit

Permalink
ordering-struct-for-watches (#474)
Browse files Browse the repository at this point in the history
* adding in comments for review

* go1.16 update with Priority List

* added in CacheSettings and ClearSnapshot() functionality

* add changelog

* fix

* updated with current utils

* moved types

* other files with types

* updated with comments and uing64 as index

* Changelog is updated to minor

* Update pkg/api/v1/control-plane/cache/status.go

Co-authored-by: Nathan Fudenberg <nathan.fudenberg@solo.io>

* Update pkg/api/v1/control-plane/cache/prioritylist.go

Co-authored-by: Nathan Fudenberg <nathan.fudenberg@solo.io>

* Update pkg/api/v1/control-plane/cache/prioritylist.go

Co-authored-by: Nathan Fudenberg <nathan.fudenberg@solo.io>

* Update pkg/api/v1/control-plane/cache/prioritylist.go

Co-authored-by: Nathan Fudenberg <nathan.fudenberg@solo.io>

* Update pkg/api/v1/control-plane/cache/simple.go

Co-authored-by: Nathan Fudenberg <nathan.fudenberg@solo.io>

* spelling

* breaking change

Co-authored-by: Nathan Fudenberg <nathan.fudenberg@solo.io>
  • Loading branch information
jackstine and nfuden authored Jun 14, 2022
1 parent 97bd7c2 commit 45e2af3
Show file tree
Hide file tree
Showing 12 changed files with 394 additions and 145 deletions.
9 changes: 9 additions & 0 deletions changelog/v0.28.0/ordering-responses-to-envoy.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
changelog:
- type: BREAKING_CHANGE
issueLink: https://github.com/solo-io/gloo/issues/6337
resolvesIssue: false
description: |
Add ordering to Response Watches so that the order sent to Envoy is in sync.
This would cause an issue in Envoy where the ClusterAssignmentLoad would update before
Clusters would. This caused the Clusters to stay in a warming state until Envoy timed
out, deleting the Endpoints on the cluster.
9 changes: 5 additions & 4 deletions pkg/api/v1/control-plane/cache/cache_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/golang/protobuf/ptypes"
"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/cache"
"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/resource"
"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/types"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -152,13 +153,13 @@ func (s TestSnapshot) MakeConsistent() {

func (s TestSnapshot) GetResources(typ string) cache.Resources {
switch typ {
case resource.EndpointTypeV3:
case types.EndpointTypeV3:
return s.Endpoints
case resource.ClusterTypeV3:
case types.ClusterTypeV3:
return s.Clusters
case resource.RouteTypeV3:
case types.RouteTypeV3:
return s.Routes
case resource.ListenerTypeV3:
case types.ListenerTypeV3:
return s.Listeners
}
return cache.Resources{}
Expand Down
48 changes: 28 additions & 20 deletions pkg/api/v1/control-plane/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
. "github.com/onsi/gomega"
"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/cache"
"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/resource"
"github.com/solo-io/solo-kit/pkg/api/v1/control-plane/types"
)

// TestIDHash uses ID field as the node hash.
Expand All @@ -30,7 +31,7 @@ var _ = Describe("Control Plane Cache", func() {

It("returns sane values for NewStatusInfo", func() {
node := &envoy_config_core_v3.Node{Id: "test"}
info := cache.NewStatusInfo(node)
info := cache.NewStatusInfo(node, cache.DefaultPrioritySet)

Expect(info.GetNode()).To(Equal(node))

Expand All @@ -40,7 +41,11 @@ var _ = Describe("Control Plane Cache", func() {
})

It("returns sane values for GetStatusKeys", func() {
c := cache.NewSnapshotCache(false, TestIDHash{}, nil)
settings := cache.CacheSettings{
Ads: false,
Hash: TestIDHash{},
}
c := cache.NewSnapshotCache(settings)

keys := c.GetStatusKeys()
Expect(len(keys)).To(Equal(0))
Expand All @@ -58,20 +63,23 @@ var _ = Describe("Control Plane Cache", func() {

It("Setting snapshot correctly updates the version", func() {
names := map[string][]string{
resource.EndpointTypeV3: {clusterName},
resource.ClusterTypeV3: nil,
resource.RouteTypeV3: {routeName},
resource.ListenerTypeV3: nil,
types.EndpointTypeV3: {clusterName},
types.ClusterTypeV3: nil,
types.RouteTypeV3: {routeName},
types.ListenerTypeV3: nil,
}

testTypes := []string{
resource.EndpointTypeV3,
resource.ClusterTypeV3,
resource.RouteTypeV3,
resource.ListenerTypeV3,
types.EndpointTypeV3,
types.ClusterTypeV3,
types.RouteTypeV3,
types.ListenerTypeV3,
}

c := cache.NewSnapshotCache(true, TestIDHash{}, nil)
settings := cache.CacheSettings{
Ads: false,
Hash: TestIDHash{},
}
c := cache.NewSnapshotCache(settings)
key := "test"

_, err := c.GetSnapshot(key)
Expand Down Expand Up @@ -108,11 +116,11 @@ var _ = Describe("Control Plane Cache", func() {
snap, err := c.GetSnapshot(key)
Expect(err).ToNot(HaveOccurred())
// check versions for resources
Expect(snap.GetResources(resource.ListenerTypeV3).Version).To(Equal(version))
Expect(snap.GetResources(resource.ClusterTypeV3).Version).To(Equal(version))
Expect(snap.GetResources(resource.RouteTypeV3).Version).To(Equal(version))
Expect(snap.GetResources(types.ListenerTypeV3).Version).To(Equal(version))
Expect(snap.GetResources(types.ClusterTypeV3).Version).To(Equal(version))
Expect(snap.GetResources(types.RouteTypeV3).Version).To(Equal(version))
// endpoint resource was not set in snapshot
Expect(snap.GetResources(resource.EndpointTypeV3).Version).To(Equal(""))
Expect(snap.GetResources(types.EndpointTypeV3).Version).To(Equal(""))

newName := "test2"
snapshot2 := &TestSnapshot{
Expand All @@ -131,11 +139,11 @@ var _ = Describe("Control Plane Cache", func() {
snap2, err := c.GetSnapshot(key)
Expect(err).ToNot(HaveOccurred())
// update to version y
Expect(snap2.GetResources(resource.EndpointTypeV3).Version).To(Equal(version2))
Expect(snap2.GetResources(resource.ClusterTypeV3).Version).To(Equal(version2))
Expect(snap2.GetResources(types.EndpointTypeV3).Version).To(Equal(version2))
Expect(snap2.GetResources(types.ClusterTypeV3).Version).To(Equal(version2))
// the cache will reset to empty version for missing resources
Expect(snap2.GetResources(resource.ListenerTypeV3).Version).To(Equal(""))
Expect(snap2.GetResources(resource.RouteTypeV3).Version).To(Equal(""))
Expect(snap2.GetResources(types.ListenerTypeV3).Version).To(Equal(""))
Expect(snap2.GetResources(types.RouteTypeV3).Version).To(Equal(""))
})

})
178 changes: 178 additions & 0 deletions pkg/api/v1/control-plane/cache/prioritylist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package cache

import (
"fmt"
"sort"
)

// The default Priority is the last index. The last index is not required in the priority list, thus any elements added
// to the struct will be added to the default priority (last).
const defaultPriority = -1

type PriorityValue interface {
GetPriority() string
}

// PrioritySortedStruct inserts elements into a map indexed in a list by prioirity. Priority lists are unordered.
// This allows O(1) inserts, gets, and deletes of these elements as long as the client has the index of the element.
// Indexes are returned on Add() function, when elements are added to the collection.
//
// Elements interface the PriorityValue interface, this allows the collection to call the element to get it's priority.
// If a priority does not exist for an added element, it is added to the lowest priority.
//
// The below 1, 2, 4 go first, 10 second, and 17 third. Anything else is processed last
// {
// 0: {item1, item2, item4}
// 1: {item10}
// 2: {item17}
// }
type PrioritySortedStruct struct {
// priorityMap maps the value to the priority.
priorityMap map[string]int
// elements are the map of elements structured by their priority.
//
// This is a list of maps with the key being an int. Because we do not care about order, and want to maintain
// O(1) delete, get, and insert we maintain the elements in maps with an index that gets incremented on every insert.
// This way clients can get and insert the data with ids generated by the collection itself.
elements []map[uint64]ResponseWatch
// nextUniqueElementIndex is the next element index to be inserted.
//
// We have a list of maps we need to ensure that when adding to the struct that the
// elements do not replace an element previously inserted. To maintain unique indexes within
// the map we have to keep a running index of all the number of elements inserted into the
// collection.
nextUniqueElementIndex uint64
}

// PriorityIndex is the priority and index used to locate items.
type PriorityIndex struct {
Priority int
Index uint64
}

// NewPrioritySortedStruct creates a new Priority Sorted Struct.
// prioritySets is the set lists for priorities, where P is the type used for priority.
func NewPrioritySortedStruct(prioritySets map[int][]string) *PrioritySortedStruct {
// need to ensure that the prioriries are in order and there are no missing or skipped Priorities
priorities := make([]int, 0)
for priority := range prioritySets {
priorities = append(priorities, priority)
}
sort.Ints(priorities)
currentP := 0
for _, p := range priorities {
if currentP == p {
currentP++
} else {
panic(fmt.Sprintf("Priorities are not set correct, you are missing priority %d", currentP))
}
}
// +1 for last priority list
elements := make([]map[uint64]ResponseWatch, len(prioritySets)+1)
if len(prioritySets) > 0 {
for priorityIndex := range prioritySets {
elements[priorityIndex] = make(map[uint64]ResponseWatch)
}
elements[len(prioritySets)] = make(map[uint64]ResponseWatch)
} else {
// there is only one map of elements
elements[0] = make(map[uint64]ResponseWatch)
}
priorityMap := make(map[string]int)
for index, pl := range prioritySets {
for _, v := range pl {
priorityMap[v] = index
}
}
p := PrioritySortedStruct{
elements: elements,
priorityMap: priorityMap,
}
return &p
}

// Get returns the element at the index, and if it exists.
func (p *PrioritySortedStruct) Get(pi PriorityIndex) (ResponseWatch, bool) {
v, ok := p.elements[pi.Priority][pi.Index]
return v, ok
}

// Process will call the procesFunc over all the elements by priority.
func (p *PrioritySortedStruct) Process(processFunc func(el ResponseWatch, pi PriorityIndex)) {
for i := 0; i < len(p.elements); i++ {
m := p.elements[i]
for index, v := range m {
processFunc(v, PriorityIndex{Priority: i, Index: index})
}
}
}

// GetPriorityList returns an ordered list of the elements by priority.
func (p *PrioritySortedStruct) GetPriorityList() []ResponseWatch {
elements := make([]ResponseWatch, 0, p.Len())
for priority := 0; priority < len(p.elements); priority++ {
mapOfElements := p.elements[priority]
for _, el := range mapOfElements {
elements = append(elements, el)
}
}
return elements
}

// Add will add the element to the Priority Collection, returns the priority, and element number.
func (p *PrioritySortedStruct) Add(element ResponseWatch) PriorityIndex {
priority := p.getPriorityOfElement(element)
if priority == defaultPriority {
// add to the last index of the watches
priority = len(p.elements) - 1
}
p.elements[priority][p.nextUniqueElementIndex] = element
pi := PriorityIndex{Priority: priority, Index: p.nextUniqueElementIndex}
p.nextUniqueElementIndex++
return pi
}

// Delete will delete the element, returns true if it deleted.
func (p *PrioritySortedStruct) Delete(pi PriorityIndex) bool {
if p.Len() == 0 {
return false
}
if _, ok := p.Get(pi); ok {
delete(p.elements[pi.Priority], pi.Index)
return ok
} else {
return false
}
}

// Len will return the number of elements
func (p *PrioritySortedStruct) Len() int {
count := 0
for _, el := range p.elements {
count += len(el)
}
return count
}

// GetPriorityIndexes returns a list of all the indexes for all elements by priority.
func (p *PrioritySortedStruct) GetPriorityIndexes() []PriorityIndex {
pi := make([]PriorityIndex, 0, p.Len())
for i := 0; i < len(p.elements); i++ {
m := p.elements[i]
for index := range m {
pi = append(pi, PriorityIndex{Priority: i, Index: index})
}
}
return pi
}

// getPriorityOfElement returns the priority of element ResponseWatch.
func (p *PrioritySortedStruct) getPriorityOfElement(element ResponseWatch) int {
pv := element.GetPriority()
if p, exists := p.priorityMap[pv]; exists {
return p
} else {
// default priority is -1
return defaultPriority
}
}
Loading

0 comments on commit 45e2af3

Please sign in to comment.