Skip to content

Commit

Permalink
feat: add support for external labels
Browse files Browse the repository at this point in the history
  • Loading branch information
fstr committed Jun 21, 2024
1 parent 15dc41f commit ecad247
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 9 deletions.
5 changes: 5 additions & 0 deletions charts/egressd/templates/exporter/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ spec:
- "-{{ $key }}"
{{- end }}
{{- end }}
{{- range $key, $value := .Values.exporter.extraLabels }}
{{- if and $value $key }}
- "-extra-label={{ $key }}={{ $value }}"
{{- end }}
{{- end }}
env:
- name: POD_NAMESPACE
valueFrom:
Expand Down
4 changes: 4 additions & 0 deletions charts/egressd/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,9 @@ exporter:
log-level: info
# metric-buffer-size: 10000

# Extra labels to pass along when using the Prometheus Remote-Write sink
extraLabels: {}
# mylabel: myvalue

prometheusScrape:
enabled: true
4 changes: 3 additions & 1 deletion cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var (
logLevel = flag.String("log-level", logrus.InfoLevel.String(), "Log level")
httpListenPort = flag.Int("http-listen-port", 6060, "HTTP server listen port")
configPath = flag.String("config-path", "/etc/egressd/config/config.yaml", "Path to exporter config path")
extraLabels = make(sinks.ExtraLabels)
)

// These should be set via `go build` during a release.
Expand All @@ -44,6 +45,7 @@ var (
)

func main() {
flag.Var(&extraLabels, "extra-label", "key=value pairs to set labels")
flag.Parse()

log := logrus.New()
Expand Down Expand Up @@ -130,7 +132,7 @@ func run(log logrus.FieldLogger) error {
))
} else if s.PromRemoteWriteConfig != nil {
sinksList = append(sinksList, sinks.NewPromRemoteWriteSink(
log, name, *s.PromRemoteWriteConfig,
log, name, *s.PromRemoteWriteConfig, extraLabels,
))
}
}
Expand Down
48 changes: 40 additions & 8 deletions exporter/sinks/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,24 @@ import (
"github.com/castai/egressd/exporter/config"
"github.com/castai/egressd/pb"
"github.com/castai/promwrite"
"fmt"
"strings"
)

type promWriter interface {
Write(ctx context.Context, req *promwrite.WriteRequest, options ...promwrite.WriteOption) (*promwrite.WriteResponse, error)
}

