Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Filebeat] Auto-scale number of netflow decoder workers #37761

Closed
andrewkroh opened this issue Jan 26, 2024 · 20 comments · Fixed by elastic/integrations#11025
Closed

[Filebeat] Auto-scale number of netflow decoder workers #37761

andrewkroh opened this issue Jan 26, 2024 · 20 comments · Fixed by elastic/integrations#11025
Assignees
Labels
enhancement Filebeat Filebeat Team:Security-Deployment and Devices Deployment and Devices Team in Security Solution

Comments

@andrewkroh
Copy link
Member

Describe the enhancement:

The Filebeat netflow input utilizes a single decoder goroutine. Users have reported instances where the drops have occurred (indicated by filebeat.input.netflow.packets.dropped) while there is no back-pressure from the internal memory queue. This evidence points to the decoder routine within the netflow input as being the bottleneck. This is the data flow.

flowchart TD
    U([UDP Receiver])
    Q[(Buffered Channel)]
    N[Netflow Decoder]
    IQ[(Internal Memory Queue)]
    O(Output)
    
    U -->|push| C{Channel Full?}
    C --> Q
    C --> D([Drop])
    Q -->|read| N
    N --> IQ
    IQ -->|N workers| O
Loading

The enhancement would be to scale up the number of goroutines that perform decoding. If we add mutliple goroutines then we need to take into account any state information that must be shared between them. Netflow receivers hold the exporters' templates as part of session state. I think the data is mostly static (e.g. you get a new template every few minutes or at the start of a session).

Describe a specific use case for the enhancement or feature:

This would allow users to receive more events per second with the Netflow input.

@botelastic botelastic bot added the needs_team Indicates that the issue/PR needs a Team:* label label Jan 26, 2024
@andrewkroh andrewkroh added the Team:Security-Deployment and Devices Deployment and Devices Team in Security Solution label Jan 26, 2024
@botelastic botelastic bot removed the needs_team Indicates that the issue/PR needs a Team:* label label Jan 26, 2024
@pkoutsovasilis
Copy link
Contributor

pkoutsovasilis commented Feb 22, 2024

After a quick investigation I think that flow timestamp sequencing is something that needs to be maintained:

  • we need to create sessions (for Netflow 9 and IPFX) in the appropriate sequence; so reading packet header needs to remain sequential?!
  • we need to publish events maintaining the flow timestamp sequence; I think this also indirectly affects this one based on this.

    Only use a TSDS if you typically add metrics data to Elasticsearch in near real-time and @timestamp order.

Thus the code point where scaling could be beneficial, and meets the above guarantees, isn't that straight forward. @andrewkroh please feel free to chime in maybe I missed something

@andrewkroh
Copy link
Member Author

andrewkroh commented Feb 22, 2024

we need to create sessions (for Netflow 9 and IPFX) in the appropriate sequence; so reading packet header needs to remain sequential?!

Good catch. This is very important, and I didn't consider it in my basic diagram. My initial thought is to handle this similar to load balancers or LAG ports on switches by using a hash function on the source address and modulo (%) on the number of workers to process the data on a consistent worker. But that introduces limitations that would limit the effectiveness (like no gains in single netflow exporter throughput). This needs more consideration 🤔 .

we need to publish events maintaining the flow timestamp sequence; I think this also indirectly affects [TSDS]

I don't think this small out-of-order processing will affect TSDS. Today there are no ordering guarantees on batches of events that get published by the Beat. For example, if a set of events is split into two batches that are sent by separate concurrent elasticsearch output workers, then the order in which data is written to Elasticsearch is indeterminate.

My understanding is that as long as the data arrives within the lock-back window it's not an issue. The @timestamp will still be used for sorting purposes.

@pkoutsovasilis
Copy link
Contributor

Good catch. This is very important, and I didn't consider it in my basic diagram. My initial thought is to handle this similar to load balancers or LAG ports on switches by using a hash function on the source address and modulo (%) on the number of workers to process the data on a consistent worker. But that introduces limitations that would limit the effectiveness (like no gains in single netflow exporter throughput). This needs more consideration 🤔 .

I would propose the way to go here, is through synthetic benchmarking, extract the CPU hot path and based-on this decide next steps of optimisation. How does that sound to you?

I don't think this small out-of-order processing will affect TSDS. Today there are no ordering guarantees on batches of events that get published by the Beat. For example, if a set of events is split into two batches that are sent by separate concurrent elasticsearch output workers, then the order in which data is written to Elasticsearch is indeterminate.

