Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix for BufferedTokenizer to completely consume lines in case of lines bigger then sizeLimit #16482

Conversation

andsel
Copy link
Contributor

@andsel andsel commented Oct 1, 2024

Release notes

[rn:skip]

What does this PR do?

Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter.
Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array.
Port the tests present at

describe FileWatch::BufferedTokenizer do
in Java.

Why is it important/What is the impact to the user?

Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met.

Checklist

  • My code follows the style guidelines of this project
  • I have commented my code, particularly in hard-to-understand areas
  • [ ] I have made corresponding changes to the documentation
  • [ ] I have made corresponding change to the default configuration files (and/or docker env variables)
  • I have added tests that prove my fix is effective or that my feature works

Author's Checklist

How to test this PR locally

Follow the instructions in #16483

Related issues

Use cases

Screenshots

Logs

andsel added 2 commits October 1, 2024 10:53
…er instead of a JRuby array, added more tests to cover some 'buffer full' conditions
@andsel andsel self-assigned this Oct 1, 2024
@andsel andsel changed the title Fix/buffered tokenizer clean state in case of line too big Bugfix for BufferedTokenizer to completely consume lines in case of lines bigger then sizeLimit Oct 1, 2024
@andsel andsel added the bug label Oct 1, 2024
@andsel andsel marked this pull request as ready for review October 1, 2024 11:02
Copy link
Member

@jsvd jsvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few questions:

  1. there's quite a bit going on here and this is feature that is in the critical path, any clue on the performance impact of this PR?
  2. Since the size checks are done between the buffered data and the first message, it's possible for the codec to output messages larger than the setting if the incoming data never touches the buffer:
jruby-9.4.8.0 :019 > buffer = FileWatch::BufferedTokenizer.new("\n", 5)
 => #<FileWatch::BufferedTokenizer:0xff2f2ae> 
jruby-9.4.8.0 :020 > buffer.extract("1234").each { |line| puts line; puts line.length }
 => [] 
jruby-9.4.8.0 :021 > buffer.extract("5\n0123456789\n").each { |line| puts line; puts line.length }
12345
5
0123456789
10
 => ["12345", "0123456789"]

This is OK if codecs and plugins clearly describe that the setting exposed to the user controls only the "leftover buffer size" and not maximum message size. This is hard for the user to understand, which will likely expect it to be message size, but they could set the limit to 5MB but still get a 50MB message.

An alternative would be to perform the size check much earlier in extract , maybe right after data.convertToString (this is half-baked thought), what do you think?

