Skip to content

Commit

Permalink
Added user ID to ingester push soft/hard errors (#1960)
Browse files Browse the repository at this point in the history
Signed-off-by: Marco Pracucci <marco@pracucci.com>
  • Loading branch information
pracucci authored and gouthamve committed Jan 7, 2020
1 parent 47df95a commit f01fd63
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 12 deletions.
10 changes: 10 additions & 0 deletions pkg/ingester/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net/http"

"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"
)
Expand Down Expand Up @@ -49,6 +50,11 @@ func makeMetricLimitError(errorType string, labels labels.Labels, err error) err
}
}

func (e *validationError) WrapWithUser(userID string) *validationError {
e.err = wrapWithUser(e.err, userID)
return e
}

func (e *validationError) Error() string {
if e.err == nil {
return e.errorType
Expand All @@ -66,3 +72,7 @@ func (e *validationError) WrappedError() error {
Body: []byte(e.Error()),
})
}

func wrapWithUser(err error, userID string) error {
return errors.Wrapf(err, "user=%s", userID)
}
4 changes: 2 additions & 2 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,13 @@ func (i *Ingester) Push(ctx old_ctx.Context, req *client.WriteRequest) (*client.
continue
}

return nil, err
return nil, wrapWithUser(err, userID)
}
}
client.ReuseSlice(req.Timeseries)

if lastPartialErr != nil {
return &client.WriteResponse{}, lastPartialErr.WrappedError()
return &client.WriteResponse{}, lastPartialErr.WrapWithUser(userID).WrappedError()
}
return &client.WriteResponse{}, nil
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/ingester/ingester_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien

db, err := i.getOrCreateTSDB(userID, false)
if err != nil {
return nil, err
return nil, wrapWithUser(err, userID)
}

// Ensure the ingester shutdown procedure hasn't started
Expand Down Expand Up @@ -141,14 +141,14 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien

// The error looks an issue on our side, so we should rollback
if rollbackErr := app.Rollback(); rollbackErr != nil {
level.Warn(util.Logger).Log("msg", "failed to rollback on error", "userID", userID, "err", rollbackErr)
level.Warn(util.Logger).Log("msg", "failed to rollback on error", "user", userID, "err", rollbackErr)
}

return nil, err
return nil, wrapWithUser(err, userID)
}
}
if err := app.Commit(); err != nil {
return nil, err
return nil, wrapWithUser(err, userID)
}

// Increment metrics only if the samples have been successfully committed.
Expand All @@ -160,7 +160,7 @@ func (i *Ingester) v2Push(ctx old_ctx.Context, req *client.WriteRequest) (*clien
client.ReuseSlice(req.Timeseries)

if lastPartialErr != nil {
return &client.WriteResponse{}, httpgrpc.Errorf(http.StatusBadRequest, lastPartialErr.Error())
return &client.WriteResponse{}, httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(lastPartialErr, userID).Error())
}
return &client.WriteResponse{}, nil
}
Expand Down
11 changes: 6 additions & 5 deletions pkg/ingester/ingester_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
func TestIngester_v2Push(t *testing.T) {
metricLabelAdapters := []client.LabelAdapter{{Name: labels.MetricName, Value: "test"}}
metricLabels := client.FromLabelAdaptersToLabels(metricLabelAdapters)
userID := "test"

tests := map[string]struct {
reqs []*client.WriteRequest
Expand Down Expand Up @@ -71,7 +72,7 @@ func TestIngester_v2Push(t *testing.T) {
[]client.Sample{{Value: 1, TimestampMs: 9}},
client.API),
},
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tsdb.ErrOutOfOrderSample.Error()),
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(tsdb.ErrOutOfOrderSample, userID).Error()),
expectedIngested: []client.TimeSeries{
{Labels: metricLabelAdapters, Samples: []client.Sample{{Value: 2, TimestampMs: 10}}},
},
Expand All @@ -95,7 +96,7 @@ func TestIngester_v2Push(t *testing.T) {
[]client.Sample{{Value: 1, TimestampMs: 1575043969 - (86400 * 1000)}},
client.API),
},
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tsdb.ErrOutOfBounds.Error()),
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(tsdb.ErrOutOfBounds, userID).Error()),
expectedIngested: []client.TimeSeries{
{Labels: metricLabelAdapters, Samples: []client.Sample{{Value: 2, TimestampMs: 1575043969}}},
},
Expand All @@ -119,7 +120,7 @@ func TestIngester_v2Push(t *testing.T) {
[]client.Sample{{Value: 1, TimestampMs: 1575043969}},
client.API),
},
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, tsdb.ErrAmendSample.Error()),
expectedErr: httpgrpc.Errorf(http.StatusBadRequest, wrapWithUser(tsdb.ErrAmendSample, userID).Error()),
expectedIngested: []client.TimeSeries{
{Labels: metricLabelAdapters, Samples: []client.Sample{{Value: 2, TimestampMs: 1575043969}}},
},
Expand Down Expand Up @@ -147,7 +148,7 @@ func TestIngester_v2Push(t *testing.T) {
defer i.Shutdown()
defer cleanup()

ctx := user.InjectOrgID(context.Background(), "test")
ctx := user.InjectOrgID(context.Background(), userID)

// Wait until the ingester is ACTIVE
test.Poll(t, 100*time.Millisecond, ring.ACTIVE, func() interface{} {
Expand Down Expand Up @@ -470,7 +471,7 @@ func TestIngester_v2Push_ShouldNotCreateTSDBIfNotInActiveState(t *testing.T) {
req := &client.WriteRequest{}

res, err := i.v2Push(ctx, req)
assert.Equal(t, fmt.Errorf(errTSDBCreateIncompatibleState, "PENDING"), err)
assert.Equal(t, wrapWithUser(fmt.Errorf(errTSDBCreateIncompatibleState, "PENDING"), userID).Error(), err.Error())
assert.Nil(t, res)

// Check if the TSDB has been created
Expand Down

0 comments on commit f01fd63

Please sign in to comment.