Oh I see! I assumed by the name that the publisher queue was applying ordering but you are 100% right, different output workers can introduce slight timestamp un-ordering.

@andrewkroh
Copy link
Member Author

... is through synthetic benchmarking

That sounds good. Scenario-wise, we should probably consider benchmarking both single-client and multi-client in case they have different hot paths.

@pkoutsovasilis
Copy link
Contributor

pkoutsovasilis commented Jul 8, 2024

Intro

For full context, please first read the section Performance results of this PR.

The short version is that we do not see any performance gains for Netflow input when we scale to more than 8 output workers.

After reviewing the performance results of scaling netflow and examining my ES cluster, which wasn’t particularly stressed with more than 8 output workers, I reached out to @alexsapran. He led a performance evaluation initiative, and we brainstormed the issue. We both agreed that the bottleneck didn't seem to be Elasticsearch but something else. At this point, @alexsapran introduced me to TSA method and told me it was in his plans to try to incorporate this performance evaluation approach. However, he had to figure out how to apply it to golang, as goroutines are not 1:1 mapped to threads. I took on the task of applying a conceptually similar approach to my findings.

The built-in Go sampling CPU profiler only shows On-CPU time, making it unsuitable for this evaluation. After exploring various Go profilers, two emerged as good candidates:

  1. Go's std blockprofile: The block profile in Go lets you analyze how much time your program spends waiting on the blocking operations listed below:

    • select
    • chan send
    • chan receive
    • semacquire ( Mutex.Lock, RWMutex.RLock , RWMutex.Lock, WaitGroup.Wait)
    • notifyListWait ( Cond.Wait)
  2. fgprof: is a sampling Go profiler that allows you to analyze On-CPU as well as Off-CPU (e.g. I/O) time together.

Delving: Uncovering the Bottleneck

  1. Block profile - 100 input workers 1 output worker
input_100_output_1
  1. Block profile - 100 input workers 4 output workers
input_100_output_4
  1. Block profile - 100 input workers 8 output workers
input_100_output_8
  1. Block profile - 100 input workers 16 output workers
input_100_output_16
  1. Block profile - 100 input workers 32 output workers
input_100_output_32

As we can see across all the above configurations, runtime.selectgo, which is part of netClientWorker.run() is increasing in blocking times as we added more output workers. This pattern indicates that goroutines (output workers) are waiting more for data to read on the select statements, leading to starvation.

  1. fgprof: 100 input workers, 32 output workers
fgprof

In the fgprof profile, we can see that the functions associated with publishing events (input workers) and writing to Elasticsearch (output workers) take the longest time, just waiting to read or write to channels. This means that the intermediate layer that connects these two is saturated, leading to their starvation.

A closer look at the Intermediate layer

Diagram:

sequenceDiagram
    pipelineClient->>OpenState: "Publish"
    OpenState-->>MemoryQueue: "run all processors and chan push beat.Event"
    MemoryQueue-->>OpenState: "close request chan"
    OpenState-->>pipelineClient: "client publish unblocks"
    MemoryQueue-->>QueueReader: "if criteria are met, chan push Batch for pending GetRequest"
    QueueReader-->>EventConsumer: "chan push ttlBatch to consumer"
   EventConsumer-->>Output Worker: "chan push ttlBatch to worker"
   Output Worker-->>ElasticSearch: "Push to ES"
   ElasticSearch-->>Output Worker: "If success ACK() the ttlBatch"
   Output Worker-->>ACKLoop: "ttlBatch acked"
   ACKLoop-->>MemoryQueue: "delete number of events request"
    box rgb(100,100,0) Input Workers
        participant pipelineClient
        participant OpenState
    end
    box rgb(0,100,100) Output Workers
        participant Output Worker
        participant ElasticSearch
    end
    box rgb(100,0,0) Intermediate Layer
        participant MemoryQueue
        participant QueueReader
        participant EventConsumer
    end
    box rgb(100,0,0) Intermediate Layer
        participant ACKLoop
    end
Loading

Some info about the components of the above sequence diagram

  • MemoryQueue [Intermediate Layer] is of "single goroutine - multiple channels in select" pattern [code]

  • QueueReader [Intermediate Layer] is of "single goroutine - multiple channels in select" pattern [code]

  • EventConsumer [Intermediate Layer] is of "single goroutine - multiple channels in select" pattern [code]

  • OutputWorker [Output Workers] is of "many goroutines - single channel" pattern [code]

  • ACKLoop [Intermediate Layer] is of "single goroutine - multiple channels in select" pattern [code]

