diff --git a/pkg/client/add_events.go b/pkg/client/add_events.go index 1d6b717..bb30105 100644 --- a/pkg/client/add_events.go +++ b/pkg/client/add_events.go @@ -320,7 +320,7 @@ func (client *DataSetClient) SendAddEventsBuffer(buf *buffer.Buffer) (*add_event resp := &add_events.AddEventsResponse{} httpRequest, err := request.NewRequest( - "POST", client.Config.Endpoint+"/api/addEvents", + "POST", client.addEventsEndpointUrl, ).WithWriteLog(client.Config.Tokens).RawRequest(payload).HttpRequest() if err != nil { return nil, len(payload), fmt.Errorf("cannot create request: %w", err) @@ -392,7 +392,7 @@ func (client *DataSetClient) apiCall(req *http.Request, response response.SetRes err = json.Unmarshal(responseBody, &response) if err != nil { - return fmt.Errorf("unable to parse response body: %w", err) + return fmt.Errorf("unable to parse response body: %w, url: %s, response: %s", err, client.addEventsEndpointUrl, truncateText(string(responseBody), 1000)) } response.SetResponseObj(resp) @@ -417,3 +417,12 @@ func (client *DataSetClient) getBuffers() []*buffer.Buffer { } return buffers } + +// Truncate provided text to the provided length +func truncateText(text string, length int) string { + if len(text) > length { + text = string([]byte(text)[:length]) + "..." + } + + return text +} diff --git a/pkg/client/add_events_test.go b/pkg/client/add_events_test.go index 5284203..6d738f0 100644 --- a/pkg/client/add_events_test.go +++ b/pkg/client/add_events_test.go @@ -617,3 +617,51 @@ func TestAddEventsDoNotRetryForever(t *testing.T) { assert.Errorf(t, err, "some buffers were dropped during finishing - 1") assert.GreaterOrEqual(t, attempt.Load(), int32(2)) } + +func TestAddEventsLogResponseBodyOnInvalidJson(t *testing.T) { + attempt := atomic.Int32{} + attempt.Store(0) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + attempt.Add(1) + + w.WriteHeader(503) + payload := []byte("not valid json") + l, err := w.Write(payload) + assert.Greater(t, l, 1) + assert.NoError(t, err) + })) + defer server.Close() + + config := &config.DataSetConfig{ + Endpoint: server.URL, + Tokens: config.DataSetTokens{WriteLog: "AAAA"}, + BufferSettings: buffer_config.DataSetBufferSettings{ + MaxSize: 20, + MaxLifetime: 0, + RetryInitialInterval: time.Second, + RetryMaxInterval: time.Second, + RetryMaxElapsedTime: 3 * time.Second, + RetryMultiplier: 1.0, + RetryRandomizationFactor: 1.0, + }, + } + sc, err := NewClient(config, &http.Client{}, zap.Must(zap.NewDevelopment())) + require.Nil(t, err) + + sessionInfo := &add_events.SessionInfo{ServerId: "a", ServerType: "b"} + sc.SessionInfo = sessionInfo + event1 := &add_events.Event{Thread: "5", Sev: 3, Ts: "0", Attrs: map[string]interface{}{"message": "test - 1"}} + eventBundle1 := &add_events.EventBundle{Event: event1, Thread: &add_events.Thread{Id: "5", Name: "fred"}} + err = sc.AddEvents([]*add_events.EventBundle{eventBundle1}) + assert.Nil(t, err) + err = sc.Finish() + + lastError := sc.LastError() + + assert.NotNil(t, lastError) + assert.Equal(t, fmt.Errorf("unable to parse response body: invalid character '<' looking for beginning of value, url: %s, response: not valid json", sc.addEventsEndpointUrl).Error(), lastError.Error()) + + assert.NotNil(t, err) + assert.Errorf(t, err, "some buffers were dropped during finishing - 1") + assert.GreaterOrEqual(t, attempt.Load(), int32(0)) +} diff --git a/pkg/client/client.go b/pkg/client/client.go index d68f906..bcd5ddc 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -83,6 +83,9 @@ type DataSetClient struct { addEventsChannels map[string]chan interface{} firstReceivedAt atomic.Int64 lastAcceptedAt atomic.Int64 + // Stores sanitized complete URL to the addEvents API endpoint, e.g. + // https://app.scalyr.com/api/addEvents + addEventsEndpointUrl string } func NewClient(cfg *config.DataSetConfig, client *http.Client, logger *zap.Logger) (*DataSetClient, error) { @@ -102,31 +105,40 @@ func NewClient(cfg *config.DataSetConfig, client *http.Client, logger *zap.Logge if err != nil { return nil, fmt.Errorf("it was not possible to generate UUID: %w", err) } + + addEventsEndpointUrl := cfg.Endpoint + if strings.HasSuffix(addEventsEndpointUrl, "/") { + addEventsEndpointUrl += "api/addEvents" + } else { + addEventsEndpointUrl += "/api/addEvents" + } + dataClient := &DataSetClient{ - Id: id, - Config: cfg, - Client: client, - buffer: make(map[string]*buffer.Buffer), - buffersEnqueued: atomic.Uint64{}, - buffersProcessed: atomic.Uint64{}, - buffersDropped: atomic.Uint64{}, - buffersAllMutex: sync.Mutex{}, - BuffersPubSub: pubsub.New(0), - LastHttpStatus: atomic.Uint32{}, - retryAfter: time.Now(), - retryAfterMu: sync.RWMutex{}, - lastErrorMu: sync.RWMutex{}, - Logger: logger, - finished: atomic.Bool{}, - eventsEnqueued: atomic.Uint64{}, - eventsProcessed: atomic.Uint64{}, - bytesAPIAccepted: atomic.Uint64{}, - bytesAPISent: atomic.Uint64{}, - addEventsMutex: sync.Mutex{}, - addEventsPubSub: pubsub.New(0), - addEventsChannels: make(map[string]chan interface{}), - firstReceivedAt: atomic.Int64{}, - lastAcceptedAt: atomic.Int64{}, + Id: id, + Config: cfg, + Client: client, + buffer: make(map[string]*buffer.Buffer), + buffersEnqueued: atomic.Uint64{}, + buffersProcessed: atomic.Uint64{}, + buffersDropped: atomic.Uint64{}, + buffersAllMutex: sync.Mutex{}, + BuffersPubSub: pubsub.New(0), + LastHttpStatus: atomic.Uint32{}, + retryAfter: time.Now(), + retryAfterMu: sync.RWMutex{}, + lastErrorMu: sync.RWMutex{}, + Logger: logger, + finished: atomic.Bool{}, + eventsEnqueued: atomic.Uint64{}, + eventsProcessed: atomic.Uint64{}, + bytesAPIAccepted: atomic.Uint64{}, + bytesAPISent: atomic.Uint64{}, + addEventsMutex: sync.Mutex{}, + addEventsPubSub: pubsub.New(0), + addEventsChannels: make(map[string]chan interface{}), + firstReceivedAt: atomic.Int64{}, + lastAcceptedAt: atomic.Int64{}, + addEventsEndpointUrl: addEventsEndpointUrl, } // run buffer sweeper if requested diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 106db18..de5ea4a 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -188,3 +188,21 @@ func TestHttpStatusCodes(t *testing.T) { }) } } + +func TestAddEventsEndpointUrlWithoutTrailingSlash(t *testing.T) { + t.Setenv("SCALYR_SERVER", "https://app.scalyr.com") + cfg, err := config.New(config.FromEnv()) + assert.Nil(t, err) + sc, err := NewClient(cfg, nil, zap.Must(zap.NewDevelopment())) + require.Nil(t, err) + assert.Equal(t, sc.addEventsEndpointUrl, "https://app.scalyr.com/api/addEvents") +} + +func TestAddEventsEndpointUrlWithTrailingSlash(t *testing.T) { + t.Setenv("SCALYR_SERVER", "https://app.scalyr.com/") + cfg2, err := config.New(config.FromEnv()) + assert.Nil(t, err) + sc2, err := NewClient(cfg2, nil, zap.Must(zap.NewDevelopment())) + require.Nil(t, err) + assert.Equal(t, sc2.addEventsEndpointUrl, "https://app.scalyr.com/api/addEvents") +}