Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process platform report metrics when extn is lagging #358

Merged
merged 4 commits into from
Jan 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ https://github.com/elastic/apm-aws-lambda/compare/v1.2.0...main[View commits]
===== Features
- experimental:[] Create proxy transaction with error results if not reported by agent {lambda-pull}315[315]
- Wait for the final platform report metrics on shutdown {lambda-pull}347[347]
- Process platform report metrics when extension is lagging {lambda-pull}358[358]

[float]
[[lambda-1.2.0]]
Expand Down
33 changes: 30 additions & 3 deletions accumulator/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func NewBatch(maxSize int, maxAge time.Duration) *Batch {
}
}

// Size returns the number of invocations cached in the batch.
func (b *Batch) Size() int {
b.mu.RLock()
defer b.mu.RUnlock()
return len(b.invocations)
}

// RegisterInvocation registers a new function invocation against its request
// ID. It also updates the caches for currently executing request ID.
func (b *Batch) RegisterInvocation(
Expand Down Expand Up @@ -185,6 +192,21 @@ func (b *Batch) OnLambdaLogRuntimeDone(reqID, status string, time time.Time) err
return b.finalizeInvocation(reqID, status, time)
}

// OnPlatformReport should be the last event for a request ID. On receiving the
// platform.report event the batch will cleanup any datastructure for the request
// ID. It will return some of the function metadata to allow the caller to enrich
// the report metrics.
func (b *Batch) OnPlatformReport(reqID string) (string, int64, time.Time, error) {
b.mu.Lock()
defer b.mu.Unlock()
inc, ok := b.invocations[reqID]
if !ok {
return "", 0, time.Time{}, fmt.Errorf("invocation for requestID %s does not exist", reqID)
}
delete(b.invocations, reqID)
return inc.FunctionARN, inc.DeadlineMs, inc.Timestamp, nil
}

// OnShutdown flushes the data for shipping to APM Server by finalizing all
// the invocation in the batch. If we haven't received a platform.runtimeDone
// event for an invocation so far we won't be able to recieve it in time thus
Expand All @@ -201,6 +223,7 @@ func (b *Batch) OnShutdown(status string) error {
if err := b.finalizeInvocation(inc.RequestID, status, time); err != nil {
return err
}
delete(b.invocations, inc.RequestID)
}
return nil
}
Expand Down Expand Up @@ -257,12 +280,16 @@ func (b *Batch) finalizeInvocation(reqID, status string, time time.Time) error {
if !ok {
return fmt.Errorf("invocation for requestID %s does not exist", reqID)
}
defer delete(b.invocations, reqID)
proxyTxn, err := inc.Finalize(status, time)
proxyTxn, err := inc.CreateProxyTxn(status, time)
if err != nil {
return err
}
return b.addData(proxyTxn)
err = b.addData(proxyTxn)
if err != nil {
return err
}
inc.Finalized = true
return nil
}

