Skip to content

Commit

Permalink
promtail: Inject tenant ID when receiving X-Scope-OrgID in heroku tar…
Browse files Browse the repository at this point in the history
…get (#6695)

* inject tenant ID header

* fix import order
  • Loading branch information
thepalbi authored Jul 18, 2022
1 parent 934d56f commit d4cdc37
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 13 deletions.
10 changes: 9 additions & 1 deletion clients/pkg/promtail/targets/heroku/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/weaveworks/common/server"

"github.com/grafana/loki/clients/pkg/promtail/api"
lokiClient "github.com/grafana/loki/clients/pkg/promtail/client"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/clients/pkg/promtail/targets/target"

Expand Down Expand Up @@ -130,12 +131,19 @@ func (h *Target) drain(w http.ResponseWriter, r *http.Request) {
ts = message.Timestamp
}

// If the incoming request carries the tenant id, inject it as the reserved label so it's used by the
// remote write client.
tenantIDHeaderValue := r.Header.Get("X-Scope-OrgID")
if tenantIDHeaderValue != "" {
lb.Set(lokiClient.ReservedLabelTenantID, tenantIDHeaderValue)
}

processed := relabel.Process(lb.Labels(), h.relabelConfigs...)

// Start with the set of labels fixed in the configuration
filtered := h.Labels().Clone()
for _, lbl := range processed {
if strings.HasPrefix(lbl.Name, "__") {
if strings.HasPrefix(lbl.Name, "__") && lbl.Name != lokiClient.ReservedLabelTenantID {
continue
}
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
Expand Down
65 changes: 53 additions & 12 deletions clients/pkg/promtail/targets/heroku/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/server"

lokiClient "github.com/grafana/loki/clients/pkg/promtail/client"
"github.com/grafana/loki/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
)
Expand Down Expand Up @@ -184,12 +185,7 @@ func TestHerokuDrainTarget(t *testing.T) {
require.NoError(t, err)
require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")

// Wait for them to appear in the test handler
countdown := 1000
for len(eh.Received()) != 1 && countdown > 0 {
time.Sleep(1 * time.Millisecond)
countdown--
}
waitForMessages(eh)

// Make sure we didn't timeout
require.Equal(t, len(tc.args.RequestBodies), len(eh.Received()))
Expand Down Expand Up @@ -247,12 +243,7 @@ func TestHerokuDrainTarget_UseIncomingTimestamp(t *testing.T) {
require.NoError(t, err)
require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")

// Wait for them to appear in the test handler
countdown := 1000
for len(eh.Received()) != 1 && countdown > 0 {
time.Sleep(1 * time.Millisecond)
countdown--
}
waitForMessages(eh)

// Make sure we didn't timeout
require.Equal(t, 1, len(eh.Received()))
Expand Down Expand Up @@ -288,6 +279,56 @@ func TestHerokuDrainTarget_ErrorOnNotPrometheusCompatibleJobName(t *testing.T) {
}
}

func TestHerokuDrainTarget_UseTenantIDHeaderIfPresent(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)

// Create fake promtail client
eh := fake.New(func() {})
defer eh.Stop()

serverConfig, port, err := getServerConfigWithAvailablePort()
require.NoError(t, err, "error generating server config or finding open port")
config := &scrapeconfig.HerokuDrainTargetConfig{
Server: serverConfig,
Labels: nil,
UseIncomingTimestamp: true,
}

prometheus.DefaultRegisterer = prometheus.NewRegistry()
metrics := NewMetrics(prometheus.DefaultRegisterer)
pt, err := NewTarget(metrics, logger, eh, "test_job", config, nil)
require.NoError(t, err)
defer func() {
_ = pt.Stop()
}()

// Clear received lines after test case is ran
defer eh.Clear()

req, err := makeDrainRequest(fmt.Sprintf("http://%s:%d", localhost, port), testLogLine1)
require.NoError(t, err, "expected test drain request to be successfully created")
req.Header.Set("X-Scope-OrgID", "42")
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, http.StatusNoContent, res.StatusCode, "expected no-content status code")

waitForMessages(eh)

// Make sure we didn't timeout
require.Equal(t, 1, len(eh.Received()))

require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels[lokiClient.ReservedLabelTenantID])
}

func waitForMessages(eh *fake.Client) {
countdown := 1000
for len(eh.Received()) != 1 && countdown > 0 {
time.Sleep(1 * time.Millisecond)
countdown--
}
}

func getServerConfigWithAvailablePort() (cfg server.Config, port int, err error) {
// Get a randomly available port by open and closing a TCP socket
addr, err := net.ResolveTCPAddr("tcp", localhost+":0")
Expand Down

0 comments on commit d4cdc37

Please sign in to comment.