Skip to content

Commit

Permalink
Orchestrator implementation
Browse files Browse the repository at this point in the history
Signed-off-by: Jess Yuen <jyuen@lyft.com>
  • Loading branch information
jessicayuen committed Apr 8, 2020
1 parent 124d83e commit e2b47b1
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 13 deletions.
188 changes: 179 additions & 9 deletions internal/app/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package orchestrator
import (
"context"
"fmt"
"sync"
"time"

"github.com/envoyproxy/xds-relay/internal/app/cache"
Expand All @@ -26,6 +27,10 @@ const (
// TODO (https://github.com/envoyproxy/xds-relay/issues/41) load from configured defaults.
cacheMaxEntries = 1000
cacheTTL = 60 * time.Minute

// unaggregatedPrefix is the prefix used to label discovery requests that
// could not be successfully mapped to an aggregation rule.
unaggregatedPrefix = "unaggregated_"
)

// Orchestrator has the following responsibilities:
Expand Down Expand Up @@ -57,6 +62,14 @@ type orchestrator struct {
cache cache.Cache
upstreamClient upstream.Client

// Map of downstream xDS client requests to response channels.
downstreamResponseChannels map[*gcp.Request]chan gcp.Response
downstreamResponseChannelsMu sync.Mutex
// Map of aggregate key to the receive-only upstream origin server response
// channels.
upstreamResponseChannels map[string]<-chan *upstream.Response
upstreamResponseChannelseMu sync.Mutex

logger log.Logger
}

Expand All @@ -65,9 +78,11 @@ type orchestrator struct {
// orchestrator.
func New(ctx context.Context, l log.Logger, mapper mapper.Mapper, upstreamClient upstream.Client) Orchestrator {
orchestrator := &orchestrator{
logger: l.Named(component),
mapper: mapper,
upstreamClient: upstreamClient,
logger: l.Named(component),
mapper: mapper,
upstreamClient: upstreamClient,
downstreamResponseChannels: make(map[*gcp.Request]chan gcp.Response),
upstreamResponseChannels: make(map[string]<-chan *upstream.Response),
}

cache, err := cache.NewCache(cacheMaxEntries, orchestrator.onCacheEvicted, cacheTTL)
Expand All @@ -90,16 +105,171 @@ func New(ctx context.Context, l log.Logger, mapper mapper.Mapper, upstreamClient
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
func (c *orchestrator) CreateWatch(req gcp.Request) (chan gcp.Response, func()) {
// TODO implement.
return nil, nil
func (o *orchestrator) CreateWatch(req gcp.Request) (chan gcp.Response, func()) {
ctx := context.Background()

o.downstreamResponseChannelsMu.Lock()
if o.downstreamResponseChannels[&req] == nil {
// If this is the first time we're seeing the request from the
// downstream client, initialize a channel to feed future responses.
o.downstreamResponseChannels[&req] = make(chan gcp.Response)
}
o.downstreamResponseChannelsMu.Unlock()

aggregatedKey, err := o.mapper.GetKey(req)
if err != nil {
// Can't map the request to an aggregated key. Log and continue to
// propagate the response upstream without aggregation.
o.logger.With("err", err).With("req", req).Error(ctx, "failed to get aggregated key")
// Mimic the aggregated key.
// TODO (insert issue) This key needs to be made more granular to
// uniquely identify a request. User-specified defaults?
aggregatedKey = fmt.Sprintf("%s%s_%s", unaggregatedPrefix, req.GetNode().GetId(), req.GetTypeUrl())
}

// Register the watch for future responses.
err = o.cache.AddRequest(aggregatedKey, req)
if err != nil {
// If we fail to register the watch, we need to kill this stream by
// closing the response channel.
o.logger.With("err", err).With("key", aggregatedKey).With(
"req", req).Error(ctx, "failed to add watch")
close(o.downstreamResponseChannels[&req])
// Clean up.
o.downstreamResponseChannelsMu.Lock()
delete(o.downstreamResponseChannels, &req)
o.downstreamResponseChannelsMu.Unlock()
}

// Check if we have a cached response first.
cached, err := o.cache.Fetch(aggregatedKey)
if err != nil {
// Log, and continue to propagate the response upstream.
o.logger.With("err", err).With("key", aggregatedKey).Error(ctx, "failed to fetch aggregated key")
}

if cached != nil && cached.Resp != nil {
// If we have a cached response, immediately return the result.
o.downstreamResponseChannels[&req] <- gcp.Response{
Request: req,
Version: cached.Resp.Raw.GetVersionInfo(),
ResourceMarshaled: true,
MarshaledResources: cached.Resp.MarshaledResources,
}
} else {
// Otherwise, check if we have a upstream stream open for this
// aggregated key. If not, open a stream with the representative
// request.
//
// Locking is necessary here so that a simultaneous downstream request
// that maps to the same aggregated key doesn't result in two upstream
// streams.
o.upstreamResponseChannelseMu.Lock()
if o.upstreamResponseChannels[aggregatedKey] == nil {
upstreamResponseChan := o.upstreamClient.OpenStream(ctx, &req)
// Spin up a go routine to watch for upstream responses.
// One routine is opened per aggregate key.
go o.watchUpstream(ctx, aggregatedKey, upstreamResponseChan)
o.upstreamResponseChannels[aggregatedKey] = upstreamResponseChan
}
o.upstreamResponseChannelseMu.Unlock()
}

return o.downstreamResponseChannels[&req], nil
}

// Fetch implements the polling method of the config cache using a non-empty request.
func (c *orchestrator) Fetch(context.Context, discovery.DiscoveryRequest) (*gcp.Response, error) {
func (o *orchestrator) Fetch(context.Context, discovery.DiscoveryRequest) (*gcp.Response, error) {
return nil, fmt.Errorf("Not implemented")
}

func (c *orchestrator) onCacheEvicted(key string, resource cache.Resource) {
// TODO implement.
// watchResponse is intended to be called in a goroutine, to receive incoming
// responses and fan out to downstream clients.
func (o *orchestrator) watchUpstream(
ctx context.Context,
aggregatedKey string,
responseChannel <-chan *upstream.Response,
) {
for {
select {
case x := <-responseChannel:
if x.Err != nil {
// A problem occurred fetching the response upstream, log and
// return the most recent cached response, so that the
// downstream will reissue the discovery request.
o.logger.With("err", x.Err).With("key", aggregatedKey).Error(ctx, "upstream error")
} else {
// Cache the response.
_, err := o.cache.SetResponse(aggregatedKey, x.Response)
if err != nil {
// If we fail to cache the new response, log and return the old one.
o.logger.With("err", err).With("key", aggregatedKey).
With("resp", x.Response).Error(ctx, "Failed to cache the response")
}
}

// Get downstream watches and fan out.
// We retrieve from cache rather than directly fanning out the
// newly received response because the cache does additional
// resource serialization.
cached, err := o.cache.Fetch(aggregatedKey)
if err != nil {
o.logger.With("err", err).With("key", aggregatedKey).Error(ctx, "cache fetch failed")
// Can't do anything because we don't know who the watches
// are. Drop the response.
} else {
if cached == nil || cached.Resp == nil {
// If cache is empty, there is nothing to fan out.
if x.Err != nil {
// Warn. Benefit of the doubt that this is the first request.
o.logger.With("key", aggregatedKey).
Warn(ctx, "attempted to fan out with no cached response")
} else {
// Error. Sanity check. Shouldn't ever reach this.
o.logger.With("key", aggregatedKey).
Error(ctx, "attempted to fan out with no cached response")
}
} else {
// Goldenpath.
o.fanout(cached.Resp, cached.Requests)
}
}
default:
// Save some processing power.
time.Sleep(1 * time.Second)
}
}
}

// fanout pushes the response to the response channels of all open downstream
// watches.
func (o *orchestrator) fanout(resp *cache.Response, watchers []*gcp.Request) {
for _, watch := range watchers {
channel := o.downstreamResponseChannels[watch]
if channel != nil {
// Construct the go-control-plane response from the cached response.
gcpResponse := gcp.Response{
Request: *watch,
Version: resp.Raw.GetVersionInfo(),
ResourceMarshaled: true,
MarshaledResources: resp.MarshaledResources,
}
channel <- gcpResponse
}
}
}

// onCacheEvicted is called when the cache evicts a response due to TTL or
// other reasons. When this happens, we need to clean up open streams.
func (o *orchestrator) onCacheEvicted(key string, resource cache.Resource) {
o.downstreamResponseChannelsMu.Lock()
defer o.downstreamResponseChannelsMu.Unlock()
for _, watch := range resource.Requests {
if o.downstreamResponseChannels[watch] != nil {
close(o.downstreamResponseChannels[watch])
delete(o.downstreamResponseChannels, watch)
}
}
// TODO close the upstream receiver channel? Need a way to notify the
// upstream client to do so since these are receive only channels.
}
8 changes: 4 additions & 4 deletions internal/app/upstream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type Client interface {
// If the timeouts are exhausted, receive fails or a irrecoverable error occurs, the error is sent back to the caller.
// It is the caller's responsibility to send a new request from the last known DiscoveryRequest.
// Cancellation of the context cleans up all outstanding streams and releases all resources.
OpenStream(context.Context, *v2.DiscoveryRequest, string) chan *Response
OpenStream(context.Context, *v2.DiscoveryRequest) <-chan *Response
}

type client struct {
Expand All @@ -44,9 +44,9 @@ type client struct {
// Only one of the fields is valid at any time. If the error is set, the response will be ignored.
type Response struct {
//nolint
response v2.DiscoveryResponse
Response v2.DiscoveryResponse
//nolint
err error
Err error
}

// NewClient creates a grpc connection with an upstream origin server.
Expand All @@ -60,6 +60,6 @@ func NewClient(ctx context.Context, url string) (Client, error) {
return &client{}, nil
}

func (m *client) OpenStream(ctx context.Context, request *v2.DiscoveryRequest, typeURL string) chan *Response {
func (m *client) OpenStream(ctx context.Context, request *v2.DiscoveryRequest) <-chan *Response {
return nil
}

0 comments on commit e2b47b1

Please sign in to comment.