throw new IllegalStateException("input buffer full");
}
this.inputSize = inputSize + entitiesSize;
}
input.append(entities.shift(context));
if (entities.isEmpty()) {
headToken.append(input.shift(context)); // remove head
Copy link
Member

@jsvd jsvd Oct 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can avoid some overhead when we know there's no leftover content at the end of the incoming data:

        if (input.getLength() < 2) {
            headToken.append(input.shift(context)); // remove head
            return RubyUtil.RUBY.newArray();
        } else {
            if (headToken.length() > 0) {
                headToken.append(input.shift(context)); // append buffer to first element and
                input.unshift(RubyUtil.toRubyObject(headToken.toString())); // reinsert it into the array
                headToken = new StringBuilder();
            }
            headToken.append(input.pop(context)); // put the leftovers in headToken for later
            inputSize = headToken.length();
            return input;
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that but complicates even more the flow. Before applying that maybe a measure of effective performance gain should be done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About question 1, I haven't done any performance check on this. However, we should define the use cases to check. The buffer full condition in the existing implementation at certain point didn't provide correct results, so if we use the original implementation as baseline, we aren't testing apples-to-apples. The only thing we can test is aggregation process, while any token size doesn't fail the size limit.
I could think to:

  • tokens that are smaller then a full segment
  • tokens that span multiple segments
  • tokens that are always spread between the end of a preceding segment and the subsequent one.

On point number 2, this is a bug. The expectation for the client of this tokenizer is that lines (or tokens) bigger than the predefined limit generates an exception and that the normal flow is able to resume on the next extract call immediately after the token delimiter that marks the end of the offending token. So if the token that should trigger the error is not the first of a segment, is must still raise the error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andsel given my proposal improves performance should we add it to this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I missed to add it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added with 99292ca

@andsel
Copy link
Contributor Author

andsel commented Oct 16, 2024

Benchmark

Summary

As suggested by the flamegraphs that @jsvd measured, this implementation has a performance loss, and the big suspect is the copying of splitted data into the input accumulator array: https://github.com/elastic/logstash/pull/16482/files#diff-5c7f8990e98f54782395d29b4b1b5b68cf6f782b34af5eb8f1b5a77331e0172eR84.

With this PR the total consumption of BufferedTokenizer.extract is 2.06% vs 0.82% without this PR.

A variation < 5% could be considered negligible, in particular with the usage of the tokenizer inside an input codec, where the latencies of the network or the locks to access the in memory queue, could make a bigger impact in the overall performance.

So we could assume that this PR doesn't significantly influence the overall performance.

Details

Result with performance metering of the extract method using the benchmark https://github.com/elastic/logstash/pull/16564/files

test name baseline (ops/ns) bugfix (ops/ns) delta bugfix + opt patch (ops/ns) delta
onlyOneTokenPerFragment 13702 7982 -41% 9584 -30%
multipleTokenPerFragment 4276 3553 -16% 4075 -4%
multipleTokensCrossingMultipleFragments 2180 1420 -34% 1561 -28%
Raw baseline results

andrea:logstash_andsel (main) % ./gradlew jmh -Pinclude="org.logstash.benchmark.BufferedTokenizerExtBenchmark.*"
To honour the JVM settings for this build a single-use Daemon process will be forked. For more on this, please refer to https://docs.gradle.org/8.7/userguide/gradle_daemon.html#sec:disabling_the_daemon in the Gradle documentation.
Daemon will be stopped at the end of the build

> Task :downloadJRuby UP-TO-DATE
Download https://repo1.maven.org/maven2/org/jruby/jruby-dist/9.4.8.0/jruby-dist-9.4.8.0-bin.tar.gz

> Task :logstash-core:compileJava
Note: Processing Log4j annotations
Note: Annotations processed
Note: Processing Log4j annotations
Note: No elements to process

> Task :logstash-core-benchmarks:jmh
# JMH version: 1.37
# VM version: JDK 21.0.4, OpenJDK 64-Bit Server VM, 21.0.4+7-LTS
# VM invoker: /Users/andrea/.sdkman/candidates/java/21.0.4-tem/bin/java
# VM options: -Dfile.encoding=UTF-8 -Duser.country=IT -Duser.language=en -Duser.variant -Djava.io.tmpdir=/Users/andrea/workspace/logstash_andsel/logstash-core/benchmarks/build -XX:+HeapDumpOnOutOfMemoryError -Xms2g -Xmx2g
# Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 3 iterations, 100 ms each
# Measurement: 10 iterations, 100 ms each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: org.logstash.benchmark.BufferedTokenizerExtBenchmark.multipleTokenPerFragment

# Run progress: 0.00% complete, ETA 00:00:03
# Fork: 1 of 1
# Warmup Iteration   1: 56.913 ops/ns
# Warmup Iteration   2: 2636.928 ops/ns
# Warmup Iteration   3: 3882.360 ops/ns
Iteration   1: 4327.280 ops/ns
Iteration   2: 4267.082 ops/ns
Iteration   3: 4371.703 ops/ns
Iteration   4: 4252.072 ops/ns
Iteration   5: 4244.560 ops/ns
Iteration   6: 4245.370 ops/ns[10s]
Iteration   7: 4222.432 ops/ns[11s]
Iteration   8: 4197.453 ops/ns[11s]
Iteration   9: 4315.799 ops/ns[11s]
Iteration  10: 4316.202 ops/ns[11s]


Result "org.logstash.benchmark.BufferedTokenizerExtBenchmark.multipleTokenPerFragment":
  4275.995 ±(99.9%) 82.202 ops/ns [Average]
  (min, avg, max) = (4197.453, 4275.995, 4371.703), stdev = 54.372
  CI (99.9%): [4193.793, 4358.198] (assumes normal distribution)


# JMH version: 1.37
# VM version: JDK 21.0.4, OpenJDK 64-Bit Server VM, 21.0.4+7-LTS
# VM invoker: /Users/andrea/.sdkman/candidates/java/21.0.4-tem/bin/java
# VM options: -Dfile.encoding=UTF-8 -Duser.country=IT -Duser.language=en -Duser.variant -Djava.io.tmpdir=/Users/andrea/workspace/logstash_andsel/logstash-core/benchmarks/build -XX:+HeapDumpOnOutOfMemoryError -Xms2g -Xmx2g
# Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 3 iterations, 100 ms each
# Measurement: 10 iterations, 100 ms each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: org.logstash.benchmark.BufferedTokenizerExtBenchmark.multipleTokensCrossingMultipleFragments

# Run progress: 33.33% complete, ETA 00:00:04
# Fork: 1 of 1
# Warmup Iteration   1: 49.464 ops/ns
# Warmup Iteration   2: 1222.981 ops/ns
# Warmup Iteration   3: 2031.913 ops/ns
Iteration   1: 2184.758 ops/ns[12s]
Iteration   2: 2231.615 ops/ns[12s]
Iteration   3: 2211.407 ops/ns[12s]
Iteration   4: 2264.482 ops/ns[12s]
Iteration   5: 2172.669 ops/ns[13s]
Iteration   6: 2110.849 ops/ns[13s]
Iteration   7: 2161.588 ops/ns[13s]
Iteration   8: 2151.354 ops/ns[13s]
Iteration   9: 2088.678 ops/ns[13s]
Iteration  10: 2221.383 ops/ns[13s]


Result "org.logstash.benchmark.BufferedTokenizerExtBenchmark.multipleTokensCrossingMultipleFragments":
  2179.878 ±(99.9%) 82.590 ops/ns [Average]
  (min, avg, max) = (2088.678, 2179.878, 2264.482), stdev = 54.628
  CI (99.9%): [2097.289, 2262.468] (assumes normal distribution)


# JMH version: 1.37
# VM version: JDK 21.0.4, OpenJDK 64-Bit Server VM, 21.0.4+7-LTS
# VM invoker: /Users/andrea/.sdkman/candidates/java/21.0.4-tem/bin/java
# VM options: -Dfile.encoding=UTF-8 -Duser.country=IT -Duser.language=en -Duser.variant -Djava.io.tmpdir=/Users/andrea/workspace/logstash_andsel/logstash-core/benchmarks/build -XX:+HeapDumpOnOutOfMemoryError -Xms2g -Xmx2g
# Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 3 iterations, 100 ms each
# Measurement: 10 iterations, 100 ms each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: org.logstash.benchmark.BufferedTokenizerExtBenchmark.onlyOneTokenPerFragment

# Run progress: 66.67% complete, ETA 00:00:02
# Fork: 1 of 1
# Warmup Iteration   1: 82.220 ops/ns
# Warmup Iteration   2: 3971.738 ops/ns
# Warmup Iteration   3: 9871.581 ops/ns
Iteration   1: 13820.481 ops/ns14s]
Iteration   2: 14055.263 ops/ns15s]
Iteration   3: 14028.545 ops/ns15s]
Iteration   4: 12603.806 ops/ns15s]
Iteration   5: 13157.192 ops/ns15s]
Iteration   6: 13159.603 ops/ns15s]
Iteration   7: 13454.914 ops/ns15s]
Iteration   8: 14170.723 ops/ns15s]
Iteration   9: 14245.842 ops/ns
Iteration  10: 14325.432 ops/ns