func (b *Batch) addData(data []byte) error {
Expand Down
12 changes: 5 additions & 7 deletions accumulator/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,24 @@ type Invocation struct {
// TransactionObserved is true if the root transaction ID for the
// invocation is observed by the extension.
TransactionObserved bool
// Finalized tracks if the invocation has been finalized or not.
Finalized bool
}

// NeedProxyTransaction returns true if a proxy transaction needs to be
// created based on the information available.
func (inc *Invocation) NeedProxyTransaction() bool {
return inc.TransactionID != "" && !inc.TransactionObserved
return !inc.Finalized && inc.TransactionID != "" && !inc.TransactionObserved
}

// Finalize creates a proxy transaction for an invocation if required.
// CreateProxyTxn creates a proxy transaction for an invocation if required.
// A proxy transaction will be required to be created if the agent has
// registered a transaction for the invocation but has not sent the
// corresponding transaction to the extension.
func (inc *Invocation) Finalize(status string, time time.Time) ([]byte, error) {
func (inc *Invocation) CreateProxyTxn(status string, time time.Time) ([]byte, error) {
if !inc.NeedProxyTransaction() {
return nil, nil
}
return inc.createProxyTxn(status, time)
}

func (inc *Invocation) createProxyTxn(status string, time time.Time) ([]byte, error) {
txn, err := sjson.SetBytes(inc.AgentPayload, "transaction.result", status)
if err != nil {
return nil, err
Expand Down
6 changes: 3 additions & 3 deletions accumulator/invocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestFinalize(t *testing.T) {
func TestCreateProxyTransaction(t *testing.T) {
txnDur := time.Second
for _, tc := range []struct {
name string
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestFinalize(t *testing.T) {
AgentPayload: []byte(tc.payload),
TransactionObserved: tc.txnObserved,
}
result, err := inc.Finalize(tc.runtimeDoneStatus, ts.Add(txnDur))
result, err := inc.CreateProxyTxn(tc.runtimeDoneStatus, ts.Add(txnDur))
assert.Nil(t, err)
if len(tc.output) > 0 {
assert.JSONEq(t, tc.output, string(result))
Expand All @@ -114,7 +114,7 @@ func BenchmarkCreateProxyTxn(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := inc.createProxyTxn("success", txnDur)
_, err := inc.CreateProxyTxn("success", txnDur)
if err != nil {
b.Fail()
}
Expand Down
10 changes: 2 additions & 8 deletions app/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ func (app *App) Run(ctx context.Context) error {
}
}

// The previous event id is used to validate the received Lambda metrics
var prevEvent *extension.NextEventResponse

for {
select {
case <-ctx.Done():
Expand All @@ -91,7 +88,7 @@ func (app *App) Run(ctx context.Context) error {
// Use a wait group to ensure the background go routine sending to the APM server
// completes before signaling that the extension is ready for the next invocation.
var backgroundDataSendWg sync.WaitGroup
event, err := app.processEvent(ctx, &backgroundDataSendWg, prevEvent)
event, err := app.processEvent(ctx, &backgroundDataSendWg)
if err != nil {
return err
}
Expand All @@ -110,15 +107,13 @@ func (app *App) Run(ctx context.Context) error {
app.apmClient.FlushAPMData(flushCtx)
cancel()
}
prevEvent = event
}
}
}

func (app *App) processEvent(
ctx context.Context,
backgroundDataSendWg *sync.WaitGroup,
prevEvent *extension.NextEventResponse,
) (*extension.NextEventResponse, error) {
// Reset flush state for future events.
defer app.apmClient.ResetFlush()
Expand Down Expand Up @@ -179,7 +174,7 @@ func (app *App) processEvent(
// also possible that lambda has init a few execution env preemptively,
// for such cases the extension will see only a SHUTDOWN event and
// there is no need to wait for any log event.
if prevEvent == nil {
if app.batch.Size() == 0 {
return event, nil
}
}
Expand All @@ -204,7 +199,6 @@ func (app *App) processEvent(
event.RequestID,
event.InvokedFunctionArn,
app.apmClient.LambdaDataChannel,
prevEvent,
event.EventType == extension.Shutdown,
)
}()
Expand Down
3 changes: 3 additions & 0 deletions logsapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ type ClientOption func(*Client)

type invocationLifecycler interface {
OnLambdaLogRuntimeDone(requestID, status string, time time.Time) error
OnPlatformReport(reqID string) (fnARN string, deadlineMs int64, ts time.Time, err error)
// Size should return the number of invocations waiting on platform.report
Size() int
}

// Client is the client used to subscribe to the Logs API.
Expand Down
32 changes: 15 additions & 17 deletions logsapi/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ package logsapi
import (
"context"
"time"

"github.com/elastic/apm-aws-lambda/extension"
)

// LogEventType represents the log type that is received in the log messages
Expand Down Expand Up @@ -63,7 +61,6 @@ func (lc *Client) ProcessLogs(
requestID string,
invokedFnArn string,
dataChan chan []byte,
prevEvent *extension.NextEventResponse,
isShutdown bool,
) {
// platformStartReqID is to identify the requestID for the function
Expand Down Expand Up @@ -94,10 +91,12 @@ func (lc *Client) ProcessLogs(
return
}
case PlatformReport:
// TODO: @lahsivjar Refactor usage of prevEvent.RequestID (should now query the batch?)
if prevEvent != nil && logEvent.Record.RequestID == prevEvent.RequestID {
fnARN, deadlineMs, ts, err := lc.invocationLifecycler.OnPlatformReport(logEvent.Record.RequestID)
if err != nil {
lc.logger.Warnf("Failed to process platform report: %v", err)
} else {
lc.logger.Debugf("Received platform report for %s", logEvent.Record.RequestID)
processedMetrics, err := ProcessPlatformReport(prevEvent, logEvent)
processedMetrics, err := ProcessPlatformReport(fnARN, deadlineMs, ts, logEvent)
if err != nil {
lc.logger.Errorf("Error processing Lambda platform metrics: %v", err)
} else {
Expand All @@ -106,17 +105,16 @@ func (lc *Client) ProcessLogs(
case <-ctx.Done():
}
}
// For shutdown event the platform report metrics for the previous log event
// would be the last possible log event.
if isShutdown {
lc.logger.Debugf(
"Processed platform report event for reqID %s as the last log event before shutdown",
logEvent.Record.RequestID,
)
return
}
} else {
lc.logger.Warn("Report event request id didn't match the previous event id")
}
// For shutdown event the platform report metrics for the previous log event
// would be the last possible log event. After processing this metric the
// invocation lifecycler's cache should be empty.
if isShutdown && lc.invocationLifecycler.Size() == 0 {
lc.logger.Debugf(
"Processed platform report event for reqID %s as the last log event before shutdown",
logEvent.Record.RequestID,
)
return
}
case PlatformLogsDropped:
lc.logger.Warnf("Logs dropped due to extension falling behind: %v", logEvent.Record)
Expand Down
8 changes: 4 additions & 4 deletions logsapi/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package logsapi

import (
"math"
"time"

"github.com/elastic/apm-aws-lambda/extension"
"go.elastic.co/apm/v2/model"
"go.elastic.co/fastjson"
)
Expand Down Expand Up @@ -64,7 +64,7 @@ func (mc MetricsContainer) MarshalFastJSON(json *fastjson.Writer) error {
// ProcessPlatformReport processes the `platform.report` log line from lambda logs API and
// returns a byte array containing the JSON body for the extracted platform metrics. A non
// nil error is returned when marshaling of platform metrics into JSON fails.
func ProcessPlatformReport(functionData *extension.NextEventResponse, platformReport LogEvent) ([]byte, error) {
func ProcessPlatformReport(fnARN string, deadlineMs int64, ts time.Time, platformReport LogEvent) ([]byte, error) {
metricsContainer := MetricsContainer{
Metrics: &model.Metrics{},
}
Expand All @@ -78,7 +78,7 @@ func ProcessPlatformReport(functionData *extension.NextEventResponse, platformRe
// FaaS Fields
metricsContainer.Metrics.FAAS = &model.FAAS{
Execution: platformReport.Record.RequestID,
ID: functionData.InvokedFunctionArn,
ID: fnARN,
Coldstart: platformReportMetrics.InitDurationMs > 0,
}

Expand All @@ -95,7 +95,7 @@ func ProcessPlatformReport(functionData *extension.NextEventResponse, platformRe
// - The epoch corresponding to the end of the current invocation (its "deadline")
// - The epoch corresponding to the start of the current invocation
// - The multiplication / division then rounds the value to obtain a number of ms that can be expressed a multiple of 1000 (see initial assumption)
metricsContainer.Add("faas.timeout", math.Ceil(float64(functionData.DeadlineMs-functionData.Timestamp.UnixMilli())/1e3)*1e3) // Unit : Milliseconds
metricsContainer.Add("faas.timeout", math.Ceil(float64(deadlineMs-ts.UnixMilli())/1e3)*1e3) // Unit : Milliseconds

var jsonWriter fastjson.Writer
if err := metricsContainer.MarshalFastJSON(&jsonWriter); err != nil {
Expand Down
11 changes: 8 additions & 3 deletions logsapi/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestProcessPlatformReport_Coldstart(t *testing.T) {

desiredOutputMetrics := fmt.Sprintf(`{"metricset":{"samples":{"faas.coldstart_duration":{"value":422.9700012207031},"faas.timeout":{"value":5000},"system.memory.total":{"value":1.34217728e+08},"system.memory.actual.free":{"value":5.4525952e+07},"faas.duration":{"value":182.42999267578125},"faas.billed_duration":{"value":183}},"timestamp":%d,"faas":{"coldstart":true,"execution":"6f7f0961f83442118a7af6fe80b88d56","id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"}}}`, timestamp.UnixNano()/1e3)

data, err := ProcessPlatformReport(&event, logEvent)
data, err := ProcessPlatformReport(event.InvokedFunctionArn, event.DeadlineMs, event.Timestamp, logEvent)
require.NoError(t, err)

assert.JSONEq(t, desiredOutputMetrics, string(data))
Expand Down Expand Up @@ -110,7 +110,7 @@ func TestProcessPlatformReport_NoColdstart(t *testing.T) {

desiredOutputMetrics := fmt.Sprintf(`{"metricset":{"samples":{"faas.coldstart_duration":{"value":0},"faas.timeout":{"value":5000},"system.memory.total":{"value":1.34217728e+08},"system.memory.actual.free":{"value":5.4525952e+07},"faas.duration":{"value":182.42999267578125},"faas.billed_duration":{"value":183}},"timestamp":%d,"faas":{"coldstart":false,"execution":"6f7f0961f83442118a7af6fe80b88d56","id":"arn:aws:lambda:us-east-2:123456789012:function:custom-runtime"}}}`, timestamp.UnixNano()/1e3)

data, err := ProcessPlatformReport(&event, logEvent)
data, err := ProcessPlatformReport(event.InvokedFunctionArn, event.DeadlineMs, event.Timestamp, logEvent)
require.NoError(t, err)

assert.JSONEq(t, desiredOutputMetrics, string(data))
Expand Down Expand Up @@ -142,7 +142,12 @@ func BenchmarkPlatformReport(b *testing.B) {
}

for n := 0; n < b.N; n++ {
_, err := ProcessPlatformReport(nextEventResp, logEvent)
_, err := ProcessPlatformReport(
nextEventResp.InvokedFunctionArn,
nextEventResp.DeadlineMs,
nextEventResp.Timestamp,
logEvent,
)
require.NoError(b, err)
}
}