Skip to content

Commit

Permalink
2022.11.01-15:18
Browse files Browse the repository at this point in the history
2022.11.01-22:36

2022.11.01-22:56

2022.11.01-23:02

2022.11.01-23:06

2022.11.01-23:09

2022.11.01-23:13

2022.11.01-23:14

2022.11.01-23:16

2022.11.01-23:19

2022.11.01-23:22

2022.11.04-00:24

2022.11.04-01:12

2022.11.04-01:12

2022.11.04-01:13

2022.11.04-02:40

2022.11.04-06:58

2022.11.04-07:02

2022.11.08-09:15

2022.11.09-00:20

go mod

2022.11.09-00:21

2022.11.09-00:22

2022.11.09-00:32

2022.11.09-00:35

2022.11.09-00:42

2022.11.09-08:53

2022.11.09-09:01

2022.11.09-10:00

2022.11.11-13:46

2022.11.11-13:50

2022.11.11-14:09

2022.11.10-22:35

2022.11.10-23:32

performance log

DEV

2022.11.11-13:43

2022.11.11-13:58

2022.11.11-14:11

2022.11.11-23:22
  • Loading branch information
slankdev committed Nov 11, 2022
1 parent 53c1edd commit c2eb120
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 85 deletions.
36 changes: 18 additions & 18 deletions examples/agent_output_hook/config.yaml
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
maxIpfixMessageLen: 100
timerFinishedDrainSeconds: 1
timerForceDrainSeconds: 30
timerTemplateFlushSeconds: 60
timerFinishedDrainSeconds: 10000
timerForceDrainSeconds: 10
timerTemplateFlushSeconds: 100000
outputs:
- log:
file: /tmp/flowlog.json
hooks:
- name: hostname addition
command: ./misc/hook_command_example_dummy.sh
- name: shell to resolve hostname
shell: |
#!/bin/sh
echo `cat` | jq --arg hostname $(hostname) '. + {hostname: $hostname}'
- name: shell to resolve ifname from ifindex
shell: |
#!/bin/sh
IN=$(cat)
I_IDX=$(echo $IN | jq .ingressIfindex -r)
E_IDX=$(echo $IN | jq .egressIfindex -r )
I_NAME=$(ip -n ns0 -j link | jq --argjson idx $I_IDX '.[] | select(.ifindex == $idx) | .ifname' -r)
E_NAME=$(ip -n ns0 -j link | jq --argjson idx $E_IDX '.[] | select(.ifindex == $idx) | .ifname' -r)
echo $IN | jq --arg i_name $I_NAME --arg e_name $E_NAME '. + {ingressIfname: $i_name, egressIfname: $e_name}'
# - command: /var/run/flowctl/hookbatch_all1.sh
- command: /var/run/flowctl/hookbatch_all2.sh
# - shell: |
# #!/bin/sh
# jq '[.[] | . + {foo1:"foo1"}]' \
# | jq '[.[] | . + {foo2:"foo2"}]' \
# | jq '[.[] | . + {foo3:"foo3"}]' \
# | jq '[.[] | . + {foo4:"foo4"}]' \
# | jq '[.[] | . + {foo5:"foo5"}]' \
# | jq '[.[] | . + {foo6:"foo6"}]' \
# | jq '[.[] | . + {foo7:"foo7"}]' \
# | jq '[.[] | . + {foo8:"foo8"}]'
# - shell: |
# #!/bin/sh
# jq '[.[] | . + {foo1:"foo1",foo2:"foo2",foo3:"foo3",foo4:"foo4",foo5:"foo5",foo6:"foo6",foo7:"foo7",foo8:"foo8"}]'
Binary file added misc/figures/n_flow_performance.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
35 changes: 35 additions & 0 deletions misc/figures/n_flow_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env python3
import numpy as np
import matplotlib.pyplot as plt

labels = ['34', '66', '130', '258']
nobatch_many_jq = [2087720, 4070806, 7964851, 15761758]
nobatch_one_jq = [1075662, 2096756, 4110590, 8167249]
batch_many_jq = [(69832+74551+70318)/3,
(75263+78929+78031)/3,
(99978+96594+95671)/3,
(130711+127055+129459)/3]
batch_one_jq = [(36970+38420+34403)/3,
(40202+39713+39580)/3,
(44911+45686+47030)/3,
(52168+56735+51616)/3]

x = np.arange(len(labels))
width = 0.2
fig, ax = plt.subplots()
rects1 = ax.bar(x - 1.5*width/1, nobatch_many_jq, width, label='nobatch-1shell-8jq')
rects2 = ax.bar(x - width/2, nobatch_one_jq, width, label='nobatch-1shell-1jq')
rects3 = ax.bar(x + width/2, batch_many_jq, width, label='batch-1shell-8jq')
rects4 = ax.bar(x + 1.5*width/1, batch_one_jq, width, label='batch-1shell-1jq')

ax.set_ylabel('latency [usec]')
ax.set_xticks(x, labels)
ax.set_title('#Flows Performance')
ax.legend()

