diff --git a/src/query/server/query_test.go b/src/query/server/query_test.go index ab57f29b50..5a4eb6de97 100644 --- a/src/query/server/query_test.go +++ b/src/query/server/query_test.go @@ -237,18 +237,37 @@ tagOptions: promReq := test.GeneratePromWriteRequest() promReqBody := test.GeneratePromWriteRequestBody(t, promReq) - req, err := http.NewRequestWithContext( - context.TODO(), - http.MethodPost, - fmt.Sprintf("http://%s%s", addr, remote.PromWriteURL), - promReqBody, - ) - require.NoError(t, err) + requestURL := fmt.Sprintf("http://%s%s", addr, remote.PromWriteURL) + newRequest := func() *http.Request { + req, err := http.NewRequestWithContext( + context.TODO(), + http.MethodPost, + requestURL, + promReqBody, + ) + require.NoError(t, err) + return req + } - resp, err := http.DefaultClient.Do(req) - require.NoError(t, err) - require.NoError(t, resp.Body.Close()) - assert.NotNil(t, externalFakePromServer.GetLastWriteRequest()) + t.Run("write request", func(t *testing.T) { + defer externalFakePromServer.Reset() + resp, err := http.DefaultClient.Do(newRequest()) + require.NoError(t, err) + + assert.NotNil(t, externalFakePromServer.GetLastWriteRequest()) + require.NoError(t, resp.Body.Close()) + }) + + t.Run("bad request propagates", func(t *testing.T) { + defer externalFakePromServer.Reset() + externalFakePromServer.SetError("badRequest", http.StatusBadRequest) + + resp, err := http.DefaultClient.Do(newRequest()) + require.NoError(t, err) + + assert.Equal(t, 400, resp.StatusCode) + require.NoError(t, resp.Body.Close()) + }) } func TestGRPCBackend(t *testing.T) { diff --git a/src/query/storage/promremote/promremotetest/test_server.go b/src/query/storage/promremote/promremotetest/test_server.go index 913d24de36..a3570f873e 100644 --- a/src/query/storage/promremote/promremotetest/test_server.go +++ b/src/query/storage/promremote/promremotetest/test_server.go @@ -37,11 +37,16 @@ import ( type TestPromServer struct { mu sync.Mutex lastWriteRequest *prompb.WriteRequest - respErr error + respErr *respErr t *testing.T svr *httptest.Server } +type respErr struct { + error string + status int +} + // NewServer creates new instance of a fake server. func NewServer(t *testing.T) *TestPromServer { testPromServer := &TestPromServer{t: t} @@ -67,7 +72,7 @@ func (s *TestPromServer) handleWrite(w http.ResponseWriter, r *http.Request) { } s.lastWriteRequest = req if s.respErr != nil { - http.Error(w, s.respErr.Error(), http.StatusInternalServerError) + http.Error(w, s.respErr.error, s.respErr.status) return } } @@ -85,10 +90,10 @@ func (s *TestPromServer) WriteAddr() string { } // SetError sets error that will be returned for all incoming requests. -func (s *TestPromServer) SetError(err error) { +func (s *TestPromServer) SetError(body string, status int) { s.mu.Lock() defer s.mu.Unlock() - s.respErr = err + s.respErr = &respErr{error: body, status: status} } // Reset resets state to default. diff --git a/src/query/storage/promremote/storage.go b/src/query/storage/promremote/storage.go index b514596821..442ee34eea 100644 --- a/src/query/storage/promremote/storage.go +++ b/src/query/storage/promremote/storage.go @@ -164,8 +164,14 @@ func (p *promStorage) writeSingle( p.logger.Error("error reading body", zap.Error(err)) response = errorReadingBody } - return fmt.Errorf("expected status code 2XX: actual=%v, address=%v, resp=%s", - resp.StatusCode, address, response) + genericError := fmt.Errorf( + "expected status code 2XX: actual=%v, address=%v, resp=%s", + resp.StatusCode, address, response, + ) + if resp.StatusCode < 500 && resp.StatusCode != http.StatusTooManyRequests { + return xerrors.NewInvalidParamsError(genericError) + } + return genericError } metrics.ReportSuccess(methodDuration) return nil diff --git a/src/query/storage/promremote/storage_test.go b/src/query/storage/promremote/storage_test.go index 4fa312c760..2e6a71e4b6 100644 --- a/src/query/storage/promremote/storage_test.go +++ b/src/query/storage/promremote/storage_test.go @@ -22,9 +22,9 @@ package promremote import ( "context" - "errors" "io" "math/rand" + "net/http" "testing" "time" @@ -39,17 +39,20 @@ import ( "github.com/m3db/m3/src/query/storage/m3/storagemetadata" "github.com/m3db/m3/src/query/storage/promremote/promremotetest" "github.com/m3db/m3/src/query/ts" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/tallytest" xtime "github.com/m3db/m3/src/x/time" ) -var logger, _ = zap.NewDevelopment() +var ( + logger, _ = zap.NewDevelopment() + scope = tally.NewTestScope("test_scope", map[string]string{}) +) func TestWrite(t *testing.T) { fakeProm := promremotetest.NewServer(t) defer fakeProm.Close() - scope := tally.NewTestScope("test_scope", map[string]string{}) promStorage, err := NewStorage(Options{ endpoints: []EndpointOptions{{name: "testEndpoint", address: fakeProm.WriteAddr()}}, scope: scope, @@ -119,40 +122,37 @@ func TestWriteBasedOnRetention(t *testing.T) { promLongRetention2.Reset() } - scope := tally.NewTestScope("test_scope", map[string]string{}) + mediumRetentionAttr := storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Retention: 720 * time.Hour, + Resolution: 5 * time.Minute, + } + shortRetentionAttr := storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Retention: 120 * time.Hour, + Resolution: 15 * time.Second, + } + longRetentionAttr := storagemetadata.Attributes{ + Resolution: 10 * time.Minute, + Retention: 8760 * time.Hour, + } promStorage, err := NewStorage(Options{ endpoints: []EndpointOptions{ { - address: promShortRetention.WriteAddr(), - attributes: storagemetadata.Attributes{ - MetricsType: storagemetadata.AggregatedMetricsType, - Retention: 120 * time.Hour, - Resolution: 15 * time.Second, - }, + address: promShortRetention.WriteAddr(), + attributes: shortRetentionAttr, }, { - address: promMediumRetention.WriteAddr(), - attributes: storagemetadata.Attributes{ - MetricsType: storagemetadata.AggregatedMetricsType, - Retention: 720 * time.Hour, - Resolution: 5 * time.Minute, - }, + address: promMediumRetention.WriteAddr(), + attributes: mediumRetentionAttr, }, { - address: promLongRetention.WriteAddr(), - attributes: storagemetadata.Attributes{ - MetricsType: storagemetadata.AggregatedMetricsType, - Retention: 8760 * time.Hour, - Resolution: 10 * time.Minute, - }, + address: promLongRetention.WriteAddr(), + attributes: longRetentionAttr, }, { - address: promLongRetention2.WriteAddr(), - attributes: storagemetadata.Attributes{ - MetricsType: storagemetadata.AggregatedMetricsType, - Retention: 8760 * time.Hour, - Resolution: 10 * time.Minute, - }, + address: promLongRetention2.WriteAddr(), + attributes: longRetentionAttr, }, }, scope: scope, @@ -161,31 +161,9 @@ func TestWriteBasedOnRetention(t *testing.T) { require.NoError(t, err) defer closeWithCheck(t, promStorage) - sendWrite := func(attr storagemetadata.Attributes) error { - //nolint: gosec - datapoint := ts.Datapoint{Value: rand.Float64(), Timestamp: xtime.Now()} - wq, err := storage.NewWriteQuery(storage.WriteQueryOptions{ - Tags: models.Tags{ - Opts: models.NewTagOptions(), - Tags: []models.Tag{{ - Name: []byte("test_tag_name"), - Value: []byte("test_tag_value"), - }}, - }, - Datapoints: ts.Datapoints{datapoint}, - Unit: xtime.Millisecond, - Attributes: attr, - }) - require.NoError(t, err) - return promStorage.Write(context.TODO(), wq) - } - t.Run("send short retention write", func(t *testing.T) { reset() - err := sendWrite(storagemetadata.Attributes{ - Retention: 120 * time.Hour, - Resolution: 15 * time.Second, - }) + err := writeTestMetric(t, promStorage, shortRetentionAttr) require.NoError(t, err) assert.NotNil(t, promShortRetention.GetLastWriteRequest()) assert.Nil(t, promMediumRetention.GetLastWriteRequest()) @@ -194,10 +172,7 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("send medium retention write", func(t *testing.T) { reset() - err := sendWrite(storagemetadata.Attributes{ - Resolution: 5 * time.Minute, - Retention: 720 * time.Hour, - }) + err := writeTestMetric(t, promStorage, mediumRetentionAttr) require.NoError(t, err) assert.Nil(t, promShortRetention.GetLastWriteRequest()) assert.NotNil(t, promMediumRetention.GetLastWriteRequest()) @@ -206,10 +181,7 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("send write to multiple instances configured with same retention", func(t *testing.T) { reset() - err := sendWrite(storagemetadata.Attributes{ - Resolution: 10 * time.Minute, - Retention: 8760 * time.Hour, - }) + err := writeTestMetric(t, promStorage, longRetentionAttr) require.NoError(t, err) assert.Nil(t, promShortRetention.GetLastWriteRequest()) assert.Nil(t, promMediumRetention.GetLastWriteRequest()) @@ -219,14 +191,14 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("send unconfigured retention write", func(t *testing.T) { reset() - err := sendWrite(storagemetadata.Attributes{ - Resolution: 5*time.Minute + 1, - Retention: 720 * time.Hour, + err := writeTestMetric(t, promStorage, storagemetadata.Attributes{ + Resolution: mediumRetentionAttr.Resolution + 1, + Retention: mediumRetentionAttr.Retention, }) require.Error(t, err) - err = sendWrite(storagemetadata.Attributes{ - Resolution: 5 * time.Minute, - Retention: 720*time.Hour + 1, + err = writeTestMetric(t, promStorage, storagemetadata.Attributes{ + Resolution: mediumRetentionAttr.Resolution, + Retention: mediumRetentionAttr.Retention + 1, }) require.Error(t, err) assert.Contains(t, err.Error(), "write did not match any of known endpoints") @@ -239,17 +211,67 @@ func TestWriteBasedOnRetention(t *testing.T) { t.Run("error should not prevent sending to other instances", func(t *testing.T) { reset() - promLongRetention.SetError(errors.New("test err")) - err := sendWrite(storagemetadata.Attributes{ - Resolution: 10 * time.Minute, - Retention: 8760 * time.Hour, - }) + promLongRetention.SetError("test err", http.StatusInternalServerError) + err := writeTestMetric(t, promStorage, longRetentionAttr) require.Error(t, err) assert.Contains(t, err.Error(), "test err") assert.NotNil(t, promLongRetention2.GetLastWriteRequest()) }) } +func TestErrorHandling(t *testing.T) { + svr := promremotetest.NewServer(t) + defer svr.Close() + + attr := storagemetadata.Attributes{ + MetricsType: storagemetadata.AggregatedMetricsType, + Retention: 720 * time.Hour, + Resolution: 5 * time.Minute, + } + promStorage, err := NewStorage(Options{ + endpoints: []EndpointOptions{{address: svr.WriteAddr(), attributes: attr}}, + scope: scope, + logger: logger, + }) + require.NoError(t, err) + defer closeWithCheck(t, promStorage) + + t.Run("wrap non 5xx errors as invalid params error", func(t *testing.T) { + svr.Reset() + svr.SetError("test err", http.StatusForbidden) + err := writeTestMetric(t, promStorage, attr) + require.Error(t, err) + assert.True(t, xerrors.IsInvalidParams(err)) + }) + + t.Run("429 should not be wrapped as invalid params", func(t *testing.T) { + svr.Reset() + svr.SetError("test err", http.StatusTooManyRequests) + err := writeTestMetric(t, promStorage, attr) + require.Error(t, err) + assert.False(t, xerrors.IsInvalidParams(err)) + }) +} + func closeWithCheck(t *testing.T, c io.Closer) { require.NoError(t, c.Close()) } + +func writeTestMetric(t *testing.T, s storage.Storage, attr storagemetadata.Attributes) error { + //nolint: gosec + datapoint := ts.Datapoint{Value: rand.Float64(), Timestamp: xtime.Now()} + wq, err := storage.NewWriteQuery(storage.WriteQueryOptions{ + Tags: models.Tags{ + Opts: models.NewTagOptions(), + Tags: []models.Tag{{ + Name: []byte("test_tag_name"), + Value: []byte("test_tag_value"), + }}, + }, + Datapoints: ts.Datapoints{datapoint}, + Unit: xtime.Millisecond, + Attributes: attr, + }) + require.NoError(t, err) + return s.Write(context.TODO(), wq) +}