Skip to content

Commit

Permalink
DSET-4080 fix recovery from error state (#36)
Browse files Browse the repository at this point in the history
* DSET-4080 refactor tests

* DSET-4080 docs add logging

* DSET-4080 refactor - guard clause

* DSET-4080 refactor tests

* DSET-4080 fix recovery from error state

Problem
- when DataSet returns Retryable error we try to retry with timeout. Once this time is over we stay in error mode since last error is never overriden, and new event handling is rejected.

Solution
- keep timestamp of last error and consider it while evaluate last error timestamp in order to recover from error mode.
  • Loading branch information
zdaratom-s1 authored Jun 21, 2023
1 parent 1295b6f commit dab2a50
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 145 deletions.
279 changes: 144 additions & 135 deletions pkg/client/add_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (

const RetryBase = time.Second

var attempt = atomic.Int32{}

func extract(req *http.Request) (add_events.AddEventsRequest, error) {
data, _ := io.ReadAll(req.Body)
b := bytes.NewBuffer(data)
Expand All @@ -55,7 +57,6 @@ func extract(req *http.Request) (add_events.AddEventsRequest, error) {
}

func TestAddEventsRetry(t *testing.T) {
attempt := atomic.Int32{}
attempt.Store(0)
wasSuccessful := atomic.Bool{}
wasSuccessful.Store(false)
Expand Down Expand Up @@ -87,19 +88,11 @@ func TestAddEventsRetry(t *testing.T) {
}))
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: 10 * RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(10*RetryBase),
buffer_config.WithRetryInitialInterval(RetryBase),
buffer_config.WithRetryMaxInterval(RetryBase),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand All @@ -119,7 +112,6 @@ func TestAddEventsRetry(t *testing.T) {
}

func TestAddEventsRetryAfterSec(t *testing.T) {
attempt := atomic.Int32{}
attempt.Store(0)
wasSuccessful := atomic.Bool{}
wasSuccessful.Store(false)
Expand Down Expand Up @@ -169,19 +161,11 @@ func TestAddEventsRetryAfterSec(t *testing.T) {
}))
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: 10 * RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(10*RetryBase),
buffer_config.WithRetryInitialInterval(RetryBase),
buffer_config.WithRetryMaxInterval(RetryBase),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand Down Expand Up @@ -226,7 +210,6 @@ func TestAddEventsRetryAfterSec(t *testing.T) {
}

func TestAddEventsRetryAfterTime(t *testing.T) {
attempt := atomic.Int32{}
attempt.Store(0)
wasSuccessful := atomic.Bool{}
wasSuccessful.Store(false)
Expand Down Expand Up @@ -264,19 +247,11 @@ func TestAddEventsRetryAfterTime(t *testing.T) {
}))
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(10*RetryBase),
buffer_config.WithRetryInitialInterval(RetryBase),
buffer_config.WithRetryMaxInterval(RetryBase),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand All @@ -302,7 +277,6 @@ func TestAddEventsLargeEvent(t *testing.T) {
originalAttrs[fmt.Sprintf("%d", i)] = strings.Repeat(fmt.Sprintf("%d", i), 1000000+v)
}

attempt := atomic.Int32{}
attempt.Store(0)
wasSuccessful := atomic.Bool{}
wasSuccessful.Store(false)
Expand Down Expand Up @@ -362,19 +336,11 @@ func TestAddEventsLargeEvent(t *testing.T) {
}))
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(10*RetryBase),
buffer_config.WithRetryInitialInterval(RetryBase),
buffer_config.WithRetryMaxInterval(RetryBase),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand All @@ -400,7 +366,6 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) {
originalAttrs[fmt.Sprintf("%d", i)] = strings.Repeat("\"", 1000000+v)
}

attempt := atomic.Int32{}
attempt.Store(0)
wasSuccessful := atomic.Bool{}
wasSuccessful.Store(false)
Expand Down Expand Up @@ -453,19 +418,11 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) {
}))
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(10*RetryBase),
buffer_config.WithRetryInitialInterval(RetryBase),
buffer_config.WithRetryMaxInterval(RetryBase),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand All @@ -485,19 +442,11 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) {
}

func TestAddEventsRejectAfterFinish(t *testing.T) {
config := &config.DataSetConfig{
Endpoint: "https://example.com",
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
},
}
config := newDataSetConfig("https://example.com", *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(10*RetryBase),
buffer_config.WithRetryInitialInterval(RetryBase),
buffer_config.WithRetryMaxInterval(RetryBase),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)
err = sc.Shutdown()
Expand All @@ -511,7 +460,6 @@ func TestAddEventsRejectAfterFinish(t *testing.T) {
}

func TestAddEventsWithBufferSweeper(t *testing.T) {
attempt := atomic.Int32{}
attempt.Store(0)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
attempt.Add(1)
Expand Down Expand Up @@ -572,36 +520,13 @@ func TestAddEventsWithBufferSweeper(t *testing.T) {
}

func TestAddEventsDoNotRetryForever(t *testing.T) {
attempt := atomic.Int32{}
attempt.Store(0)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
attempt.Add(1)

w.WriteHeader(503)
payload, err := json.Marshal(map[string]interface{}{
"status": "success",
"bytesCharged": 42,
})
assert.NoError(t, err)
l, err := w.Write(payload)
assert.Greater(t, l, 1)
assert.NoError(t, err)
}))
server := mockServerDefaultPayload(t, 503)
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryInitialInterval: time.Second,
RetryMaxInterval: time.Second,
RetryMaxElapsedTime: 5 * time.Second,
RetryMultiplier: 1.0,
RetryRandomizationFactor: 1.0,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(time.Duration(5) * time.Second),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand All @@ -619,32 +544,12 @@ func TestAddEventsDoNotRetryForever(t *testing.T) {
}