ax.bar_label(rects1, padding=3)
ax.bar_label(rects2, padding=3)
ax.bar_label(rects3, padding=3)
ax.bar_label(rects4, padding=3)
fig.tight_layout()
plt.savefig("n_flow_performance.png")
Binary file added misc/figures/n_hook_performance.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
23 changes: 23 additions & 0 deletions misc/figures/n_hook_performance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env python3
import numpy as np
import matplotlib.pyplot as plt

labels = ['nothing', '1 hook', '2 hooks', '8 hooks']
shell_means = [5772, 8335522, 16449901, 65299620]
command_means = [5772, 8111420, 16282779, 64994027]

x = np.arange(len(labels))
width = 0.35
fig, ax = plt.subplots()
rects1 = ax.bar(x - width/2, shell_means, width, label='shell')
rects2 = ax.bar(x + width/2, command_means, width, label='command')

ax.set_ylabel('latency [usec]')
ax.set_xticks(x, labels)
ax.set_title('#Hooks Performance')
ax.legend()

ax.bar_label(rects1, padding=3)
ax.bar_label(rects2, padding=3)
fig.tight_layout()
plt.savefig("n_hook_performance.png")
Binary file added misc/figures/optimization.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
21 changes: 21 additions & 0 deletions misc/figures/optimization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#!/usr/bin/env python3
import numpy as np
import matplotlib.pyplot as plt

labels = ['(shell-1jq)x8', '(shell-8jq)x1', '(shell-1jq)x1']
shell_means = [65299620, 15761758, 8167249]

x = np.arange(len(labels))
width = 0.35
fig, ax = plt.subplots()
rects1 = ax.bar(x, shell_means, width, label='shell')

ax.set_ylabel('latency [usec]')
ax.set_xticks(x, labels)
ax.set_title('Optimize Shell Hooks')
ax.legend()

ax.bar_label(rects1, padding=3)
#ax.bar_label(rects2, padding=3)
fig.tight_layout()
plt.savefig("optimization.png")
32 changes: 0 additions & 32 deletions pkg/ebpfmap/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,35 +314,3 @@ func ToIpfixFlowFile(ebflows []Flow) (*ipfix.FlowFile, error) {
}
return flowFile, nil
}

