Skip to content

Commit

Permalink
internal/xds: Refactor xDS Server into a v2 package
Browse files Browse the repository at this point in the history
Refactors the xDS Server into a v2 package allowing for
a v3 server to be created seperate from the v2 instance.

Updates #1898

Signed-off-by: Steve Sloka <slokas@vmware.com>
  • Loading branch information
stevesloka committed Oct 9, 2020
1 parent 1c77ba9 commit 9121803
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 72 deletions.
7 changes: 4 additions & 3 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/projectcontour/contour/internal/timeout"
"github.com/projectcontour/contour/internal/workgroup"
"github.com/projectcontour/contour/internal/xds"
contour_xds_v2 "github.com/projectcontour/contour/internal/xds/v2"
"github.com/projectcontour/contour/internal/xdscache"
xdscache_v2 "github.com/projectcontour/contour/internal/xdscache/v2"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -623,12 +624,12 @@ func doServe(log logrus.FieldLogger, ctx *serveContext) error {

switch ctx.XDSServerType {
case "contour":
grpcServer = xds.RegisterServer(
xds.NewContourServer(log, xdscache.ResourcesOf(resources)...),
grpcServer = contour_xds_v2.RegisterServer(
contour_xds_v2.NewContourServer(log, xdscache.ResourcesOf(resources)...),
registry,
ctx.grpcOptions(log)...)
case "envoy":
grpcServer = xds.RegisterServer(
grpcServer = contour_xds_v2.RegisterServer(
server.NewServer(context.Background(), snapshotCache, nil),
registry,
ctx.grpcOptions(log)...)
Expand Down
6 changes: 3 additions & 3 deletions internal/featuretests/v2/featuretests.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"github.com/projectcontour/contour/internal/sorter"
"github.com/projectcontour/contour/internal/status"
"github.com/projectcontour/contour/internal/workgroup"
"github.com/projectcontour/contour/internal/xds"
contour_xds_v2 "github.com/projectcontour/contour/internal/xds/v2"
"github.com/projectcontour/contour/internal/xdscache"
xdscache_v2 "github.com/projectcontour/contour/internal/xdscache/v2"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -130,8 +130,8 @@ func setup(t *testing.T, opts ...interface{}) (cache.ResourceEventHandler, *Cont
l, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

srv := xds.RegisterServer(
xds.NewContourServer(log, xdscache.ResourcesOf(resources)...),
srv := contour_xds_v2.RegisterServer(
contour_xds_v2.NewContourServer(log, xdscache.ResourcesOf(resources)...),
r /* Prometheus registry */)

var g workgroup.Group
Expand Down
32 changes: 32 additions & 0 deletions internal/xds/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright Project Contour Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package xds

import "github.com/golang/protobuf/proto"

// Resource represents a source of proto.Messages that can be registered
// for interest.
type Resource interface {
// Contents returns the contents of this resource.
Contents() []proto.Message

// Query returns an entry for each resource name supplied.
Query(names []string) []proto.Message

// Register registers ch to receive a value when Notify is called.
Register(chan int, int, ...string)

// TypeURL returns the typeURL of messages returned from Values.
TypeURL() string
}
49 changes: 17 additions & 32 deletions internal/xds/contour.go → internal/xds/v2/contour.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,42 +11,27 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package xds
package v2

import (
"context"
"fmt"
"strconv"
"sync/atomic"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoy_api_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/projectcontour/contour/internal/xds"
"github.com/sirupsen/logrus"
)

// Resource represents a source of proto.Messages that can be registered
// for interest.
type Resource interface {
// Contents returns the contents of this resource.
Contents() []proto.Message

// Query returns an entry for each resource name supplied.
Query(names []string) []proto.Message

// Register registers ch to receive a value when Notify is called.
Register(chan int, int, ...string)

// TypeURL returns the typeURL of messages returned from Values.
TypeURL() string
}

type grpcStream interface {
Context() context.Context
Send(*v2.DiscoveryResponse) error
Recv() (*v2.DiscoveryRequest, error)
Send(*envoy_api_v2.DiscoveryResponse) error
Recv() (*envoy_api_v2.DiscoveryRequest, error)
}

// counter holds an atomically incrementing counter.
Expand All @@ -61,10 +46,10 @@ var connections counter
// NewContourServer creates an internally implemented Server that streams the
// provided set of Resource objects. The returned Server implements the xDS
// State of the World (SotW) variant.
func NewContourServer(log logrus.FieldLogger, resources ...Resource) Server {
func NewContourServer(log logrus.FieldLogger, resources ...xds.Resource) Server {
c := contourServer{
FieldLogger: log,
resources: map[string]Resource{},
resources: map[string]xds.Resource{},
}

for i, r := range resources {
Expand All @@ -80,13 +65,13 @@ type contourServer struct {
// the unimplemented gRPC endpoints.
discovery.UnimplementedAggregatedDiscoveryServiceServer
discovery.UnimplementedSecretDiscoveryServiceServer
v2.UnimplementedRouteDiscoveryServiceServer
v2.UnimplementedEndpointDiscoveryServiceServer
v2.UnimplementedClusterDiscoveryServiceServer
v2.UnimplementedListenerDiscoveryServiceServer
envoy_api_v2.UnimplementedRouteDiscoveryServiceServer
envoy_api_v2.UnimplementedEndpointDiscoveryServiceServer
envoy_api_v2.UnimplementedClusterDiscoveryServiceServer
envoy_api_v2.UnimplementedListenerDiscoveryServiceServer

logrus.FieldLogger
resources map[string]Resource
resources map[string]xds.Resource
}

// stream processes a stream of DiscoveryRequests.
Expand Down Expand Up @@ -174,7 +159,7 @@ func (s *contourServer) stream(st grpcStream) error {
any = append(any, a)
}

resp := &v2.DiscoveryResponse{
resp := &envoy_api_v2.DiscoveryResponse{
VersionInfo: strconv.Itoa(last),
Resources: any,
TypeUrl: r.TypeURL(),
Expand All @@ -191,19 +176,19 @@ func (s *contourServer) stream(st grpcStream) error {
}
}

func (s *contourServer) StreamClusters(srv v2.ClusterDiscoveryService_StreamClustersServer) error {
func (s *contourServer) StreamClusters(srv envoy_api_v2.ClusterDiscoveryService_StreamClustersServer) error {
return s.stream(srv)
}

func (s *contourServer) StreamEndpoints(srv v2.EndpointDiscoveryService_StreamEndpointsServer) error {
func (s *contourServer) StreamEndpoints(srv envoy_api_v2.EndpointDiscoveryService_StreamEndpointsServer) error {
return s.stream(srv)
}

func (s *contourServer) StreamListeners(srv v2.ListenerDiscoveryService_StreamListenersServer) error {
func (s *contourServer) StreamListeners(srv envoy_api_v2.ListenerDiscoveryService_StreamListenersServer) error {
return s.stream(srv)
}

func (s *contourServer) StreamRoutes(srv v2.RouteDiscoveryService_StreamRoutesServer) error {
func (s *contourServer) StreamRoutes(srv envoy_api_v2.RouteDiscoveryService_StreamRoutesServer) error {
return s.stream(srv)
}

Expand Down
45 changes: 23 additions & 22 deletions internal/xds/contour_test.go → internal/xds/v2/contour_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package xds
package v2

import (
"context"
Expand All @@ -20,8 +20,9 @@ import (
"io/ioutil"
"testing"

v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
envoy_api_v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/golang/protobuf/proto"
"github.com/projectcontour/contour/internal/xds"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)
Expand All @@ -38,7 +39,7 @@ func TestXDSHandlerStream(t *testing.T) {
xh: contourServer{FieldLogger: log},
stream: &mockStream{
context: context.Background,
recv: func() (*v2.DiscoveryRequest, error) {
recv: func() (*envoy_api_v2.DiscoveryRequest, error) {
return nil, io.EOF
},
},
Expand All @@ -48,8 +49,8 @@ func TestXDSHandlerStream(t *testing.T) {
xh: contourServer{FieldLogger: log},
stream: &mockStream{
context: context.Background,
recv: func() (*v2.DiscoveryRequest, error) {
return &v2.DiscoveryRequest{
recv: func() (*envoy_api_v2.DiscoveryRequest, error) {
return &envoy_api_v2.DiscoveryRequest{
TypeUrl: "io.projectcontour.potato",
}, nil
},
Expand All @@ -59,7 +60,7 @@ func TestXDSHandlerStream(t *testing.T) {
"failed to convert values to any": {
xh: contourServer{
FieldLogger: log,
resources: map[string]Resource{
resources: map[string]xds.Resource{
"io.projectcontour.potato": &mockResource{
register: func(ch chan int, i int) {
ch <- i + 1
Expand All @@ -73,8 +74,8 @@ func TestXDSHandlerStream(t *testing.T) {
},
stream: &mockStream{
context: context.Background,
recv: func() (*v2.DiscoveryRequest, error) {
return &v2.DiscoveryRequest{
recv: func() (*envoy_api_v2.DiscoveryRequest, error) {
return &envoy_api_v2.DiscoveryRequest{
TypeUrl: "io.projectcontour.potato",
}, nil
},
Expand All @@ -84,26 +85,26 @@ func TestXDSHandlerStream(t *testing.T) {
"failed to send": {
xh: contourServer{
FieldLogger: log,
resources: map[string]Resource{
resources: map[string]xds.Resource{
"io.projectcontour.potato": &mockResource{
register: func(ch chan int, i int) {
ch <- i + 1
},
contents: func() []proto.Message {
return []proto.Message{new(v2.ClusterLoadAssignment)}
return []proto.Message{new(envoy_api_v2.ClusterLoadAssignment)}
},
typeurl: func() string { return "io.projectcontour.potato" },
},
},
},
stream: &mockStream{
context: context.Background,
recv: func() (*v2.DiscoveryRequest, error) {
return &v2.DiscoveryRequest{
recv: func() (*envoy_api_v2.DiscoveryRequest, error) {
return &envoy_api_v2.DiscoveryRequest{
TypeUrl: "io.projectcontour.potato",
}, nil
},
send: func(resp *v2.DiscoveryResponse) error {
send: func(resp *envoy_api_v2.DiscoveryResponse) error {
return io.EOF
},
},
Expand All @@ -112,7 +113,7 @@ func TestXDSHandlerStream(t *testing.T) {
"context canceled": {
xh: contourServer{
FieldLogger: log,
resources: map[string]Resource{
resources: map[string]xds.Resource{
"io.projectcontour.potato": &mockResource{
register: func(ch chan int, i int) {
// do nothing
Expand All @@ -128,12 +129,12 @@ func TestXDSHandlerStream(t *testing.T) {
cancel()
return ctx
},
recv: func() (*v2.DiscoveryRequest, error) {
return &v2.DiscoveryRequest{
recv: func() (*envoy_api_v2.DiscoveryRequest, error) {
return &envoy_api_v2.DiscoveryRequest{
TypeUrl: "io.projectcontour.potato",
}, nil
},
send: func(resp *v2.DiscoveryResponse) error {
send: func(resp *envoy_api_v2.DiscoveryResponse) error {
return io.EOF
},
},
Expand All @@ -151,13 +152,13 @@ func TestXDSHandlerStream(t *testing.T) {

type mockStream struct {
context func() context.Context
send func(*v2.DiscoveryResponse) error
recv func() (*v2.DiscoveryRequest, error)
send func(*envoy_api_v2.DiscoveryResponse) error
recv func() (*envoy_api_v2.DiscoveryRequest, error)
}

func (m *mockStream) Context() context.Context { return m.context() }
func (m *mockStream) Send(resp *v2.DiscoveryResponse) error { return m.send(resp) }
func (m *mockStream) Recv() (*v2.DiscoveryRequest, error) { return m.recv() }
func (m *mockStream) Context() context.Context { return m.context() }
func (m *mockStream) Send(resp *envoy_api_v2.DiscoveryResponse) error { return m.send(resp) }
func (m *mockStream) Recv() (*envoy_api_v2.DiscoveryRequest, error) { return m.recv() }

type mockResource struct {
contents func() []proto.Message
Expand Down
2 changes: 1 addition & 1 deletion internal/xds/server.go → internal/xds/v2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package xds
package v2

import (
api "github.com/envoyproxy/go-control-plane/envoy/api/v2"
Expand Down
Loading

0 comments on commit 9121803

Please sign in to comment.