func TestAddEventsLogResponseBodyOnInvalidJson(t *testing.T) {
attempt := atomic.Int32{}
attempt.Store(0)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
attempt.Add(1)

w.WriteHeader(503)
payload := []byte("<html>not valid json</html>")
l, err := w.Write(payload)
assert.Greater(t, l, 1)
assert.NoError(t, err)
}))
server := mockServer(t, 503, []byte("<html>not valid json</html>"))
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryInitialInterval: time.Second,
RetryMaxInterval: time.Second,
RetryMaxElapsedTime: 3 * time.Second,
RetryMultiplier: 1.0,
RetryRandomizationFactor: 1.0,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(time.Duration(3) * time.Second),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand All @@ -665,3 +570,107 @@ func TestAddEventsLogResponseBodyOnInvalidJson(t *testing.T) {
assert.Errorf(t, err, "some buffers were dropped during finishing - 1")
assert.GreaterOrEqual(t, attempt.Load(), int32(0))
}

func TestAddEventsAreNotRejectedOncePreviousReqRetriesMaxLifetimeExpired(t *testing.T) {
// GIVEN
maxElapsedTime := 10
lastEventRetriesExpiration := maxElapsedTime + 1
attempt.Store(0)
server := mockServerDefaultPayload(t, http.StatusOK)
defer server.Close()
dataSetConfig := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithMaxLifetime(time.Second),
buffer_config.WithRetryMaxElapsedTime(time.Duration(maxElapsedTime)*time.Second),
buffer_config.WithRetryRandomizationFactor(0.000000001),
))
client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"}
client.SessionInfo = sessionInfo
event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}}
eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}}

// GIVEN mock previous event request error
client.setLastErrorTimestamp(time.Now().Add(-time.Duration(lastEventRetriesExpiration) * time.Second))
client.setLastError(fmt.Errorf("failed to handle previous request"))
client.LastHttpStatus.Store(http.StatusTooManyRequests)

// WHEN
err = client.AddEvents([]*add_events.EventBundle{eventBundle1})
// THEN event is not rejected
assert.Nil(t, err)
}

func TestAddEventsAreRejectedOncePreviousReqRetriesMaxLifetimeNotExpired(t *testing.T) {
// GIVEN
maxElapsedTime := 10
lastEventRetriesExpiration := maxElapsedTime - 1
attempt.Store(0)
server := mockServerDefaultPayload(t, http.StatusOK)
defer server.Close()
dataSetConfig := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithMaxLifetime(time.Second),
buffer_config.WithRetryMaxElapsedTime(time.Duration(maxElapsedTime)*time.Second),
buffer_config.WithRetryRandomizationFactor(0.000000001),
))
client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"}
client.SessionInfo = sessionInfo
event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}}
eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}}

// GIVEN mock previous event request error
client.setLastErrorTimestamp(time.Now().Add(-time.Duration(lastEventRetriesExpiration) * time.Second))
client.setLastError(fmt.Errorf("failed to handle previous request"))
client.LastHttpStatus.Store(http.StatusTooManyRequests)

// WHEN
err = client.AddEvents([]*add_events.EventBundle{eventBundle1})
// THEN event is rejected
assert.NotNil(t, err)
assert.Errorf(t, err, "AddEvents - reject batch: rejecting - Last HTTP request contains an error: failed to handle previous request")
}

func mockServerDefaultPayload(t *testing.T, statusCode int) *httptest.Server {
payload, _ := json.Marshal(map[string]interface{}{
"status": "success",
"bytesCharged": 42,
})
return mockServer(t, statusCode, payload)
}

func mockServer(t *testing.T, statusCode int, payload []byte) *httptest.Server {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
attempt.Add(1)
w.WriteHeader(statusCode)
l, err := w.Write(payload)
assert.Greater(t, l, 1)
assert.NoError(t, err)
}))
return server
}

func newDataSetConfig(url string, settings buffer_config.DataSetBufferSettings) *config.DataSetConfig {
return &config.DataSetConfig{
Endpoint: url,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: settings,
}
}

func newBufferSettings(customOpts ...buffer_config.DataSetBufferSettingsOption) *buffer_config.DataSetBufferSettings {
defaultOpts := []buffer_config.DataSetBufferSettingsOption{
buffer_config.WithMaxSize(20),
buffer_config.WithMaxLifetime(0),
buffer_config.WithRetryInitialInterval(time.Second),
buffer_config.WithRetryMaxInterval(time.Second),
buffer_config.WithRetryMaxElapsedTime(time.Duration(1) * time.Second),
buffer_config.WithRetryMultiplier(1.0),
buffer_config.WithRetryRandomizationFactor(1.0),
}
bufferSetting, _ := buffer_config.New(append(defaultOpts, customOpts...)...)
return bufferSetting
}
Loading

0 comments on commit dab2a50

Please sign in to comment.