Skip to content

Commit

Permalink
Added semaphore in order to avoid search thread pools from rejecting
Browse files Browse the repository at this point in the history
search requests originating from the match processor.

This is a temporary workaround.
  • Loading branch information
martijnvg committed Jun 28, 2019
1 parent 494cdbe commit 9256652
Showing 1 changed file with 25 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.function.BiConsumer;

final class ExactMatchProcessor extends AbstractProcessor {
Expand Down Expand Up @@ -145,7 +146,30 @@ List<EnrichSpecification> getSpecifications() {
return specifications;
}

// TODO: This is temporary and will be removed once internal transport action that does an efficient lookup instead of a search.
// This semaphore purpose is to throttle the number of concurrent search requests, if this is not done then search thread pool
// on nodes may get full and search request fail because they get rejected.
// Because this code is going to change, a semaphore seemed like an easy quick fix to address this problem.
private static final Semaphore SEMAPHORE = new Semaphore(100);

private static BiConsumer<SearchRequest, BiConsumer<SearchResponse, Exception>> createSearchRunner(Client client) {
return (req, handler) -> client.search(req, ActionListener.wrap(resp -> handler.accept(resp, null), e -> handler.accept(null, e)));
return (req, handler) -> {
try {
SEMAPHORE.acquire();
} catch (InterruptedException e) {
Thread.interrupted();
handler.accept(null, e);

This comment has been minimized.

Copy link
@jakelandis

jakelandis Jun 28, 2019

Contributor

do we need to release the semaphore here ?

This comment has been minimized.

Copy link
@martijnvg

martijnvg Jun 28, 2019

Author Member

I think not, because the thread was interrupted before a permit was acquired?

return;
}
client.search(req, ActionListener.wrap(
resp -> {
SEMAPHORE.release();
handler.accept(resp, null);
},
e -> {
SEMAPHORE.release();
handler.accept(null, e);
}));
};
}
}

0 comments on commit 9256652

Please sign in to comment.