Skip to content

Commit

Permalink
Merge pull request #7 from maitreya-source/master
Browse files Browse the repository at this point in the history
Add: Added support for ebpf tcptop - Issue#6
  • Loading branch information
williamchanrico authored Dec 10, 2021
2 parents 7675544 + 8afc0ed commit 8835946
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 0 deletions.
11 changes: 11 additions & 0 deletions cmd/planet-exporter/internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"planet-exporter/collector"
taskdarkstat "planet-exporter/collector/task/darkstat"
taskebpf "planet-exporter/collector/task/ebpf"
taskinventory "planet-exporter/collector/task/inventory"
tasksocketstat "planet-exporter/collector/task/socketstat"
"planet-exporter/server"
Expand Down Expand Up @@ -55,6 +56,9 @@ type Config struct {
TaskInventoryEnabled bool
TaskInventoryAddr string // InventoryAddr url for inventory hostgroup mapping table data

TaskEbpfEnabled bool
TaskEbpfAddr string // TaskEbpfAddr url for scraping the ebpf data

TaskSocketstatEnabled bool
}

Expand Down Expand Up @@ -155,6 +159,9 @@ func (s Service) collect(ctx context.Context, interval time.Duration) {
log.Infof("Task Darkstat: %v", s.Config.TaskDarkstatEnabled)
taskdarkstat.InitTask(ctx, s.Config.TaskDarkstatEnabled, s.Config.TaskDarkstatAddr)

log.Infof("Task ebpf: %v", s.Config.TaskEbpfEnabled)
taskebpf.InitTask(ctx, s.Config.TaskEbpfEnabled, s.Config.TaskEbpfAddr)

log.Infof("Task Inventory: %v", s.Config.TaskInventoryEnabled)
taskinventory.InitTask(ctx, s.Config.TaskInventoryEnabled, s.Config.TaskInventoryAddr)

Expand All @@ -172,6 +179,10 @@ func (s Service) collect(ctx context.Context, interval time.Duration) {
if err != nil {
log.Errorf("Darkstat collect failed: %v", err)
}
err = taskebpf.Collect(ctx)
if err != nil {
log.Errorf("EBPF collect failed: %v", err)
}
err = tasksocketstat.Collect(ctx)
if err != nil {
log.Errorf("Socketstat collect failed: %v", err)
Expand Down
3 changes: 3 additions & 0 deletions cmd/planet-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func main() {
flag.BoolVar(&config.TaskDarkstatEnabled, "task-darkstat-enabled", false, "Enable darkstat collector task")
flag.StringVar(&config.TaskDarkstatAddr, "task-darkstat-addr", "", "Darkstat target address")

flag.BoolVar(&config.TaskEbpfEnabled, "task-ebpf-enabled", false, "Enable Ebpf collector task")
flag.StringVar(&config.TaskEbpfAddr, "task-ebpf-addr", "http://localhost:9435/metrics", "Ebpf target address")

flag.BoolVar(&config.TaskInventoryEnabled, "task-inventory-enabled", false, "Enable inventory collector task")
flag.StringVar(&config.TaskInventoryAddr, "task-inventory-addr", "", "Darkstat target address")

Expand Down
12 changes: 12 additions & 0 deletions collector/collector_network_dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package collector

import (
"planet-exporter/collector/task/darkstat"
"planet-exporter/collector/task/ebpf"
"planet-exporter/collector/task/inventory"
"planet-exporter/collector/task/socketstat"

Expand All @@ -28,6 +29,7 @@ type networkDependencyCollector struct {
upstream *prometheus.Desc
downstream *prometheus.Desc
traffic *prometheus.Desc
ebpfTraffic *prometheus.Desc
}

func init() {
Expand All @@ -48,6 +50,11 @@ func NewNetworkDependencyCollector() (Collector, error) {
"Total network traffic with peers",
[]string{"local_hostgroup", "direction", "remote_hostgroup", "remote_ip", "local_domain", "remote_domain"}, nil,
),
ebpfTraffic: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "ebpf_traffic_bytes_total"),
"Total network traffic with peers from ebpf_exporter",
[]string{"local_hostgroup", "direction", "remote_hostgroup", "remote_ip", "local_domain", "remote_domain"}, nil,
),
upstream: prometheus.NewDesc(
prometheus.BuildFQName(namespace, "", "upstream"),
"Upstream dependency of this machine",
Expand All @@ -64,13 +71,18 @@ func NewNetworkDependencyCollector() (Collector, error) {
// Update implements the Collector interface
func (c networkDependencyCollector) Update(ch chan<- prometheus.Metric) error {
traffic := darkstat.Get()
ebpf := ebpf.Get()
serverProcesses, upstreams, downstreams := socketstat.Get()
localInventory := inventory.GetLocalInventory()

for _, m := range traffic {
ch <- prometheus.MustNewConstMetric(c.traffic, prometheus.GaugeValue, m.Bandwidth,
m.LocalHostgroup, m.Direction, m.RemoteHostgroup, m.RemoteIPAddr, m.LocalDomain, m.RemoteDomain)
}
for _, m := range ebpf {
ch <- prometheus.MustNewConstMetric(c.ebpfTraffic, prometheus.GaugeValue, m.Bandwidth,
m.LocalHostgroup, m.Direction, m.RemoteHostgroup, m.RemoteIPAddr, m.LocalDomain, m.RemoteDomain)
}
for _, m := range upstreams {
ch <- prometheus.MustNewConstMetric(c.upstream, prometheus.GaugeValue, 1,
m.LocalHostgroup, m.RemoteHostgroup, m.LocalAddress, m.RemoteAddress, m.Port, m.Protocol, m.ProcessName)
Expand Down
193 changes: 193 additions & 0 deletions collector/task/ebpf/tcptop.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/**
* Copyright 2021
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ebpf

import (
"context"
"fmt"
"net"
"planet-exporter/collector/task/inventory"
"planet-exporter/pkg/network"
"planet-exporter/pkg/prometheus"
"strconv"
"sync"
"time"

"github.com/prometheus/prom2json"
log "github.com/sirupsen/logrus"
)

// task that queries ebpf metrics and aggregates them into usable planet metrics.
type task struct {
enabled bool
ebpfAddr string

hosts []Metric
mu sync.Mutex
}

var (
once sync.Once
singleton task
)

const (
send_bytes = "ebpf_exporter_ipv4_send_bytes"
recv_bytes = "ebpf_exporter_ipv4_recv_bytes"
ingress = "ingress"
egress = "egress"
)

func init() {
singleton = task{
enabled: false,
hosts: []Metric{},
mu: sync.Mutex{},
}
}

// InitTask initial states.
func InitTask(ctx context.Context, enabled bool, ebpfAddr string) {
once.Do(func() {
singleton.enabled = enabled
singleton.ebpfAddr = ebpfAddr
})
}

// Metric contains values needed for planet metrics.
type Metric struct {
Direction string // ingress or egress
LocalHostgroup string // e.g. hostgroup
RemoteHostgroup string
RemoteIPAddr string
LocalDomain string // e.g. consul domain
RemoteDomain string
Bandwidth float64
}

// Get returns latest metrics from singleton.
func Get() []Metric {
singleton.mu.Lock()
hosts := singleton.hosts
singleton.mu.Unlock()

return hosts
}

// Collect will process ebpf metrics locally and fill singleton with latest data.
func Collect(ctx context.Context) error {
if !singleton.enabled {
return nil
}

if singleton.ebpfAddr == "" {
return fmt.Errorf("eBPF address is empty")
}

startTime := time.Now()

// Scrape ebpf prometheus endpoint for send_bytes_metric and recv_bytes_metric.
ebpfScrape, err := prometheus.Scrape(singleton.ebpfAddr)
if err != nil {
return err
}
var send_bytes_metric *prom2json.Family
var recv_bytes_metric *prom2json.Family
for _, v := range ebpfScrape {
if v.Name == send_bytes {
send_bytes_metric = v
}
if v.Name == recv_bytes {
recv_bytes_metric = v
}
if send_bytes_metric != nil && recv_bytes_metric != nil {
break
}
}
if send_bytes_metric == nil {
return fmt.Errorf("Metric %v doesn't exist", send_bytes)
}
if recv_bytes_metric == nil {
return fmt.Errorf("Metric %v doesn't exist", recv_bytes)
}

sendHostBytes, err := toHostMetrics(send_bytes_metric, egress)
if err != nil {
log.Errorf("Conversion to host metric failed for %v, err: %v", send_bytes, err)
}
recvHostBytes, err := toHostMetrics(recv_bytes_metric, ingress)
if err != nil {
log.Errorf("Conversion to host metric failed for %v, err: %v", recv_bytes, err)
}

singleton.mu.Lock()
singleton.hosts = append(sendHostBytes, recvHostBytes...)
singleton.mu.Unlock()

log.Debugf("taskebpf.Collect retrieved %v metrics", len(sendHostBytes)+len(recvHostBytes))
log.Debugf("taskebpf.Collect process took %v", time.Since(startTime))
return nil
}

// toHostMetrics converts ebpf metrics into planet explorer prometheus metrics.
func toHostMetrics(bytes_metric *prom2json.Family, direction string) ([]Metric, error) {
var hosts []Metric
inventoryHosts := inventory.Get()

localAddr, err := network.DefaultLocalAddr()
if err != nil {
return nil, err
}

// To label source traffic that we need to build dependency graph.
localHostgroup := localAddr.String()
localDomain := localAddr.String()
localInventory, ok := inventoryHosts[localAddr.String()]
if ok {
localHostgroup = localInventory.Hostgroup
localDomain = localInventory.Domain
}
log.Debugf("Local address doesn't exist in the inventory: %v", localAddr.String())

for _, m := range bytes_metric.Metrics {
metric := m.(prom2json.Metric)
destIp := net.ParseIP(metric.Labels["daddr"])

if destIp.Equal(localAddr) || destIp.Equal(nil) {
continue
}

inventoryHostInfo := inventoryHosts[metric.Labels["daddr"]]

bandwidth, err := strconv.ParseFloat(metric.Value, 64)
if err != nil {
log.Errorf("Failed to parse 'bytes_metric' value: %v", err)
continue
}

hosts = append(hosts, Metric{
LocalHostgroup: localHostgroup,
RemoteHostgroup: inventoryHostInfo.Hostgroup,
RemoteIPAddr: metric.Labels["daddr"],
LocalDomain: localDomain,
RemoteDomain: inventoryHostInfo.Domain,
Direction: direction,
Bandwidth: bandwidth,
})
}
return hosts, nil
}

0 comments on commit 8835946

Please sign in to comment.