From 8a624fa470c8451572e08e4549abe2954cd96b4f Mon Sep 17 00:00:00 2001 From: Hiroki Shirokura Date: Tue, 1 Nov 2022 03:07:31 +0000 Subject: [PATCH] Implement LogOutputHook feature Signed-off-by: Hiroki Shirokura --- config.yaml | 17 +++++- examples/agent_output_hook/config.yaml | 23 ++++++++ examples/agent_output_hook/flowlog.json | 46 ++++++++++++++++ misc/hook_command_example_dummy.sh | 18 ++++++ misc/hook_command_example_hostname.sh | 18 ++++++ misc/hook_command_example_ifname.sh | 24 ++++++++ pkg/ebpfmap/types.go | 44 ++++++++++----- pkg/flowctl/ipfix.go | 14 +++-- pkg/hook/command.go | 56 +++++++++++++++++++ pkg/hook/interface.go | 22 ++++++++ pkg/hook/shell.go | 66 ++++++++++++++++++++++ pkg/ipfix/config.go | 73 +++++++++++++++++++++++++ 12 files changed, 400 insertions(+), 21 deletions(-) create mode 100644 examples/agent_output_hook/config.yaml create mode 100644 examples/agent_output_hook/flowlog.json create mode 100755 misc/hook_command_example_dummy.sh create mode 100755 misc/hook_command_example_hostname.sh create mode 100755 misc/hook_command_example_ifname.sh create mode 100644 pkg/hook/command.go create mode 100644 pkg/hook/interface.go create mode 100644 pkg/hook/shell.go diff --git a/config.yaml b/config.yaml index 72b5f23..3e53e1f 100644 --- a/config.yaml +++ b/config.yaml @@ -25,7 +25,22 @@ outputs: # nfdump -r /tmp/netflow/nfcapd.202207101030 -o extended - log: file: /tmp/flow.log - + # hooks: + # - name: hostname addition + # command: /usr/bin/hook_command_example_hostname.sh + # - name: shell to resolve hostname + # shell: | + # #!/bin/sh + # echo `cat` | jq --arg hostname $(hostname) '. + {hostname: $hostname}' + # - name: shell to resolve ifname from ifindex + # shell: | + # #!/bin/sh + # IN=$(cat) + # I_IDX=$(echo $IN | jq .ingressIfindex -r) + # E_IDX=$(echo $IN | jq .egressIfindex -r ) + # I_NAME=$(ip -n ns0 -j link | jq --argjson idx $I_IDX '.[] | select(.ifindex == $idx) | .ifname' -r) + # E_NAME=$(ip -n ns0 -j link | jq --argjson idx $E_IDX '.[] | select(.ifindex == $idx) | .ifname' -r) + # echo $IN | jq --arg i_name $I_NAME --arg e_name $E_NAME '. + {ingressIfname: $i_name, egressIfname: $e_name}' templates: - id: 1001 template: diff --git a/examples/agent_output_hook/config.yaml b/examples/agent_output_hook/config.yaml new file mode 100644 index 0000000..e0bca76 --- /dev/null +++ b/examples/agent_output_hook/config.yaml @@ -0,0 +1,23 @@ +maxIpfixMessageLen: 100 +timerFinishedDrainSeconds: 1 +timerForceDrainSeconds: 30 +timerTemplateFlushSeconds: 60 +outputs: +- log: + file: /tmp/flowlog.json + hooks: + - name: hostname addition + command: ./misc/hook_command_example_dummy.sh + - name: shell to resolve hostname + shell: | + #!/bin/sh + echo `cat` | jq --arg hostname $(hostname) '. + {hostname: $hostname}' + - name: shell to resolve ifname from ifindex + shell: | + #!/bin/sh + IN=$(cat) + I_IDX=$(echo $IN | jq .ingressIfindex -r) + E_IDX=$(echo $IN | jq .egressIfindex -r ) + I_NAME=$(ip -n ns0 -j link | jq --argjson idx $I_IDX '.[] | select(.ifindex == $idx) | .ifname' -r) + E_NAME=$(ip -n ns0 -j link | jq --argjson idx $E_IDX '.[] | select(.ifindex == $idx) | .ifname' -r) + echo $IN | jq --arg i_name $I_NAME --arg e_name $E_NAME '. + {ingressIfname: $i_name, egressIfname: $e_name}' diff --git a/examples/agent_output_hook/flowlog.json b/examples/agent_output_hook/flowlog.json new file mode 100644 index 0000000..ed78d4e --- /dev/null +++ b/examples/agent_output_hook/flowlog.json @@ -0,0 +1,46 @@ +{ + "level": "info", + "ts": 1667266821.7434478, + "caller": "flowctl/ipfix.go:430", + "msg": "flowlog", + "bytes": 594, + "finished": 1, + "foo": "bar", + "pkts": 6, + "proto": 6, + "start": 12168400776634952, + "action": 0, + "ingressIfindex": 3, + "dport": 32816, + "egressIfindex": 2, + "hostname": "slankdev", + "src": "10.2.0.2", + "ingressIfname": "eth2", + "egressIfname": "eth1", + "dst": "10.1.0.2", + "sport": 8080, + "end": 12168400778056844 +} +{ + "level": "info", + "ts": 1667266821.9629853, + "caller": "flowctl/ipfix.go:430", + "msg": "flowlog", + "dport": 8080, + "dst": "10.2.0.2", + "proto": 6, + "src": "10.1.0.2", + "finished": 1, + "sport": 32816, + "start": 12168400776608754, + "ingressIfname": "eth1", + "egressIfindex": 3, + "foo": "bar", + "egressIfname": "eth2", + "pkts": 6, + "action": 0, + "bytes": 481, + "end": 12168400778037424, + "hostname": "slankdev", + "ingressIfindex": 2 +} diff --git a/misc/hook_command_example_dummy.sh b/misc/hook_command_example_dummy.sh new file mode 100755 index 0000000..7e8b3bc --- /dev/null +++ b/misc/hook_command_example_dummy.sh @@ -0,0 +1,18 @@ +#!/bin/sh +# IN: +# { +# "src": "10.1.0.1", +# "dst": "10.2.0.1", +# "pkts": 10, +# "bytes": 1000 +# } +# +# OUT: +# { +# "src": "10.1.0.1", +# "dst": "10.2.0.1", +# "pkts": 10, +# "bytes": 1000, +# "foo": "bar" +# } +echo `cat` | jq '. + {foo: "bar"}' diff --git a/misc/hook_command_example_hostname.sh b/misc/hook_command_example_hostname.sh new file mode 100755 index 0000000..cddd70a --- /dev/null +++ b/misc/hook_command_example_hostname.sh @@ -0,0 +1,18 @@ +#!/bin/sh +# IN: +# { +# "src": "10.1.0.1", +# "dst": "10.2.0.1", +# "pkts": 10, +# "bytes": 1000 +# } +# +# OUT: +# { +# "src": "10.1.0.1", +# "dst": "10.2.0.1", +# "pkts": 10, +# "bytes": 1000, +# "hostname": "machine1" +# } +echo `cat` | jq --arg hostname $(hostname) '. + {hostname: $hostname}' diff --git a/misc/hook_command_example_ifname.sh b/misc/hook_command_example_ifname.sh new file mode 100755 index 0000000..989bd1f --- /dev/null +++ b/misc/hook_command_example_ifname.sh @@ -0,0 +1,24 @@ +#!/bin/sh +# IN: +# { +# "ingressIfindex": 1, +# "egressIfindex": 2, +# "pkts": 10, +# "bytes": 1000 +# } +# +# OUT: +# { +# "ingressIfindex": 1, +# "egressIfindex": 2, +# "ingressIfname": 1, +# "egressIfname": 2, +# "pkts": 10, +# "bytes": 1000 +# } +IN=$(cat) +I_IDX=$(echo $IN | jq .ingressIfindex -r) +E_IDX=$(echo $IN | jq .egressIfindex -r ) +I_NAME=$(ip -n ns0 -j link | jq --argjson idx $I_IDX '.[] | select(.ifindex == $idx) | .ifname' -r) +E_NAME=$(ip -n ns0 -j link | jq --argjson idx $E_IDX '.[] | select(.ifindex == $idx) | .ifname' -r) +echo $IN | jq --arg i_name $I_NAME --arg e_name $E_NAME '. + {ingressIfname: $i_name, egressIfname: $e_name}' diff --git a/pkg/ebpfmap/types.go b/pkg/ebpfmap/types.go index 8030e14..8e0978e 100644 --- a/pkg/ebpfmap/types.go +++ b/pkg/ebpfmap/types.go @@ -308,20 +308,34 @@ func ToIpfixFlowFile(ebflows []Flow) (*ipfix.FlowFile, error) { return flowFile, nil } -func (f Flow) ToZap() []interface{} { - return []interface{}{ - "src", util.ConvertUint32ToIP(f.Key.Saddr).String(), - "dst", util.ConvertUint32ToIP(f.Key.Daddr).String(), - "proto", f.Key.Proto, - "sport", f.Key.Sport, - "dport", f.Key.Dport, - "ingressIfindex", f.Key.IngressIfindex, - "egressIfindex", f.Key.EgressIfindex, - "pkts", f.Val.FlowPkts, - "bytes", f.Val.FlowBytes, - "action", f.Key.Mark, - "start", f.Val.FlowStartMilliSecond, - "end", f.Val.FlowEndMilliSecond, - "finished", f.Val.Finished, +func (f Flow) ToZap(o ipfix.OutputLog) ([]interface{}, error) { + m := map[string]interface{}{ + "src": util.ConvertUint32ToIP(f.Key.Saddr).String(), + "dst": util.ConvertUint32ToIP(f.Key.Daddr).String(), + "proto": f.Key.Proto, + "sport": f.Key.Sport, + "dport": f.Key.Dport, + "ingressIfindex": f.Key.IngressIfindex, + "egressIfindex": f.Key.EgressIfindex, + "pkts": f.Val.FlowPkts, + "bytes": f.Val.FlowBytes, + "action": f.Key.Mark, + "start": f.Val.FlowStartMilliSecond, + "end": f.Val.FlowEndMilliSecond, + "finished": f.Val.Finished, } + + for _, h := range o.Hooks { + var err error + m, err = h.Execute(m) + if err != nil { + return nil, err + } + } + + ret := []interface{}{} + for key, val := range m { + ret = append(ret, key, val) + } + return ret, nil } diff --git a/pkg/flowctl/ipfix.go b/pkg/flowctl/ipfix.go index 6ee4292..d69b3aa 100644 --- a/pkg/flowctl/ipfix.go +++ b/pkg/flowctl/ipfix.go @@ -194,7 +194,7 @@ func fnIpfixDump(cmd *cobra.Command, args []string) error { return fmt.Errorf("invalid config") } if o.Log != nil { - if err := FlowOutputLog(ebpfFlows, o.Log.File); err != nil { + if err := FlowOutputLog(ebpfFlows, o.Log.File, *o.Log); err != nil { return err } } @@ -345,7 +345,7 @@ func flushCachesFinished(config ipfix.Config) error { return fmt.Errorf("invalid config") } if o.Log != nil { - if err := FlowOutputLog(ebpfFlows, o.Log.File); err != nil { + if err := FlowOutputLog(ebpfFlows, o.Log.File, *o.Log); err != nil { return err } } @@ -385,7 +385,7 @@ func flushCaches(config ipfix.Config) error { return fmt.Errorf("invalid config") } if o.Log != nil { - if err := FlowOutputLog(ebpfFlows, o.Log.File); err != nil { + if err := FlowOutputLog(ebpfFlows, o.Log.File, *o.Log); err != nil { return err } } @@ -415,7 +415,7 @@ func flushCaches(config ipfix.Config) error { return nil } -func FlowOutputLog(flows []ebpfmap.Flow, out string) error { +func FlowOutputLog(flows []ebpfmap.Flow, out string, o ipfix.OutputLog) error { cfg := zap.NewProductionConfig() cfg.OutputPaths = []string{ out, @@ -427,7 +427,11 @@ func FlowOutputLog(flows []ebpfmap.Flow, out string) error { log := zapr.NewLogger(zapLog) for _, flow := range flows { - log.Info("flowlog", flow.ToZap()...) + args, err := flow.ToZap(o) + if err != nil { + return err + } + log.Info("flowlog", args...) } return nil } diff --git a/pkg/hook/command.go b/pkg/hook/command.go new file mode 100644 index 0000000..0bab052 --- /dev/null +++ b/pkg/hook/command.go @@ -0,0 +1,56 @@ +/* +Copyright 2022 Hiroki Shirokura. +Copyright 2022 LINE Corporation. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hook + +import ( + "bytes" + "encoding/json" + "fmt" + "os/exec" +) + +type Command string + +var _ Hook = (*Command)(nil) + +func (c Command) Execute(in map[string]interface{}) (map[string]interface{}, error) { + // Prepare input/output + stdoutbuf := bytes.Buffer{} + stderrbuf := bytes.Buffer{} + stdinbytes, err := json.Marshal(in) + if err != nil { + return nil, err + } + + // Execute child process + cmd := exec.Command(string(c)) + cmd.Stdout = &stdoutbuf + cmd.Stderr = &stderrbuf + cmd.Stdin = bytes.NewBuffer(stdinbytes) + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("child process is failed: err=%v stderr=%s", + err, stderrbuf.String()) + } + + // Convert back to map data from json-bytes + out := map[string]interface{}{} + if err := json.Unmarshal(stdoutbuf.Bytes(), &out); err != nil { + return nil, err + } + return out, nil +} diff --git a/pkg/hook/interface.go b/pkg/hook/interface.go new file mode 100644 index 0000000..c721c66 --- /dev/null +++ b/pkg/hook/interface.go @@ -0,0 +1,22 @@ +/* +Copyright 2022 Hiroki Shirokura. +Copyright 2022 LINE Corporation. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hook + +type Hook interface { + Execute(in map[string]interface{}) (map[string]interface{}, error) +} diff --git a/pkg/hook/shell.go b/pkg/hook/shell.go new file mode 100644 index 0000000..779d71b --- /dev/null +++ b/pkg/hook/shell.go @@ -0,0 +1,66 @@ +/* +Copyright 2022 Hiroki Shirokura. +Copyright 2022 LINE Corporation. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hook + +import ( + "bytes" + "crypto/sha1" + "encoding/json" + "fmt" + "os" + "os/exec" +) + +type Shell string + +var _ Hook = (*Shell)(nil) + +func (s Shell) Execute(in map[string]interface{}) (map[string]interface{}, error) { + // Create temp file from hook shell script + hash := sha1.New() + hash.Write([]byte(s)) + filename := fmt.Sprintf("/tmp/%x.sh", hash.Sum(nil)) + if err := os.WriteFile(filename, []byte(s), 0755); err != nil { + return nil, err + } + + // Prepare input/output + stdoutbuf := bytes.Buffer{} + stderrbuf := bytes.Buffer{} + stdinbytes, err := json.Marshal(in) + if err != nil { + return nil, err + } + + // Execute child process + cmd := exec.Command(filename) + cmd.Stdout = &stdoutbuf + cmd.Stderr = &stderrbuf + cmd.Stdin = bytes.NewBuffer(stdinbytes) + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("child process is failed: err=%v stderr=%s", + err, stderrbuf.String()) + } + + // Convert back to map data from json-bytes + out := map[string]interface{}{} + if err := json.Unmarshal(stdoutbuf.Bytes(), &out); err != nil { + return nil, err + } + return out, nil +} diff --git a/pkg/ipfix/config.go b/pkg/ipfix/config.go index a611168..573a499 100644 --- a/pkg/ipfix/config.go +++ b/pkg/ipfix/config.go @@ -20,6 +20,8 @@ package ipfix import ( "fmt" + + "github.com/wide-vsix/linux-flow-exporter/pkg/hook" ) type OutputCollector struct { @@ -27,8 +29,79 @@ type OutputCollector struct { LocalAddress string `yaml:"localAddress"` } +// Hook can speficy external mechianism to make log-data updated Only one of the +// other Hook backends will be enabled. +// +// DEVELOPER_NOTE(slankdev): +// To add a new hook backend, follow hook.Command, hook.Shell in /pkg/hook. If +// you add hook.Foo to type Hook struct, please edit Hook.Valid() function and +// Hook.Execute() function at the same time. +// +// TODO(slankdev): +// performance Currently, an external program is executed for a single log data, +// but this is inefficient for a large number of log data, so batch processing +// should be used in the future. +type Hook struct { + // Name make operator to know which hook is executed or failed. + Name string `yaml:"name"` + // Command is a hook to argument data using an external program like CNI. + // It sends log data via standard input to the command it executes. Receive + // modified log data on stdout. If the command fails, the log data is lost. + // It respects ansible.builtin.command, but may be changed in the future. + // + // ## Example + // ``` + // hooks: + // - command: + // cmd: /usr/bin/cmd1 + // ``` + Command *hook.Command `yaml:"command"` + // Shell is the backend that executes the external program. It is similar to + // the Command hook, but it allows you to write shell scripts directly in the + // config file, so you should use this one for simple operations. For example, + // you can use jq to add property, resolve ifname from ifindex, add hostname, + // and so on. + // + // ## Example ``` hooks: - name: test hook1 + // shell: + // shell: | + // #!/bin/sh + // echo `cat` | jq --arg hostname $(hostname) '. + {hostname: $hostname}' + // ``` + Shell *hook.Shell `yaml:"shell"` +} + +func (h Hook) Valid() bool { + cnt := 0 + if h.Command != nil { + cnt++ + } + if h.Shell != nil { + cnt++ + } + return cnt == 1 +} + +func (h Hook) Execute(m map[string]interface{}) (map[string]interface{}, error) { + if !h.Valid() { + return nil, fmt.Errorf("invalid hook") + } + if h.Shell != nil { + return h.Shell.Execute(m) + } + if h.Command != nil { + return h.Command.Execute(m) + } + return nil, fmt.Errorf("(no reach code)") +} + type OutputLog struct { File string `yaml:"file"` + // Hooks are the extention for special argmentation. Multiple Hooks can be + // set. You can change the data structure as you like by using an external + // mechanism called flowctl agent. Hooks are arrays and are executed in order. + // If a Hook fails, the log data will be lost. + Hooks []Hook `yaml:"hooks"` } type Output struct {