diff --git a/pkg/promtail/client/batch.go b/pkg/promtail/client/batch.go index 16bb23ebbe925..16c5d684a36cd 100644 --- a/pkg/promtail/client/batch.go +++ b/pkg/promtail/client/batch.go @@ -1,10 +1,14 @@ package client import ( + "fmt" + "sort" + "strings" "time" "github.com/gogo/protobuf/proto" "github.com/golang/snappy" + "github.com/prometheus/common/model" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/promtail/api" @@ -40,7 +44,7 @@ func (b *batch) add(entry api.Entry) { b.bytes += len(entry.Line) // Append the entry to an already existing stream (if any) - labels := entry.Labels.String() + labels := labelsMapToString(entry.Labels, ReservedLabelTenantID) if stream, ok := b.streams[labels]; ok { stream.Entries = append(stream.Entries, entry.Entry) return @@ -53,6 +57,22 @@ func (b *batch) add(entry api.Entry) { } } +func labelsMapToString(ls model.LabelSet, without ...model.LabelName) string { + lstrs := make([]string, 0, len(ls)) +Outer: + for l, v := range ls { + for _, w := range without { + if l == w { + continue Outer + } + } + lstrs = append(lstrs, fmt.Sprintf("%s=%q", l, v)) + } + + sort.Strings(lstrs) + return fmt.Sprintf("{%s}", strings.Join(lstrs, ", ")) +} + // sizeBytes returns the current batch size in bytes func (b *batch) sizeBytes() int { return b.bytes diff --git a/pkg/promtail/client/client.go b/pkg/promtail/client/client.go index 2dbe134f1d8c2..a57cf8251fddd 100644 --- a/pkg/promtail/client/client.go +++ b/pkg/promtail/client/client.go @@ -405,7 +405,6 @@ func (c *client) processEntry(e api.Entry) (api.Entry, string) { e.Labels = c.externalLabels.Merge(e.Labels) } tenantID := c.getTenantID(e.Labels) - delete(e.Labels, ReservedLabelTenantID) return e, tenantID } diff --git a/pkg/promtail/client/multi_test.go b/pkg/promtail/client/multi_test.go index 9d4e0a82d0a07..0b35c9ac74dff 100644 --- a/pkg/promtail/client/multi_test.go +++ b/pkg/promtail/client/multi_test.go @@ -6,10 +6,13 @@ import ( "testing" "time" + "github.com/cortexproject/cortex/pkg/util" "github.com/cortexproject/cortex/pkg/util/flagext" util_log "github.com/cortexproject/cortex/pkg/util/log" + "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/promtail/api" @@ -115,3 +118,22 @@ func TestMultiClient_Handle(t *testing.T) { t.Fatal("missing handle call") } } + +func TestMultiClient_Handle_Race(t *testing.T) { + u := flagext.URLValue{} + require.NoError(t, u.Set("http://localhost")) + c1, err := New(nil, Config{URL: u, BackoffConfig: util.BackoffConfig{MaxRetries: 1}, Timeout: time.Microsecond}, log.NewNopLogger()) + require.NoError(t, err) + c2, err := New(nil, Config{URL: u, BackoffConfig: util.BackoffConfig{MaxRetries: 1}, Timeout: time.Microsecond}, log.NewNopLogger()) + require.NoError(t, err) + clients := []Client{c1, c2} + m := &MultiClient{ + clients: clients, + entries: make(chan api.Entry), + } + m.start() + + m.Chan() <- api.Entry{Labels: model.LabelSet{"foo": "bar", ReservedLabelTenantID: "1"}, Entry: logproto.Entry{Line: "foo"}} + + m.Stop() +}