func (f Flow) ToZap(o ipfix.OutputLog) ([]interface{}, error) {
m := map[string]interface{}{
"src": util.ConvertUint32ToIP(f.Key.Saddr).String(),
"dst": util.ConvertUint32ToIP(f.Key.Daddr).String(),
"proto": f.Key.Proto,
"sport": f.Key.Sport,
"dport": f.Key.Dport,
"ingressIfindex": f.Key.IngressIfindex,
"egressIfindex": f.Key.EgressIfindex,
"pkts": f.Val.FlowPkts,
"bytes": f.Val.FlowBytes,
"action": f.Key.Mark,
"start": f.Val.FlowStartMilliSecond,
"end": f.Val.FlowEndMilliSecond,
"finished": f.Val.Finished,
}

for _, h := range o.Hooks {
var err error
m, err = h.Execute(m)
if err != nil {
return nil, err
}
}

ret := []interface{}{}
for key, val := range m {
ret = append(ret, key, val)
}
return ret, nil
}
110 changes: 83 additions & 27 deletions pkg/flowctl/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,44 +261,59 @@ func threadFlowExporter() error {
if len(ebpfFlows) == 0 {
continue
}
if err := flushCaches(config); err != nil {

before := time.Now()
nFlows, err := flushCaches(config)
if err != nil {
return err
}
if err := ebpfmap.DeleteAll(); err != nil {
return err
}
slog.Info("force drain current flows", "usec",
time.Since(before).Microseconds(), "nFlows", nFlows)

case <-tickerFinished.C:
slog.Info("drain finished flow")
ebpfFlows, err := ebpfmap.Dump()
if err != nil {
return err
}
if len(ebpfFlows) == 0 {
continue
}
if err := flushCachesFinished(config); err != nil {

before := time.Now()
nFlows, err := flushCachesFinished(config)
if err != nil {
return err
}
if nFlows > 0 {
slog.Info("drain finished flows", "usec",
time.Since(before).Microseconds(), "nFlows", nFlows)
}
if err := ebpfmap.DeleteFinished(); err != nil {
return err
}

case <-ticketForce.C:
slog.Info("force drain current flows")
ebpfFlows, err := ebpfmap.Dump()
if err != nil {
return err
}
if len(ebpfFlows) == 0 {
continue
}
if err := flushCaches(config); err != nil {

before := time.Now()
nFlows, err := flushCaches(config)
if err != nil {
return err
}
if err := ebpfmap.DeleteAll(); err != nil {
return err
}
slog.Info("force drain current flows", "usec",
time.Since(before).Microseconds(), "nFlows", nFlows)
case <-tickerForTemplateFlush.C:
slog.Info("flush ipfix template")
buf1 := bytes.Buffer{}
Expand Down Expand Up @@ -328,10 +343,10 @@ func threadFlowExporter() error {
}
}

func flushCachesFinished(config ipfix.Config) error {
func flushCachesFinished(config ipfix.Config) (int, error) {
ebpfFlows0, err := ebpfmap.Dump()
if err != nil {
return err
return 0, err
}
ebpfFlows := []ebpfmap.Flow{}
for _, ebpfFlow := range ebpfFlows0 {
Expand All @@ -342,80 +357,81 @@ func flushCachesFinished(config ipfix.Config) error {

for _, o := range config.Outputs {
if !o.Valid() {
return fmt.Errorf("invalid config")
return 0, fmt.Errorf("invalid config")
}
if o.Log != nil {
if err := FlowOutputLog(ebpfFlows, o.Log.File, *o.Log); err != nil {
return err
return 0, err
}
}

if o.Collector != nil {
flow, err := ebpfmap.ToIpfixFlowFile(ebpfFlows)
if err != nil {
return err
return 0, err
}
flowDataMessages, err := flow.ToFlowDataMessages(&config, 0)
if err != nil {
return err
return 0, err
}
for _, flowDataMessage := range flowDataMessages {
flowDataMessage.Header.SysupTime = uint32(util.TimeNow())
buf2 := bytes.Buffer{}
if err := flowDataMessage.Write(&buf2, &config); err != nil {
return err
return 0, err
}
if err := util.UdpTransmit(o.Collector.LocalAddress,
o.Collector.RemoteAddress, &buf2); err != nil {
return err
return 0, err
}
}
}
}
return nil
return len(ebpfFlows), err
}

func flushCaches(config ipfix.Config) error {
func flushCaches(config ipfix.Config) (int, error) {
ebpfFlows, err := ebpfmap.Dump()
nFlows := len(ebpfFlows)
if err != nil {
return err
return 0, err
}
for _, o := range config.Outputs {
if !o.Valid() {
return fmt.Errorf("invalid config")
return 0, fmt.Errorf("invalid config")
}
if o.Log != nil {
if err := FlowOutputLog(ebpfFlows, o.Log.File, *o.Log); err != nil {
return err
return 0, err
}
}

if o.Collector != nil {
flow, err := ebpfmap.ToIpfixFlowFile(ebpfFlows)
if err != nil {
return err
return 0, err
}
flowDataMessages, err := flow.ToFlowDataMessages(&config, 0)
if err != nil {
return err
return 0, err
}
for _, flowDataMessage := range flowDataMessages {
flowDataMessage.Header.SysupTime = uint32(util.TimeNow())
buf2 := bytes.Buffer{}
if err := flowDataMessage.Write(&buf2, &config); err != nil {
return err
return 0, err
}
if err := util.UdpTransmit(o.Collector.LocalAddress,
o.Collector.RemoteAddress, &buf2); err != nil {
return err
return 0, err
}
}
}
}
return nil
return nFlows, nil
}

func FlowOutputLog(flows []ebpfmap.Flow, out string, o ipfix.OutputLog) error {
func getlogger(out string) logr.Logger {
cfg := zap.NewProductionConfig()
cfg.OutputPaths = []string{
out,
Expand All @@ -425,13 +441,53 @@ func FlowOutputLog(flows []ebpfmap.Flow, out string, o ipfix.OutputLog) error {
panic(fmt.Sprintf("who watches the watchmen (%v)?", err))
}
log := zapr.NewLogger(zapLog)
return log

}

for _, flow := range flows {
args, err := flow.ToZap(o)
func ebpflowsToMapArray(flows []ebpfmap.Flow) []map[string]interface{} {
flowTmp := []map[string]interface{}{}
for _, f := range flows {
flowTmp = append(flowTmp, map[string]interface{}{
"src": util.ConvertUint32ToIP(f.Key.Saddr).String(),
"dst": util.ConvertUint32ToIP(f.Key.Daddr).String(),
"proto": f.Key.Proto,
"sport": f.Key.Sport,
"dport": f.Key.Dport,
"ingressIfindex": f.Key.IngressIfindex,
"egressIfindex": f.Key.EgressIfindex,
"pkts": f.Val.FlowPkts,
"bytes": f.Val.FlowBytes,
"action": f.Key.Mark,
"start": f.Val.FlowStartMilliSecond,
"end": f.Val.FlowEndMilliSecond,
"finished": f.Val.Finished,
})
}
return flowTmp
}

func tozap(m map[string]interface{}) []interface{} {
ret := []interface{}{}
for key, val := range m {
ret = append(ret, key, val)
}
return ret
}

func FlowOutputLog(flows []ebpfmap.Flow, out string, o ipfix.OutputLog) error {
flowTmp := ebpflowsToMapArray(flows)
for _, h := range o.Hooks {
var err error
flowTmp, err = h.ExecuteBatch(flowTmp)
if err != nil {
return err
}
log.Info("flowlog", args...)
}

log := getlogger(out)
for _, flow := range flowTmp {
log.Info("flowlog", tozap(flow)...)
}
return nil
}
Loading

0 comments on commit c2eb120

Please sign in to comment.