Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes a couple of minor weaknesses #491

Merged
merged 1 commit into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion cmd/flowlogs-pipeline/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,10 @@ func initLogger() {
}

func dumpConfig(opts config.Options) {
configAsJSON, _ := json.MarshalIndent(opts, "", " ")
configAsJSON, err := json.MarshalIndent(opts, "", " ")
if err != nil {
panic(fmt.Sprintf("error dumping config: %v", err))
}
fmt.Printf("Using configuration:\n%s\n", configAsJSON)
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/pipeline/encode/encode_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,16 @@ type encodeKafka struct {
// Encode writes entries to kafka topic
func (r *encodeKafka) Encode(entry config.GenericMap) {
var entryByteArray []byte
entryByteArray, _ = json.Marshal(entry)
var err error
entryByteArray, err = json.Marshal(entry)
if err != nil {
log.Errorf("encodeKafka error: %v", err)
return
}
msg := kafkago.Message{
Value: entryByteArray,
}
err := r.kafkaWriter.WriteMessages(context.Background(), msg)
err = r.kafkaWriter.WriteMessages(context.Background(), msg)
if err != nil {
log.Errorf("encodeKafka error: %v", err)
} else {
Expand Down
36 changes: 20 additions & 16 deletions pkg/pipeline/extract/aggregate/aggregate.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,22 +153,26 @@ func (aggregate *Aggregate) UpdateByEntry(entry config.GenericMap, normalizedVal
value, ok := entry[operationKey]
if ok {
valueString := fmt.Sprintf("%v", value)
valueFloat64, _ := strconv.ParseFloat(valueString, 64)
switch operation {
case OperationSum:
groupState.totalValue += valueFloat64
groupState.recentOpValue += valueFloat64
case OperationMax:
groupState.totalValue = math.Max(groupState.totalValue, valueFloat64)
groupState.recentOpValue = math.Max(groupState.recentOpValue, valueFloat64)
case OperationMin:
groupState.totalValue = math.Min(groupState.totalValue, valueFloat64)
groupState.recentOpValue = math.Min(groupState.recentOpValue, valueFloat64)
case OperationAvg:
groupState.totalValue = (groupState.totalValue*float64(groupState.totalCount) + valueFloat64) / float64(groupState.totalCount+1)
groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1)
case OperationRawValues:
groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64)
if valueFloat64, err := strconv.ParseFloat(valueString, 64); err != nil {
// Log as debug to avoid performance impact
log.Debugf("UpdateByEntry error when parsing float '%s': %v", valueString, err)
} else {
switch operation {
case OperationSum:
groupState.totalValue += valueFloat64
groupState.recentOpValue += valueFloat64
case OperationMax:
groupState.totalValue = math.Max(groupState.totalValue, valueFloat64)
groupState.recentOpValue = math.Max(groupState.recentOpValue, valueFloat64)
case OperationMin:
groupState.totalValue = math.Min(groupState.totalValue, valueFloat64)
groupState.recentOpValue = math.Min(groupState.recentOpValue, valueFloat64)
case OperationAvg:
groupState.totalValue = (groupState.totalValue*float64(groupState.totalCount) + valueFloat64) / float64(groupState.totalCount+1)
groupState.recentOpValue = (groupState.recentOpValue*float64(groupState.recentCount) + valueFloat64) / float64(groupState.recentCount+1)
case OperationRawValues:
groupState.recentRawValues = append(groupState.recentRawValues, valueFloat64)
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/aggregate/aggregate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,5 +145,5 @@ func Test_GetMetrics(t *testing.T) {
require.Equal(t, len(metrics), 1)
require.Equal(t, metrics[0]["name"], aggregate.Definition.Name)
valueFloat64 := metrics[0]["total_value"].(float64)
require.Equal(t, valueFloat64, float64(7))
require.Equal(t, float64(7), valueFloat64)
}
22 changes: 13 additions & 9 deletions pkg/pipeline/extract/timebased/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,19 @@ func (fs *FilterStruct) CalculateValue(l *list.List, oldestValidTime time.Time)
// entry is out of time range; ignore it
continue
}
valueFloat64, _ := utils.ConvertToFloat64(cEntry.entry[fs.Rule.OperationKey])
nItems++
switch fs.Rule.OperationType {
case api.FilterOperationName("FilterOperationSum"), api.FilterOperationName("FilterOperationAvg"):
currentValue += valueFloat64
case api.FilterOperationName("FilterOperationMax"):
currentValue = math.Max(currentValue, valueFloat64)
case api.FilterOperationName("FilterOperationMin"):
currentValue = math.Min(currentValue, valueFloat64)
if valueFloat64, err := utils.ConvertToFloat64(cEntry.entry[fs.Rule.OperationKey]); err != nil {
// Log as debug to avoid performance impact
log.Debugf("CalculateValue error with OperationKey %s: %v", fs.Rule.OperationKey, err)
} else {
nItems++
switch fs.Rule.OperationType {
case api.FilterOperationName("FilterOperationSum"), api.FilterOperationName("FilterOperationAvg"):
currentValue += valueFloat64
case api.FilterOperationName("FilterOperationMax"):
currentValue = math.Max(currentValue, valueFloat64)
case api.FilterOperationName("FilterOperationMin"):
currentValue = math.Min(currentValue, valueFloat64)
}
}
}
if fs.Rule.OperationType == api.FilterOperationName("FilterOperationAvg") && nItems > 0 {
Expand Down
10 changes: 7 additions & 3 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,12 @@ func (k *ingestKafka) reportStats() {
func NewIngestKafka(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) {
klog.Debugf("entering NewIngestKafka")
jsonIngestKafka := api.IngestKafka{}
if params.Ingest != nil && params.Ingest.Kafka != nil {
jsonIngestKafka = *params.Ingest.Kafka
var ingestType string
if params.Ingest != nil {
ingestType = params.Ingest.Type
if params.Ingest.Kafka != nil {
jsonIngestKafka = *params.Ingest.Kafka
}
}

// connect to the kafka server
Expand Down Expand Up @@ -278,7 +282,7 @@ func NewIngestKafka(opMetrics *operational.Metrics, params config.StageParam) (I
}

in := make(chan []byte, 2*bml)
metrics := newMetrics(opMetrics, params.Name, params.Ingest.Type, func() int { return len(in) })
metrics := newMetrics(opMetrics, params.Name, ingestType, func() int { return len(in) })

return &ingestKafka{
kafkaReader: kafkaReader,
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/ingest/ingest_synthetic.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (ingestS *IngestSynthetic) Ingest(out chan<- config.GenericMap) {
func NewIngestSynthetic(opMetrics *operational.Metrics, params config.StageParam) (Ingester, error) {
log.Debugf("entering NewIngestSynthetic")
confIngestSynthetic := api.IngestSynthetic{}
if params.Ingest != nil || params.Ingest.Synthetic != nil {
if params.Ingest != nil && params.Ingest.Synthetic != nil {
confIngestSynthetic = *params.Ingest.Synthetic
}
if confIngestSynthetic.Connections == 0 {
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/write/write_ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,7 +537,7 @@ func NewWriteIpfix(params config.StageParam) (Writer, error) {
// Initialize IPFIX registry and send templates
registry.LoadRegistry()
var err error
if params.Write.Ipfix.EnterpriseID != 0 {
if params.Write != nil && params.Write.Ipfix != nil && params.Write.Ipfix.EnterpriseID != 0 {
err = loadCustomRegistry(writeIpfix.enrichEnterpriseID)
if err != nil {
ilog.Fatalf("Failed to load Custom(%d) Registry", writeIpfix.enrichEnterpriseID)
Expand Down