func NewPromRemoteWriteSink(log logrus.FieldLogger, sinkName string, cfg config.SinkPromRemoteWriteConfig) Sink {
func NewPromRemoteWriteSink(log logrus.FieldLogger, sinkName string, cfg config.SinkPromRemoteWriteConfig, extraLabels ExtraLabels) Sink {
return &PromRemoteWriteSink{
log: log.WithFields(map[string]interface{}{
"sink_type": "prom_remote_write",
"sink_name": sinkName,
}),
client: promwrite.NewClient(cfg.URL),
timeGetter: timeGetter,
cfg: cfg,
client: promwrite.NewClient(cfg.URL),
timeGetter: timeGetter,
cfg: cfg,
extraLabels: extraLabels,
}
}

Expand All @@ -35,10 +38,11 @@ func timeGetter() time.Time {
}

type PromRemoteWriteSink struct {
cfg config.SinkPromRemoteWriteConfig
log logrus.FieldLogger
client promWriter
timeGetter func() time.Time
cfg config.SinkPromRemoteWriteConfig
log logrus.FieldLogger
client promWriter
timeGetter func() time.Time
extraLabels ExtraLabels
}

func (s *PromRemoteWriteSink) Push(ctx context.Context, batch *pb.PodNetworkMetricBatch) error {
Expand Down Expand Up @@ -69,9 +73,16 @@ func (s *PromRemoteWriteSink) Push(ctx context.Context, batch *pb.PodNetworkMetr

{Name: "proto", Value: protoString(uint8(m.Proto))},
}

// add any defined extra labels
for k, v := range s.extraLabels {
labels = append(labels, promwrite.Label{Name: k, Value: v})
}

sort.Slice(labels, func(i, j int) bool {
return labels[i].Name < labels[j].Name
})

ts = append(ts, promwrite.TimeSeries{
Labels: labels,
Sample: promwrite.Sample{
Expand Down Expand Up @@ -117,3 +128,24 @@ func protoString(p uint8) string {
}
return strconv.Itoa(int(p))
}

// ExtraLabels is a custom type that implements the flag.Value interface
type ExtraLabels map[string]string

// String is part of the flag.Value interface
func (l *ExtraLabels) String() string {
return fmt.Sprint(*l)
}

// Set is part of the flag.Value interface
// It parses a key=value pair and adds it to the map
func (l *ExtraLabels) Set(value string) error {
parts := strings.SplitN(value, "=", 2)
if len(parts) != 2 {
return fmt.Errorf("invalid label format: %s", value)
}
key := parts[0]
val := parts[1]
(*l)[key] = val
return nil
}
80 changes: 80 additions & 0 deletions exporter/sinks/prom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,86 @@ func TestPromSink(t *testing.T) {
}, client.req.TimeSeries)
}

func TestPromSinkWithExtraLabels(t *testing.T) {
r := require.New(t)
ctx := context.Background()
log := logrus.New()
log.SetLevel(logrus.DebugLevel)
extraLabels := ExtraLabels{
"extra1": "value1",
"extra2": "value2",
}

cfg := config.SinkPromRemoteWriteConfig{
URL: "prom-remote-write-url",
Headers: map[string]string{
"Custom-Header": "1",
},
}
client := &mockPromWriteClient{}
ts := time.Date(2023, time.April, 13, 13, 41, 48, 926278000, time.UTC)
sink := PromRemoteWriteSink{
cfg: cfg,
log: log,
client: client,
timeGetter: func() time.Time {
return ts
},
extraLabels: extraLabels,
}
batch := &pb.PodNetworkMetricBatch{
Items: []*pb.PodNetworkMetric{
{
SrcIp: "10.14.7.12",
SrcPod: "p1",
SrcNamespace: "team1",
SrcNode: "n1",
SrcZone: "us-east-1a",
DstIp: "10.14.7.5",
DstPod: "",
DstNamespace: "team2",
DstNode: "n1",
DstZone: "us-east-1a",
TxBytes: 35,
TxPackets: 3,
RxBytes: 30,
RxPackets: 1,
Proto: 6,
},
},
}
r.NoError(sink.Push(ctx, batch))
expectedLabels := []promwrite.Label{
{Name: "__name__", Value: "egressd_transmit_bytes_total"},
{Name: "src_pod", Value: "p1"},
{Name: "src_node", Value: "n1"},
{Name: "src_namespace", Value: "team1"},
{Name: "src_zone", Value: "us-east-1a"},
{Name: "src_ip", Value: "10.14.7.12"},
{Name: "dst_pod", Value: "unknown"},
{Name: "dst_node", Value: "n1"},
{Name: "dst_namespace", Value: "team2"},
{Name: "dst_zone", Value: "us-east-1a"},
{Name: "dst_ip", Value: "10.14.7.5"},
{Name: "dst_ip_type", Value: "private"},
{Name: "cross_zone", Value: "false"},
{Name: "proto", Value: "TCP"},
{Name: "extra1", Value: "value1"},
{Name: "extra2", Value: "value2"},
}
sort.Slice(expectedLabels, func(i, j int) bool {
return expectedLabels[i].Name < expectedLabels[j].Name
})

r.Equal([]promwrite.TimeSeries{
{
Labels: expectedLabels,
Sample: promwrite.Sample{Time: ts, Value: 35},
},
},
client.req.TimeSeries)
}

type mockPromWriteClient struct {
req *promwrite.WriteRequest
}
Expand Down

0 comments on commit ecad247

Please sign in to comment.