Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orchestrator implementation #53

Merged
merged 32 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
124d83e
Return Resource on cache Fetch
jessicayuen Apr 8, 2020
8c07603
Orchestrator implementation
jessicayuen Apr 8, 2020
ccf3a75
Merge branch 'master' into orchestrator
jessicayuen Apr 8, 2020
8fbde42
Pass by reference for cache watch, export MarshalResources
jessicayuen Apr 13, 2020
ee54b65
Add downstream, upstream response map wrappers
jessicayuen Apr 13, 2020
3b505db
Changes and fixes to orchestrator workflow
jessicayuen Apr 13, 2020
071e8e5
Add unit tests
jessicayuen Apr 13, 2020
0e853a8
Add missing testdata
jessicayuen Apr 13, 2020
6234b8a
Link TODO with issues
jessicayuen Apr 13, 2020
fad0c13
Fanout response to watches in parallel
jessicayuen Apr 13, 2020
7723d2c
Send response only if req/resp versions differ
jessicayuen Apr 14, 2020
78ecf04
Remove sleep
jessicayuen Apr 15, 2020
2f15c34
Merge master
jessicayuen Apr 15, 2020
cc6284e
Merge master (#2)
jyotimahapatra Apr 21, 2020
24d87a6
DeleteRequest API (#59)
Apr 23, 2020
aefa63b
grpc graceful shutdown (#64)
jyotimahapatra Apr 24, 2020
3672040
Fix all the things
jessicayuen Apr 24, 2020
4c05a5a
Merge branch 'master' into orchestrator
jessicayuen Apr 24, 2020
f302e8c
lint
jessicayuen Apr 24, 2020
60f7a22
Fix cache test
jessicayuen Apr 24, 2020
97c91e7
Add TODOs, feedback
jessicayuen Apr 25, 2020
aeef7ca
Use sync.Map
jessicayuen Apr 29, 2020
5315967
s/shutdown/shutdownUpstream
jessicayuen Apr 30, 2020
d3d1e3b
add timeout
jessicayuen Apr 30, 2020
6bcaf46
add wg
jessicayuen Apr 30, 2020
30246e7
Reduce timeout, don't close downstream channels
jessicayuen May 1, 2020
ecb0a28
fanout with default, buffered downstream chans
jessicayuen May 4, 2020
144d156
Remove timeout
jessicayuen May 4, 2020
a36fc60
Document upstream/downstream files
jessicayuen May 5, 2020
d08ddcc
Heavily document watchUpstreams, s/watches/watchers
jessicayuen May 5, 2020
4e4e0cc
assert sync map keys
jessicayuen May 5, 2020
57ff78e
comment on default
jessicayuen May 5, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions internal/app/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (c *cache) Fetch(key string) (*Resource, error) {
func (c *cache) SetResponse(key string, resp v2.DiscoveryResponse) (map[*v2.DiscoveryRequest]bool, error) {
c.cacheMu.Lock()
defer c.cacheMu.Unlock()
marshaledResources, err := marshalResources(resp.Resources)
marshaledResources, err := MarshalResources(resp.Resources)
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("failed to marshal resources for key: %s, err %v", key, err)
}
Expand All @@ -126,6 +126,7 @@ func (c *cache) SetResponse(key string, resp v2.DiscoveryResponse) (map[*v2.Disc
resource := Resource{
Resp: response,
ExpirationTime: c.getExpirationTime(time.Now()),
Requests: make(map[*v2.DiscoveryRequest]bool),
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
}
c.cache.Add(key, resource)
return nil, nil
Expand Down Expand Up @@ -193,7 +194,9 @@ func (c *cache) getExpirationTime(currentTime time.Time) time.Time {
return time.Time{}
}

func marshalResources(resources []*any.Any) ([]gcp_types.MarshaledResource, error) {
// MarshalResource converts the raw xDS discovery resources into a serialized
// form accepted by go-control-plane.
func MarshalResources(resources []*any.Any) ([]gcp_types.MarshaledResource, error) {
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved
var marshaledResources []gcp_types.MarshaledResource
for _, resource := range resources {
marshaledResource, err := gcp.MarshalResource(resource)
Expand Down
3 changes: 2 additions & 1 deletion internal/app/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ var testResponse = Response{
}

var testResource = Resource{
Resp: &testResponse,
Resp: &testResponse,
Requests: make(map[*v2.DiscoveryRequest]bool),
}

func TestAddRequestAndFetch(t *testing.T) {
Expand Down
80 changes: 80 additions & 0 deletions internal/app/orchestrator/downstream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Package orchestrator is responsible for instrumenting inbound xDS client
// requests to the correct aggregated key, forwarding a representative request
// to the upstream origin server, and managing the lifecycle of downstream and
// upstream connections and associates streams. It implements
// go-control-plane's Cache interface in order to receive xDS-based requests,
// send responses, and handle gRPC streams.
//
// This file manages the bookkeeping of downstream clients by tracking inbound
// requests to their corresponding response channels. The contents of this file
// are intended to only be used within the orchestrator module and should not
// be exported.
package orchestrator
jessicayuen marked this conversation as resolved.
Show resolved Hide resolved

import (
"sync"

gcp "github.com/envoyproxy/go-control-plane/pkg/cache/v2"
)

// downstreamResponseMap is a map of downstream xDS client requests to response
// channels.
type downstreamResponseMap struct {
mu sync.RWMutex
responseChannels map[*gcp.Request]chan gcp.Response
}

func newDownstreamResponseMap() downstreamResponseMap {
return downstreamResponseMap{
responseChannels: make(map[*gcp.Request]chan gcp.Response),
}
}

// createChannel initializes a new channel for a request if it doesn't already
// exist.
func (d *downstreamResponseMap) createChannel(req *gcp.Request) chan gcp.Response {
d.mu.Lock()
defer d.mu.Unlock()
if _, ok := d.responseChannels[req]; !ok {
d.responseChannels[req] = make(chan gcp.Response, 1)
}
return d.responseChannels[req]
}

// get retrieves the channel where responses are set for the specified request.
func (d *downstreamResponseMap) get(req *gcp.Request) (chan gcp.Response, bool) {
d.mu.RLock()
defer d.mu.RUnlock()
channel, ok := d.responseChannels[req]
return channel, ok
}

// delete removes the response channel and request entry from the map.
// Note: We don't close the response channel prior to deletion because there
// can be separate go routines that are still attempting to write to the
// channel. We rely on garbage collection to clean up and close outstanding
// response channels once the go routines finish writing to them.
func (d *downstreamResponseMap) delete(req *gcp.Request) chan gcp.Response {
d.mu.Lock()
defer d.mu.Unlock()
if channel, ok := d.responseChannels[req]; ok {
delete(d.responseChannels, req)
return channel
}
return nil
}

// deleteAll removes all response channels and request entries from the map.
// Note: We don't close the response channel prior to deletion because there
// can be separate go routines that are still attempting to write to the
// channel. We rely on garbage collection to clean up and close outstanding
// response channels once the go routines finish writing to them.
func (d *downstreamResponseMap) deleteAll(watchers map[*gcp.Request]bool) {
d.mu.Lock()
defer d.mu.Unlock()
for watch := range watchers {
if d.responseChannels[watch] != nil {
delete(d.responseChannels, watch)
}
}
}
Loading