diff --git a/pkg/ingester/errors.go b/pkg/ingester/errors.go index 6cc00b978e..8d822240ba 100644 --- a/pkg/ingester/errors.go +++ b/pkg/ingester/errors.go @@ -4,6 +4,7 @@ import ( "fmt" "net/http" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/httpgrpc" ) @@ -49,6 +50,11 @@ func makeMetricLimitError(errorType string, labels labels.Labels, err error) err } } +func (e *validationError) WrapWithUser(userID string) *validationError { + e.err = wrapWithUser(e.err, userID) + return e +} + func (e *validationError) Error() string { if e.err == nil { return e.errorType @@ -66,3 +72,7 @@ func (e *validationError) WrappedError() error { Body: []byte(e.Error()), }) } + +func wrapWithUser(err error, userID string) error { + return errors.Wrapf(err, "user=%s", userID) +} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 848738b5dc..ffa80ced26 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -308,13 +308,13 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client. continue } - return nil, err + return nil, wrapWithUser(err, userID) } } client.ReuseSlice(req.Timeseries) if lastPartialErr != nil { - return &client.WriteResponse{}, lastPartialErr.WrappedError() + return &client.WriteResponse{}, lastPartialErr.WrapWithUser(userID).WrappedError() } return &client.WriteResponse{}, nil } diff --git a/pkg/ingester/ingester_v2.go b/pkg/ingester/ingester_v2.go index edcf15ba8b..5c27461ad2 100644 --- a/pkg/ingester/ingester_v2.go +++ b/pkg/ingester/ingester_v2.go @@ -89,7 +89,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien db, err := i.getOrCreateTSDB(userID, false) if err != nil { - return nil, err + return nil, wrapWithUser(err, userID) } // Ensure the ingester shutdown procedure hasn't started @@ -141,14 +141,14 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien // The error looks an issue on our side, so we should rollback if rollbackErr := app.Rollback(); rollbackErr != nil { - level.Warn(util.Logger).Log("msg", "failed to rollback on error", "userID", userID, "err", rollbackErr) + level.Warn(util.Logger).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr) } - return nil, err + return nil, wrapWithUser(err, userID) } } if err := app.Commit(); err != nil { - return nil, err + return nil, wrapWithUser(err, userID) } // Increment metrics only if the samples have been successfully committed. @@ -160,7 +160,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien client.ReuseSlice(req.Timeseries) if lastPartialErr != nil { - return &client.WriteResponse{}, httpgrpc.Errorf(http.StatusBadRequest, lastPartialErr.Error()) + return &client.WriteResponse{}, httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(lastPartialErr, userID).Error()) } return &client.WriteResponse{}, nil } diff --git a/pkg/ingester/ingester_v2_test.go b/pkg/ingester/ingester_v2_test.go index 1c04a5811f..9191442eb7 100644 --- a/pkg/ingester/ingester_v2_test.go +++ b/pkg/ingester/ingester_v2_test.go @@ -29,6 +29,7 @@ import ( func TestIngester_v2Push(t *testing.T) { metricLabelAdapters := []client.LabelAdapter{{Name: labels.MetricName, Value: "test"}} metricLabels := client.FromLabelAdaptersToLabels(metricLabelAdapters) + userID := "test" tests := map[string]struct { reqs []*client.WriteRequest @@ -71,7 +72,7 @@ func TestIngester_v2Push(t *testing.T) { []client.Sample{{Value: 1, TimestampMs: 9}}, client.API), }, - expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tsdb.ErrOutOfOrderSample.Error()), + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(tsdb.ErrOutOfOrderSample, userID).Error()), expectedIngested: []client.TimeSeries{ {Labels: metricLabelAdapters, Samples: []client.Sample{{Value: 2, TimestampMs: 10}}}, }, @@ -95,7 +96,7 @@ func TestIngester_v2Push(t *testing.T) { []client.Sample{{Value: 1, TimestampMs: 1575043969 - (86400 * 1000)}}, client.API), }, - expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tsdb.ErrOutOfBounds.Error()), + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(tsdb.ErrOutOfBounds, userID).Error()), expectedIngested: []client.TimeSeries{ {Labels: metricLabelAdapters, Samples: []client.Sample{{Value: 2, TimestampMs: 1575043969}}}, }, @@ -119,7 +120,7 @@ func TestIngester_v2Push(t *testing.T) { []client.Sample{{Value: 1, TimestampMs: 1575043969}}, client.API), }, - expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tsdb.ErrAmendSample.Error()), + expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(tsdb.ErrAmendSample, userID).Error()), expectedIngested: []client.TimeSeries{ {Labels: metricLabelAdapters, Samples: []client.Sample{{Value: 2, TimestampMs: 1575043969}}}, }, @@ -147,7 +148,7 @@ func TestIngester_v2Push(t *testing.T) { defer i.Shutdown() defer cleanup() - ctx := user.InjectOrgID(context.Background(), "test") + ctx := user.InjectOrgID(context.Background(), userID) // Wait until the ingester is ACTIVE test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} { @@ -470,7 +471,7 @@ func TestIngester_v2Push_ShouldNotCreateTSDBIfNotInActiveState(t *testing.T) { req := &client.WriteRequest{} res, err := i.v2Push(ctx, req) - assert.Equal(t, fmt.Errorf(errTSDBCreateIncompatibleState, "PENDING"), err) + assert.Equal(t, wrapWithUser(fmt.Errorf(errTSDBCreateIncompatibleState, "PENDING"), userID).Error(), err.Error()) assert.Nil(t, res) // Check if the TSDB has been created