diff --git a/pkg/receive/capnp_server.go b/pkg/receive/capnp_server.go index f6c0282460..34b406bfa9 100644 --- a/pkg/receive/capnp_server.go +++ b/pkg/receive/capnp_server.go @@ -103,35 +103,6 @@ func (c CapNProtoHandler) Write(ctx context.Context, call writecapnp.Writer_writ result.SetError(writecapnp.WriteError_internal) } } - return nil -} - -type BufferedListener struct { - ctx context.Context - cancel context.CancelFunc - - conns chan net.Conn -} - -func (b BufferedListener) Accept() (net.Conn, error) { - select { - case <-b.ctx.Done(): - return nil, b.ctx.Err() - case c := <-b.conns: - return c, nil - } -} -func (b BufferedListener) Close() error { - b.cancel() return nil } - -func (b BufferedListener) Addr() net.Addr { - return addr{} -} - -type addr struct{} - -func (addr) Network() string { return "bufconn" } -func (addr) String() string { return "bufconn" } diff --git a/pkg/receive/capnproto_writer.go b/pkg/receive/capnproto_writer.go index 00b484f436..115605a6dd 100644 --- a/pkg/receive/capnproto_writer.go +++ b/pkg/receive/capnproto_writer.go @@ -5,6 +5,7 @@ package receive import ( "context" + "strings" "github.com/go-kit/log" "github.com/pkg/errors" @@ -72,6 +73,13 @@ func (r *CapNProtoWriter) Write(ctx context.Context, tenantID string, wreq *writ ref, lset = getRef.GetRef(series.Labels, series.Labels.Hash()) if ref == 0 { lset = series.Labels.Copy() + // NOTE(GiedriusS): do a deep copy because the labels are reused in the capnp message. + // Creation of new series is much rarer compared to adding extra samples + // to an existing series. + for i := range lset { + lset[i].Name = strings.Clone(lset[i].Name) + lset[i].Value = strings.Clone(lset[i].Value) + } } // Append as many valid samples as possible, but keep track of the errors. diff --git a/test/e2e/e2ethanos/services.go b/test/e2e/e2ethanos/services.go index 0ae6dd8d7a..c8a9e7fc62 100644 --- a/test/e2e/e2ethanos/services.go +++ b/test/e2e/e2ethanos/services.go @@ -539,6 +539,7 @@ type ReceiveBuilder struct { f e2e.FutureRunnable maxExemplars int + capnp bool ingestion bool limit int tenantsLimits receive.TenantsWriteLimitsConfig @@ -555,7 +556,7 @@ type ReceiveBuilder struct { func NewReceiveBuilder(e e2e.Environment, name string) *ReceiveBuilder { f := e.Runnable(fmt.Sprintf("receive-%v", name)). - WithPorts(map[string]int{"http": 8080, "grpc": 9091, "remote-write": 8081}). + WithPorts(map[string]int{"http": 8080, "grpc": 9091, "remote-write": 8081, "capnp": 19391}). Future() return &ReceiveBuilder{ Linkable: f, @@ -586,6 +587,11 @@ func (r *ReceiveBuilder) WithLabel(name, value string) *ReceiveBuilder { return r } +func (r *ReceiveBuilder) UseCapnpReplication() *ReceiveBuilder { + r.capnp = true + return r +} + func (r *ReceiveBuilder) WithRouting(replication int, hashringConfigs ...receive.HashringConfig) *ReceiveBuilder { r.hashringConfigs = hashringConfigs r.replication = replication @@ -646,6 +652,10 @@ func (r *ReceiveBuilder) Init() *e2eobs.Observable { args["--label"] = fmt.Sprintf("%s,%s", args["--label"], strings.Join(r.labels, ",")) } + if r.capnp { + args["--receive.replication-protocol"] = "capnproto" + } + hashring := r.hashringConfigs if len(hashring) > 0 && r.ingestion { args["--receive.local-endpoint"] = r.InternalEndpoint("grpc") diff --git a/test/e2e/receive_test.go b/test/e2e/receive_test.go index 4f70479dfc..c938a4f040 100644 --- a/test/e2e/receive_test.go +++ b/test/e2e/receive_test.go @@ -25,6 +25,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" + "github.com/prometheus/prometheus/model/timestamp" "github.com/prometheus/prometheus/prompb" "github.com/prometheus/prometheus/storage/remote" "github.com/stretchr/testify/require" @@ -1152,3 +1153,65 @@ func TestReceiveExtractsTenant(t *testing.T) { }) } + +func TestReceiveCpnp(t *testing.T) { + e, err := e2e.NewDockerEnvironment("receive-cpnp") + testutil.Ok(t, err) + t.Cleanup(e2ethanos.CleanScenario(t, e)) + + i := e2ethanos.NewReceiveBuilder(e, "ingestor").WithIngestionEnabled().Init() + testutil.Ok(t, e2e.StartAndWaitReady(i)) + + h := receive.HashringConfig{ + TenantMatcherType: "glob", + Tenants: []string{ + "default*", + }, + Endpoints: []receive.Endpoint{ + {Address: i.InternalEndpoint("grpc"), CapNProtoAddress: i.InternalEndpoint("capnp")}, + }, + } + + r := e2ethanos.NewReceiveBuilder(e, "router").UseCapnpReplication().WithRouting(1, h).Init() + testutil.Ok(t, e2e.StartAndWaitReady(r)) + + ts := time.Now() + + require.NoError(t, runutil.RetryWithLog(logkit.NewLogfmtLogger(os.Stdout), 1*time.Second, make(<-chan struct{}), func() error { + return storeWriteRequest(context.Background(), "http://"+r.Endpoint("remote-write")+"/api/v1/receive", &prompb.WriteRequest{ + Timeseries: []prompb.TimeSeries{ + { + Labels: []prompb.Label{ + {Name: model.MetricNameLabel, Value: "myself"}, + }, + Samples: []prompb.Sample{ + {Value: 1, Timestamp: timestamp.FromTime(ts)}, + }, + }, + }, + }) + })) + + testutil.Ok(t, i.WaitSumMetricsWithOptions(e2emon.Equals(0), []string{"prometheus_tsdb_blocks_loaded"}, e2emon.WithLabelMatchers(matchers.MustNewMatcher(matchers.MatchEqual, "tenant", "default-tenant")), e2emon.WaitMissingMetrics())) + + q := e2ethanos.NewQuerierBuilder(e, "1", i.InternalEndpoint("grpc")).Init() + testutil.Ok(t, e2e.StartAndWaitReady(q)) + + v := instantQuery(t, context.Background(), q.Endpoint("http"), func() string { return "myself" }, func() time.Time { return ts }, promclient.QueryOptions{ + Deduplicate: false, + }, 1) + + v[0].Timestamp = 0 + + require.Equal(t, model.Vector{ + { + Metric: model.Metric{ + model.MetricNameLabel: "myself", + "receive": "receive-ingestor", + "tenant_id": "default-tenant", + }, + Value: 1, + }, + }, v) + +}