Skip to content

Commit

Permalink
Add TCP connect metrics (#234)
Browse files Browse the repository at this point in the history
* Add tcp connection metrics

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* Add interval checking before scanning /proc/tcp

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* add logs when tcp_connect has nil fields

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* add log of the error when scanning tcp states

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* add log of the error when scanning tcp states

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* add sendRequestEvent to help transmit the state

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* add container_id to connStats

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* remove the connStats from the map when the state is success or failure

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* remove unnecessary type variables

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* refactor the codes of generating dataGroup

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* fix testcase

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* Only record the connection's duration when it is successfully established

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* use time.Now as a variable

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* add tcp connect metrics description

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* fix metrics description

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* add field src_container_id to metrics description

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>

* add log if there are two tcp_connect events come

Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>
  • Loading branch information
dxsup authored Jun 2, 2022
1 parent ca18a11 commit e174425
Show file tree
Hide file tree
Showing 22 changed files with 1,228 additions and 15 deletions.
241 changes: 241 additions & 0 deletions collector/analyzer/tcpconnectanalyzer/analyzer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
package tcpconnectanalyzer

import (
"time"

"github.com/Kindling-project/kindling/collector/analyzer"
"github.com/Kindling-project/kindling/collector/analyzer/tcpconnectanalyzer/internal"
"github.com/Kindling-project/kindling/collector/component"
"github.com/Kindling-project/kindling/collector/consumer"
conntrackerpackge "github.com/Kindling-project/kindling/collector/metadata/conntracker"
"github.com/Kindling-project/kindling/collector/model"
"github.com/Kindling-project/kindling/collector/model/constlabels"
"github.com/Kindling-project/kindling/collector/model/constnames"
"github.com/hashicorp/go-multierror"
"go.uber.org/zap"
)

const Type analyzer.Type = "tcpconnectanalyzer"

type TcpConnectAnalyzer struct {
config *Config
nextConsumers []consumer.Consumer
conntracker conntrackerpackge.Conntracker

eventChannel chan *model.KindlingEvent
connectMonitor *internal.ConnectMonitor

stopCh chan bool

telemetry *component.TelemetryTools
}

func New(cfg interface{}, telemetry *component.TelemetryTools, consumers []consumer.Consumer) analyzer.Analyzer {
config := cfg.(*Config)
ret := &TcpConnectAnalyzer{
config: config,
nextConsumers: consumers,
telemetry: telemetry,
eventChannel: make(chan *model.KindlingEvent, config.ChannelSize),
stopCh: make(chan bool),

connectMonitor: internal.NewConnectMonitor(telemetry.Logger),
}
conntracker, err := conntrackerpackge.NewConntracker(nil)
if err != nil {
telemetry.Logger.Warn("Conntracker cannot work as expected:", zap.Error(err))
}
ret.conntracker = conntracker
newSelfMetrics(telemetry.MeterProvider, ret.connectMonitor)
return ret
}

func (a *TcpConnectAnalyzer) ConsumableEvents() []string {
return []string{
constnames.ConnectEvent,
constnames.TcpConnectEvent,
constnames.TcpSetStateEvent,
constnames.WriteEvent,
constnames.WritevEvent,
constnames.SendMsgEvent,
constnames.SendToEvent,
}
}

// Start initializes the analyzer
func (a *TcpConnectAnalyzer) Start() error {
go func() {
scanTcpStateTicker := time.NewTicker(time.Duration(a.config.WaitEventSecond/3) * time.Second)
for {
select {
case <-scanTcpStateTicker.C:
a.trimConnectionsWithTcpStat()
case event := <-a.eventChannel:
a.consumeChannelEvent(event)
case <-a.stopCh:
// Only trim the connections expired. For those unfinished, we leave them
// unchanged and just shutdown this goroutine.
a.trimConnectionsWithTcpStat()
return
}
}
}()
return nil
}

// ConsumeEvent gets the event from the previous component
func (a *TcpConnectAnalyzer) ConsumeEvent(event *model.KindlingEvent) error {
a.eventChannel <- event
return nil
}

func (a *TcpConnectAnalyzer) consumeChannelEvent(event *model.KindlingEvent) {
var (
connectStats *internal.ConnectionStats
err error
)

switch event.Name {
case constnames.ConnectEvent:
if !event.IsTcp() {
return
}
connectStats, err = a.connectMonitor.ReadInConnectExitSyscall(event)
case constnames.TcpConnectEvent:
connectStats, err = a.connectMonitor.ReadInTcpConnect(event)
case constnames.TcpSetStateEvent:
connectStats, err = a.connectMonitor.ReadInTcpSetState(event)
case constnames.WriteEvent:
fallthrough
case constnames.WritevEvent:
fallthrough
case constnames.SendToEvent:
fallthrough
case constnames.SendMsgEvent:
if filterRequestEvent(event) {
return
}
connectStats, err = a.connectMonitor.ReadSendRequestSyscall(event)
}

if err != nil {
a.telemetry.Logger.Debug("Cannot update connection stats:", zap.Error(err))
return
}
// Connection is not established yet
if connectStats == nil {
return
}

dataGroup := a.generateDataGroup(connectStats)
a.passThroughConsumers(dataGroup)
}

func filterRequestEvent(event *model.KindlingEvent) bool {
if event.Category != model.Category_CAT_NET {
return true
}

ctx := event.GetCtx()
if ctx == nil || ctx.GetThreadInfo() == nil {
return true
}
fd := ctx.GetFdInfo()
if fd == nil {
return true
}
if fd.GetProtocol() != model.L4Proto_TCP {
return true
}
if fd.GetSip() == nil || fd.GetDip() == nil {
return true
}

return false
}

func (a *TcpConnectAnalyzer) trimConnectionsWithTcpStat() {
connStats := a.connectMonitor.TrimConnectionsWithTcpStat(a.config.WaitEventSecond)
for _, connStat := range connStats {
dataGroup := a.generateDataGroup(connStat)
a.passThroughConsumers(dataGroup)
}
}

func (a *TcpConnectAnalyzer) passThroughConsumers(dataGroup *model.DataGroup) {
var retError error
for _, nextConsumer := range a.nextConsumers {
err := nextConsumer.Consume(dataGroup)
if err != nil {
retError = multierror.Append(retError, err)
}
}
if retError != nil {
a.telemetry.Logger.Warn("Error happened while passing through processors:", zap.Error(retError))
}
}

func (a *TcpConnectAnalyzer) generateDataGroup(connectStats *internal.ConnectionStats) *model.DataGroup {
labels := a.generateLabels(connectStats)
metrics := make([]*model.Metric, 0, 2)
metrics = append(metrics, model.NewIntMetric(constnames.TcpConnectTotalMetric, 1))
// Only record the connection's duration when it is successfully established
if connectStats.StateMachine.GetCurrentState() == internal.Success {
metrics = append(metrics, model.NewIntMetric(constnames.TcpConnectDurationMetric, connectStats.GetConnectDuration()))
}

retDataGroup := model.NewDataGroup(
constnames.TcpConnectMetricGroupName,
labels,
connectStats.EndTimestamp,
metrics...)

return retDataGroup
}

func (a *TcpConnectAnalyzer) generateLabels(connectStats *internal.ConnectionStats) *model.AttributeMap {
labels := model.NewAttributeMap()
// The connect events always come from the client-side
labels.AddBoolValue(constlabels.IsServer, false)
labels.AddStringValue(constlabels.ContainerId, connectStats.ContainerId)
labels.AddIntValue(constlabels.Errno, int64(connectStats.Code))
if connectStats.StateMachine.GetCurrentState() == internal.Success {
labels.AddBoolValue(constlabels.Success, true)
} else {
labels.AddBoolValue(constlabels.Success, false)
}

srcIp := connectStats.ConnKey.SrcIP
dstIp := connectStats.ConnKey.DstIP
srcPort := connectStats.ConnKey.SrcPort
dstPort := connectStats.ConnKey.DstPort
labels.UpdateAddStringValue(constlabels.SrcIp, srcIp)
labels.UpdateAddStringValue(constlabels.DstIp, dstIp)
labels.UpdateAddIntValue(constlabels.SrcPort, int64(srcPort))
labels.UpdateAddIntValue(constlabels.DstPort, int64(dstPort))
dNatIp, dNatPort := a.findDNatTuple(srcIp, uint64(srcPort), dstIp, uint64(dstPort))
labels.AddStringValue(constlabels.DnatIp, dNatIp)
labels.AddIntValue(constlabels.DnatPort, dNatPort)
return labels
}

func (a *TcpConnectAnalyzer) findDNatTuple(sIp string, sPort uint64, dIp string, dPort uint64) (string, int64) {
dNat := a.conntracker.GetDNATTupleWithString(sIp, dIp, uint16(sPort), uint16(dPort), 0)
if dNat == nil {
return "", -1
}
dNatIp := dNat.ReplSrcIP.String()
dNatPort := dNat.ReplSrcPort
return dNatIp, int64(dNatPort)
}

// Shutdown cleans all the resources used by the analyzer
func (a *TcpConnectAnalyzer) Shutdown() error {
a.stopCh <- true
return nil
}

// Type returns the type of the analyzer
func (a *TcpConnectAnalyzer) Type() analyzer.Type {
return Type
}
13 changes: 13 additions & 0 deletions collector/analyzer/tcpconnectanalyzer/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package tcpconnectanalyzer

type Config struct {
ChannelSize int `mapstructure:"channel_size"`
WaitEventSecond int `mapstructure:"wait_event_second"`
}

func NewDefaultConfig() *Config {
return &Config{
ChannelSize: 2000,
WaitEventSecond: 10,
}
}
Loading

0 comments on commit e174425

Please sign in to comment.