Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DSET-4080 fix recovery from error state #36

Merged
merged 5 commits into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 144 additions & 135 deletions pkg/client/add_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (

const RetryBase = time.Second

var attempt = atomic.Int32{}
zdaratom-s1 marked this conversation as resolved.
Show resolved Hide resolved

func extract(req *http.Request) (add_events.AddEventsRequest, error) {
data, _ := io.ReadAll(req.Body)
b := bytes.NewBuffer(data)
Expand All @@ -55,7 +57,6 @@ func extract(req *http.Request) (add_events.AddEventsRequest, error) {
}

func TestAddEventsRetry(t *testing.T) {
attempt := atomic.Int32{}
attempt.Store(0)
wasSuccessful := atomic.Bool{}
wasSuccessful.Store(false)
Expand Down Expand Up @@ -87,19 +88,11 @@ func TestAddEventsRetry(t *testing.T) {
}))
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: 10 * RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(10*RetryBase),
buffer_config.WithRetryInitialInterval(RetryBase),
buffer_config.WithRetryMaxInterval(RetryBase),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand All @@ -119,7 +112,6 @@ func TestAddEventsRetry(t *testing.T) {
}

func TestAddEventsRetryAfterSec(t *testing.T) {
attempt := atomic.Int32{}
attempt.Store(0)
wasSuccessful := atomic.Bool{}
wasSuccessful.Store(false)
Expand Down Expand Up @@ -169,19 +161,11 @@ func TestAddEventsRetryAfterSec(t *testing.T) {
}))
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: 10 * RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(10*RetryBase),
buffer_config.WithRetryInitialInterval(RetryBase),
buffer_config.WithRetryMaxInterval(RetryBase),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand Down Expand Up @@ -226,7 +210,6 @@ func TestAddEventsRetryAfterSec(t *testing.T) {
}

func TestAddEventsRetryAfterTime(t *testing.T) {
attempt := atomic.Int32{}
attempt.Store(0)
wasSuccessful := atomic.Bool{}
wasSuccessful.Store(false)
Expand Down Expand Up @@ -264,19 +247,11 @@ func TestAddEventsRetryAfterTime(t *testing.T) {
}))
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(10*RetryBase),
buffer_config.WithRetryInitialInterval(RetryBase),
buffer_config.WithRetryMaxInterval(RetryBase),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand All @@ -302,7 +277,6 @@ func TestAddEventsLargeEvent(t *testing.T) {
originalAttrs[fmt.Sprintf("%d", i)] = strings.Repeat(fmt.Sprintf("%d", i), 1000000+v)
}

attempt := atomic.Int32{}
attempt.Store(0)
wasSuccessful := atomic.Bool{}
wasSuccessful.Store(false)
Expand Down Expand Up @@ -362,19 +336,11 @@ func TestAddEventsLargeEvent(t *testing.T) {
}))
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(10*RetryBase),
buffer_config.WithRetryInitialInterval(RetryBase),
buffer_config.WithRetryMaxInterval(RetryBase),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand All @@ -400,7 +366,6 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) {
originalAttrs[fmt.Sprintf("%d", i)] = strings.Repeat("\"", 1000000+v)
}

attempt := atomic.Int32{}
attempt.Store(0)
wasSuccessful := atomic.Bool{}
wasSuccessful.Store(false)
Expand Down Expand Up @@ -453,19 +418,11 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) {
}))
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(10*RetryBase),
buffer_config.WithRetryInitialInterval(RetryBase),
buffer_config.WithRetryMaxInterval(RetryBase),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand All @@ -485,19 +442,11 @@ func TestAddEventsLargeEventThatNeedEscaping(t *testing.T) {
}

func TestAddEventsRejectAfterFinish(t *testing.T) {
config := &config.DataSetConfig{
Endpoint: "https://example.com",
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryRandomizationFactor: 1.0,
RetryMultiplier: 1.0,
RetryInitialInterval: RetryBase,
RetryMaxInterval: RetryBase,
RetryMaxElapsedTime: 10 * RetryBase,
},
}
config := newDataSetConfig("https://example.com", *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(10*RetryBase),
buffer_config.WithRetryInitialInterval(RetryBase),
buffer_config.WithRetryMaxInterval(RetryBase),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)
err = sc.Shutdown()
Expand All @@ -511,7 +460,6 @@ func TestAddEventsRejectAfterFinish(t *testing.T) {
}

func TestAddEventsWithBufferSweeper(t *testing.T) {
attempt := atomic.Int32{}
attempt.Store(0)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
attempt.Add(1)
Expand Down Expand Up @@ -572,36 +520,13 @@ func TestAddEventsWithBufferSweeper(t *testing.T) {
}

func TestAddEventsDoNotRetryForever(t *testing.T) {
attempt := atomic.Int32{}
attempt.Store(0)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
attempt.Add(1)

w.WriteHeader(503)
payload, err := json.Marshal(map[string]interface{}{
"status": "success",
"bytesCharged": 42,
})
assert.NoError(t, err)
l, err := w.Write(payload)
assert.Greater(t, l, 1)
assert.NoError(t, err)
}))
server := mockServerDefaultPayload(t, 503)
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryInitialInterval: time.Second,
RetryMaxInterval: time.Second,
RetryMaxElapsedTime: 5 * time.Second,
RetryMultiplier: 1.0,
RetryRandomizationFactor: 1.0,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(time.Duration(5) * time.Second),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand All @@ -619,32 +544,12 @@ func TestAddEventsDoNotRetryForever(t *testing.T) {
}

