Skip to content

Commit

Permalink
[lokiexporter] Add Resource Attributes as Loki Label (#3418)
Browse files Browse the repository at this point in the history
Logrecord attributes usually only give information at the log level (such a severity and request url, etc). 
Resource attributes usually add the context of what resource sent the log (such as container_name or cluster_name etc).

Currently the loki exporter is able to add logrecord attributes to loki labels. This PR adds Resource attributes also, which helps with filtering of loki data on both log attributes and resource attributes.

**Link to tracking Issue:** 
#3405

**Testing:** 
Added a few tests and a basic run of the exporter. I still have some questions about what happens if we have attributes with the same key on both log records and resources and which we would choose and how we deduplicate

**Documentation:** 
Updated README.md, added code comments for doc creation
  • Loading branch information
Brian Gibbins authored Jun 16, 2021
1 parent 5ccdbe0 commit 198781c
Show file tree
Hide file tree
Showing 8 changed files with 237 additions and 66 deletions.
15 changes: 11 additions & 4 deletions exporter/lokiexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,12 @@ The following settings are required:

- `endpoint` (no default): The target URL to send Loki log streams to (e.g.: http://loki:3100/loki/api/v1/push).

- `labels.attributes` (no default): Map of attributes names to valid Loki label names (must match "^[a-zA-Z_][a-zA-Z0-9_]*$")
allowed to be added as labels to Loki log streams. Logs that do not have at least one of these attributes will be dropped.
- `labels.{attributes/resource}` (no default): Either a map of attributes or resource names to valid Loki label names
(must match "^[a-zA-Z_][a-zA-Z0-9_]*$") allowed to be added as labels to Loki log streams.
Attributes are log record attributes that describe the log message itself. Resource attributes are attributes that
belong to the infrastructure that create the log (container_name, cluster_name, etc.). At least one attribute from
attribute or resource is required
Logs that do not have at least one of these attributes will be dropped.
This is a safety net to help prevent accidentally adding dynamic labels that may significantly increase cardinality,
thus having a performance impact on your Loki instance. See the
[Loki label best practices](https://grafana.com/docs/loki/latest/best-practices/current-best-practices/) page for
Expand Down Expand Up @@ -47,13 +51,16 @@ loki:
endpoint: http://loki:3100/loki/api/v1/push
tenant_id: "example"
labels:
attributes:
resource:
# Allowing 'container.name' attribute and transform it to 'container_name', which is a valid Loki label name.
container.name: "container_name"
# Allowing 'k8s.cluster.name' attribute and transform it to 'k8s_cluster_name', which is a valid Loki label name.
k8s.cluster.name: "k8s_cluster_name"
attributes:
# Allowing 'severity' attribute and not providing a mapping, since the attribute name is a valid Loki label name.
severity: ""
http.status_code: "http_status_code"

headers:
"X-Custom-Header": "loki_rocks"
```
Expand All @@ -66,4 +73,4 @@ configurations [here](./testdata/config.yaml).
Several helper files are leveraged to provide additional capabilities automatically:
- [HTTP settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/config/confighttp/README.md)
- [Queuing and retry settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
- [Queuing and retry settings](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md)
29 changes: 21 additions & 8 deletions exporter/lokiexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,45 @@ func (c *Config) validate() error {

// LabelsConfig defines the labels-related configuration
type LabelsConfig struct {
// Attributes are the attributes that are allowed to be added as labels on a log stream.
// Attributes are the log record attributes that are allowed to be added as labels on a log stream.
Attributes map[string]string `mapstructure:"attributes"`

// ResourceAttributes are the resource attributes that are allowed to be added as labels on a log stream.
ResourceAttributes map[string]string `mapstructure:"resource"`
}

func (c *LabelsConfig) validate() error {
if len(c.Attributes) == 0 {
return fmt.Errorf("\"labels.attributes\" must be configured with at least one attribute")
if len(c.Attributes) == 0 && len(c.ResourceAttributes) == 0 {
return fmt.Errorf("\"labels.attributes\" or \"labels.resource\" must be configured with at least one attribute")
}

labelNameInvalidErr := "the label `%s` in \"labels.attributes\" is not a valid label name. Label names must match " + model.LabelNameRE.String()
logRecordNameInvalidErr := "the label `%s` in \"labels.attributes\" is not a valid label name. Label names must match " + model.LabelNameRE.String()
for l, v := range c.Attributes {
if len(v) > 0 && !model.LabelName(v).IsValid() {
return fmt.Errorf(labelNameInvalidErr, v)
return fmt.Errorf(logRecordNameInvalidErr, v)
} else if len(v) == 0 && !model.LabelName(l).IsValid() {
return fmt.Errorf(labelNameInvalidErr, l)
return fmt.Errorf(logRecordNameInvalidErr, l)
}
}

resourceNameInvalidErr := "the label `%s` in \"labels.resource\" is not a valid label name. Label names must match " + model.LabelNameRE.String()
for l, v := range c.ResourceAttributes {
if len(v) > 0 && !model.LabelName(v).IsValid() {
return fmt.Errorf(resourceNameInvalidErr, v)
} else if len(v) == 0 && !model.LabelName(l).IsValid() {
return fmt.Errorf(resourceNameInvalidErr, l)
}
}

return nil
}

// getAttributes creates a lookup of allowed attributes to valid Loki label names.
func (c *LabelsConfig) getAttributes() map[string]model.LabelName {
func (c *LabelsConfig) getAttributes(labels map[string]string) map[string]model.LabelName {

attributes := map[string]model.LabelName{}

for attrName, lblName := range c.Attributes {
for attrName, lblName := range labels {
if len(lblName) > 0 {
attributes[attrName] = model.LabelName(lblName)
continue
Expand Down
114 changes: 98 additions & 16 deletions exporter/lokiexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ func TestLoadConfig(t *testing.T) {
conventions.AttributeK8sCluster: "k8s_cluster_name",
"severity": "severity",
},
ResourceAttributes: map[string]string{
"resource.name": "resource_name",
"severity": "severity",
},
},
}
require.Equal(t, &expectedCfg, actualCfg)
Expand All @@ -90,9 +94,12 @@ func TestLoadConfig(t *testing.T) {
func TestConfig_validate(t *testing.T) {
const validEndpoint = "https://validendpoint.local"

validLabelsConfig := LabelsConfig{
validAttribLabelsConfig := LabelsConfig{
Attributes: testValidAttributesWithMapping,
}
validResourceLabelsConfig := LabelsConfig{
ResourceAttributes: testValidResourceWithMapping,
}

type fields struct {
Endpoint string
Expand All @@ -111,15 +118,15 @@ func TestConfig_validate(t *testing.T) {
name: "with valid endpoint",
fields: fields{
Endpoint: validEndpoint,
Labels: validLabelsConfig,
Labels: validAttribLabelsConfig,
},
shouldError: false,
},
{
name: "with missing endpoint",
fields: fields{
Endpoint: "",
Labels: validLabelsConfig,
Labels: validAttribLabelsConfig,
},
errorMessage: "\"endpoint\" must be a valid URL",
shouldError: true,
Expand All @@ -128,37 +135,36 @@ func TestConfig_validate(t *testing.T) {
name: "with invalid endpoint",
fields: fields{
Endpoint: "this://is:an:invalid:endpoint.com",
Labels: validLabelsConfig,
Labels: validAttribLabelsConfig,
},
errorMessage: "\"endpoint\" must be a valid URL",
shouldError: true,
},
{
name: "with missing `labels.attributes`",
name: "with missing `labels.attributes` and missing `labels.resource`",
fields: fields{
Endpoint: validEndpoint,
Labels: LabelsConfig{
Attributes: nil,
Attributes: nil,
ResourceAttributes: nil,
},
},
errorMessage: "\"labels.attributes\" must be configured with at least one attribute",
errorMessage: "\"labels.attributes\" or \"labels.resource\" must be configured with at least one attribute",
shouldError: true,
},
{
name: "with `labels.attributes` set",
name: "with missing `labels.attributes`",
fields: fields{
Endpoint: validEndpoint,
Labels: LabelsConfig{
Attributes: testValidAttributesWithMapping,
},
Labels: validResourceLabelsConfig,
},
shouldError: false,
},
{
name: "with valid `labels` config",
name: "with missing `labels.resource`",
fields: fields{
Endpoint: validEndpoint,
Labels: validLabelsConfig,
Labels: validAttribLabelsConfig,
},
shouldError: false,
},
Expand Down Expand Up @@ -198,9 +204,10 @@ func TestLabelsConfig_validate(t *testing.T) {
{
name: "with no attributes",
labels: LabelsConfig{
Attributes: map[string]string{},
Attributes: map[string]string{},
ResourceAttributes: map[string]string{},
},
errorMessage: "\"labels.attributes\" must be configured with at least one attribute",
errorMessage: "\"labels.attributes\" or \"labels.resource\" must be configured with at least one attribute",
shouldError: true,
},
{
Expand All @@ -212,6 +219,15 @@ func TestLabelsConfig_validate(t *testing.T) {
},
shouldError: false,
},
{
name: "with valid resource label map",
labels: LabelsConfig{
ResourceAttributes: map[string]string{
"other.attribute": "other",
},
},
shouldError: false,
},
{
name: "with invalid attribute label map",
labels: LabelsConfig{
Expand All @@ -222,6 +238,16 @@ func TestLabelsConfig_validate(t *testing.T) {
errorMessage: "the label `invalid.label.name` in \"labels.attributes\" is not a valid label name. Label names must match " + model.LabelNameRE.String(),
shouldError: true,
},
{
name: "with invalid resource label map",
labels: LabelsConfig{
ResourceAttributes: map[string]string{
"other.attribute": "invalid.label.name",
},
},
errorMessage: "the label `invalid.label.name` in \"labels.resource\" is not a valid label name. Label names must match " + model.LabelNameRE.String(),
shouldError: true,
},
{
name: "with attribute having an invalid label name and no map configured",
labels: LabelsConfig{
Expand Down Expand Up @@ -301,7 +327,63 @@ func TestLabelsConfig_getAttributes(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mapping := tt.labels.getAttributes()
mapping := tt.labels.getAttributes(tt.labels.Attributes)

assert.Equal(t, tt.expectedMapping, mapping)
})
}
}

func TestResourcesConfig_getAttributes(t *testing.T) {
tests := []struct {
name string
labels LabelsConfig
expectedMapping map[string]model.LabelName
}{
{
name: "with attributes without label mapping",
labels: LabelsConfig{
ResourceAttributes: map[string]string{
"attribute_1": "",
"attribute_2": "",
},
},
expectedMapping: map[string]model.LabelName{
"attribute_1": model.LabelName("attribute_1"),
"attribute_2": model.LabelName("attribute_2"),
},
},
{
name: "with attributes and label mapping",
labels: LabelsConfig{
ResourceAttributes: map[string]string{
"attribute.1": "attribute_1",
"attribute.2": "attribute_2",
},
},
expectedMapping: map[string]model.LabelName{
"attribute.1": model.LabelName("attribute_1"),
"attribute.2": model.LabelName("attribute_2"),
},
},
{
name: "with attributes and without label mapping",
labels: LabelsConfig{
ResourceAttributes: map[string]string{
"attribute.1": "attribute_1",
"attribute2": "",
},
},
expectedMapping: map[string]model.LabelName{
"attribute.1": model.LabelName("attribute_1"),
"attribute2": model.LabelName("attribute2"),
},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mapping := tt.labels.getAttributes(tt.labels.ResourceAttributes)

assert.Equal(t, tt.expectedMapping, mapping)
})
Expand Down
45 changes: 27 additions & 18 deletions exporter/lokiexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ import (
)

type lokiExporter struct {
config *Config
logger *zap.Logger
client *http.Client
attributesToLabels map[string]model.LabelName
wg sync.WaitGroup
config *Config
logger *zap.Logger
client *http.Client
wg sync.WaitGroup
}

func newExporter(config *Config, logger *zap.Logger) *lokiExporter {
Expand Down Expand Up @@ -111,7 +110,6 @@ func (l *lokiExporter) start(_ context.Context, host component.Host) (err error)

l.client = client

l.attributesToLabels = l.config.Labels.getAttributes()
return nil
}

Expand All @@ -125,18 +123,18 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n
rls := ld.ResourceLogs()
for i := 0; i < rls.Len(); i++ {
ills := rls.At(i).InstrumentationLibraryLogs()
resource := rls.At(i).Resource()
for j := 0; j < ills.Len(); j++ {
logs := ills.At(j).Logs()
for k := 0; k < logs.Len(); k++ {
log := logs.At(k)
attribLabels, ok := l.convertAttributesToLabels(log.Attributes())
if !ok {

mergedLabels, dropped := l.convertAttributesAndMerge(log.Attributes(), resource.Attributes())
if dropped {
numDroppedLogs++
continue
}

labels := attribLabels.String()

labels := mergedLabels.String()
entry := convertLogToLokiEntry(log)

if stream, ok := streams[labels]; ok {
Expand Down Expand Up @@ -165,10 +163,25 @@ func (l *lokiExporter) logDataToLoki(ld pdata.Logs) (pr *logproto.PushRequest, n
return pr, numDroppedLogs
}

func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap) (model.LabelSet, bool) {
func (l *lokiExporter) convertAttributesAndMerge(logAttrs pdata.AttributeMap, resourceAttrs pdata.AttributeMap) (mergedAttributes model.LabelSet, dropped bool) {
logRecordAttributes := l.convertAttributesToLabels(logAttrs, l.config.Labels.Attributes)
resourceAttributes := l.convertAttributesToLabels(resourceAttrs, l.config.Labels.ResourceAttributes)

// This prometheus model.labelset Merge function overwrites the logRecordAttributes with resourceAttributes
mergedAttributes = logRecordAttributes.Merge(resourceAttributes)

if len(mergedAttributes) == 0 {
return nil, true
}
return mergedAttributes, false
}

func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap, allowedAttributes map[string]string) model.LabelSet {
ls := model.LabelSet{}

for attr, attrLabelName := range l.attributesToLabels {
allowedLabels := l.config.Labels.getAttributes(allowedAttributes)

for attr, attrLabelName := range allowedLabels {
av, ok := attributes.Get(attr)
if ok {
if av.Type() != pdata.AttributeValueTypeString {
Expand All @@ -179,11 +192,7 @@ func (l *lokiExporter) convertAttributesToLabels(attributes pdata.AttributeMap)
}
}

if len(ls) == 0 {
return nil, false
}

return ls, true
return ls
}

func convertLogToLokiEntry(lr pdata.LogRecord) *logproto.Entry {
Expand Down
Loading

0 comments on commit 198781c

Please sign in to comment.