Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xds: add HTTP connection manager max_stream_duration support #4122

Merged
merged 2 commits into from
Jan 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions xds/internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,10 @@ type ListenerUpdate struct {
RouteConfigName string
// SecurityCfg contains security configuration sent by the control plane.
SecurityCfg *SecurityConfig
// MaxStreamDuration contains the HTTP connection manager's
// common_http_protocol_options.max_stream_duration field, or zero if
// unset.
MaxStreamDuration time.Duration
}

func (lu *ListenerUpdate) String() string {
Expand Down Expand Up @@ -181,8 +185,11 @@ type Route struct {
Fraction *uint32

// If the matchers above indicate a match, the below configuration is used.
Action map[string]uint32 // action is weighted clusters.
MaxStreamDuration time.Duration
Action map[string]uint32 // action is weighted clusters.
// If MaxStreamDuration is nil, it indicates neither of the route action's
// max_stream_duration fields (grpc_timeout_header_max nor
// max_stream_duration) were set.
MaxStreamDuration *time.Duration
menghanl marked this conversation as resolved.
Show resolved Hide resolved
}

// HeaderMatcher represents header matchers.
Expand Down
9 changes: 7 additions & 2 deletions xds/internal/client/client_lds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package client
import (
"strings"
"testing"
"time"

v2xdspb "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
Expand All @@ -36,6 +37,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/xds/internal/version"
"google.golang.org/protobuf/types/known/durationpb"
)

func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
Expand Down Expand Up @@ -86,6 +88,9 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
RouteConfigName: v3RouteConfigName,
},
},
CommonHttpProtocolOptions: &v3corepb.HttpProtocolOptions{
MaxStreamDuration: durationpb.New(time.Second),
},
}
mcm, _ := proto.Marshal(cm)
lis := &v3listenerpb.Listener{
Expand Down Expand Up @@ -280,15 +285,15 @@ func (s) TestUnmarshalListener_ClientSide(t *testing.T) {
name: "v3 listener resource",
resources: []*anypb.Any{v3Lis},
wantUpdate: map[string]ListenerUpdate{
v3LDSTarget: {RouteConfigName: v3RouteConfigName},
v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second},
},
},
{
name: "multiple listener resources",
resources: []*anypb.Any{v2Lis, v3Lis},
wantUpdate: map[string]ListenerUpdate{
v2LDSTarget: {RouteConfigName: v2RouteConfigName},
v3LDSTarget: {RouteConfigName: v3RouteConfigName},
v3LDSTarget: {RouteConfigName: v3RouteConfigName, MaxStreamDuration: time.Second},
},
},
}
Expand Down
10 changes: 7 additions & 3 deletions xds/internal/client/client_rds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(time.Second)}},
},
},
},
Expand Down Expand Up @@ -347,7 +347,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: time.Second}},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(time.Second)}},
},
},
},
Expand Down Expand Up @@ -377,7 +377,7 @@ func (s) TestRDSGenerateRDSUpdateFromRouteConfiguration(t *testing.T) {
VirtualHosts: []*VirtualHost{
{
Domains: []string{ldsTarget},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: 0}},
Routes: []*Route{{Prefix: newStringP("/"), Action: map[string]uint32{clusterName: 1}, MaxStreamDuration: newDurationP(0)}},
},
},
},
Expand Down Expand Up @@ -739,3 +739,7 @@ func newUInt32P(i uint32) *uint32 {
func newBoolP(b bool) *bool {
return &b
}

func newDurationP(d time.Duration) *time.Duration {
return &d
}
20 changes: 15 additions & 5 deletions xds/internal/client/client_xds.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ func processListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
// processClientSideListener checks if the provided Listener proto meets
// the expected criteria. If so, it returns a non-empty routeConfigName.
func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
update := &ListenerUpdate{}

apiLisAny := lis.GetApiListener().GetApiListener()
if !IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
return nil, fmt.Errorf("xds: unexpected resource type: %q in LDS response", apiLisAny.GetTypeUrl())
Expand All @@ -98,7 +100,7 @@ func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err
if name == "" {
return nil, fmt.Errorf("xds: empty route_config_name in LDS response: %+v", lis)
}
return &ListenerUpdate{RouteConfigName: name}, nil
update.RouteConfigName = name
case *v3httppb.HttpConnectionManager_RouteConfig:
// TODO: Add support for specifying the RouteConfiguration inline
// in the LDS response.
Expand All @@ -108,6 +110,10 @@ func processClientSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, err
default:
return nil, fmt.Errorf("xds: unsupported type %T for RouteSpecifier in received LDS response", apiLis.RouteSpecifier)
}

