diff --git a/.chloggen/awscloudwatchlogs-raw-log.yaml b/.chloggen/awscloudwatchlogs-raw-log.yaml new file mode 100644 index 000000000000..8cc507f5d2a7 --- /dev/null +++ b/.chloggen/awscloudwatchlogs-raw-log.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awscloudwatchlogsexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add the ability to export raw log to cloud watch + +# One or more tracking issues related to the change +issues: [18758] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: Add emf and raw log support for aws cloudwatch exporter. diff --git a/exporter/awscloudwatchlogsexporter/README.md b/exporter/awscloudwatchlogsexporter/README.md index 7c36bd7e62fd..248e4df76349 100644 --- a/exporter/awscloudwatchlogsexporter/README.md +++ b/exporter/awscloudwatchlogsexporter/README.md @@ -24,6 +24,7 @@ The following settings can be optionally configured: - `region`: The AWS region where the log stream is in. - `endpoint`: The CloudWatch Logs service endpoint which the requests are forwarded to. [See the CloudWatch Logs endpoints](https://docs.aws.amazon.com/general/latest/gr/cwl_region.html) for a list. - `log_retention`: LogRetention is the option to set the log retention policy for only newly created CloudWatch Log Groups. Defaults to Never Expire if not specified or set to 0. Possible values for retention in days are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653. +- `raw_log`: Boolean default false. If you want to export only the log message to cw logs. This is required for emf logs. ### Examples diff --git a/exporter/awscloudwatchlogsexporter/config.go b/exporter/awscloudwatchlogsexporter/config.go index e8a5738ced80..df1947aa59ba 100644 --- a/exporter/awscloudwatchlogsexporter/config.go +++ b/exporter/awscloudwatchlogsexporter/config.go @@ -53,6 +53,10 @@ type Config struct { logger *zap.Logger awsutil.AWSSessionSettings `mapstructure:",squash"` + + // Export raw log string instead of log wrapper + // Required for emf logs + RawLog bool `mapstructure:"raw_log,omitempty"` } type QueueSettings struct { diff --git a/exporter/awscloudwatchlogsexporter/exporter.go b/exporter/awscloudwatchlogsexporter/exporter.go index c84470c2f7fa..9c814eb3f26b 100644 --- a/exporter/awscloudwatchlogsexporter/exporter.go +++ b/exporter/awscloudwatchlogsexporter/exporter.go @@ -19,6 +19,7 @@ import ( "encoding/hex" "encoding/json" "errors" + "sync" "time" "github.com/aws/aws-sdk-go/aws" @@ -42,7 +43,19 @@ type exporter struct { retryCount int collectorID string svcStructuredLog *cwlogs.Client - pusher cwlogs.Pusher + pusherMap map[cwlogs.PusherKey]cwlogs.Pusher + pusherMapLock sync.RWMutex +} + +type awsMetadata struct { + LogGroupName string `json:"logGroupName,omitempty"` + LogStreamName string `json:"logStreamName,omitempty"` +} + +type emfMetadata struct { + AWSMetadata *awsMetadata `json:"_aws,omitempty"` + LogGroupName string `json:"log_group_name,omitempty"` + LogStreamName string `json:"log_stream_name,omitempty"` } func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*exporter, error) { @@ -66,7 +79,16 @@ func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*exporter, e return nil, err } - pusher := cwlogs.NewPusher(aws.String(expConfig.LogGroupName), aws.String(expConfig.LogStreamName), *awsConfig.MaxRetries, *svcStructuredLog, params.Logger) + pusherKey := cwlogs.PusherKey{ + LogGroupName: expConfig.LogGroupName, + LogStreamName: expConfig.LogStreamName, + } + + pusher := cwlogs.NewPusher(pusherKey, *awsConfig.MaxRetries, *svcStructuredLog, params.Logger) + + pusherMap := make(map[cwlogs.PusherKey]cwlogs.Pusher) + + pusherMap[pusherKey] = pusher logsExporter := &exporter{ svcStructuredLog: svcStructuredLog, @@ -74,7 +96,7 @@ func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*exporter, e logger: params.Logger, retryCount: *awsConfig.MaxRetries, collectorID: collectorIdentifier.String(), - pusher: pusher, + pusherMap: pusherMap, } return logsExporter, nil } @@ -98,47 +120,74 @@ func newCwLogsExporter(config component.Config, params exp.CreateSettings) (exp. } func (e *exporter) consumeLogs(_ context.Context, ld plog.Logs) error { - cwLogsPusher := e.pusher - logEvents, _ := logsToCWLogs(e.logger, ld) + logEvents, _ := logsToCWLogs(e.logger, ld, e.Config) if len(logEvents) == 0 { return nil } + logPushersUsed := make(map[cwlogs.PusherKey]cwlogs.Pusher) for _, logEvent := range logEvents { - logEvent := &cwlogs.Event{ - InputLogEvent: logEvent, - GeneratedTime: time.Now(), + pusherKey := cwlogs.PusherKey{ + LogGroupName: logEvent.LogGroupName, + LogStreamName: logEvent.LogStreamName, } + cwLogsPusher := e.getLogPusher(logEvent) e.logger.Debug("Adding log event", zap.Any("event", logEvent)) err := cwLogsPusher.AddLogEntry(logEvent) if err != nil { e.logger.Error("Failed ", zap.Int("num_of_events", len(logEvents))) } + logPushersUsed[pusherKey] = cwLogsPusher } - e.logger.Debug("Log events are successfully put") - flushErr := cwLogsPusher.ForceFlush() - if flushErr != nil { - e.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(flushErr)) - return flushErr + var flushErrArray []error + for _, pusher := range logPushersUsed { + flushErr := pusher.ForceFlush() + if flushErr != nil { + e.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(flushErr)) + flushErrArray = append(flushErrArray, flushErr) + } + } + if len(flushErrArray) != 0 { + errorString := "" + for _, err := range flushErrArray { + errorString += err.Error() + } + return errors.New(errorString) } return nil } +func (e *exporter) getLogPusher(logEvent *cwlogs.Event) cwlogs.Pusher { + e.pusherMapLock.Lock() + defer e.pusherMapLock.Unlock() + pusherKey := cwlogs.PusherKey{ + LogGroupName: logEvent.LogGroupName, + LogStreamName: logEvent.LogStreamName, + } + if e.pusherMap[pusherKey] == nil { + pusher := cwlogs.NewPusher(pusherKey, e.retryCount, *e.svcStructuredLog, e.logger) + e.pusherMap[pusherKey] = pusher + } + return e.pusherMap[pusherKey] +} + func (e *exporter) shutdown(_ context.Context) error { - if e.pusher != nil { - e.pusher.ForceFlush() + if e.pusherMap != nil { + for _, pusher := range e.pusherMap { + pusher.ForceFlush() + } } return nil } -func logsToCWLogs(logger *zap.Logger, ld plog.Logs) ([]*cloudwatchlogs.InputLogEvent, int) { +func logsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config) ([]*cwlogs.Event, int) { n := ld.ResourceLogs().Len() if n == 0 { - return []*cloudwatchlogs.InputLogEvent{}, 0 + return []*cwlogs.Event{}, 0 } var dropped int - var out []*cloudwatchlogs.InputLogEvent + var out []*cwlogs.Event rls := ld.ResourceLogs() for i := 0; i < rls.Len(); i++ { @@ -151,7 +200,7 @@ func logsToCWLogs(logger *zap.Logger, ld plog.Logs) ([]*cloudwatchlogs.InputLogE logs := sl.LogRecords() for k := 0; k < logs.Len(); k++ { log := logs.At(k) - event, err := logToCWLog(resourceAttrs, log) + event, err := logToCWLog(resourceAttrs, log, config) if err != nil { logger.Debug("Failed to convert to CloudWatch Log", zap.Error(err)) dropped++ @@ -176,32 +225,63 @@ type cwLogBody struct { Resource map[string]interface{} `json:"resource,omitempty"` } -func logToCWLog(resourceAttrs map[string]interface{}, log plog.LogRecord) (*cloudwatchlogs.InputLogEvent, error) { +func logToCWLog(resourceAttrs map[string]interface{}, log plog.LogRecord, config *Config) (*cwlogs.Event, error) { // TODO(jbd): Benchmark and improve the allocations. // Evaluate go.elastic.co/fastjson as a replacement for encoding/json. - body := cwLogBody{ - Body: log.Body().AsRaw(), - SeverityNumber: int32(log.SeverityNumber()), - SeverityText: log.SeverityText(), - DroppedAttributesCount: log.DroppedAttributesCount(), - Flags: uint32(log.Flags()), - } - if traceID := log.TraceID(); !traceID.IsEmpty() { - body.TraceID = hex.EncodeToString(traceID[:]) - } - if spanID := log.SpanID(); !spanID.IsEmpty() { - body.SpanID = hex.EncodeToString(spanID[:]) - } - body.Attributes = attrsValue(log.Attributes()) - body.Resource = resourceAttrs + logGroupName := config.LogGroupName + logStreamName := config.LogStreamName - bodyJSON, err := json.Marshal(body) - if err != nil { - return nil, err + var bodyJSON []byte + var err error + if config.RawLog { + // Check if this is an emf log + var metadata emfMetadata + bodyString := log.Body().AsString() + err = json.Unmarshal([]byte(bodyString), &metadata) + // v1 emf json + if err == nil && metadata.AWSMetadata != nil && metadata.AWSMetadata.LogGroupName != "" { + logGroupName = metadata.AWSMetadata.LogGroupName + if metadata.AWSMetadata.LogStreamName != "" { + logStreamName = metadata.AWSMetadata.LogStreamName + } + } else /* v0 emf json */ if err == nil && metadata.LogGroupName != "" { + logGroupName = metadata.LogGroupName + if metadata.LogStreamName != "" { + logStreamName = metadata.LogStreamName + } + } + bodyJSON = []byte(bodyString) + } else { + body := cwLogBody{ + Body: log.Body().AsRaw(), + SeverityNumber: int32(log.SeverityNumber()), + SeverityText: log.SeverityText(), + DroppedAttributesCount: log.DroppedAttributesCount(), + Flags: uint32(log.Flags()), + } + if traceID := log.TraceID(); !traceID.IsEmpty() { + body.TraceID = hex.EncodeToString(traceID[:]) + } + if spanID := log.SpanID(); !spanID.IsEmpty() { + body.SpanID = hex.EncodeToString(spanID[:]) + } + body.Attributes = attrsValue(log.Attributes()) + body.Resource = resourceAttrs + + bodyJSON, err = json.Marshal(body) + if err != nil { + return &cwlogs.Event{}, err + } } - return &cloudwatchlogs.InputLogEvent{ - Timestamp: aws.Int64(int64(log.Timestamp()) / int64(time.Millisecond)), // in milliseconds - Message: aws.String(string(bodyJSON)), + + return &cwlogs.Event{ + InputLogEvent: &cloudwatchlogs.InputLogEvent{ + Timestamp: aws.Int64(int64(log.Timestamp()) / int64(time.Millisecond)), // in milliseconds + Message: aws.String(string(bodyJSON)), + }, + LogGroupName: logGroupName, + LogStreamName: logStreamName, + GeneratedTime: time.Now(), }, nil } diff --git a/exporter/awscloudwatchlogsexporter/exporter_test.go b/exporter/awscloudwatchlogsexporter/exporter_test.go index bdd590527a3b..70313ef59fea 100644 --- a/exporter/awscloudwatchlogsexporter/exporter_test.go +++ b/exporter/awscloudwatchlogsexporter/exporter_test.go @@ -17,6 +17,7 @@ package awscloudwatchlogsexporter import ( "context" "testing" + "time" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" @@ -58,34 +59,151 @@ func TestLogToCWLog(t *testing.T) { name string resource pcommon.Resource log plog.LogRecord - want *cloudwatchlogs.InputLogEvent + config Config + want cwlogs.Event wantErr bool }{ { name: "basic", resource: testResource(), log: testLogRecord(), - want: &cloudwatchlogs.InputLogEvent{ - Timestamp: aws.Int64(1609719139), - Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"flags":1,"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","attributes":{"key1":1,"key2":"attr2"},"resource":{"host":"abc123","node":5}}`), + config: Config{}, + want: cwlogs.Event{ + GeneratedTime: time.Now(), + InputLogEvent: &cloudwatchlogs.InputLogEvent{ + Timestamp: aws.Int64(1609719139), + Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"flags":1,"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","attributes":{"key1":1,"key2":"attr2"},"resource":{"host":"abc123","node":5}}`), + }, + LogGroupName: "", + LogStreamName: "", }, }, { name: "no resource", resource: pcommon.NewResource(), log: testLogRecord(), - want: &cloudwatchlogs.InputLogEvent{ - Timestamp: aws.Int64(1609719139), - Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"flags":1,"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","attributes":{"key1":1,"key2":"attr2"}}`), + config: Config{}, + want: cwlogs.Event{ + GeneratedTime: time.Now(), + InputLogEvent: &cloudwatchlogs.InputLogEvent{ + Timestamp: aws.Int64(1609719139), + Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"flags":1,"trace_id":"0102030405060708090a0b0c0d0e0f10","span_id":"0102030405060708","attributes":{"key1":1,"key2":"attr2"}}`), + }, + LogGroupName: "", + LogStreamName: "", }, }, { name: "no trace", resource: testResource(), log: testLogRecordWithoutTrace(), - want: &cloudwatchlogs.InputLogEvent{ - Timestamp: aws.Int64(1609719139), - Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"attributes":{"key1":1,"key2":"attr2"},"resource":{"host":"abc123","node":5}}`), + config: Config{ + LogGroupName: "tLogGroup", + LogStreamName: "tStreamName", + }, + want: cwlogs.Event{ + GeneratedTime: time.Now(), + InputLogEvent: &cloudwatchlogs.InputLogEvent{ + Timestamp: aws.Int64(1609719139), + Message: aws.String(`{"body":"hello world","severity_number":5,"severity_text":"debug","dropped_attributes_count":4,"attributes":{"key1":1,"key2":"attr2"},"resource":{"host":"abc123","node":5}}`), + }, + LogGroupName: "tLogGroup", + LogStreamName: "tStreamName", + }, + }, + { + name: "raw", + resource: testResource(), + log: testLogRecordWithoutTrace(), + config: Config{ + LogGroupName: "tLogGroup", + LogStreamName: "tStreamName", + RawLog: true, + }, + want: cwlogs.Event{ + GeneratedTime: time.Now(), + InputLogEvent: &cloudwatchlogs.InputLogEvent{ + Timestamp: aws.Int64(1609719139), + Message: aws.String(`hello world`), + }, + LogGroupName: "tLogGroup", + LogStreamName: "tStreamName", + }, + }, + { + name: "raw emf v1", + resource: testResource(), + log: createPLog(`{"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}`), + config: Config{ + LogGroupName: "tLogGroup", + LogStreamName: "tStreamName", + RawLog: true, + }, + want: cwlogs.Event{ + GeneratedTime: time.Now(), + InputLogEvent: &cloudwatchlogs.InputLogEvent{ + Timestamp: aws.Int64(1609719139), + Message: aws.String(`{"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}`), + }, + LogGroupName: "Foo", + LogStreamName: "tStreamName", + }, + }, + { + name: "raw emf v1 with log stream", + resource: testResource(), + log: createPLog(`{"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo","LogStreamName":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}`), + config: Config{ + LogGroupName: "tLogGroup", + LogStreamName: "tStreamName", + RawLog: true, + }, + want: cwlogs.Event{ + GeneratedTime: time.Now(), + InputLogEvent: &cloudwatchlogs.InputLogEvent{ + Timestamp: aws.Int64(1609719139), + Message: aws.String(`{"_aws":{"Timestamp":1574109732004,"LogGroupName":"Foo","LogStreamName":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}]},"Operation":"Aggregator","ProcessingLatency":100}`), + }, + LogGroupName: "Foo", + LogStreamName: "Foo", + }, + }, + { + name: "raw emf v0", + resource: testResource(), + log: createPLog(`{"Timestamp":1574109732004,"log_group_name":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}],"Operation":"Aggregator","ProcessingLatency":100}`), + config: Config{ + LogGroupName: "tLogGroup", + LogStreamName: "tStreamName", + RawLog: true, + }, + want: cwlogs.Event{ + GeneratedTime: time.Now(), + InputLogEvent: &cloudwatchlogs.InputLogEvent{ + Timestamp: aws.Int64(1609719139), + Message: aws.String(`{"Timestamp":1574109732004,"log_group_name":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}],"Operation":"Aggregator","ProcessingLatency":100}`), + }, + LogGroupName: "Foo", + LogStreamName: "tStreamName", + }, + }, + { + name: "raw emf v0 with log stream", + resource: testResource(), + log: createPLog(`{"Timestamp":1574109732004,"log_group_name":"Foo","log_stream_name":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}],"Operation":"Aggregator","ProcessingLatency":100}`), + config: Config{ + LogGroupName: "tLogGroup", + LogStreamName: "tStreamName", + RawLog: true, + }, + want: cwlogs.Event{ + GeneratedTime: time.Now(), + InputLogEvent: &cloudwatchlogs.InputLogEvent{ + Timestamp: aws.Int64(1609719139), + Message: aws.String(`{"Timestamp":1574109732004,"log_group_name":"Foo","log_stream_name":"Foo","CloudWatchMetrics":[{"Namespace":"MyApp","Dimensions":[["Operation"]],"Metrics":[{"Name":"ProcessingLatency","Unit":"Milliseconds","StorageResolution":60}]}],"Operation":"Aggregator","ProcessingLatency":100}`), + }, + LogGroupName: "Foo", + LogStreamName: "Foo", }, }, } @@ -93,12 +211,15 @@ func TestLogToCWLog(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { resourceAttrs := attrsValue(tt.resource.Attributes()) - got, err := logToCWLog(resourceAttrs, tt.log) + got, err := logToCWLog(resourceAttrs, tt.log, &tt.config) if (err != nil) != tt.wantErr { t.Errorf("logToCWLog() error = %v, wantErr %v", err, tt.wantErr) return } - assert.Equal(t, tt.want, got) + // Do not test generated time since it is time.Now() + assert.Equal(t, tt.want.InputLogEvent, got.InputLogEvent) + assert.Equal(t, tt.want.LogStreamName, got.LogStreamName) + assert.Equal(t, tt.want.LogGroupName, got.LogGroupName) }) } } @@ -109,7 +230,7 @@ func BenchmarkLogToCWLog(b *testing.B) { resource := testResource() log := testLogRecord() for i := 0; i < b.N; i++ { - _, err := logToCWLog(attrsValue(resource.Attributes()), log) + _, err := logToCWLog(attrsValue(resource.Attributes()), log, &Config{}) if err != nil { b.Errorf("logToCWLog() failed %v", err) return @@ -151,6 +272,13 @@ func testLogRecordWithoutTrace() plog.LogRecord { return record } +func createPLog(log string) plog.LogRecord { + pLog := plog.NewLogRecord() + pLog.Body().SetStr(log) + pLog.SetTimestamp(1609719139000000) + return pLog +} + func TestConsumeLogs(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -174,7 +302,10 @@ func TestConsumeLogs(t *testing.T) { logPusher := new(mockPusher) logPusher.On("AddLogEntry", nil).Return("").Once() logPusher.On("ForceFlush", nil).Return("").Twice() - exp.pusher = logPusher + exp.pusherMap[cwlogs.PusherKey{ + LogGroupName: expCfg.LogGroupName, + LogStreamName: expCfg.LogStreamName, + }] = logPusher require.NoError(t, exp.consumeLogs(ctx, ld)) require.NoError(t, exp.shutdown(ctx)) } diff --git a/exporter/awsemfexporter/emf_exporter.go b/exporter/awsemfexporter/emf_exporter.go index 693db61acac5..aeaa03ad32bb 100644 --- a/exporter/awsemfexporter/emf_exporter.go +++ b/exporter/awsemfexporter/emf_exporter.go @@ -21,7 +21,6 @@ import ( "strings" "sync" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/google/uuid" "go.opentelemetry.io/collector/component" @@ -45,11 +44,10 @@ const ( ) type emfExporter struct { - // Each (log group, log stream) keeps a separate pusher because of each (log group, log stream) requires separate stream token. - groupStreamToPusherMap map[string]map[string]cwlogs.Pusher - svcStructuredLog *cwlogs.Client - config component.Config - logger *zap.Logger + pusherMap map[cwlogs.PusherKey]cwlogs.Pusher + svcStructuredLog *cwlogs.Client + config component.Config + logger *zap.Logger metricTranslator metricTranslator @@ -89,7 +87,7 @@ func newEmfPusher( logger: logger, collectorID: collectorIdentifier.String(), } - emfExporter.groupStreamToPusherMap = map[string]map[string]cwlogs.Pusher{} + emfExporter.pusherMap = map[cwlogs.PusherKey]cwlogs.Pusher{} return emfExporter, nil } @@ -158,7 +156,10 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e logStream = defaultLogStream } - emfPusher := emf.getPusher(logGroup, logStream) + emfPusher := emf.getPusher(cwlogs.PusherKey{ + LogGroupName: logGroup, + LogStreamName: logStream, + }) if emfPusher != nil { returnError := emfPusher.AddLogEntry(putLogEvent) if returnError != nil { @@ -187,23 +188,13 @@ func (emf *emfExporter) pushMetricsData(_ context.Context, md pmetric.Metrics) e return nil } -func (emf *emfExporter) getPusher(logGroup, logStream string) cwlogs.Pusher { - emf.pusherMapLock.Lock() - defer emf.pusherMapLock.Unlock() +func (emf *emfExporter) getPusher(key cwlogs.PusherKey) cwlogs.Pusher { var ok bool - var streamToPusherMap map[string]cwlogs.Pusher - if streamToPusherMap, ok = emf.groupStreamToPusherMap[logGroup]; !ok { - streamToPusherMap = map[string]cwlogs.Pusher{} - emf.groupStreamToPusherMap[logGroup] = streamToPusherMap + if _, ok = emf.pusherMap[key]; !ok { + emf.pusherMap[key] = cwlogs.NewPusher(key, emf.retryCnt, *emf.svcStructuredLog, emf.logger) } - - var emfPusher cwlogs.Pusher - if emfPusher, ok = streamToPusherMap[logStream]; !ok { - emfPusher = cwlogs.NewPusher(aws.String(logGroup), aws.String(logStream), emf.retryCnt, *emf.svcStructuredLog, emf.logger) - streamToPusherMap[logStream] = emfPusher - } - return emfPusher + return emf.pusherMap[key] } func (emf *emfExporter) listPushers() []cwlogs.Pusher { @@ -211,10 +202,8 @@ func (emf *emfExporter) listPushers() []cwlogs.Pusher { defer emf.pusherMapLock.Unlock() var pushers []cwlogs.Pusher - for _, pusherMap := range emf.groupStreamToPusherMap { - for _, pusher := range pusherMap { - pushers = append(pushers, pusher) - } + for _, pusher := range emf.pusherMap { + pushers = append(pushers, pusher) } return pushers } diff --git a/exporter/awsemfexporter/emf_exporter_test.go b/exporter/awsemfexporter/emf_exporter_test.go index de1f173a186c..2674ffade881 100644 --- a/exporter/awsemfexporter/emf_exporter_test.go +++ b/exporter/awsemfexporter/emf_exporter_test.go @@ -251,11 +251,12 @@ func TestConsumeMetricsWithLogGroupStreamConfig(t *testing.T) { md := internaldata.OCToMetrics(mdata.Node, mdata.Resource, mdata.Metrics) require.Error(t, exp.pushMetricsData(ctx, md)) require.NoError(t, exp.shutdown(ctx)) - streamToPusherMap, ok := exp.groupStreamToPusherMap["test-logGroupName"] + pusherMap, ok := exp.pusherMap[cwlogs.PusherKey{ + LogGroupName: expCfg.LogGroupName, + LogStreamName: expCfg.LogStreamName, + }] assert.True(t, ok) - emfPusher, ok := streamToPusherMap["test-logStreamName"] - assert.True(t, ok) - assert.NotNil(t, emfPusher) + assert.NotNil(t, pusherMap) } func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) { @@ -320,11 +321,12 @@ func TestConsumeMetricsWithLogGroupStreamValidPlaceholder(t *testing.T) { md := internaldata.OCToMetrics(mdata.Node, mdata.Resource, mdata.Metrics) require.Error(t, exp.pushMetricsData(ctx, md)) require.NoError(t, exp.shutdown(ctx)) - streamToPusherMap, ok := exp.groupStreamToPusherMap["/aws/ecs/containerinsights/test-cluster-name/performance"] - assert.True(t, ok) - emfPusher, ok := streamToPusherMap["test-task-id"] + pusherMap, ok := exp.pusherMap[cwlogs.PusherKey{ + LogGroupName: "/aws/ecs/containerinsights/test-cluster-name/performance", + LogStreamName: "test-task-id", + }] assert.True(t, ok) - assert.NotNil(t, emfPusher) + assert.NotNil(t, pusherMap) } func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) { @@ -389,11 +391,12 @@ func TestConsumeMetricsWithOnlyLogStreamPlaceholder(t *testing.T) { md := internaldata.OCToMetrics(mdata.Node, mdata.Resource, mdata.Metrics) require.Error(t, exp.pushMetricsData(ctx, md)) require.NoError(t, exp.shutdown(ctx)) - streamToPusherMap, ok := exp.groupStreamToPusherMap["test-logGroupName"] + pusherMap, ok := exp.pusherMap[cwlogs.PusherKey{ + LogGroupName: expCfg.LogGroupName, + LogStreamName: "test-task-id", + }] assert.True(t, ok) - emfPusher, ok := streamToPusherMap["test-task-id"] - assert.True(t, ok) - assert.NotNil(t, emfPusher) + assert.NotNil(t, pusherMap) } func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) { @@ -458,11 +461,12 @@ func TestConsumeMetricsWithWrongPlaceholder(t *testing.T) { md := internaldata.OCToMetrics(mdata.Node, mdata.Resource, mdata.Metrics) require.Error(t, exp.pushMetricsData(ctx, md)) require.NoError(t, exp.shutdown(ctx)) - streamToPusherMap, ok := exp.groupStreamToPusherMap["test-logGroupName"] - assert.True(t, ok) - emfPusher, ok := streamToPusherMap["{WrongKey}"] + pusherMap, ok := exp.pusherMap[cwlogs.PusherKey{ + LogGroupName: expCfg.LogGroupName, + LogStreamName: expCfg.LogStreamName, + }] assert.True(t, ok) - assert.NotNil(t, emfPusher) + assert.NotNil(t, pusherMap) } func TestPushMetricsDataWithErr(t *testing.T) { @@ -484,9 +488,11 @@ func TestPushMetricsDataWithErr(t *testing.T) { logPusher.On("ForceFlush", nil).Return("some error").Once() logPusher.On("ForceFlush", nil).Return("").Once() logPusher.On("ForceFlush", nil).Return("some error").Once() - streamToPusherMap := map[string]cwlogs.Pusher{"test-logStreamName": logPusher} - exp.groupStreamToPusherMap = map[string]map[string]cwlogs.Pusher{} - exp.groupStreamToPusherMap["test-logGroupName"] = streamToPusherMap + exp.pusherMap = map[cwlogs.PusherKey]cwlogs.Pusher{} + exp.pusherMap[cwlogs.PusherKey{ + LogGroupName: "test-logGroupName", + LogStreamName: "test-logStreamName", + }] = logPusher mdata := agentmetricspb.ExportMetricsServiceRequest{ Node: &commonpb.Node{ diff --git a/internal/aws/cwlogs/pusher.go b/internal/aws/cwlogs/pusher.go index ee1277724df0..752b8720c2ab 100644 --- a/internal/aws/cwlogs/pusher.go +++ b/internal/aws/cwlogs/pusher.go @@ -51,6 +51,8 @@ type Event struct { InputLogEvent *cloudwatchlogs.InputLogEvent // The time which log generated. GeneratedTime time.Time + LogGroupName string + LogStreamName string } // NewEvent creates a new log event @@ -64,6 +66,11 @@ func NewEvent(timestampMs int64, message string) *Event { return event } +type PusherKey struct { + LogGroupName string + LogStreamName string +} + func (logEvent *Event) Validate(logger *zap.Logger) error { if logEvent.eventPayloadBytes() > maxEventPayloadBytes { logger.Warn("logpusher: the single log event size is larger than the max event payload allowed. Truncate the log event.", @@ -115,11 +122,11 @@ type eventBatch struct { } // Create a new log event batch if needed. -func newEventBatch(logGroupName, logStreamName *string) *eventBatch { +func newEventBatch(key PusherKey) *eventBatch { return &eventBatch{ putLogEventsInput: &cloudwatchlogs.PutLogEventsInput{ - LogGroupName: logGroupName, - LogStreamName: logStreamName, + LogGroupName: aws.String(key.LogGroupName), + LogStreamName: aws.String(key.LogStreamName), LogEvents: make([]*cloudwatchlogs.InputLogEvent, 0, maxRequestEventCount)}, } } @@ -201,10 +208,10 @@ type logPusher struct { } // NewPusher creates a logPusher instance -func NewPusher(logGroupName, logStreamName *string, retryCnt int, +func NewPusher(pusherKey PusherKey, retryCnt int, svcStructuredLog Client, logger *zap.Logger) Pusher { - pusher := newLogPusher(logGroupName, logStreamName, svcStructuredLog, logger) + pusher := newLogPusher(pusherKey, svcStructuredLog, logger) pusher.retryCnt = defaultRetryCount if retryCnt > 0 { @@ -215,15 +222,15 @@ func NewPusher(logGroupName, logStreamName *string, retryCnt int, } // Only create a logPusher, but not start the instance. -func newLogPusher(logGroupName, logStreamName *string, +func newLogPusher(pusherKey PusherKey, svcStructuredLog Client, logger *zap.Logger) *logPusher { pusher := &logPusher{ - logGroupName: logGroupName, - logStreamName: logStreamName, + logGroupName: aws.String(pusherKey.LogGroupName), + logStreamName: aws.String(pusherKey.LogStreamName), svcStructuredLog: svcStructuredLog, logger: logger, } - pusher.logEventBatch = newEventBatch(logGroupName, logStreamName) + pusher.logEventBatch = newEventBatch(pusherKey) return pusher } @@ -322,7 +329,10 @@ func (p *logPusher) addLogEvent(logEvent *Event) *eventBatch { currentBatch := p.logEventBatch if currentBatch.exceedsLimit(logEvent.eventPayloadBytes()) || !currentBatch.isActive(logEvent.InputLogEvent.Timestamp) { prevBatch = currentBatch - currentBatch = newEventBatch(p.logGroupName, p.logStreamName) + currentBatch = newEventBatch(PusherKey{ + LogGroupName: *p.logGroupName, + LogStreamName: *p.logStreamName, + }) } currentBatch.append(logEvent) p.logEventBatch = currentBatch @@ -337,7 +347,10 @@ func (p *logPusher) renewEventBatch() *eventBatch { var prevBatch *eventBatch if len(p.logEventBatch.putLogEventsInput.LogEvents) > 0 { prevBatch = p.logEventBatch - p.logEventBatch = newEventBatch(p.logGroupName, p.logStreamName) + p.logEventBatch = newEventBatch(PusherKey{ + LogGroupName: *p.logGroupName, + LogStreamName: *p.logStreamName, + }) } return prevBatch diff --git a/internal/aws/cwlogs/pusher_test.go b/internal/aws/cwlogs/pusher_test.go index cfd44dd2147a..2e272158a712 100644 --- a/internal/aws/cwlogs/pusher_test.go +++ b/internal/aws/cwlogs/pusher_test.go @@ -77,7 +77,10 @@ func newMockPusherWithEventCheck(check func(msg string)) Pusher { check(eventMsg) } }) - p := newLogPusher(&logGroup, &logStreamName, *svc, zap.NewNop()) + p := newLogPusher(PusherKey{ + LogGroupName: logGroup, + LogStreamName: logStreamName, + }, *svc, zap.NewNop()) return p } @@ -174,7 +177,10 @@ func TestLogEventBatch_sortLogEvents(t *testing.T) { // Need to remove the tmp state folder after testing. func newMockPusher() *logPusher { svc := newAlwaysPassMockLogClient(func(args mock.Arguments) {}) - return newLogPusher(&logGroup, &logStreamName, *svc, zap.NewNop()) + return newLogPusher(PusherKey{ + LogGroupName: logGroup, + LogStreamName: logStreamName, + }, *svc, zap.NewNop()) } // @@ -187,7 +193,10 @@ var msg = "test log message" func TestPusher_newLogEventBatch(t *testing.T) { p := newMockPusher() - logEventBatch := newEventBatch(p.logGroupName, p.logStreamName) + logEventBatch := newEventBatch(PusherKey{ + LogGroupName: logGroup, + LogStreamName: logStreamName, + }) assert.Equal(t, int64(0), logEventBatch.maxTimestampMs) assert.Equal(t, int64(0), logEventBatch.minTimestampMs) assert.Equal(t, 0, logEventBatch.byteTotal)