Result "org.logstash.benchmark.BufferedTokenizerExtBenchmark.onlyOneTokenPerFragment":
  13702.180 ±(99.9%) 873.989 ops/ns [Average]
  (min, avg, max) = (12603.806, 13702.180, 14325.432), stdev = 578.090
  CI (99.9%): [12828.191, 14576.169] (assumes normal distribution)


# Run complete. Total time: 00:00:06

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

NOTE: Current JVM experimentally supports Compiler Blackholes, and they are in use. Please exercise
extra caution when trusting the results, look into the generated code to check the benchmark still
works, and factor in a small probability of new VM bugs. Additionally, while comparisons between
different JVMs are already problematic, the performance difference caused by different Blackhole
modes can be very significant. Please make sure you use the consistent Blackhole mode for comparisons.

Benchmark                                                               Mode  Cnt      Score     Error   Units
BufferedTokenizerExtBenchmark.multipleTokenPerFragment                 thrpt   10   4275.995 ±  82.202  ops/ns
BufferedTokenizerExtBenchmark.multipleTokensCrossingMultipleFragments  thrpt   10   2179.878 ±  82.590  ops/ns
BufferedTokenizerExtBenchmark.onlyOneTokenPerFragment                  thrpt   10  13702.180 ± 873.989  ops/ns