func TestAddEventsLogResponseBodyOnInvalidJson(t *testing.T) {
attempt := atomic.Int32{}
attempt.Store(0)
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
attempt.Add(1)

w.WriteHeader(503)
payload := []byte("<html>not valid json</html>")
l, err := w.Write(payload)
assert.Greater(t, l, 1)
assert.NoError(t, err)
}))
server := mockServer(t, 503, []byte("<html>not valid json</html>"))
defer server.Close()

config := &config.DataSetConfig{
Endpoint: server.URL,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: buffer_config.DataSetBufferSettings{
MaxSize: 20,
MaxLifetime: 0,
RetryInitialInterval: time.Second,
RetryMaxInterval: time.Second,
RetryMaxElapsedTime: 3 * time.Second,
RetryMultiplier: 1.0,
RetryRandomizationFactor: 1.0,
},
}
config := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithRetryMaxElapsedTime(time.Duration(3) * time.Second),
))
sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

Expand All @@ -665,3 +570,107 @@ func TestAddEventsLogResponseBodyOnInvalidJson(t *testing.T) {
assert.Errorf(t, err, "some buffers were dropped during finishing - 1")
assert.GreaterOrEqual(t, attempt.Load(), int32(0))
}

func TestAddEventsAreNotRejectedOncePreviousReqRetriesMaxLifetimeExpired(t *testing.T) {
// GIVEN
maxElapsedTime := 10
lastEventRetriesExpiration := maxElapsedTime + 1
attempt.Store(0)
server := mockServerDefaultPayload(t, http.StatusOK)
defer server.Close()
dataSetConfig := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithMaxLifetime(time.Second),
buffer_config.WithRetryMaxElapsedTime(time.Duration(maxElapsedTime)*time.Second),
buffer_config.WithRetryRandomizationFactor(0.000000001),
))
client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"}
client.SessionInfo = sessionInfo
event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}}
eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}}

// GIVEN mock previous event request error
client.setLastErrorTimestamp(time.Now().Add(-time.Duration(lastEventRetriesExpiration) * time.Second))
client.setLastError(fmt.Errorf("failed to handle previous request"))
client.LastHttpStatus.Store(http.StatusTooManyRequests)

// WHEN
err = client.AddEvents([]*add_events.EventBundle{eventBundle1})
// THEN event is not rejected
assert.Nil(t, err)
}

func TestAddEventsAreRejectedOncePreviousReqRetriesMaxLifetimeNotExpired(t *testing.T) {
// GIVEN
maxElapsedTime := 10
lastEventRetriesExpiration := maxElapsedTime - 1
attempt.Store(0)
server := mockServerDefaultPayload(t, http.StatusOK)
defer server.Close()
dataSetConfig := newDataSetConfig(server.URL, *newBufferSettings(
buffer_config.WithMaxLifetime(time.Second),
buffer_config.WithRetryMaxElapsedTime(time.Duration(maxElapsedTime)*time.Second),
buffer_config.WithRetryRandomizationFactor(0.000000001),
))
client, err := NewClient(dataSetConfig, &http.Client{}, zap.Must(zap.NewDevelopment()))
require.Nil(t, err)

sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"}
client.SessionInfo = sessionInfo
event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}}
eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}}

// GIVEN mock previous event request error
client.setLastErrorTimestamp(time.Now().Add(-time.Duration(lastEventRetriesExpiration) * time.Second))
client.setLastError(fmt.Errorf("failed to handle previous request"))
client.LastHttpStatus.Store(http.StatusTooManyRequests)

// WHEN
err = client.AddEvents([]*add_events.EventBundle{eventBundle1})
// THEN event is rejected
assert.NotNil(t, err)
assert.Errorf(t, err, "AddEvents - reject batch: rejecting - Last HTTP request contains an error: failed to handle previous request")
}

func mockServerDefaultPayload(t *testing.T, statusCode int) *httptest.Server {
payload, _ := json.Marshal(map[string]interface{}{
"status": "success",
"bytesCharged": 42,
})
return mockServer(t, statusCode, payload)
}

func mockServer(t *testing.T, statusCode int, payload []byte) *httptest.Server {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
attempt.Add(1)
w.WriteHeader(statusCode)
l, err := w.Write(payload)
assert.Greater(t, l, 1)
assert.NoError(t, err)
}))
return server
}

func newDataSetConfig(url string, settings buffer_config.DataSetBufferSettings) *config.DataSetConfig {
return &config.DataSetConfig{
Endpoint: url,
Tokens: config.DataSetTokens{WriteLog: "AAAA"},
BufferSettings: settings,
}
}

func newBufferSettings(customOpts ...buffer_config.DataSetBufferSettingsOption) *buffer_config.DataSetBufferSettings {
defaultOpts := []buffer_config.DataSetBufferSettingsOption{
buffer_config.WithMaxSize(20),
buffer_config.WithMaxLifetime(0),
buffer_config.WithRetryInitialInterval(time.Second),
buffer_config.WithRetryMaxInterval(time.Second),
buffer_config.WithRetryMaxElapsedTime(time.Duration(1) * time.Second),
buffer_config.WithRetryMultiplier(1.0),
buffer_config.WithRetryRandomizationFactor(1.0),
}
bufferSetting, _ := buffer_config.New(append(defaultOpts, customOpts...)...)
return bufferSetting
}
Loading