-
Notifications
You must be signed in to change notification settings - Fork 24.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[INGEST] Interrupt the current thread if evaluation grok expressions take too long #31024
[INGEST] Interrupt the current thread if evaluation grok expressions take too long #31024
Conversation
…take too long This adds a thread interrupter that allows us to encapsulate calls to org.joni.Matcher#search() This method can hang forever if the regex expression is too complex. The thread interrupter in the background checks every 3 seconds whether there are threads execution the org.joni.Matcher#search() method for longer than 5 seconds and if so interrupts these threads. Joni has checks that that for every 30k iterations it checks if the current thread is interrupted and if so returns org.joni.Matcher#INTERRUPTED Closes elastic#28731
Pinging @elastic/es-core-infra |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some comments, looks great overall!
if (result == Matcher.INTERRUPTED) { | ||
throw new IllegalArgumentException("grok pattern matching is too complex and takes too long to execute"); | ||
} else if (result == Matcher.FAILED) { | ||
// I think we should throw an error here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and change current behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly but I think it should be a follow-up so let's go with a // TODO:
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, this should be done in a follow up.
I don't think is different behaviour, because in GrokProcessor
we throw an error when null
is returned.
/** | ||
* Provides a thread pool | ||
*/ | ||
// TODO: do we really want to expose ThreadPool here? Or a BiFunction<Long, Runnable, ScheduledFuture<?>> to just handle scheduling? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this something you want to discuss?
I see no issue with leaving the Threadpool available to processors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a shame that we have to expose anything at all so let us at least go with the minimum necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I had doubts about directly exposing TP to all processors, because this will send to wrong message to processor implementors, there should be no need to fork a thread. I will expose the minimum necessary.
@@ -45,6 +49,10 @@ | |||
public class IngestCommonPlugin extends Plugin implements ActionPlugin, IngestPlugin { | |||
|
|||
static final Map<String, String> GROK_PATTERNS = Grok.getBuiltinPatterns(); | |||
static final Setting<TimeValue> GUARD_INTERVAL = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add documentation for these settings
static final Setting<TimeValue> GUARD_INTERVAL = | ||
Setting.timeSetting("ingest.grok.guard.interval", TimeValue.timeValueSeconds(3), Setting.Property.NodeScope); | ||
static final Setting<TimeValue> GUARD_MAX_EXECUTION_TIME = | ||
Setting.timeSetting("ingest.grok.guard.max_execution_time", TimeValue.timeValueSeconds(5), Setting.Property.NodeScope); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is discussing these timeout values worth doing?
I took a look at some other defaults of ours, like in the low-level rest client. It sets its timeout to 1sec by default. Should we be more aggressive here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I change the defaults to 1 second. I think that should be good enough for now?
Also now I think about if maybe timeout
is better than max_execution_time
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The overall implementation looks good. I left some comment about some details.
* @return A noop implementation that does not interrupt threads and is useful for testing and pre-defined grok expressions. | ||
*/ | ||
static ThreadInterrupter noop() { | ||
return new Noop(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can reference a singleton instance?
/** | ||
* De-registers the current thread and prevents it from being interrupted. | ||
*/ | ||
void deregister(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be unregister
.
* | ||
* This is needed for Joni's {@link org.joni.Matcher#search(int, int, int)} method, because | ||
* it can end up spinning endlessly if the regular expression is too complex. Joni has checks | ||
* that that for every 30k iterations it checks if the current thread is interrupted and if so |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a double that
here.
try { | ||
final long currentRelativeTime = relativeTimeSupplier.getAsLong(); | ||
for (Map.Entry<Thread, Long> entry : registry.entrySet()) { | ||
long threadTime = entry.getValue(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This local looks unnecessary?
return new Noop(); | ||
} | ||
|
||
class Noop implements ThreadInterrupter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it odd that the interface is named ThreadInterrupted
and then we have a Noop
implementation that does nothing; that is, it implements an interface that indicates that it interrupts threads yet it doesn't interrupt threads. I think a name that avoids this confusion is ThreadWatchdog
. This is not an uncommon name for this idea.
} | ||
} | ||
} finally { | ||
scheduler.apply(interval, this::interruptLongRunningExecutions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is potentially risky. What if the try
block throws an OutOfMemoryError
and the scheduler#apply
throws a RejectedExecutionException
? We lose the OutOfMemoryError
and miss dying with dignity. Now, the scheduler#apply
should really on throw a RejectedExecutionException
if the scheduler is shutdown which should only happen when we are shutting down, but let's not take any chances here that we lose a fatal error?
BiFunction<Long, Runnable, ScheduledFuture<?>> scheduler = (delay, command) -> { | ||
try { | ||
Thread.sleep(delay); | ||
} catch (InterruptedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should not happen so we can rethrow this as an AssertionError
?
ThreadInterrupter guard = ThreadInterrupter.newInstance(10, 100, System::currentTimeMillis, (delay, command) -> { | ||
try { | ||
Thread.sleep(delay); | ||
} catch (InterruptedException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment here: rethrow as an AssertionError
.
// need to call #register() method on a different thread, assertBusy() fails if current thread gets interrupted | ||
Thread thread = new Thread(() -> { | ||
guard.register(); | ||
while (run.get()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe have this thread check it's interrupt status and break the loop if it detect it is interrupted? That would be a more realistic implementation of cooperative interruption?
/** | ||
* Provides a thread pool | ||
*/ | ||
// TODO: do we really want to expose ThreadPool here? Or a BiFunction<Long, Runnable, ScheduledFuture<?>> to just handle scheduling? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a shame that we have to expose anything at all so let us at least go with the minimum necessary.
@talevy @jasontedor Thanks for reviewing. I've updated this PR. |
retest this please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left a few more comments.
[[grok-watchdog]] | ||
==== Grok watchdog | ||
|
||
Grok expression that take too long to execute are interrupted and |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
expression
-> expressions
==== Grok watchdog | ||
|
||
Grok expression that take too long to execute are interrupted and | ||
the the grok processor then fails with an exception. The grok |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the the
-> the
Grok expression that take too long to execute are interrupted and | ||
the the grok processor then fails with an exception. The grok | ||
processor has a watchdog thread that determines when evaluation | ||
a grok expression takes too long and is controlled by the following |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
evaluation a
-> evaluation of a
* @return The maximum allowed time for a thread to invoke {@link #unregister()} after {@link #register()} | ||
* has been invoked before this ThreadWatchDog starts to interrupting that thread. | ||
*/ | ||
long maxExecutionTime(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be a TimeValue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TimeValue
is not accessible in the grok module. I don't think we want to make grok module depend on elasticsearch-core module?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's a shame. I agree. Can we make the name of this method reflect that the time is represented in milliseconds then?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes: c21a408
} | ||
if (result == Matcher.INTERRUPTED) { | ||
throw new IllegalArgumentException("grok pattern matching was interrupted after [" + | ||
threadWatchdog.maxExecutionTime() + "] ms"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that if we keep ThreadWatchdog#maxExecutionTime
as TimeValue
then we can let TimeValue
do the formatting for us?
threadWatchdog.unregister(); | ||
} | ||
if (result == Matcher.INTERRUPTED) { | ||
throw new IllegalArgumentException("grok pattern matching was interrupted after [" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IllegalArgumentException
feels wrong to me? I think a generic RuntimeException
is okay?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that now too. We're just not able to digest a grok expression here and interrupting a thread was our last resort, so it is not the user's fault.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…take too long (#31024) This adds a thread interrupter that allows us to encapsulate calls to org.joni.Matcher#search() This method can hang forever if the regex expression is too complex. The thread interrupter in the background checks every 3 seconds whether there are threads execution the org.joni.Matcher#search() method for longer than 5 seconds and if so interrupts these threads. Joni has checks that that for every 30k iterations it checks if the current thread is interrupted and if so returns org.joni.Matcher#INTERRUPTED Closes #28731
* master: Remove RestGetAllAliasesAction (#31308) Temporary fix for broken build Reenable Checkstyle's unused import rule (#31270) Remove remaining unused imports before merging #31270 Fix non-REST doc snippet [DOC] Extend SQL docs Immediately flush channel after writing to buffer (#31301) [DOCS] Shortens ML API intros Use quotes in the call invocation (#31249) move security ingest processors to a sub ingest directory (#31306) Add 5.6.11 version constant. Fix version detection. SQL: Whitelist SQL utility class for better scripting (#30681) [Docs] All Rollup docs experimental, agg limitations, clarify DeleteJob (#31299) CCS: don't proxy requests for already connected node (#31273) Mute ScriptedMetricAggregatorTests testSelfReferencingAggStateAfterMap [test] opensuse packaging turn up debug logging Add unreleased version 6.3.1 Removes experimental tag from scripted_metric aggregation (#31298) [Rollup] Metric config parser must use builder so validation runs (#31159) [ML] Check licence when datafeeds use cross cluster search (#31247) Add notion of internal index settings (#31286) Test: Remove broken yml test feature (#31255) REST hl client: cluster health to default to cluster level (#31268) [ML] Update test thresholds to account for changes to memory control (#31289) Log warnings when cluster state publication failed to some nodes (#31233) Fix AntFixture waiting condition (#31272) Ignore numeric shard count if waiting for ALL (#31265) [ML] Implement new rules design (#31110) index_prefixes back-compat should test 6.3 (#30951) Core: Remove plain execute method on TransportAction (#30998) Update checkstyle to 8.10.1 (#31269) Set analyzer version in PreBuiltAnalyzerProviderFactory (#31202) Modify pipelining handlers to require full requests (#31280) Revert upgrade to Netty 4.1.25.Final (#31282) Use armored input stream for reading public key (#31229) Fix Netty 4 Server Transport tests. Again. REST hl client: adjust wait_for_active_shards param in cluster health (#31266) REST high-level Client: remove deprecated API methods (#31200) [DOCS] Mark SQL feature as experimental [DOCS] Updates machine learning custom URL screenshots (#31222) Fix naming conventions check for XPackTestCase Fix security Netty 4 transport tests Fix race in clear scroll (#31259) [DOCS] Clarify audit index settings when remote indexing (#30923) Delete typos in SAML docs (#31199) REST high-level client: add Cluster Health API (#29331) [ML][TEST] Mute tests using rules (#31204) Support RequestedAuthnContext (#31238) SyncedFlushResponse to implement ToXContentObject (#31155) Add Get Aliases API to the high-level REST client (#28799) Remove some line length supressions (#31209) Validate xContentType in PutWatchRequest. (#31088) [INGEST] Interrupt the current thread if evaluation grok expressions take too long (#31024) Suppress extras FS on caching directory tests Revert "[DOCS] Added 6.3 info & updated the upgrade table. (#30940)" Revert "Fix snippets in upgrade docs" Fix snippets in upgrade docs [DOCS] Added 6.3 info & updated the upgrade table. (#30940) LLClient: Support host selection (#30523) Upgrade to Netty 4.1.25.Final (#31232) Enable custom credentials for core REST tests (#31235) Move ESIndexLevelReplicationTestCase to test framework (#31243) Encapsulate Translog in Engine (#31220) HLRest: Add get index templates API (#31161) Remove all unused imports and fix CRLF (#31207) [Tests] Fix self-referencing tests [TEST] Fix testRecoveryAfterPrimaryPromotion [Docs] Remove mention pattern files in Grok processor (#31170) Use stronger write-once semantics for Azure repository (#30437) Don't swallow exceptions on replication (#31179) Limit the number of concurrent requests per node (#31206) Call ensureNoSelfReferences() on _agg state variable after scripted metric agg script executions (#31044) Move java version checker back to its own jar (#30708) [test] add fix for rare virtualbox error (#31212)
* 6.x: SQL: Fix build on Java 10 [Tests] Mutualize fixtures code in BaseHttpFixture (#31210) [TEST] Fix RemoteClusterClientTests#testEnsureWeReconnect [ML] Update test thresholds to account for changes to memory control (#31289) Reenable Checkstyle's unused import rule (#31270) [ML] Check licence when datafeeds use cross cluster search (#31247) Fix non-REST doc snippet [DOC] Extend SQL docs [DOCS] Shortens ML API intros Use quotes in the call invocation (#31249) move security ingest processors to a sub ingest directory (#31306) SQL: Whitelist SQL utility class for better scripting (#30681) Add 5.6.11 version constant. Fix version detection. [Docs] All Rollup docs experimental, agg limitations, clarify DeleteJob (#31299) Add missing release notes. Security: fix token bwc with pre 6.0.0-beta2 (#31254) Fix compilation error in UpdateSettingsIT (#31304) Test: Remove broken yml test feature (#31255) Add unreleased version 6.3.1 [Rollup] Metric config parser must use builder so validation runs (#31159) Removes experimental tag from scripted_metric aggregation (#31298) [DOCS] Removes coming tag from 6.3.0 release notes 6.3 release notes. Add notion of internal index settings (#31286) REST high-level client: add Cluster Health API (#29331) Remove leftover usage of deprecated client API SyncedFlushResponse to implement ToXContentObject (#31155) Add Get Aliases API to the high-level REST client (#28799) HLRest: Add get index templates API (#31161) Log warnings when cluster state publication failed to some nodes (#31233) Fix AntFixture waiting condition (#31272) [TEST] Mute RecoveryIT.testHistoryUUIDIsGenerated Ignore numeric shard count if waiting for ALL (#31265) Update checkstyle to 8.10.1 (#31269) Set analyzer version in PreBuiltAnalyzerProviderFactory (#31202) Revert upgrade to Netty 4.1.25.Final (#31282) Use armored input stream for reading public key (#31229) [DOCS] Added 'fail_on_unsupported_field' param to MLT. Closes #28008 (#31160) Fix Netty 4 Server Transport tests. Again. [DOCS] Fixed typo. [DOCS] Added release highlights for 6.3 (#31256) [DOCS] Mark SQL feature as experimental [DOCS] Updates machine learning custom URL screenshots (#31222) Fix naming conventions check for XPackTestCase Fix security Netty 4 transport tests Fix race in clear scroll (#31259) [DOCS] Clarify audit index settings when remote indexing (#30923) [ML][TEST] Mute tests using rules (#31204) Support RequestedAuthnContext (#31238) Validate xContentType in PutWatchRequest. (#31088) [INGEST] Interrupt the current thread if evaluation grok expressions take too long (#31024) Upgrade to Netty 4.1.25.Final (#31232) Suppress extras FS on caching directory tests Revert "[DOCS] Added 6.3 info & updated the upgrade table. (#30940)" Revert "Fix snippets in upgrade docs" Fix snippets in upgrade docs [DOCS] Added 6.3 info & updated the upgrade table. (#30940) Enable custom credentials for core REST tests (#31235) Move ESIndexLevelReplicationTestCase to test framework (#31243) Encapsulate Translog in Engine (#31220) [DOCS] Adds machine learning 6.3.0 release notes (#31217) Remove all unused imports and fix CRLF (#31207) [TEST] Fix testRecoveryAfterPrimaryPromotion [Docs] Remove mention pattern files in Grok processor (#31170) Use stronger write-once semantics for Azure repository (#30437) Don't swallow exceptions on replication (#31179) Compliant SAML Response destination check (#31175) Move java version checker back to its own jar (#30708) TEST: Retry synced-flush if ongoing ops on primary (#30978) [test] add fix for rare virtualbox error (#31212)
This adds a thread interrupter that allows us to encapsulate calls to org.joni.Matcher#search()
This method can hang forever if the regex expression is too complex.
The thread interrupter in the background checks every 3 seconds whether there are threads
execution the
org.joni.Matcher#search(...)
method for longer than 5 seconds andif so interrupts these threads.
Joni has checks that that for every 30k iterations it checks if the current thread is interrupted and
if so returns
org.joni.Matcher#INTERRUPTED
PR for #28731