Bugfix results

andrea:logstash_andsel (fix/buffered_tokenizer_clean_state_in_case_of_line_too_big) % ./gradlew jmh -Pinclude="org.logstash.benchmark.BufferedTokenizerExtBenchmark.*"
To honour the JVM settings for this build a single-use Daemon process will be forked. For more on this, please refer to https://docs.gradle.org/8.7/userguide/gradle_daemon.html#sec:disabling_the_daemon in the Gradle documentation.
Daemon will be stopped at the end of the build

> Task :downloadJRuby UP-TO-DATE
Download https://repo1.maven.org/maven2/org/jruby/jruby-dist/9.4.8.0/jruby-dist-9.4.8.0-bin.tar.gz

> Task :logstash-core-benchmarks:jmh
# JMH version: 1.37
# VM version: JDK 21.0.4, OpenJDK 64-Bit Server VM, 21.0.4+7-LTS
# VM invoker: /Users/andrea/.sdkman/candidates/java/21.0.4-tem/bin/java
# VM options: -Dfile.encoding=UTF-8 -Duser.country=IT -Duser.language=en -Duser.variant -Djava.io.tmpdir=/Users/andrea/workspace/logstash_andsel/logstash-core/benchmarks/build -XX:+HeapDumpOnOutOfMemoryError -Xms2g -Xmx2g
# Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 3 iterations, 100 ms each
# Measurement: 10 iterations, 100 ms each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: org.logstash.benchmark.BufferedTokenizerExtBenchmark.multipleTokenPerFragment

# Run progress: 0.00% complete, ETA 00:00:03
# Fork: 1 of 1
# Warmup Iteration   1: 38.059 ops/ns
# Warmup Iteration   2: 1590.637 ops/ns
# Warmup Iteration   3: 2992.422 ops/ns
Iteration   1: 3715.554 ops/ns
Iteration   2: 3963.687 ops/ns
Iteration   3: 3780.469 ops/ns
Iteration   4: 3339.686 ops/ns
Iteration   5: 3329.894 ops/ns
Iteration   6: 3281.113 ops/ns
Iteration   7: 3476.390 ops/ns[9s]
Iteration   8: 3753.612 ops/ns[9s]
Iteration   9: 3488.085 ops/ns[9s]
Iteration  10: 3407.034 ops/ns[9s]


Result "org.logstash.benchmark.BufferedTokenizerExtBenchmark.multipleTokenPerFragment":
  3553.552 ±(99.9%) 351.924 ops/ns [Average]
  (min, avg, max) = (3281.113, 3553.552, 3963.687), stdev = 232.776
  CI (99.9%): [3201.628, 3905.477] (assumes normal distribution)


# JMH version: 1.37
# VM version: JDK 21.0.4, OpenJDK 64-Bit Server VM, 21.0.4+7-LTS
# VM invoker: /Users/andrea/.sdkman/candidates/java/21.0.4-tem/bin/java
# VM options: -Dfile.encoding=UTF-8 -Duser.country=IT -Duser.language=en -Duser.variant -Djava.io.tmpdir=/Users/andrea/workspace/logstash_andsel/logstash-core/benchmarks/build -XX:+HeapDumpOnOutOfMemoryError -Xms2g -Xmx2g
# Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 3 iterations, 100 ms each
# Measurement: 10 iterations, 100 ms each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: org.logstash.benchmark.BufferedTokenizerExtBenchmark.multipleTokensCrossingMultipleFragments

