Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: create a new bytes reader instead of sharing bytes buffer #511

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 38 additions & 15 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,17 @@ func newMockLambdaServer(t *testing.T, logsapiAddr string, eventsChannel chan Mo
select {
case nextEvent := <-eventsChannel:
sendNextEventInfo(w, currID, nextEvent.Timeout, nextEvent.Type == Shutdown, l)
go processMockEvent(mockLogEventQ, currID, nextEvent, os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), &lambdaServerInternals, l)
wg.Add(1)
go processMockEvent(mockLogEventQ, currID, nextEvent, os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), &lambdaServerInternals, l, &wg)
default:
finalShutDown := MockEvent{
Type: Shutdown,
ExecutionDuration: 0,
Timeout: 0,
}
sendNextEventInfo(w, currID, finalShutDown.Timeout, true, l)
go processMockEvent(mockLogEventQ, currID, finalShutDown, os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), &lambdaServerInternals, l)
wg.Add(1)
go processMockEvent(mockLogEventQ, currID, finalShutDown, os.Getenv("ELASTIC_APM_DATA_RECEIVER_SERVER_PORT"), &lambdaServerInternals, l, &wg)
}
// Logs API subscription request
case "/2020-08-15/logs":
Expand Down Expand Up @@ -232,7 +234,8 @@ func newTestStructs(t *testing.T) chan MockEvent {
return eventsChannel
}

func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent, extensionPort string, internals *MockServerInternals, l *zap.SugaredLogger) {
func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent, extensionPort string, internals *MockServerInternals, l *zap.SugaredLogger, wg *sync.WaitGroup) {
defer wg.Done()
queueLogEvent(q, currID, logsapi.PlatformStart, l)
client := http.Client{}

Expand Down Expand Up @@ -261,8 +264,15 @@ func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent,
time.Sleep(time.Duration(event.Timeout * float64(time.Second)))
case InvokeStandard:
time.Sleep(delay)
req, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), buf)
res, _ := client.Do(req)
req, err := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), buf)
if err != nil {
l.Error(err.Error())
}
res, err := client.Do(req)
if err != nil {
l.Error(err.Error())
}
res.Body.Close()
l.Debugf("Response seen by the agent : %d", res.StatusCode)
case InvokeStandardFlush:
time.Sleep(delay)
Expand All @@ -276,34 +286,48 @@ func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent,
internals.WaitGroup.Add(1)
go func() {
<-ch
if _, err := client.Do(reqData); err != nil {
res, err := client.Do(reqData)
if err != nil {
l.Error(err.Error())
}
res.Body.Close()
internals.WaitGroup.Done()
}()
// For this specific scenario, we do not want to see metrics in the APM Server logs (in order to easily check if the logs contain to "TimelyResponse" back to back).
sendMetrics = false
case InvokeWaitgroupsRace:
time.Sleep(delay)
reqData0, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), buf)
reqData1, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), buf)
if _, err := client.Do(reqData0); err != nil {
// we can't share a bytes.Buffer with two http requests
// create two bytes.Reader to avoid a race condition
body := buf.Bytes()
reqData0, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewReader(body))
reqData1, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewReader(body))
res, err := client.Do(reqData0)
if err != nil {
l.Error(err.Error())
}
if _, err := client.Do(reqData1); err != nil {
res.Body.Close()
res, err = client.Do(reqData1)
if err != nil {
l.Error(err.Error())
}
res.Body.Close()
time.Sleep(650 * time.Microsecond)
case InvokeMultipleTransactionsOverload:
// we can't share a bytes.Buffer with two http requests
// create two bytes.Reader to avoid a race condition
body := buf.Bytes()
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
time.Sleep(delay)
reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), buf)
if _, err := client.Do(reqData); err != nil {
reqData, _ := http.NewRequest("POST", fmt.Sprintf("http://localhost:%s/intake/v2/events", extensionPort), bytes.NewReader(body))
res, err := client.Do(reqData)
if err != nil {
l.Error(err.Error())
}
res.Body.Close()
wg.Done()
}()
}
Expand All @@ -317,9 +341,8 @@ func processMockEvent(q chan<- logsapi.LogEvent, currID string, event MockEvent,
break
}
var rawBytes []byte
if res.Body != nil {
rawBytes, _ = io.ReadAll(res.Body)
}
rawBytes, _ = io.ReadAll(res.Body)
res.Body.Close()
internals.Data += string(rawBytes)
l.Debugf("Response seen by the agent : %d", res.StatusCode)
case Shutdown:
Expand Down
Loading