Skip to content

Commit

Permalink
Increase the payload size to SNAPLEN when events can be merged (#410)
Browse files Browse the repository at this point in the history
Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>
  • Loading branch information
dxsup authored Dec 20, 2022
1 parent 3a185ad commit 3cbe703
Show file tree
Hide file tree
Showing 21 changed files with 234 additions and 183 deletions.
66 changes: 39 additions & 27 deletions collector/pkg/component/analyzer/network/message_pair.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,6 @@ import (
"github.com/Kindling-project/kindling/collector/pkg/model"
)

const (
LOWER32 = 0x00000000FFFFFFFF
LOWER16 = 0x000000000000FFFF
)

type mergableEvent struct {
events []*model.KindlingEvent // Keep No more than 10 events.

Expand All @@ -24,13 +19,15 @@ type mergableEvent struct {
}

type events struct {
event *model.KindlingEvent
mergable *mergableEvent
event *model.KindlingEvent
mergable *mergableEvent
maxPayloadLength int
}

func newEvents(evt *model.KindlingEvent) *events {
func newEvents(evt *model.KindlingEvent, maxPayloadSize int) *events {
return &events{
event: evt,
event: evt,
maxPayloadLength: maxPayloadSize,
}
}

Expand Down Expand Up @@ -78,15 +75,29 @@ func (evts *events) mergeEvent(evt *model.KindlingEvent) {
evts.mergable.resVal += evt.GetResVal()
evts.mergable.ts = evt.Timestamp

if len(evts.mergable.data) < 80 {
var appendLength int
newData := evt.GetData()
if 80-len(evts.mergable.data) > len(newData) {
appendLength = len(newData)
} else {
appendLength = 80 - len(evts.mergable.data)
}
evts.mergable.data = append(evts.mergable.data, newData[0:appendLength]...)
// We have a constraint on the payload size. The merged data can accommodate a maximum payload size
// as same as SANPLEN that is also the maximum size of the syscall data.
// If the previous data size is smaller than maxPayloadLength, the later one would fill that gap.
appendLength := evts.getAppendLength(len(evt.GetData()))
if appendLength == 0 {
return
}
evts.mergable.data = append(evts.mergable.data, evt.GetData()[0:appendLength]...)
}

// getAppendLength returns the length to accommodate the new event according to the remaining size and
// the new event's size.
func (evts *events) getAppendLength(newEventLength int) int {
remainingSize := evts.maxPayloadLength - len(evts.mergable.data)
// If the merged data is full
if remainingSize <= 0 {
return 0
}
// If the merged data is not full, return the smaller size
if remainingSize > newEventLength {
return newEventLength
} else {
return remainingSize
}
}

Expand Down Expand Up @@ -139,12 +150,13 @@ func (evts *events) getDuration() uint64 {
}

type messagePairs struct {
connects *events
requests *events
responses *events
natTuple *conntracker.IPTranslation
isSend int32
mutex sync.RWMutex // only for update latency and resval now
connects *events
requests *events
responses *events
natTuple *conntracker.IPTranslation
isSend int32
mutex sync.RWMutex // only for update latency and resval now
maxPayloadLength int
}

func (mps *messagePairs) checkSend() bool {
Expand All @@ -166,7 +178,7 @@ func (mps *messagePairs) getKey() messagePairKey {
func (mps *messagePairs) mergeConnect(evt *model.KindlingEvent) {
mps.mutex.Lock()
if mps.requests == nil {
mps.connects = newEvents(evt)
mps.connects = newEvents(evt, mps.maxPayloadLength)
} else {
mps.connects.mergeEvent(evt)
}
Expand All @@ -186,7 +198,7 @@ func (mps *messagePairs) putRequestBack(evts *events) {
func (mps *messagePairs) mergeRequest(evt *model.KindlingEvent) {
mps.mutex.Lock()
if mps.requests == nil {
mps.requests = newEvents(evt)
mps.requests = newEvents(evt, mps.maxPayloadLength)
} else {
mps.requests.mergeEvent(evt)
}
Expand All @@ -196,7 +208,7 @@ func (mps *messagePairs) mergeRequest(evt *model.KindlingEvent) {
func (mps *messagePairs) mergeResponse(evt *model.KindlingEvent) {
mps.mutex.Lock()
if mps.responses == nil {
mps.responses = newEvents(evt)
mps.responses = newEvents(evt, mps.maxPayloadLength)
} else {
mps.responses.mergeEvent(evt)
}
Expand Down
68 changes: 42 additions & 26 deletions collector/pkg/component/analyzer/network/network_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package network

import (
"context"
"log"
"math/rand"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -48,6 +49,10 @@ type NetworkAnalyzer struct {
tcpMessagePairSize int64
udpMessagePairSize int64
telemetry *component.TelemetryTools

// snaplen is the maximum data size the event could accommodate bytes.
// It is set by setting the environment variable SNAPLEN. See https://github.com/KindlingProject/kindling/pull/387.
snaplen int
}

func NewNetworkAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consumers []consumer.Consumer) analyzer.Analyzer {
Expand All @@ -71,9 +76,20 @@ func NewNetworkAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, co
}

na.parserFactory = factory.NewParserFactory(factory.WithUrlClusteringMethod(na.cfg.UrlClusteringMethod))
na.snaplen = getSnaplenEnv()
return na
}

func getSnaplenEnv() int {
snaplen := os.Getenv("SNAPLEN")
snaplenInt, err := strconv.Atoi(snaplen)
if err != nil {
// Set 1000 bytes by default.
return 1000
}
return snaplenInt
}

func (na *NetworkAnalyzer) ConsumableEvents() []string {
return []string{
constnames.ReadEvent,
Expand Down Expand Up @@ -110,13 +126,13 @@ func (na *NetworkAnalyzer) Start() error {

na.protocolMap = map[string]*protocol.ProtocolParser{}
parsers := make([]*protocol.ProtocolParser, 0)
for _, protocol := range na.cfg.ProtocolParser {
protocolparser := na.parserFactory.GetParser(protocol)
if protocolparser != nil {
na.protocolMap[protocol] = protocolparser
disableDisern, ok := disableDisernProtocols[protocol]
if !ok || !disableDisern {
parsers = append(parsers, protocolparser)
for _, protocolName := range na.cfg.ProtocolParser {
protocolParser := na.parserFactory.GetParser(protocolName)
if protocolParser != nil {
na.protocolMap[protocolName] = protocolParser
disableDiscern, ok := disableDisernProtocols[protocolName]
if !ok || !disableDiscern {
parsers = append(parsers, protocolParser)
}
}
}
Expand Down Expand Up @@ -177,9 +193,6 @@ func (na *NetworkAnalyzer) ConsumeEvent(evt *model.KindlingEvent) error {
return err
}
if isRequest {
if evt.GetPid() == 13759 {
log.Printf("latency = %d", evt.GetLatency())
}
return na.analyseRequest(evt)
} else {
return na.analyseResponse(evt)
Expand All @@ -195,13 +208,13 @@ func (na *NetworkAnalyzer) consumerFdNoReusingTrace() {
mps := v.(*messagePairs)
var timeoutTs = mps.getTimeoutTs()
if timeoutTs != 0 {
var duration = (time.Now().UnixNano()/1000000000 - int64(timeoutTs)/1000000000)
var duration = time.Now().UnixNano()/1000000000 - int64(timeoutTs)/1000000000
if mps.responses != nil && duration >= int64(na.cfg.GetFdReuseTimeout()) {
// No FdReuse Request
na.distributeTraceMetric(mps, nil)
_ = na.distributeTraceMetric(mps, nil)
} else if duration >= int64(na.cfg.getNoResponseThreshold()) {
// No Response Request
na.distributeTraceMetric(mps, nil)
_ = na.distributeTraceMetric(mps, nil)
}
}
return true
Expand All @@ -212,25 +225,26 @@ func (na *NetworkAnalyzer) consumerFdNoReusingTrace() {

func (na *NetworkAnalyzer) analyseConnect(evt *model.KindlingEvent) error {
mps := &messagePairs{
connects: newEvents(evt),
requests: nil,
responses: nil,
mutex: sync.RWMutex{},
connects: newEvents(evt, na.snaplen),
requests: nil,
responses: nil,
mutex: sync.RWMutex{},
maxPayloadLength: na.snaplen,
}
if pairInterface, exist := na.requestMonitor.LoadOrStore(mps.getKey(), mps); exist {
// There is an old message pair
var oldPairs = pairInterface.(*messagePairs)
// TODO: is there any need to check old connect event?
if oldPairs.requests == nil && oldPairs.connects != nil {
if oldPairs.connects.IsTimeout(evt, na.cfg.GetConnectTimeout()) {
na.distributeTraceMetric(oldPairs, mps)
_ = na.distributeTraceMetric(oldPairs, mps)
} else {
oldPairs.mergeConnect(evt)
}
return nil
}

na.distributeTraceMetric(oldPairs, mps)
_ = na.distributeTraceMetric(oldPairs, mps)
} else {
na.recordMessagePairSize(evt, 1)
}
Expand All @@ -247,10 +261,12 @@ func (na *NetworkAnalyzer) recordMessagePairSize(evt *model.KindlingEvent, count

func (na *NetworkAnalyzer) analyseRequest(evt *model.KindlingEvent) error {
mps := &messagePairs{
connects: nil,
requests: newEvents(evt),
responses: nil,
mutex: sync.RWMutex{}}
connects: nil,
requests: newEvents(evt, na.snaplen),
responses: nil,
mutex: sync.RWMutex{},
maxPayloadLength: na.snaplen,
}
if pairInterface, exist := na.requestMonitor.LoadOrStore(mps.getKey(), mps); exist {
// There is an old message pair
var oldPairs = pairInterface.(*messagePairs)
Expand All @@ -268,7 +284,7 @@ func (na *NetworkAnalyzer) analyseRequest(evt *model.KindlingEvent) error {
}

if oldPairs.responses != nil || oldPairs.requests.IsSportChanged(evt) {
na.distributeTraceMetric(oldPairs, mps)
_ = na.distributeTraceMetric(oldPairs, mps)
} else {
oldPairs.mergeRequest(evt)
}
Expand Down Expand Up @@ -340,7 +356,7 @@ func (na *NetworkAnalyzer) distributeTraceMetric(oldPairs *messagePairs, newPair
}
netanalyzerParsedRequestTotal.Add(context.Background(), 1, attribute.String("protocol", record.Labels.GetStringValue(constlabels.Protocol)))
for _, nexConsumer := range na.nextConsumers {
nexConsumer.Consume(record)
_ = nexConsumer.Consume(record)
}
na.dataGroupPool.Free(record)
}
Expand Down
Loading

0 comments on commit 3cbe703

Please sign in to comment.