Conclusion

From the above code findings, the single goroutine-backed design of the intermediate layer is getting saturated by the amount of input data published and the output data sent to ACK. This saturation leads to the performance plateau we observe in the performance results of this PR for output workers higher than 8. I believe that there are improvements we could make, but I will leave these out of this comment

PS: If I had dealt with CGO, this analysis would have been impossible. Being able to perform such an analysis is a strong argument against using CGO unless absolutely necessary.

(cc'ing folks I think they would like to know of the above analysis)
cc @andrewkroh @qcorporation @pierrehilbert @cmacknz @alexsapran

@cmacknz
Copy link
Member

cmacknz commented Jul 8, 2024

From the above code findings, the single goroutine-backed design of the intermediate layer is getting saturated by the amount of input data published and the output data sent to ACK

CC @faec

What was the configuration of the queue and output during these tests? Also what was the state of the queue, was it full? The queue and bulk_max_size configuration needs to be calculated proportional to the number of output workers, the default throughput preset was tuned for 8 4 workers. https://www.elastic.co/guide/en/fleet/current/es-output-settings.html

To make a great simplification, the input workers are going to block in the Publish call when the queue is full, so once you are reliably keeping the queue full, adding more input workers shouldn't help throughput at all, they will just read one unit of data (buffer, log line, whatever it is) and hold it trying to write it to the queue.

The focus needs to be on keeping the output workers as busy as possible, which I would define as they are able to grab another batch to send as quickly as possible once their network call to the _bulk API unblocks. The queue is going to be parked waiting for this to happen, it responds to requests from output workers.

This comes back to the question about the queue and output configuration, what was it when you attempted to go past 8 workers? Was it able to hold enough data at once to keep more than 8 workers "fed" at once if they all made a concurrent request for more data?

@pkoutsovasilis
Copy link
Contributor

hi @cmacknz, the configuration I used was the throughput template found here using the workers. More than happy to repeat any experiment with any configuration you give me. That said from above pprofs you can see that between 4 and 8 workers the latter are not utilised as much as they could

@cmacknz
Copy link
Member

cmacknz commented Jul 8, 2024

The throughput preset was tested for 4 workers not 8 looking at the table again, not sure why I remember 8. In 8.15.0 the important parameters are:

bulk_max_size: 1600
worker: 4
queue.mem.events: 12800

Where 2 * workers * bulk_max_size = queue.mem.events or 2 * 4 * 1600 = 12800. Following that formula should hopefully let you see improvement beyond 4 and 8 workers.

I didn't come up with that formula, the earliest reference to it I know about is in https://www.elastic.co/blog/how-to-tune-elastic-beats-performance-a-practical-example-with-batch-size-worker-count-and-more.

@pkoutsovasilis
Copy link
Contributor

@cmacknz just rerun another experiment

"preset":                     "custom",
"bulk_max_size":              2 * 1600,
"worker":                     16, 
"queue.mem.events":           2 * 12800,
"queue.mem.flush.min_events": 1600,
"queue.mem.flush.timeout":    5,
"compression_level":          1,
"connection_idle_timeout":    15,

still workers do not seem utilised

image

now don't get me wrong I see your point and if I make the queue big enough to fit my input in it yes I do believe that the workers will be occupied 🙂

@cmacknz
Copy link
Member

cmacknz commented Jul 8, 2024

The workers all want to grab bulk_max_size events at once, in your example above this is 2*1600=3200. The queue size is 2*12800=25600. This can keep 25600/3200=8 workers busy at once if the queue were full and all 8 workers tried to pull data from the queue at the same instant in a perfect world where requesting data from the queue has no latency or bottlenecks by itself.

@pkoutsovasilis
Copy link
Contributor

@cmacknz give the config for 16 workers please 🙂

@pkoutsovasilis
Copy link
Contributor

it would be great if this turned out to be only a tuning problem 🎉

@cmacknz
Copy link
Member

cmacknz commented Jul 8, 2024

2 * workers * bulk_max_size = queue.mem.events means 2 * 16 * 1600 = 51200 should be the value for queue.mem.events assuming you keep bulk_max_size at 1600.

@pkoutsovasilis
Copy link
Contributor

pkoutsovasilis commented Jul 8, 2024

@cmacknz utilisation got better still some starvation there (now blockprofile of 16 output workers looks like the one of 8 workers).

"preset":                     "custom",
"bulk_max_size":              1600,
"worker":                     16, 
"queue.mem.events":           2 * 16 * 1600,
image

However, what we effectively did here is increase the queue buffer so much to deal with some starvation on the output workers side. Do you have any tuning tips how to deal with the starvation of the input workers waiting to be ACKed this still remains to be sequential from the ACKloop? I am asking because even with this tuning performance results remain the same at 16 secs 575165 flows are pushed

@pkoutsovasilis
Copy link
Contributor

pkoutsovasilis commented Jul 8, 2024

@cmacknz interestingly enough fgprof still reports the output workers as parking a lot waiting for data even when following this recipe 2 * workers * bulk_max_size

4 output workers
Screenshot 2024-07-08 at 6 52 34 PM

8 output workers
Screenshot 2024-07-08 at 6 52 43 PM

16 output workers
Screenshot 2024-07-08 at 6 52 51 PM

I know for sure that fgprof accounts the sleeping time of a goroutine for more reasons than blockprofile does, but at the moment I am not entirely sure about the reason of the diff here. Just keeping the above as a side note

@cmacknz
Copy link
Member

cmacknz commented Jul 8, 2024

Do you have any tuning tips how to deal with the starvation of the input workers waiting to be ACKed this still remains to be sequential from the ACKloop?

I don't think there's any tuning here, this will require a code or design change. Conceptually, the netflow input doesn't need acks for anything besides data delivery guarantees, where space in the queue doesn't free until it we know we wrote a batch successfull.

@cmacknz
Copy link
Member

cmacknz commented Jul 8, 2024

@cmacknz interestingly enough fgprof still reports the output workers as parking a lot waiting for data even when following this recipe 2 * workers * bulk_max_size

Interesting, we'd need to determine why it is blocking and if its something to optimize away.

I'd be curious if every channel read is effectively a goroutine context switch that leads to us being in gopark for at least one scheduler time slice, but generally I have no idea what this is telling me.

The question we want the answer to is "is this goroutine is in runtime.gopark unnecessarily when it could be doing useful work".

@cmacknz
Copy link
Member

cmacknz commented Jul 9, 2024

@pkoutsovasilis is adjusting the queue parameters enough to get you the target performance you need, or is there still more work to do here?

Trying to gauge if we have enough efficiency or not, there are definitely things to improve from the profiles, just not sure if those improvements are blocking your work.

@andrewkroh
Copy link
Member Author

andrewkroh commented Jul 9, 2024

I think a good outcome would be if Filebeat is on par or better than comparable flow processing software for efficiency. And that it would scale linearly with the number of cores. I've seen advertised rates of ~5000 flows per second per core.

In one example, we have a user needing to process 300k flows per second. It would be ideal if had loose guidance on the number of cores that this will require.

@pkoutsovasilis
Copy link
Contributor

To try and provide some realistic quantification on the results I did the following:

Experimental setup:

  • 1 GCP VM with 4 CPUS and 16 GB RAM
  • single instance single node Elasticsearch cluster with 64 GB RAM

Results:

The first curve from left to right is the run with go-docappender the second with memqueue. As you can see I have adjusted the bulk size of go-docappender (configured with bytes) to match the one of memqueue (configured with number of events)

  • go-docappender ~14% higher EPS vs memqueue
  • go-docappender can sustain 5500 packets/sec with 4 input workers where memqueue can sustain 5000 packets/sec with 4 input workers. For 5500 packets/sec memqueue dropped ~18000 packets
  • running netflow/filebeat through agent (aka default processors enabled) increases each event by 80% in size. Thus a good performance optimisation, when wire transfer speed to Elasticsearch is limited, is to disable some of the processors?!
  • there were some runs where the memory footprint of Filebeat with memqueue sky-rocketted at ~2.5GB but this is not happening consistently so nothing to report
netflow_with_processors

so @andrewkroh I think that you could say that with 4 input workers if the wire speed toward Elasticsearch can sustain ~ 60MB/sec netflow can do 5000 packets/sec.

PS: since I had an interaction with benchbuilder and the corresponding Kibana dashboards I found it extra useful to introduce support there for streaming-based inputs and this will be the task of my OnWeek (ty @alexsapran)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Filebeat Filebeat Team:Security-Deployment and Devices Deployment and Devices Team in Security Solution
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants