Skip to content

Commit

Permalink
Add EMF And Raw Log Support For Cloud Watch Logs (#18730)
Browse files Browse the repository at this point in the history
  • Loading branch information
sethAmazon authored Feb 24, 2023
1 parent c35fe0a commit 7206d6d
Show file tree
Hide file tree
Showing 9 changed files with 363 additions and 114 deletions.
16 changes: 16 additions & 0 deletions .chloggen/awscloudwatchlogs-raw-log.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awscloudwatchlogsexporter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add the ability to export raw log to cloud watch

# One or more tracking issues related to the change
issues: [18758]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: Add emf and raw log support for aws cloudwatch exporter.
1 change: 1 addition & 0 deletions exporter/awscloudwatchlogsexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ The following settings can be optionally configured:
- `region`: The AWS region where the log stream is in.
- `endpoint`: The CloudWatch Logs service endpoint which the requests are forwarded to. [See the CloudWatch Logs endpoints](https://docs.aws.amazon.com/general/latest/gr/cwl_region.html) for a list.
- `log_retention`: LogRetention is the option to set the log retention policy for only newly created CloudWatch Log Groups. Defaults to Never Expire if not specified or set to 0. Possible values for retention in days are 1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 2192, 2557, 2922, 3288, or 3653.
- `raw_log`: Boolean default false. If you want to export only the log message to cw logs. This is required for emf logs.

### Examples

Expand Down
4 changes: 4 additions & 0 deletions exporter/awscloudwatchlogsexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type Config struct {
logger *zap.Logger

awsutil.AWSSessionSettings `mapstructure:",squash"`

// Export raw log string instead of log wrapper
// Required for emf logs
RawLog bool `mapstructure:"raw_log,omitempty"`
}

type QueueSettings struct {
Expand Down
162 changes: 121 additions & 41 deletions exporter/awscloudwatchlogsexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
Expand All @@ -42,7 +43,19 @@ type exporter struct {
retryCount int
collectorID string
svcStructuredLog *cwlogs.Client
pusher cwlogs.Pusher
pusherMap map[cwlogs.PusherKey]cwlogs.Pusher
pusherMapLock sync.RWMutex
}

type awsMetadata struct {
LogGroupName string `json:"logGroupName,omitempty"`
LogStreamName string `json:"logStreamName,omitempty"`
}

type emfMetadata struct {
AWSMetadata *awsMetadata `json:"_aws,omitempty"`
LogGroupName string `json:"log_group_name,omitempty"`
LogStreamName string `json:"log_stream_name,omitempty"`
}

func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*exporter, error) {
Expand All @@ -66,15 +79,24 @@ func newCwLogsPusher(expConfig *Config, params exp.CreateSettings) (*exporter, e
return nil, err
}

pusher := cwlogs.NewPusher(aws.String(expConfig.LogGroupName), aws.String(expConfig.LogStreamName), *awsConfig.MaxRetries, *svcStructuredLog, params.Logger)
pusherKey := cwlogs.PusherKey{
LogGroupName: expConfig.LogGroupName,
LogStreamName: expConfig.LogStreamName,
}

pusher := cwlogs.NewPusher(pusherKey, *awsConfig.MaxRetries, *svcStructuredLog, params.Logger)

pusherMap := make(map[cwlogs.PusherKey]cwlogs.Pusher)

pusherMap[pusherKey] = pusher

logsExporter := &exporter{
svcStructuredLog: svcStructuredLog,
Config: expConfig,
logger: params.Logger,
retryCount: *awsConfig.MaxRetries,
collectorID: collectorIdentifier.String(),
pusher: pusher,
pusherMap: pusherMap,
}
return logsExporter, nil
}
Expand All @@ -98,47 +120,74 @@ func newCwLogsExporter(config component.Config, params exp.CreateSettings) (exp.
}

func (e *exporter) consumeLogs(_ context.Context, ld plog.Logs) error {
cwLogsPusher := e.pusher
logEvents, _ := logsToCWLogs(e.logger, ld)
logEvents, _ := logsToCWLogs(e.logger, ld, e.Config)
if len(logEvents) == 0 {
return nil
}

logPushersUsed := make(map[cwlogs.PusherKey]cwlogs.Pusher)
for _, logEvent := range logEvents {
logEvent := &cwlogs.Event{
InputLogEvent: logEvent,
GeneratedTime: time.Now(),
pusherKey := cwlogs.PusherKey{
LogGroupName: logEvent.LogGroupName,
LogStreamName: logEvent.LogStreamName,
}
cwLogsPusher := e.getLogPusher(logEvent)
e.logger.Debug("Adding log event", zap.Any("event", logEvent))
err := cwLogsPusher.AddLogEntry(logEvent)
if err != nil {
e.logger.Error("Failed ", zap.Int("num_of_events", len(logEvents)))
}
logPushersUsed[pusherKey] = cwLogsPusher
}
e.logger.Debug("Log events are successfully put")
flushErr := cwLogsPusher.ForceFlush()
if flushErr != nil {
e.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(flushErr))
return flushErr
var flushErrArray []error
for _, pusher := range logPushersUsed {
flushErr := pusher.ForceFlush()
if flushErr != nil {
e.logger.Error("Error force flushing logs. Skipping to next logPusher.", zap.Error(flushErr))
flushErrArray = append(flushErrArray, flushErr)
}
}
if len(flushErrArray) != 0 {
errorString := ""
for _, err := range flushErrArray {
errorString += err.Error()
}
return errors.New(errorString)
}
return nil
}

func (e *exporter) getLogPusher(logEvent *cwlogs.Event) cwlogs.Pusher {
e.pusherMapLock.Lock()
defer e.pusherMapLock.Unlock()
pusherKey := cwlogs.PusherKey{
LogGroupName: logEvent.LogGroupName,
LogStreamName: logEvent.LogStreamName,
}
if e.pusherMap[pusherKey] == nil {
pusher := cwlogs.NewPusher(pusherKey, e.retryCount, *e.svcStructuredLog, e.logger)
e.pusherMap[pusherKey] = pusher
}
return e.pusherMap[pusherKey]
}

func (e *exporter) shutdown(_ context.Context) error {
if e.pusher != nil {
e.pusher.ForceFlush()
if e.pusherMap != nil {
for _, pusher := range e.pusherMap {
pusher.ForceFlush()
}
}
return nil
}

func logsToCWLogs(logger *zap.Logger, ld plog.Logs) ([]*cloudwatchlogs.InputLogEvent, int) {
func logsToCWLogs(logger *zap.Logger, ld plog.Logs, config *Config) ([]*cwlogs.Event, int) {
n := ld.ResourceLogs().Len()
if n == 0 {
return []*cloudwatchlogs.InputLogEvent{}, 0
return []*cwlogs.Event{}, 0
}

var dropped int
var out []*cloudwatchlogs.InputLogEvent
var out []*cwlogs.Event

rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
Expand All @@ -151,7 +200,7 @@ func logsToCWLogs(logger *zap.Logger, ld plog.Logs) ([]*cloudwatchlogs.InputLogE
logs := sl.LogRecords()
for k := 0; k < logs.Len(); k++ {
log := logs.At(k)
event, err := logToCWLog(resourceAttrs, log)
event, err := logToCWLog(resourceAttrs, log, config)
if err != nil {
logger.Debug("Failed to convert to CloudWatch Log", zap.Error(err))
dropped++
Expand All @@ -176,32 +225,63 @@ type cwLogBody struct {
Resource map[string]interface{} `json:"resource,omitempty"`
}

func logToCWLog(resourceAttrs map[string]interface{}, log plog.LogRecord) (*cloudwatchlogs.InputLogEvent, error) {
func logToCWLog(resourceAttrs map[string]interface{}, log plog.LogRecord, config *Config) (*cwlogs.Event, error) {
// TODO(jbd): Benchmark and improve the allocations.
// Evaluate go.elastic.co/fastjson as a replacement for encoding/json.
body := cwLogBody{
Body: log.Body().AsRaw(),
SeverityNumber: int32(log.SeverityNumber()),
SeverityText: log.SeverityText(),
DroppedAttributesCount: log.DroppedAttributesCount(),
Flags: uint32(log.Flags()),
}
if traceID := log.TraceID(); !traceID.IsEmpty() {
body.TraceID = hex.EncodeToString(traceID[:])
}
if spanID := log.SpanID(); !spanID.IsEmpty() {
body.SpanID = hex.EncodeToString(spanID[:])
}
body.Attributes = attrsValue(log.Attributes())
body.Resource = resourceAttrs
logGroupName := config.LogGroupName
logStreamName := config.LogStreamName

bodyJSON, err := json.Marshal(body)
if err != nil {
return nil, err
var bodyJSON []byte
var err error
if config.RawLog {
// Check if this is an emf log
var metadata emfMetadata
bodyString := log.Body().AsString()
err = json.Unmarshal([]byte(bodyString), &metadata)
// v1 emf json
if err == nil && metadata.AWSMetadata != nil && metadata.AWSMetadata.LogGroupName != "" {
logGroupName = metadata.AWSMetadata.LogGroupName
if metadata.AWSMetadata.LogStreamName != "" {
logStreamName = metadata.AWSMetadata.LogStreamName
}
} else /* v0 emf json */ if err == nil && metadata.LogGroupName != "" {
logGroupName = metadata.LogGroupName
if metadata.LogStreamName != "" {
logStreamName = metadata.LogStreamName
}
}
bodyJSON = []byte(bodyString)
} else {
body := cwLogBody{
Body: log.Body().AsRaw(),
SeverityNumber: int32(log.SeverityNumber()),
SeverityText: log.SeverityText(),
DroppedAttributesCount: log.DroppedAttributesCount(),
Flags: uint32(log.Flags()),
}
if traceID := log.TraceID(); !traceID.IsEmpty() {
body.TraceID = hex.EncodeToString(traceID[:])
}
if spanID := log.SpanID(); !spanID.IsEmpty() {
body.SpanID = hex.EncodeToString(spanID[:])
}
body.Attributes = attrsValue(log.Attributes())
body.Resource = resourceAttrs

bodyJSON, err = json.Marshal(body)
if err != nil {
return &cwlogs.Event{}, err
}
}
return &cloudwatchlogs.InputLogEvent{
Timestamp: aws.Int64(int64(log.Timestamp()) / int64(time.Millisecond)), // in milliseconds
Message: aws.String(string(bodyJSON)),

return &cwlogs.Event{
InputLogEvent: &cloudwatchlogs.InputLogEvent{
Timestamp: aws.Int64(int64(log.Timestamp()) / int64(time.Millisecond)), // in milliseconds
Message: aws.String(string(bodyJSON)),
},
LogGroupName: logGroupName,
LogStreamName: logStreamName,
GeneratedTime: time.Now(),
}, nil
}

Expand Down
Loading

0 comments on commit 7206d6d

Please sign in to comment.