update.MaxStreamDuration = apiLis.GetCommonHttpProtocolOptions().GetMaxStreamDuration().AsDuration()

return update, nil
}

func processServerSideListener(lis *v3listenerpb.Listener) (*ListenerUpdate, error) {
Expand Down Expand Up @@ -342,12 +348,16 @@ func routesProtoToSlice(routes []*v3routepb.Route, logger *grpclog.PrefixLogger)
}

route.Action = clusters

msd := action.GetMaxStreamDuration()
// Prefer grpc_timeout_header_max, if set.
if dur := msd.GetGrpcTimeoutHeaderMax(); dur != nil {
route.MaxStreamDuration = dur.AsDuration()
} else {
route.MaxStreamDuration = msd.GetMaxStreamDuration().AsDuration()
dur := msd.GetGrpcTimeoutHeaderMax()
if dur == nil {
dur = msd.GetMaxStreamDuration()
}
if dur != nil {
d := dur.AsDuration()
route.MaxStreamDuration = &d
}
routesRet = append(routesRet, &route)
}
Expand Down
10 changes: 7 additions & 3 deletions xds/internal/resolver/serviceconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,11 @@ var newWRR = wrr.NewRandom
func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, error) {
cs := &configSelector{
r: r,
routes: make([]route, len(su.Routes)),
routes: make([]route, len(su.routes)),
clusters: make(map[string]*clusterInfo),
}

for i, rt := range su.Routes {
for i, rt := range su.routes {
clusters := newWRR()
for cluster, weight := range rt.Action {
clusters.Add(cluster, int64(weight))
Expand All @@ -227,7 +227,11 @@ func (r *xdsResolver) newConfigSelector(su serviceUpdate) (*configSelector, erro
if err != nil {
return nil, err
}
cs.routes[i].maxStreamDuration = rt.MaxStreamDuration
if rt.MaxStreamDuration == nil {
cs.routes[i].maxStreamDuration = su.ldsConfig.maxStreamDuration
} else {
cs.routes[i].maxStreamDuration = *rt.MaxStreamDuration
}
}

return cs, nil
Expand Down
39 changes: 33 additions & 6 deletions xds/internal/resolver/watch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,28 @@ import (
"fmt"
"strings"
"sync"
"time"

"google.golang.org/grpc/internal/grpclog"
xdsclient "google.golang.org/grpc/xds/internal/client"
)

// serviceUpdate contains information received from the RDS responses which is
// of interested to the xds resolver. The RDS request is built by first making a
// LDS to get the RouteConfig name.
// serviceUpdate contains information received from the LDS/RDS responses which
// are of interest to the xds resolver. The RDS request is built by first
// making a LDS to get the RouteConfig name.
type serviceUpdate struct {
// Routes contain matchers+actions to route RPCs.
Routes []*xdsclient.Route
// routes contain matchers+actions to route RPCs.
routes []*xdsclient.Route
// ldsConfig contains configuration that applies to all routes.
ldsConfig ldsConfig
menghanl marked this conversation as resolved.
Show resolved Hide resolved
}

// ldsConfig contains information received from the LDS responses which are of
// interest to the xds resolver.
type ldsConfig struct {
// maxStreamDuration is from the HTTP connection manager's
// common_http_protocol_options field.
maxStreamDuration time.Duration
}

// watchService uses LDS and RDS to discover information about the provided
Expand Down Expand Up @@ -61,6 +72,7 @@ type serviceUpdateWatcher struct {
serviceName string
ldsCancel func()
serviceCb func(serviceUpdate, error)
lastUpdate serviceUpdate

mu sync.Mutex
closed bool
Expand All @@ -84,16 +96,30 @@ func (w *serviceUpdateWatcher) handleLDSResp(update xdsclient.ListenerUpdate, er
w.rdsCancel()
w.rdsName = ""
w.rdsCancel = nil
w.lastUpdate = serviceUpdate{}
}
// The other error cases still return early without canceling the
// existing RDS watch.
w.serviceCb(serviceUpdate{}, err)
return
}

sendUpdate := false
ldsUpdate := ldsConfig{maxStreamDuration: update.MaxStreamDuration}
if w.lastUpdate.ldsConfig != ldsUpdate {
w.lastUpdate.ldsConfig = ldsUpdate
sendUpdate = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also shouldn't send the update if this LDS update contains a new RDS name to watch.
Move this if check inside if w.rdsName == update.RouteConfigName?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored a bit. PTAL

}

if w.rdsName == update.RouteConfigName {
// If the new RouteConfigName is same as the previous, don't cancel and
// restart the RDS watch.
if sendUpdate {
// The route name didn't change but the LDS data did; send it now.
// If the route name did change, then we will wait until the first
// RDS update before reporting this LDS config.
w.serviceCb(w.lastUpdate, nil)
}
return
}
w.rdsName = update.RouteConfigName
Expand Down Expand Up @@ -127,7 +153,8 @@ func (w *serviceUpdateWatcher) handleRDSResp(update xdsclient.RouteConfigUpdate,
return
}

w.serviceCb(serviceUpdate{Routes: matchVh.Routes}, nil)
w.lastUpdate.routes = matchVh.Routes
w.serviceCb(w.lastUpdate, nil)
}

func (w *serviceUpdateWatcher) close() {
Expand Down
65 changes: 59 additions & 6 deletions xds/internal/resolver/watch_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
Expand Down Expand Up @@ -138,7 +139,7 @@ func verifyServiceUpdate(ctx context.Context, updateCh *testutils.Channel, wantU
return fmt.Errorf("timeout when waiting for service update: %v", err)
}
gotUpdate := u.(serviceUpdateErr)
if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()) {
if gotUpdate.err != nil || !cmp.Equal(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty(), cmp.AllowUnexported(serviceUpdate{}, ldsConfig{})) {
return fmt.Errorf("unexpected service update: (%v, %v), want: (%v, nil), diff (-want +got):\n%s", gotUpdate.u, gotUpdate.err, wantUpdate, cmp.Diff(gotUpdate.u, wantUpdate, cmpopts.EquateEmpty()))
}
return nil
Expand All @@ -165,7 +166,7 @@ func (s) TestServiceWatch(t *testing.T) {
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)

wantUpdate := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Expand All @@ -179,7 +180,7 @@ func (s) TestServiceWatch(t *testing.T) {
}

wantUpdate2 := serviceUpdate{
Routes: []*xdsclient.Route{{
routes: []*xdsclient.Route{{
Path: newStringP(""),
Action: map[string]uint32{cluster: 1},
}},
Expand Down Expand Up @@ -219,7 +220,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)

wantUpdate := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Expand All @@ -240,7 +241,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
waitForWatchRouteConfig(ctx, t, xdsC, routeStr+"2")

// RDS update for the new name.
wantUpdate2 := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}}
wantUpdate2 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Expand All @@ -254,6 +255,58 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
}
}

// TestServiceWatchLDSUpdate covers the case that after first LDS and first RDS
// response, the second LDS response includes a new MaxStreamDuration. It also
// verifies this is reported in subsequent RDS updates.
func (s) TestServiceWatchLDSUpdateMaxStreamDuration(t *testing.T) {
serviceUpdateCh := testutils.NewChannel()
xdsC := fakeclient.NewClient()
cancelWatch := watchService(xdsC, targetStr, func(update serviceUpdate, err error) {
serviceUpdateCh.Send(serviceUpdateErr{u: update, err: err})
}, nil)
defer cancelWatch()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
waitForWatchListener(ctx, t, xdsC, targetStr)
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr, MaxStreamDuration: time.Second}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)

wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}, ldsConfig: ldsConfig{maxStreamDuration: time.Second}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}},
},
},
}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate); err != nil {
t.Fatal(err)
}

// Another LDS update with the same RDS_name but different MaxStreamDuration (zero in this case).
wantUpdate2 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
t.Fatal(err)
}

// RDS update.
wantUpdate3 := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster + "2": 1}}},
},
},
}, nil)
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate3); err != nil {
t.Fatal(err)
}
}

// TestServiceNotCancelRDSOnSameLDSUpdate covers the case that if the second LDS
// update contains the same RDS name as the previous, the RDS watch isn't
// canceled and restarted.
Expand All @@ -271,7 +324,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)

wantUpdate := serviceUpdate{Routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
wantUpdate := serviceUpdate{routes: []*xdsclient.Route{{Prefix: newStringP(""), Action: map[string]uint32{cluster: 1}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Expand Down
Loading