diff --git a/pkg/client/add_events_test.go b/pkg/client/add_events_test.go index deb6f8b..c5605e6 100644 --- a/pkg/client/add_events_test.go +++ b/pkg/client/add_events_test.go @@ -41,6 +41,8 @@ import ( const RetryBase = time.Second +var attempt = atomic.Int32{} + func extract(req *http.Request) (add_events.AddEventsRequest, error) { data, _ := io.ReadAll(req.Body) b := bytes.NewBuffer(data) @@ -55,7 +57,6 @@ func extract(req *http.Request) (add_events.AddEventsRequest, error) { } func TestAddEventsRetry(t *testing.T) { - attempt := atomic.Int32{} attempt.Store(0) wasSuccessful := atomic.Bool{} wasSuccessful.Store(false) @@ -87,19 +88,11 @@ func TestAddEventsRetry(t *testing.T) { })) defer server.Close() - config := &config.DataSetConfig{ - Endpoint: server.URL, - Tokens: config.DataSetTokens{WriteLog: "AAAA"}, - BufferSettings: buffer_config.DataSetBufferSettings{ - MaxSize: 20, - MaxLifetime: 0, - RetryRandomizationFactor: 1.0, - RetryMultiplier: 1.0, - RetryInitialInterval: RetryBase, - RetryMaxInterval: 10 * RetryBase, - RetryMaxElapsedTime: 10 * RetryBase, - }, - } + config := newDataSetConfig(server.URL, *newBufferSettings( + buffer_config.WithRetryMaxElapsedTime(10*RetryBase), + buffer_config.WithRetryInitialInterval(RetryBase), + buffer_config.WithRetryMaxInterval(RetryBase), + )) sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment())) require.Nil(t, err) @@ -119,7 +112,6 @@ func TestAddEventsRetry(t *testing.T) { } func TestAddEventsRetryAfterSec(t *testing.T) { - attempt := atomic.Int32{} attempt.Store(0) wasSuccessful := atomic.Bool{} wasSuccessful.Store(false) @@ -169,19 +161,11 @@ func TestAddEventsRetryAfterSec(t *testing.T) { })) defer server.Close() - config := &config.DataSetConfig{ - Endpoint: server.URL, - Tokens: config.DataSetTokens{WriteLog: "AAAA"}, - BufferSettings: buffer_config.DataSetBufferSettings{ - MaxSize: 20, - MaxLifetime: 0, - RetryRandomizationFactor: 1.0, - RetryMultiplier: 1.0, - RetryInitialInterval: RetryBase, - RetryMaxInterval: 10 * RetryBase, - RetryMaxElapsedTime: 10 * RetryBase, - }, - } + config := newDataSetConfig(server.URL, *newBufferSettings( + buffer_config.WithRetryMaxElapsedTime(10*RetryBase), + buffer_config.WithRetryInitialInterval(RetryBase), + buffer_config.WithRetryMaxInterval(RetryBase), + )) sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment())) require.Nil(t, err) @@ -226,7 +210,6 @@ func TestAddEventsRetryAfterSec(t *testing.T) { } func TestAddEventsRetryAfterTime(t *testing.T) { - attempt := atomic.Int32{} attempt.Store(0) wasSuccessful := atomic.Bool{} wasSuccessful.Store(false) @@ -264,19 +247,11 @@ func TestAddEventsRetryAfterTime(t *testing.T) { })) defer server.Close() - config := &config.DataSetConfig{ - Endpoint: server.URL, - Tokens: config.DataSetTokens{WriteLog: "AAAA"}, - BufferSettings: buffer_config.DataSetBufferSettings{ - MaxSize: 20, - MaxLifetime: 0, - RetryRandomizationFactor: 1.0, - RetryMultiplier: 1.0, - RetryInitialInterval: RetryBase, - RetryMaxInterval: RetryBase, - RetryMaxElapsedTime: 10 * RetryBase, - }, - } + config := newDataSetConfig(server.URL, *newBufferSettings( + buffer_config.WithRetryMaxElapsedTime(10*RetryBase), + buffer_config.WithRetryInitialInterval(RetryBase), + buffer_config.WithRetryMaxInterval(RetryBase), + )) sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment())) require.Nil(t, err) @@ -302,7 +277,6 @@ func TestAddEventsLargeEvent(t *testing.T) { originalAttrs[fmt.Sprintf("%d", i)] = strings.Repeat(fmt.Sprintf("%d", i), 1000000+v) } - attempt := atomic.Int32{} attempt.Store(0) wasSuccessful := atomic.Bool{} wasSuccessful.Store(false) @@ -362,19 +336,11 @@ func TestAddEventsLargeEvent(t *testing.T) { })) defer server.Close() - config := &config.DataSetConfig{ - Endpoint: server.URL, - Tokens: config.DataSetTokens{WriteLog: "AAAA"}, - BufferSettings: buffer_config.DataSetBufferSettings{ - MaxSize: 20, - MaxLifetime: 0, - RetryRandomizationFactor: 1.0, - RetryMultiplier: 1.0, - RetryInitialInterval: RetryBase, - RetryMaxInterval: RetryBase, - RetryMaxElapsedTime: 10 * RetryBase, - }, - } + config := newDataSetConfig(server.URL, *newBufferSettings( + buffer_config.WithRetryMaxElapsedTime(10*RetryBase), + buffer_config.WithRetryInitialInterval(RetryBase), + buffer_config.WithRetryMaxInterval(RetryBase), + )) sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment())) require.Nil(t, err) @@ -400,7 +366,6 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) { originalAttrs[fmt.Sprintf("%d", i)] = strings.Repeat("\"", 1000000+v) } - attempt := atomic.Int32{} attempt.Store(0) wasSuccessful := atomic.Bool{} wasSuccessful.Store(false) @@ -453,19 +418,11 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) { })) defer server.Close() - config := &config.DataSetConfig{ - Endpoint: server.URL, - Tokens: config.DataSetTokens{WriteLog: "AAAA"}, - BufferSettings: buffer_config.DataSetBufferSettings{ - MaxSize: 20, - MaxLifetime: 0, - RetryRandomizationFactor: 1.0, - RetryMultiplier: 1.0, - RetryInitialInterval: RetryBase, - RetryMaxInterval: RetryBase, - RetryMaxElapsedTime: 10 * RetryBase, - }, - } + config := newDataSetConfig(server.URL, *newBufferSettings( + buffer_config.WithRetryMaxElapsedTime(10*RetryBase), + buffer_config.WithRetryInitialInterval(RetryBase), + buffer_config.WithRetryMaxInterval(RetryBase), + )) sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment())) require.Nil(t, err) @@ -485,19 +442,11 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) { } func TestAddEventsRejectAfterFinish(t *testing.T) { - config := &config.DataSetConfig{ - Endpoint: "https://example.com", - Tokens: config.DataSetTokens{WriteLog: "AAAA"}, - BufferSettings: buffer_config.DataSetBufferSettings{ - MaxSize: 20, - MaxLifetime: 0, - RetryRandomizationFactor: 1.0, - RetryMultiplier: 1.0, - RetryInitialInterval: RetryBase, - RetryMaxInterval: RetryBase, - RetryMaxElapsedTime: 10 * RetryBase, - }, - } + config := newDataSetConfig("https://example.com", *newBufferSettings( + buffer_config.WithRetryMaxElapsedTime(10*RetryBase), + buffer_config.WithRetryInitialInterval(RetryBase), + buffer_config.WithRetryMaxInterval(RetryBase), + )) sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment())) require.Nil(t, err) err = sc.Shutdown() @@ -511,7 +460,6 @@ func TestAddEventsRejectAfterFinish(t *testing.T) { } func TestAddEventsWithBufferSweeper(t *testing.T) { - attempt := atomic.Int32{} attempt.Store(0) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { attempt.Add(1) @@ -572,36 +520,13 @@ func TestAddEventsWithBufferSweeper(t *testing.T) { } func TestAddEventsDoNotRetryForever(t *testing.T) { - attempt := atomic.Int32{} attempt.Store(0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - attempt.Add(1) - - w.WriteHeader(503) - payload, err := json.Marshal(map[string]interface{}{ - "status": "success", - "bytesCharged": 42, - }) - assert.NoError(t, err) - l, err := w.Write(payload) - assert.Greater(t, l, 1) - assert.NoError(t, err) - })) + server := mockServerDefaultPayload(t, 503) defer server.Close() - config := &config.DataSetConfig{ - Endpoint: server.URL, - Tokens: config.DataSetTokens{WriteLog: "AAAA"}, - BufferSettings: buffer_config.DataSetBufferSettings{ - MaxSize: 20, - MaxLifetime: 0, - RetryInitialInterval: time.Second, - RetryMaxInterval: time.Second, - RetryMaxElapsedTime: 5 * time.Second, - RetryMultiplier: 1.0, - RetryRandomizationFactor: 1.0, - }, - } + config := newDataSetConfig(server.URL, *newBufferSettings( + buffer_config.WithRetryMaxElapsedTime(time.Duration(5) * time.Second), + )) sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment())) require.Nil(t, err) @@ -619,32 +544,12 @@ func TestAddEventsDoNotRetryForever(t *testing.T) { } func TestAddEventsLogResponseBodyOnInvalidJson(t *testing.T) { - attempt := atomic.Int32{} attempt.Store(0) - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { - attempt.Add(1) - - w.WriteHeader(503) - payload := []byte("not valid json") - l, err := w.Write(payload) - assert.Greater(t, l, 1) - assert.NoError(t, err) - })) + server := mockServer(t, 503, []byte("not valid json")) defer server.Close() - - config := &config.DataSetConfig{ - Endpoint: server.URL, - Tokens: config.DataSetTokens{WriteLog: "AAAA"}, - BufferSettings: buffer_config.DataSetBufferSettings{ - MaxSize: 20, - MaxLifetime: 0, - RetryInitialInterval: time.Second, - RetryMaxInterval: time.Second, - RetryMaxElapsedTime: 3 * time.Second, - RetryMultiplier: 1.0, - RetryRandomizationFactor: 1.0, - }, - } + config := newDataSetConfig(server.URL, *newBufferSettings( + buffer_config.WithRetryMaxElapsedTime(time.Duration(3) * time.Second), + )) sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment())) require.Nil(t, err) @@ -665,3 +570,107 @@ func TestAddEventsLogResponseBodyOnInvalidJson(t *testing.T) { assert.Errorf(t, err, "some buffers were dropped during finishing - 1") assert.GreaterOrEqual(t, attempt.Load(), int32(0)) } + +func TestAddEventsAreNotRejectedOncePreviousReqRetriesMaxLifetimeExpired(t *testing.T) { + // GIVEN + maxElapsedTime := 10 + lastEventRetriesExpiration := maxElapsedTime + 1 + attempt.Store(0) + server := mockServerDefaultPayload(t, http.StatusOK) + defer server.Close() + dataSetConfig := newDataSetConfig(server.URL, *newBufferSettings( + buffer_config.WithMaxLifetime(time.Second), + buffer_config.WithRetryMaxElapsedTime(time.Duration(maxElapsedTime)*time.Second), + buffer_config.WithRetryRandomizationFactor(0.000000001), + )) + client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment())) + require.Nil(t, err) + + sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} + client.SessionInfo = sessionInfo + event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}} + eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} + + // GIVEN mock previous event request error + client.setLastErrorTimestamp(time.Now().Add(-time.Duration(lastEventRetriesExpiration) * time.Second)) + client.setLastError(fmt.Errorf("failed to handle previous request")) + client.LastHttpStatus.Store(http.StatusTooManyRequests) + + // WHEN + err = client.AddEvents([]*add_events.EventBundle{eventBundle1}) + // THEN event is not rejected + assert.Nil(t, err) +} + +func TestAddEventsAreRejectedOncePreviousReqRetriesMaxLifetimeNotExpired(t *testing.T) { + // GIVEN + maxElapsedTime := 10 + lastEventRetriesExpiration := maxElapsedTime - 1 + attempt.Store(0) + server := mockServerDefaultPayload(t, http.StatusOK) + defer server.Close() + dataSetConfig := newDataSetConfig(server.URL, *newBufferSettings( + buffer_config.WithMaxLifetime(time.Second), + buffer_config.WithRetryMaxElapsedTime(time.Duration(maxElapsedTime)*time.Second), + buffer_config.WithRetryRandomizationFactor(0.000000001), + )) + client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment())) + require.Nil(t, err) + + sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} + client.SessionInfo = sessionInfo + event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}} + eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} + + // GIVEN mock previous event request error + client.setLastErrorTimestamp(time.Now().Add(-time.Duration(lastEventRetriesExpiration) * time.Second)) + client.setLastError(fmt.Errorf("failed to handle previous request")) + client.LastHttpStatus.Store(http.StatusTooManyRequests) + + // WHEN + err = client.AddEvents([]*add_events.EventBundle{eventBundle1}) + // THEN event is rejected + assert.NotNil(t, err) + assert.Errorf(t, err, "AddEvents - reject batch: rejecting - Last HTTP request contains an error: failed to handle previous request") +} + +func mockServerDefaultPayload(t *testing.T, statusCode int) *httptest.Server { + payload, _ := json.Marshal(map[string]interface{}{ + "status": "success", + "bytesCharged": 42, + }) + return mockServer(t, statusCode, payload) +} + +func mockServer(t *testing.T, statusCode int, payload []byte) *httptest.Server { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + attempt.Add(1) + w.WriteHeader(statusCode) + l, err := w.Write(payload) + assert.Greater(t, l, 1) + assert.NoError(t, err) + })) + return server +} + +func newDataSetConfig(url string, settings buffer_config.DataSetBufferSettings) *config.DataSetConfig { + return &config.DataSetConfig{ + Endpoint: url, + Tokens: config.DataSetTokens{WriteLog: "AAAA"}, + BufferSettings: settings, + } +} + +func newBufferSettings(customOpts ...buffer_config.DataSetBufferSettingsOption) *buffer_config.DataSetBufferSettings { + defaultOpts := []buffer_config.DataSetBufferSettingsOption{ + buffer_config.WithMaxSize(20), + buffer_config.WithMaxLifetime(0), + buffer_config.WithRetryInitialInterval(time.Second), + buffer_config.WithRetryMaxInterval(time.Second), + buffer_config.WithRetryMaxElapsedTime(time.Duration(1) * time.Second), + buffer_config.WithRetryMultiplier(1.0), + buffer_config.WithRetryRandomizationFactor(1.0), + } + bufferSetting, _ := buffer_config.New(append(defaultOpts, customOpts...)...) + return bufferSetting +} diff --git a/pkg/client/client.go b/pkg/client/client.go index aa8052e..fa6a01d 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -72,6 +72,7 @@ type DataSetClient struct { BufferPerSessionTopic *pubsub.PubSub LastHttpStatus atomic.Uint32 lastError error + lastErrorTs time.Time lastErrorMu sync.RWMutex retryAfter time.Time retryAfterMu sync.RWMutex @@ -270,15 +271,15 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) { lastHttpStatus := uint32(0) if err != nil { client.Logger.Error("unable to send addEvents buffers", zap.Error(err)) - if strings.Contains(err.Error(), "Unable to send request") { - lastHttpStatus = HttpErrorCannotConnect - client.LastHttpStatus.Store(lastHttpStatus) - } else { + client.setLastErrorTimestamp(time.Now()) + if !strings.Contains(err.Error(), "Unable to send request") { lastHttpStatus = HttpErrorHasErrorMessage client.LastHttpStatus.Store(lastHttpStatus) client.onBufferDrop(buf, lastHttpStatus, err) - break + break // exit loop (failed to send buffer) } + lastHttpStatus = HttpErrorCannotConnect + client.LastHttpStatus.Store(lastHttpStatus) } zaps := make([]zap.Field, 0) if response.ResponseObj != nil { @@ -303,7 +304,7 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) { if isOkStatus(lastHttpStatus) { // everything was fine, there is no need for retries client.bytesAPIAccepted.Add(uint64(payloadLen)) - break + break // exit loop (buffer sent) } backoffDelay := expBackoff.NextBackOff() @@ -312,7 +313,7 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) { // throw away the batch err = fmt.Errorf("max elapsed time expired %w", err) client.onBufferDrop(buf, lastHttpStatus, err) - break + break // exit loop (failed to send buffer) } if isRetryableStatus(lastHttpStatus) { @@ -334,7 +335,7 @@ func (client *DataSetClient) sendBufferWithRetryPolicy(buf *buffer.Buffer) { } else { err = fmt.Errorf("non recoverable error %w", err) client.onBufferDrop(buf, lastHttpStatus, err) - break + break // exit loop (failed to send buffer) } retryNum++ } @@ -495,8 +496,8 @@ func (client *DataSetClient) publishBuffer(buf *buffer.Buffer) { // Exporter rejects handling of incoming batches if is in error state func (client *DataSetClient) isInErrorState() (bool, error) { - // In case one of session failed (with retryable status) to send request (batch of event to DataSet), client retries sending this request. - if isRetryableStatus(client.LastHttpStatus.Load()) { + // In case one of session failed (with retryable status) to send request (batch of event to DataSet), client retries sending this request. Unless retry attempts timed out + if isRetryableStatus(client.LastHttpStatus.Load()) && !client.isLastRetryableErrorTimedOut() { err := client.LastError() if err != nil { return true, fmt.Errorf("rejecting - Last HTTP request contains an error: %w", err) @@ -593,3 +594,19 @@ func (client *DataSetClient) onBufferDrop(buf *buffer.Buffer, status uint32, err )..., ) } + +func (client *DataSetClient) setLastErrorTimestamp(timestamp time.Time) { + client.lastErrorMu.Lock() + defer client.lastErrorMu.Unlock() + client.lastErrorTs = timestamp +} + +func (client *DataSetClient) lastErrorTimestamp() time.Time { + client.retryAfterMu.RLock() + defer client.retryAfterMu.RUnlock() + return client.lastErrorTs +} + +func (client *DataSetClient) isLastRetryableErrorTimedOut() bool { + return client.lastErrorTimestamp().Add(client.Config.BufferSettings.RetryMaxElapsedTime).Before(time.Now()) +}