# Run progress: 33.33% complete, ETA 00:00:04
# Fork: 1 of 1
# Warmup Iteration   1: 25.704 ops/ns
# Warmup Iteration   2: 789.568 ops/ns
# Warmup Iteration   3: 1413.859 ops/ns
Iteration   1: 1596.075 ops/ns[10s]
Iteration   2: 1490.031 ops/ns[10s]
Iteration   3: 1572.851 ops/ns[10s]
Iteration   4: 1410.213 ops/ns[10s]
Iteration   5: 1418.818 ops/ns[11s]
Iteration   6: 1406.862 ops/ns[11s]
Iteration   7: 1091.753 ops/ns[11s]
Iteration   8: 1529.710 ops/ns[11s]
Iteration   9: 1388.452 ops/ns[11s]
Iteration  10: 1298.760 ops/ns[11s]


Result "org.logstash.benchmark.BufferedTokenizerExtBenchmark.multipleTokensCrossingMultipleFragments":
  1420.352 ±(99.9%) 222.548 ops/ns [Average]
  (min, avg, max) = (1091.753, 1420.352, 1596.075), stdev = 147.202
  CI (99.9%): [1197.804, 1642.901] (assumes normal distribution)


# JMH version: 1.37
# VM version: JDK 21.0.4, OpenJDK 64-Bit Server VM, 21.0.4+7-LTS
# VM invoker: /Users/andrea/.sdkman/candidates/java/21.0.4-tem/bin/java
# VM options: -Dfile.encoding=UTF-8 -Duser.country=IT -Duser.language=en -Duser.variant -Djava.io.tmpdir=/Users/andrea/workspace/logstash_andsel/logstash-core/benchmarks/build -XX:+HeapDumpOnOutOfMemoryError -Xms2g -Xmx2g
# Blackhole mode: compiler (auto-detected, use -Djmh.blackhole.autoDetect=false to disable)
# Warmup: 3 iterations, 100 ms each
# Measurement: 10 iterations, 100 ms each
# Timeout: 10 min per iteration
# Threads: 1 thread, will synchronize iterations
# Benchmark mode: Throughput, ops/time
# Benchmark: org.logstash.benchmark.BufferedTokenizerExtBenchmark.onlyOneTokenPerFragment

# Run progress: 66.67% complete, ETA 00:00:02
# Fork: 1 of 1
# Warmup Iteration   1: 45.601 ops/ns
# Warmup Iteration   2: 2655.867 ops/ns
# Warmup Iteration   3: 5615.446 ops/ns
Iteration   1: 8124.816 ops/ns[12s]
Iteration   2: 8347.746 ops/ns[13s]
Iteration   3: 8412.099 ops/ns[13s]
Iteration   4: 6964.810 ops/ns[13s]
Iteration   5: 7227.234 ops/ns[13s]
Iteration   6: 7313.477 ops/ns[13s]
Iteration   7: 8263.972 ops/ns[13s]
Iteration   8: 8622.241 ops/ns[13s]
Iteration   9: 8165.904 ops/ns[13s]
Iteration  10: 8381.400 ops/ns[13s]


Result "org.logstash.benchmark.BufferedTokenizerExtBenchmark.onlyOneTokenPerFragment":
  7982.370 ±(99.9%) 883.453 ops/ns [Average]
  (min, avg, max) = (6964.810, 7982.370, 8622.241), stdev = 584.349
  CI (99.9%): [7098.917, 8865.823] (assumes normal distribution)


# Run complete. Total time: 00:00:06

REMEMBER: The numbers below are just data. To gain reusable insights, you need to follow up on
why the numbers are the way they are. Use profilers (see -prof, -lprof), design factorial
experiments, perform baseline and negative tests that provide experimental control, make sure
the benchmarking environment is safe on JVM/OS/HW level, ask for reviews from the domain experts.
Do not assume the numbers tell you what you want them to tell.

NOTE: Current JVM experimentally supports Compiler Blackholes, and they are in use. Please exercise
extra caution when trusting the results, look into the generated code to check the benchmark still
works, and factor in a small probability of new VM bugs. Additionally, while comparisons between
different JVMs are already problematic, the performance difference caused by different Blackhole
modes can be very significant. Please make sure you use the consistent Blackhole mode for comparisons.

