Skip to content

Commit

Permalink
Revert "Backport PR elastic#16482 to 8.x: Bugfix for BufferedTokenize…
Browse files Browse the repository at this point in the history
…r to completely consume lines in case of lines bigger then sizeLimit (elastic#16569)"

This reverts commit 27bd2a0.
  • Loading branch information
donoghuc committed Nov 21, 2024
1 parent aa43020 commit 25f2ad8
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

import org.jruby.Ruby;
import org.jruby.RubyArray;
import org.jruby.RubyBoolean;
import org.jruby.RubyClass;
import org.jruby.RubyObject;
import org.jruby.RubyString;
Expand All @@ -41,12 +40,10 @@ public class BufferedTokenizerExt extends RubyObject {
freeze(RubyUtil.RUBY.getCurrentContext());

private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray();
private StringBuilder headToken = new StringBuilder();
private RubyString delimiter = NEW_LINE;
private int sizeLimit;
private boolean hasSizeLimit;
private int inputSize;
private boolean bufferFullErrorNotified = false;

public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) {
super(runtime, metaClass);
Expand All @@ -69,6 +66,7 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
* Extract takes an arbitrary string of input data and returns an array of
* tokenized entities, provided there were any available to extract. This
* makes for easy processing of datagrams using a pattern like:
*
* {@code tokenizer.extract(data).map { |entity| Decode(entity) }.each do}
*
* @param context ThreadContext
Expand All @@ -79,63 +77,22 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) {
@SuppressWarnings("rawtypes")
public RubyArray extract(final ThreadContext context, IRubyObject data) {
final RubyArray entities = data.convertToString().split(delimiter, -1);
if (!bufferFullErrorNotified) {
input.clear();
input.addAll(entities);
} else {
// after a full buffer signal
if (input.isEmpty()) {
// after a buffer full error, the remaining part of the line, till next delimiter,
// has to be consumed, unless the input buffer doesn't still contain fragments of
// subsequent tokens.
entities.shift(context);
input.addAll(entities);
} else {
// merge last of the input with first of incoming data segment
if (!entities.isEmpty()) {
RubyString last = ((RubyString) input.pop(context));
RubyString nextFirst = ((RubyString) entities.shift(context));
entities.unshift(last.concat(nextFirst));
input.addAll(entities);
}
}
}

if (hasSizeLimit) {
if (bufferFullErrorNotified) {
bufferFullErrorNotified = false;
if (input.isEmpty()) {
return RubyUtil.RUBY.newArray();
}
}
final int entitiesSize = ((RubyString) input.first()).size();
final int entitiesSize = ((RubyString) entities.first()).size();
if (inputSize + entitiesSize > sizeLimit) {
bufferFullErrorNotified = true;
headToken = new StringBuilder();
inputSize = 0;
input.shift(context); // consume the token fragment that generates the buffer full
throw new IllegalStateException("input buffer full");
}
this.inputSize = inputSize + entitiesSize;
}

if (input.getLength() < 2) {
// this is a specialization case which avoid adding and removing from input accumulator
// when it contains just one element
headToken.append(input.shift(context)); // remove head
input.append(entities.shift(context));
if (entities.isEmpty()) {
return RubyUtil.RUBY.newArray();
}

if (headToken.length() > 0) {
// if there is a pending token part, merge it with the first token segment present
// in the accumulator, and clean the pending token part.
headToken.append(input.shift(context)); // append buffer to first element and
input.unshift(RubyUtil.toRubyObject(headToken.toString())); // reinsert it into the array
headToken = new StringBuilder();
}
headToken.append(input.pop(context)); // put the leftovers in headToken for later
inputSize = headToken.length();
return input;
entities.unshift(input.join(context));
input.clear();
input.append(entities.pop(context));
inputSize = ((RubyString) input.first()).size();
return entities;
}

/**
Expand All @@ -147,14 +104,14 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) {
*/
@JRubyMethod
public IRubyObject flush(final ThreadContext context) {
final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString());
headToken = new StringBuilder();
final IRubyObject buffer = input.join(context);
input.clear();
return buffer;
}

@JRubyMethod(name = "empty?")
public IRubyObject isEmpty(final ThreadContext context) {
return RubyBoolean.newBoolean(context.runtime, headToken.toString().isEmpty());
return input.empty_p();
}

}

This file was deleted.

This file was deleted.

This file was deleted.

0 comments on commit 25f2ad8

Please sign in to comment.