Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

e2e: add test for cpnp replication #7

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions pkg/receive/capnp_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,35 +103,6 @@ func (c CapNProtoHandler) Write(ctx context.Context, call writecapnp.Writer_writ
result.SetError(writecapnp.WriteError_internal)
}
}
return nil
}

type BufferedListener struct {
ctx context.Context
cancel context.CancelFunc

conns chan net.Conn
}

func (b BufferedListener) Accept() (net.Conn, error) {
select {
case <-b.ctx.Done():
return nil, b.ctx.Err()
case c := <-b.conns:
return c, nil
}
}

func (b BufferedListener) Close() error {
b.cancel()
return nil
}

func (b BufferedListener) Addr() net.Addr {
return addr{}
}

type addr struct{}

func (addr) Network() string { return "bufconn" }
func (addr) String() string { return "bufconn" }
8 changes: 8 additions & 0 deletions pkg/receive/capnproto_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import (
"context"
"strings"

"github.com/go-kit/log"
"github.com/pkg/errors"
Expand Down Expand Up @@ -72,15 +73,22 @@
ref, lset = getRef.GetRef(series.Labels, series.Labels.Hash())
if ref == 0 {
lset = series.Labels.Copy()
// NOTE(GiedriusS): do a deep copy because the labels are reused in the capnp message.
// Creation of new series is much rarer compared to adding extra samples
// to an existing series.
for i := range lset {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to use lset.Range

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll fix this on my branch.

lset[i].Name = strings.Clone(lset[i].Name)
lset[i].Value = strings.Clone(lset[i].Value)
}
}

// Append as many valid samples as possible, but keep track of the errors.
for _, s := range series.Samples {
ref, err = app.Append(ref, lset, s.Timestamp, s.Value)
errorTracker.addSampleError(err, tLogger, lset, s.Timestamp, s.Value)
}

Check failure on line 89 in pkg/receive/capnproto_writer.go

View workflow job for this annotation

GitHub Actions / Go build with -tags=stringlabels

cannot range over lset (variable of type "github.com/prometheus/prometheus/model/labels".Labels)

Check failure on line 90 in pkg/receive/capnproto_writer.go

View workflow job for this annotation

GitHub Actions / Go build with -tags=stringlabels

invalid operation: cannot index lset (variable of type "github.com/prometheus/prometheus/model/labels".Labels)
for _, hp := range series.Histograms {

Check failure on line 91 in pkg/receive/capnproto_writer.go

View workflow job for this annotation

GitHub Actions / Go build with -tags=stringlabels

invalid operation: cannot index lset (variable of type "github.com/prometheus/prometheus/model/labels".Labels)
ref, err = app.AppendHistogram(ref, lset, hp.Timestamp, hp.Histogram, hp.FloatHistogram)
errorTracker.addHistogramError(err, tLogger, lset, hp.Timestamp)
}
Expand Down
12 changes: 11 additions & 1 deletion test/e2e/e2ethanos/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,7 @@ type ReceiveBuilder struct {
f e2e.FutureRunnable

maxExemplars int
capnp bool
ingestion bool
limit int
tenantsLimits receive.TenantsWriteLimitsConfig
Expand All @@ -555,7 +556,7 @@ type ReceiveBuilder struct {

func NewReceiveBuilder(e e2e.Environment, name string) *ReceiveBuilder {
f := e.Runnable(fmt.Sprintf("receive-%v", name)).
WithPorts(map[string]int{"http": 8080, "grpc": 9091, "remote-write": 8081}).
WithPorts(map[string]int{"http": 8080, "grpc": 9091, "remote-write": 8081, "capnp": 19391}).
Future()
return &ReceiveBuilder{
Linkable: f,
Expand Down Expand Up @@ -586,6 +587,11 @@ func (r *ReceiveBuilder) WithLabel(name, value string) *ReceiveBuilder {
return r
}

func (r *ReceiveBuilder) UseCapnpReplication() *ReceiveBuilder {
r.capnp = true
return r
}

func (r *ReceiveBuilder) WithRouting(replication int, hashringConfigs ...receive.HashringConfig) *ReceiveBuilder {
r.hashringConfigs = hashringConfigs
r.replication = replication
Expand Down Expand Up @@ -646,6 +652,10 @@ func (r *ReceiveBuilder) Init() *e2eobs.Observable {
args["--label"] = fmt.Sprintf("%s,%s", args["--label"], strings.Join(r.labels, ","))
}

if r.capnp {
args["--receive.replication-protocol"] = "capnproto"
}

hashring := r.hashringConfigs
if len(hashring) > 0 && r.ingestion {
args["--receive.local-endpoint"] = r.InternalEndpoint("grpc")
Expand Down
63 changes: 63 additions & 0 deletions test/e2e/receive_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/prometheus/common/model"

"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"
"github.com/prometheus/prometheus/storage/remote"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1152,3 +1153,65 @@ func TestReceiveExtractsTenant(t *testing.T) {

})
}

func TestReceiveCpnp(t *testing.T) {
e, err := e2e.NewDockerEnvironment("receive-cpnp")
testutil.Ok(t, err)
t.Cleanup(e2ethanos.CleanScenario(t, e))

i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init()
testutil.Ok(t, e2e.StartAndWaitReady(i))

h := receive.HashringConfig{
TenantMatcherType: "glob",
Tenants: []string{
"default*",
},
Endpoints: []receive.Endpoint{
{Address: i.InternalEndpoint("grpc"), CapNProtoAddress: i.InternalEndpoint("capnp")},
},
}

r := e2ethanos.NewReceiveBuilder(e, "router").UseCapnpReplication().WithRouting(1, h).Init()
testutil.Ok(t, e2e.StartAndWaitReady(r))

ts := time.Now()

require.NoError(t, runutil.RetryWithLog(logkit.NewLogfmtLogger(os.Stdout), 1*time.Second, make(<-chan struct{}), func() error {
return storeWriteRequest(context.Background(), "http://"+r.Endpoint("remote-write")+"/api/v1/receive", &prompb.WriteRequest{
Timeseries: []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: model.MetricNameLabel, Value: "myself"},
},
Samples: []prompb.Sample{
{Value: 1, Timestamp: timestamp.FromTime(ts)},
},
},
},
})
}))

testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "default-tenant")), e2emon.WaitMissingMetrics()))
GiedriusS marked this conversation as resolved.
Show resolved Hide resolved

q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init()
testutil.Ok(t, e2e.StartAndWaitReady(q))

v := instantQuery(t, context.Background(), q.Endpoint("http"), func() string { return "myself" }, func() time.Time { return ts }, promclient.QueryOptions{
Deduplicate: false,
}, 1)

v[0].Timestamp = 0

require.Equal(t, model.Vector{
{
Metric: model.Metric{
model.MetricNameLabel: "myself",
"receive": "receive-ingestor",
"tenant_id": "default-tenant",
},
Value: 1,
},
}, v)

}
Loading