Benchmark                                                               Mode  Cnt     Score     Error   Units
BufferedTokenizerExtBenchmark.multipleTokenPerFragment                 thrpt   10  3553.552 ± 351.924  ops/ns
BufferedTokenizerExtBenchmark.multipleTokensCrossingMultipleFragments  thrpt   10  1420.352 ± 222.548  ops/ns
BufferedTokenizerExtBenchmark.onlyOneTokenPerFragment                  thrpt   10  7982.370 ± 883.453  ops/ns

Flamegraphs

5 feeding pipelines feeding one pipeline with employed the lines codec we extracted the following flamegraphs, before this PR and with this PR:

With Logstash 8.15.1
Screenshot 2024-10-15 at 15 43 55

With this PR
Screenshot 2024-10-15 at 15 44 20

The loader LS can be configured with these in config/pipelines.yaml

- pipeline.id: test_1
  pipeline.workers: 2
  pipeline.batch.size: 200
  config.string: "input { java_generator {} } filter { ruby { code => 'event.set(\"data\", \"*\"*rand(400))' } } output { tcp { port => 3333 host => localhost codec => line }}"
- pipeline.id: test_2
  pipeline.workers: 2
  pipeline.batch.size: 200
  config.string: "input { java_generator {} } filter { ruby { code => 'event.set(\"data\", \"*\"*rand(400))' } } output { tcp { port => 3333 host => localhost codec => line }}"
- pipeline.id: test_3
  pipeline.workers: 2
  pipeline.batch.size: 200
  config.string: "input { java_generator {} } filter { ruby { code => 'event.set(\"data\", \"*\"*rand(400))' } } output { tcp { port => 3333 host => localhost codec => line }}"
- pipeline.id: test_4
  pipeline.workers: 2
  pipeline.batch.size: 200
  config.string: "input { java_generator {} } filter { ruby { code => 'event.set(\"data\", \"*\"*rand(400))' } } output { tcp { port => 3333 host => localhost codec => line }}"
- pipeline.id: test_5
  pipeline.workers: 2
  pipeline.batch.size: 200
  config.string: "input { java_generator {} } filter { ruby { code => 'event.set(\"data\", \"*\"*rand(400))' } } output { tcp { port => 3333 host => localhost codec => line }}"

While the system under test can be executed with

bin/logstash -e "input { tcp { port => 3333 codec => line } } output { null {} }"

Used the asynch porofiler from https://github.com/async-profiler/async-profiler running with

bin/asprof -d 20 -i 10ms -f /tmp/flamegraph.html <LS PID>

@andsel andsel requested a review from jsvd October 16, 2024 13:05
Copy link

@elasticmachine
Copy link
Collaborator

💛 Build succeeded, but was flaky

Failed CI Steps

History

cc @andsel

Copy link
Member

@jsvd jsvd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@andsel andsel merged commit 85493ce into elastic:main Oct 16, 2024
6 checks passed
@andsel
Copy link
Contributor Author

andsel commented Oct 16, 2024

@logstashmachine backport 8.x

