Skip to content

Commit

Permalink
Added target API tests (+goleak). (thanos-io#5260)
Browse files Browse the repository at this point in the history
Attempted to repro thanos-io#5257, but no good luck.

Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka authored Apr 24, 2022
1 parent 24706d6 commit 955ea6d
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 49 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ require (
github.com/golang/snappy v0.0.4
github.com/googleapis/gax-go v2.0.2+incompatible
github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware/providers/kit/v2 v2.0.0-20201002093600-73cf2ae9d891
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.0.0-rc.2.0.20201207153454-9f6bf00c00a7
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
Expand Down Expand Up @@ -160,6 +159,7 @@ require (
github.com/google/uuid v1.2.0 // indirect
github.com/googleapis/gax-go/v2 v2.1.1 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down
16 changes: 5 additions & 11 deletions pkg/targets/prometheus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,9 @@ scrape_configs:
requestedState targetspb.TargetsRequest_State
expectedErr error
}{
{
requestedState: targetspb.TargetsRequest_ANY,
},
{
requestedState: targetspb.TargetsRequest_ACTIVE,
},
{
requestedState: targetspb.TargetsRequest_DROPPED,
},
{requestedState: targetspb.TargetsRequest_ANY},
{requestedState: targetspb.TargetsRequest_ACTIVE},
{requestedState: targetspb.TargetsRequest_DROPPED},
} {
t.Run(tcase.requestedState.String(), func(t *testing.T) {
targets, w, err := grpcClient.Targets(context.Background(), &targetspb.TargetsRequest{
Expand All @@ -144,9 +138,9 @@ scrape_configs:

switch tcase.requestedState {
case targetspb.TargetsRequest_ACTIVE:
expectedTargets.DroppedTargets = expectedTargets.DroppedTargets[:0]
expectedTargets.DroppedTargets = nil
case targetspb.TargetsRequest_DROPPED:
expectedTargets.ActiveTargets = expectedTargets.ActiveTargets[:0]
expectedTargets.ActiveTargets = nil
}

for i := range targets.ActiveTargets {
Expand Down
138 changes: 105 additions & 33 deletions pkg/targets/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,67 +7,139 @@ import (
"context"
"io"
"os"
"sort"
"testing"

"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/testutil"
"google.golang.org/grpc"

"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/thanos-io/thanos/pkg/targets/targetspb"
)

func TestMain(m *testing.M) {
testutil.TolerantVerifyLeakMain(m)
}

type testTargetRecv struct {
// A simulation of underlying grpc Recv behavior as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125.
err error
resp *targetspb.TargetDiscovery
}

type testTargetsClient struct {
grpc.ClientStream
targetsErr, recvErr error
response *targetspb.TargetsResponse
sentResponse bool
targetsErr error
recvs []testTargetRecv
i int
}

func (t *testTargetsClient) String() string {
return "test"
}

func (t *testTargetsClient) Recv() (*targetspb.TargetsResponse, error) {
// A simulation of underlying grpc Recv behavior as per https://github.com/grpc/grpc-go/blob/7f2581f910fc21497091c4109b56d310276fc943/stream.go#L117-L125.
if t.recvErr != nil {
return nil, t.recvErr
}

if t.sentResponse {
if t.i+1 >= len(t.recvs) {
return nil, io.EOF
}
t.sentResponse = true

return t.response, nil
t.i++
return targetspb.NewTargetsResponse(t.recvs[t.i].resp), t.recvs[t.i].err
}

func (t *testTargetsClient) Targets(ctx context.Context, in *targetspb.TargetsRequest, opts ...grpc.CallOption) (targetspb.Targets_TargetsClient, error) {
func (t *testTargetsClient) Targets(context.Context, *targetspb.TargetsRequest, ...grpc.CallOption) (targetspb.Targets_TargetsClient, error) {
t.i = -1
return t, t.targetsErr
}

var _ targetspb.TargetsClient = &testTargetsClient{}

// TestProxyDataRace find the concurrent data race bug ( go test -race -run TestProxyDataRace -v ).
func TestProxyDataRace(t *testing.T) {
logger := log.NewLogfmtLogger(os.Stderr)
p := NewProxy(logger, func() []targetspb.TargetsClient {
es := &testTargetsClient{
recvErr: errors.New("err"),
}
size := 100
endpoints := make([]targetspb.TargetsClient, 0, size)
for i := 0; i < size; i++ {
endpoints = append(endpoints, es)
func TestProxy(t *testing.T) {
activeTarget1 := &targetspb.ActiveTarget{GlobalUrl: "test1"}
activeTarget2 := &targetspb.ActiveTarget{GlobalUrl: "test2"}
activeTarget3 := &targetspb.ActiveTarget{GlobalUrl: "test3"}

for _, tcase := range []struct {
clients []targetspb.TargetsClient
expectedWarnNum int
expectedDiscovery *targetspb.TargetDiscovery
}{
{
clients: []targetspb.TargetsClient{
&testTargetsClient{targetsErr: errors.New("err1")},
&testTargetsClient{targetsErr: errors.New("err2")},
},
expectedWarnNum: 2,
},
{
clients: []targetspb.TargetsClient{
&testTargetsClient{targetsErr: errors.New("err1")},
&testTargetsClient{recvs: []testTargetRecv{{err: errors.New("err")}}},
},
expectedWarnNum: 2,
},
{
clients: []targetspb.TargetsClient{
&testTargetsClient{recvs: []testTargetRecv{{err: errors.New("err")}, {resp: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget1}}}}},
},
expectedWarnNum: 1,
},
{
clients: []targetspb.TargetsClient{
&testTargetsClient{recvs: []testTargetRecv{
{resp: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget1}}},
{resp: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget2}}},
}},
&testTargetsClient{recvs: []testTargetRecv{
{resp: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget3}}},
}},
&testTargetsClient{recvs: []testTargetRecv{
{err: errors.New("err")},
}},
},
expectedWarnNum: 1,
expectedDiscovery: &targetspb.TargetDiscovery{ActiveTargets: []*targetspb.ActiveTarget{activeTarget1, activeTarget2, activeTarget3}},
},
{
// Reproduced the concurrent data race bug we had ( go test -race -run TestProxyDataRace -v ).
clients: func() []targetspb.TargetsClient {
size := 100
endpoints := make([]targetspb.TargetsClient, 0, size)
for i := 0; i < size; i++ {
endpoints = append(endpoints, &testTargetsClient{recvs: []testTargetRecv{{err: errors.New("err")}}})
}
return endpoints
}(),
expectedWarnNum: 100,
},
} {
if ok := t.Run("", func(t *testing.T) {
if tcase.expectedDiscovery == nil {
tcase.expectedDiscovery = &targetspb.TargetDiscovery{}
}

logger := log.NewLogfmtLogger(os.Stderr)
p := NewProxy(logger, func() []targetspb.TargetsClient { return tcase.clients })
req := &targetspb.TargetsRequest{
State: targetspb.TargetsRequest_ANY,
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
}

// Test idempotency.
for i := 0; i < 3; i++ {
s := &targetsServer{ctx: context.Background(), targets: &targetspb.TargetDiscovery{}}
testutil.Ok(t, p.Targets(req, s))
testutil.Equals(t, tcase.expectedWarnNum, len(s.warnings))

sort.Slice(s.targets.ActiveTargets, func(i, j int) bool {
return s.targets.ActiveTargets[i].GlobalUrl < s.targets.ActiveTargets[j].GlobalUrl
})
testutil.Equals(t, tcase.expectedDiscovery, s.targets)
}
}); !ok {
return
}
return endpoints
})
req := &targetspb.TargetsRequest{
State: targetspb.TargetsRequest_ANY,
PartialResponseStrategy: storepb.PartialResponseStrategy_WARN,
}
s := &targetsServer{
ctx: context.Background(),
}
_ = p.Targets(req, s)

}
5 changes: 1 addition & 4 deletions pkg/targets/targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ func NewGRPCClientWithDedup(ts targetspb.TargetsServer, replicaLabels []string)
}

func (rr *GRPCClient) Targets(ctx context.Context, req *targetspb.TargetsRequest) (*targetspb.TargetDiscovery, storage.Warnings, error) {
resp := &targetsServer{ctx: ctx, targets: &targetspb.TargetDiscovery{
ActiveTargets: make([]*targetspb.ActiveTarget, 0),
DroppedTargets: make([]*targetspb.DroppedTarget, 0),
}}
resp := &targetsServer{ctx: ctx, targets: &targetspb.TargetDiscovery{}}

if err := rr.proxy.Targets(req, resp); err != nil {
return nil, nil, errors.Wrap(err, "proxy Targets")
Expand Down

0 comments on commit 955ea6d

Please sign in to comment.