diff --git a/internal/app/orchestrator/orchestrator.go b/internal/app/orchestrator/orchestrator.go index feca848b..32fed540 100644 --- a/internal/app/orchestrator/orchestrator.go +++ b/internal/app/orchestrator/orchestrator.go @@ -9,6 +9,7 @@ package orchestrator import ( "context" "fmt" + "sync" "time" "github.com/envoyproxy/xds-relay/internal/app/cache" @@ -26,6 +27,10 @@ const ( // TODO (https://github.com/envoyproxy/xds-relay/issues/41) load from configured defaults. cacheMaxEntries = 1000 cacheTTL = 60 * time.Minute + + // unaggregatedPrefix is the prefix used to label discovery requests that + // could not be successfully mapped to an aggregation rule. + unaggregatedPrefix = "unaggregated_" ) // Orchestrator has the following responsibilities: @@ -57,6 +62,14 @@ type orchestrator struct { cache cache.Cache upstreamClient upstream.Client + // Map of downstream xDS client requests to response channels. + downstreamResponseChannels map[*gcp.Request]chan gcp.Response + downstreamResponseChannelsMu sync.Mutex + // Map of aggregate key to the receive-only upstream origin server response + // channels. + upstreamResponseChannels map[string]<-chan *upstream.Response + upstreamResponseChannelseMu sync.Mutex + logger log.Logger } @@ -65,9 +78,11 @@ type orchestrator struct { // orchestrator. func New(ctx context.Context, l log.Logger, mapper mapper.Mapper, upstreamClient upstream.Client) Orchestrator { orchestrator := &orchestrator{ - logger: l.Named(component), - mapper: mapper, - upstreamClient: upstreamClient, + logger: l.Named(component), + mapper: mapper, + upstreamClient: upstreamClient, + downstreamResponseChannels: make(map[*gcp.Request]chan gcp.Response), + upstreamResponseChannels: make(map[string]<-chan *upstream.Response), } cache, err := cache.NewCache(cacheMaxEntries, orchestrator.onCacheEvicted, cacheTTL) @@ -90,16 +105,171 @@ func New(ctx context.Context, l log.Logger, mapper mapper.Mapper, upstreamClient // // Cancel is an optional function to release resources in the producer. If // provided, the consumer may call this function multiple times. -func (c *orchestrator) CreateWatch(req gcp.Request) (chan gcp.Response, func()) { - // TODO implement. - return nil, nil +func (o *orchestrator) CreateWatch(req gcp.Request) (chan gcp.Response, func()) { + ctx := context.Background() + + o.downstreamResponseChannelsMu.Lock() + if o.downstreamResponseChannels[&req] == nil { + // If this is the first time we're seeing the request from the + // downstream client, initialize a channel to feed future responses. + o.downstreamResponseChannels[&req] = make(chan gcp.Response) + } + o.downstreamResponseChannelsMu.Unlock() + + aggregatedKey, err := o.mapper.GetKey(req) + if err != nil { + // Can't map the request to an aggregated key. Log and continue to + // propagate the response upstream without aggregation. + o.logger.With("err", err).With("req", req).Error(ctx, "failed to get aggregated key") + // Mimic the aggregated key. + // TODO (insert issue) This key needs to be made more granular to + // uniquely identify a request. User-specified defaults? + aggregatedKey = fmt.Sprintf("%s%s_%s", unaggregatedPrefix, req.GetNode().GetId(), req.GetTypeUrl()) + } + + // Register the watch for future responses. + err = o.cache.AddRequest(aggregatedKey, req) + if err != nil { + // If we fail to register the watch, we need to kill this stream by + // closing the response channel. + o.logger.With("err", err).With("key", aggregatedKey).With( + "req", req).Error(ctx, "failed to add watch") + close(o.downstreamResponseChannels[&req]) + // Clean up. + o.downstreamResponseChannelsMu.Lock() + delete(o.downstreamResponseChannels, &req) + o.downstreamResponseChannelsMu.Unlock() + } + + // Check if we have a cached response first. + cached, err := o.cache.Fetch(aggregatedKey) + if err != nil { + // Log, and continue to propagate the response upstream. + o.logger.With("err", err).With("key", aggregatedKey).Error(ctx, "failed to fetch aggregated key") + } + + if cached != nil && cached.Resp != nil { + // If we have a cached response, immediately return the result. + o.downstreamResponseChannels[&req] <- gcp.Response{ + Request: req, + Version: cached.Resp.Raw.GetVersionInfo(), + ResourceMarshaled: true, + MarshaledResources: cached.Resp.MarshaledResources, + } + } else { + // Otherwise, check if we have a upstream stream open for this + // aggregated key. If not, open a stream with the representative + // request. + // + // Locking is necessary here so that a simultaneous downstream request + // that maps to the same aggregated key doesn't result in two upstream + // streams. + o.upstreamResponseChannelseMu.Lock() + if o.upstreamResponseChannels[aggregatedKey] == nil { + upstreamResponseChan := o.upstreamClient.OpenStream(ctx, &req) + // Spin up a go routine to watch for upstream responses. + // One routine is opened per aggregate key. + go o.watchUpstream(ctx, aggregatedKey, upstreamResponseChan) + o.upstreamResponseChannels[aggregatedKey] = upstreamResponseChan + } + o.upstreamResponseChannelseMu.Unlock() + } + + return o.downstreamResponseChannels[&req], nil } // Fetch implements the polling method of the config cache using a non-empty request. -func (c *orchestrator) Fetch(context.Context, discovery.DiscoveryRequest) (*gcp.Response, error) { +func (o *orchestrator) Fetch(context.Context, discovery.DiscoveryRequest) (*gcp.Response, error) { return nil, fmt.Errorf("Not implemented") } -func (c *orchestrator) onCacheEvicted(key string, resource cache.Resource) { - // TODO implement. +// watchResponse is intended to be called in a goroutine, to receive incoming +// responses and fan out to downstream clients. +func (o *orchestrator) watchUpstream( + ctx context.Context, + aggregatedKey string, + responseChannel <-chan *upstream.Response, +) { + for { + select { + case x := <-responseChannel: + if x.Err != nil { + // A problem occurred fetching the response upstream, log and + // return the most recent cached response, so that the + // downstream will reissue the discovery request. + o.logger.With("err", x.Err).With("key", aggregatedKey).Error(ctx, "upstream error") + } else { + // Cache the response. + _, err := o.cache.SetResponse(aggregatedKey, x.Response) + if err != nil { + // If we fail to cache the new response, log and return the old one. + o.logger.With("err", err).With("key", aggregatedKey). + With("resp", x.Response).Error(ctx, "Failed to cache the response") + } + } + + // Get downstream watches and fan out. + // We retrieve from cache rather than directly fanning out the + // newly received response because the cache does additional + // resource serialization. + cached, err := o.cache.Fetch(aggregatedKey) + if err != nil { + o.logger.With("err", err).With("key", aggregatedKey).Error(ctx, "cache fetch failed") + // Can't do anything because we don't know who the watches + // are. Drop the response. + } else { + if cached == nil || cached.Resp == nil { + // If cache is empty, there is nothing to fan out. + if x.Err != nil { + // Warn. Benefit of the doubt that this is the first request. + o.logger.With("key", aggregatedKey). + Warn(ctx, "attempted to fan out with no cached response") + } else { + // Error. Sanity check. Shouldn't ever reach this. + o.logger.With("key", aggregatedKey). + Error(ctx, "attempted to fan out with no cached response") + } + } else { + // Goldenpath. + o.fanout(cached.Resp, cached.Requests) + } + } + default: + // Save some processing power. + time.Sleep(1 * time.Second) + } + } +} + +// fanout pushes the response to the response channels of all open downstream +// watches. +func (o *orchestrator) fanout(resp *cache.Response, watchers []*gcp.Request) { + for _, watch := range watchers { + channel := o.downstreamResponseChannels[watch] + if channel != nil { + // Construct the go-control-plane response from the cached response. + gcpResponse := gcp.Response{ + Request: *watch, + Version: resp.Raw.GetVersionInfo(), + ResourceMarshaled: true, + MarshaledResources: resp.MarshaledResources, + } + channel <- gcpResponse + } + } +} + +// onCacheEvicted is called when the cache evicts a response due to TTL or +// other reasons. When this happens, we need to clean up open streams. +func (o *orchestrator) onCacheEvicted(key string, resource cache.Resource) { + o.downstreamResponseChannelsMu.Lock() + defer o.downstreamResponseChannelsMu.Unlock() + for _, watch := range resource.Requests { + if o.downstreamResponseChannels[watch] != nil { + close(o.downstreamResponseChannels[watch]) + delete(o.downstreamResponseChannels, watch) + } + } + // TODO close the upstream receiver channel? Need a way to notify the + // upstream client to do so since these are receive only channels. } diff --git a/internal/app/upstream/client.go b/internal/app/upstream/client.go index ea4533bf..a15e9fec 100644 --- a/internal/app/upstream/client.go +++ b/internal/app/upstream/client.go @@ -25,7 +25,7 @@ type Client interface { // If the timeouts are exhausted, receive fails or a irrecoverable error occurs, the error is sent back to the caller. // It is the caller's responsibility to send a new request from the last known DiscoveryRequest. // Cancellation of the context cleans up all outstanding streams and releases all resources. - OpenStream(context.Context, *v2.DiscoveryRequest, string) chan *Response + OpenStream(context.Context, *v2.DiscoveryRequest) <-chan *Response } type client struct { @@ -44,9 +44,9 @@ type client struct { // Only one of the fields is valid at any time. If the error is set, the response will be ignored. type Response struct { //nolint - response v2.DiscoveryResponse + Response v2.DiscoveryResponse //nolint - err error + Err error } // NewClient creates a grpc connection with an upstream origin server. @@ -60,6 +60,6 @@ func NewClient(ctx context.Context, url string) (Client, error) { return &client{}, nil } -func (m *client) OpenStream(ctx context.Context, request *v2.DiscoveryRequest, typeURL string) chan *Response { +func (m *client) OpenStream(ctx context.Context, request *v2.DiscoveryRequest) <-chan *Response { return nil }