github-actions bot pushed a commit that referenced this pull request Oct 16, 2024
…ines bigger then sizeLimit (#16482)

Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met.

Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter.
Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array.
Furthermore it translates the `buftok_spec` tests into JUnit tests.

(cherry picked from commit 85493ce)
@andsel andsel added v8.17.0 and removed v8.16.0 labels Oct 16, 2024
andsel added a commit that referenced this pull request Oct 16, 2024
… consume lines in case of lines bigger then sizeLimit (#16569)

Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met.

Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter.
Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array.
Furthermore it translates the `buftok_spec` tests into JUnit tests.

(cherry picked from commit 85493ce)

Co-authored-by: Andrea Selva <selva.andre@gmail.com>
@andsel
Copy link
Contributor Author

andsel commented Oct 17, 2024

@logstashmachine backport 7.17

1 similar comment
@andsel
Copy link
Contributor Author

andsel commented Oct 17, 2024

@logstashmachine backport 7.17

@andsel
Copy link
Contributor Author

andsel commented Oct 17, 2024

@logstashmachine backport 8.16

github-actions bot pushed a commit that referenced this pull request Oct 17, 2024
…ines bigger then sizeLimit (#16482)

Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met.

Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter.
Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array.
Furthermore it translates the `buftok_spec` tests into JUnit tests.

(cherry picked from commit 85493ce)
edmocosta pushed a commit that referenced this pull request Oct 18, 2024
…ines bigger then sizeLimit (#16482) (#16579)

Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met.

Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter.
Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array.
Furthermore it translates the `buftok_spec` tests into JUnit tests.

(cherry picked from commit 85493ce)

Co-authored-by: Andrea Selva <selva.andre@gmail.com>
edmocosta pushed a commit that referenced this pull request Oct 22, 2024
…onsume lines in case of lines bigger then sizeLimit (#16577)

* Bugfix for BufferedTokenizer to completely consume lines in case of lines bigger then sizeLimit (#16482)

Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met.

Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter.
Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array.
Furthermore it translates the `buftok_spec` tests into JUnit tests.

* Fixed compilation error due to libraries incompatibilities

- usage of `data.convertToString().split(context, delimiter, MINUS_ONE);` instead of `data.convertToString().split(delimiter, -1);`
- avoid to extend BuffererdTokenir test cases from `org.logstash.RubyTestBase` which was introduced in #13159
- JDK 8 compatibilities:
  - `Arrays.asList` vs `List.of`
  - `assertThrows` method from JUnit5 not available in JUnit4 so reimplemented in the test
@robbavey robbavey added v8.16.1 and removed v8.16.0 labels Nov 12, 2024
robbavey pushed a commit that referenced this pull request Nov 12, 2024
…ines bigger then sizeLimit (#16482) (#16580)

Fixes the behaviour of the tokenizer to be able to work properly when buffer full conditions are met.

Updates BufferedTokenizerExt so that can accumulate token fragments coming from different data segments. When a "buffer full" condition is matched, it record this state in a local field so that on next data segment it can consume all the token fragments till the next token delimiter.
Updated the accumulation variable from RubyArray containing strings to a StringBuilder which contains the head token, plus the remaining token fragments are stored in the input array.
Furthermore it translates the `buftok_spec` tests into JUnit tests.

(cherry picked from commit 85493ce)

Co-authored-by: Andrea Selva <selva.andre@gmail.com>
yaauie added a commit to yaauie/logstash that referenced this pull request Nov 18, 2024
yaauie added a commit to yaauie/logstash that referenced this pull request Nov 18, 2024
donoghuc pushed a commit that referenced this pull request Nov 18, 2024
…ase of lines bigger then sizeLimit (#16482) (#16580)" (#16688)

This reverts commit 2c98211.
donoghuc pushed a commit that referenced this pull request Nov 19, 2024
…ase of lines bigger then sizeLimit (#16482) (#16579)" (#16687)

This reverts commit cef98e4.
donoghuc added a commit to donoghuc/logstash that referenced this pull request Nov 20, 2024
…r to completely consume lines in case of lines bigger then sizeLimit (elastic#16569)"

This reverts commit 27bd2a0.
donoghuc added a commit that referenced this pull request Nov 20, 2024
…mpletely consume lines in case of lines bigger then sizeLimit (#16569)" (#16705)

This reverts commit 27bd2a0.
donoghuc added a commit to donoghuc/logstash that referenced this pull request Nov 21, 2024
…to completely consume lines in case of lines bigger then sizeLimit (elastic#16577)"

This reverts commit b4ca550.
donoghuc added a commit to donoghuc/logstash that referenced this pull request Nov 21, 2024
…r to completely consume lines in case of lines bigger then sizeLimit (elastic#16569)"

This reverts commit 27bd2a0.
donoghuc added a commit to donoghuc/logstash that referenced this pull request Nov 21, 2024
…ase of lines bigger then sizeLimit (elastic#16482)"

This reverts commit 85493ce.
donoghuc added a commit that referenced this pull request Nov 21, 2024
…letely consume lines in case of lines bigger then sizeLimit (#16577)" (#16713)

This reverts commit b4ca550.
donoghuc added a commit that referenced this pull request Nov 21, 2024
…mpletely consume lines in case of lines bigger then sizeLimit (#16569)" (#16714)

This reverts commit 27bd2a0.
donoghuc added a commit that referenced this pull request Nov 21, 2024
…ase of lines bigger then sizeLimit (#16482)" (#16715)

This reverts commit 85493ce.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

BufferedTokenizer doesn't dice correctly the payload when restart processing after buffer full error
4 participants