Skip to content

Commit

Permalink
refactor to remove the TimeProvider listener
Browse files Browse the repository at this point in the history
Signed-off-by: Chenyang Ji <cyji@amazon.com>
  • Loading branch information
ansjcy committed Jan 10, 2024
1 parent a44172b commit 0e731b8
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public final void start() {
0,
0,
buildTookInMillis(),
timeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down Expand Up @@ -670,7 +670,7 @@ protected final SearchResponse buildSearchResponse(
successfulOps.get(),
skippedOps.get(),
buildTookInMillis(),
timeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
failures,
clusters,
searchContextId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,25 @@ class SearchRequestContext {
private TotalHits totalHits;
private final EnumMap<ShardStatsFieldNames, Integer> shardStats;

private final boolean phaseTookEnabled;

/**
* This constructor is for testing only
*/
SearchRequestContext() {
this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()));
this(new SearchRequestOperationsListener.CompositeListener(List.of(), LogManager.getLogger()), false);
}

SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) {
SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener, boolean phaseTookEnabled) {
this.searchRequestOperationsListener = searchRequestOperationsListener;
this.absoluteStartNanos = System.nanoTime();
this.phaseTookMap = new HashMap<>();
this.shardStats = new EnumMap<>(ShardStatsFieldNames.class);
this.phaseTookEnabled = phaseTookEnabled;
}

SearchRequestContext(SearchRequestOperationsListener searchRequestOperationsListener) {
this(searchRequestOperationsListener, false);
}

SearchRequestOperationsListener getSearchRequestOperationsListener() {
Expand All @@ -57,6 +64,14 @@ Map<String, Long> phaseTookMap() {
return phaseTookMap;
}

SearchResponse.PhaseTook getPhaseTook() {
if (phaseTookEnabled) {
return new SearchResponse.PhaseTook(phaseTookMap);
} else {
return null;
}
}

/**
* Override absoluteStartNanos set in constructor.
* For testing only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public abstract class SearchRequestOperationsListener {
private volatile boolean enabled;

protected SearchRequestOperationsListener() {
this.enabled = false;
this.enabled = true;
}

protected SearchRequestOperationsListener(boolean enabled) {
Expand All @@ -41,11 +41,15 @@ void onRequestStart(SearchRequestContext searchRequestContext) {}

void onRequestEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {}

boolean getEnabled() {
boolean isEnabled(SearchRequest searchRequest) {
return isEnabled();
}

boolean isEnabled() {
return enabled;
}

void setEnabled(boolean enabled) {
protected void setEnabled(boolean enabled) {
this.enabled = enabled;
}

Expand All @@ -62,7 +66,6 @@ static final class CompositeListener extends SearchRequestOperationsListener {
CompositeListener(List<SearchRequestOperationsListener> listeners, Logger logger) {
this.listeners = listeners;
this.logger = logger;
this.setEnabled(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,22 @@ public List<SearchRequestOperationsListener> getListeners() {
* Create the {@link SearchRequestOperationsListener.CompositeListener}
* with the all listeners enabled at cluster-level and request-level.
*
* @param searchRequest The SearchRequest object used to decide which request-level listeners to add based on states/flags
* @param logger Logger to be attached to the {@link SearchRequestOperationsListener.CompositeListener}
* @param perRequestListeners the per-request listeners that can be optionally added to the returned CompositeListener list.
* @return SearchRequestOperationsListener.CompositeListener
*/
public SearchRequestOperationsListener.CompositeListener buildCompositeListener(
SearchRequest searchRequest,
Logger logger,
SearchRequestOperationsListener... perRequestListeners
) {
final List<SearchRequestOperationsListener> searchListenersList = Stream.concat(
searchRequestListenersList.stream(),
Arrays.stream(perRequestListeners)
).filter(SearchRequestOperationsListener::getEnabled).collect(Collectors.toList());
)
.filter((searchRequestOperationsListener -> searchRequestOperationsListener.isEnabled(searchRequest)))
.collect(Collectors.toList());

return new SearchRequestOperationsListener.CompositeListener(searchListenersList, logger);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ final class SearchResponseMerger {
/**
* Add a search response to the list of responses to be merged together into one.
* Merges currently happen at once when all responses are available and
* {@link #getMergedResponse(SearchResponse.Clusters)} )} is called.
* {@link #getMergedResponse(SearchResponse.Clusters, SearchRequestContext)} )} is called.
* That may change in the future as it's possible to introduce incremental merges as responses come in if necessary.
*/
void add(SearchResponse searchResponse) {
Expand All @@ -126,7 +126,7 @@ int numResponses() {
* Returns the merged response. To be called once all responses have been added through {@link #add(SearchResponse)}
* so that all responses are merged into a single one.
*/
SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
SearchResponse getMergedResponse(SearchResponse.Clusters clusters, SearchRequestContext searchRequestContext) {
// if the search is only across remote clusters, none of them are available, and all of them have skip_unavailable set to true,
// we end up calling merge without anything to merge, we just return an empty search response
if (searchResponses.size() == 0) {
Expand Down Expand Up @@ -236,7 +236,7 @@ SearchResponse getMergedResponse(SearchResponse.Clusters clusters) {
successfulShards,
skippedShards,
tookInMillis,
searchTimeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
shardFailures,
clusters,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -269,8 +268,6 @@ private Map<String, Float> resolveIndexBoosts(SearchRequest searchRequest, Clust
}

/**
* Listener to track request-level tookTime and phase tookTimes from the coordinator.
*
* Search operations need two clocks. One clock is to fulfill real clock needs (e.g., resolving
* "now" to an index name). Another clock is needed for measuring how long a search operation
* took. These two uses are at odds with each other. There are many issues with using a real
Expand All @@ -280,8 +277,7 @@ private Map<String, Float> resolveIndexBoosts(SearchRequest searchRequest, Clust
*
* @opensearch.internal
*/
static final class SearchTimeProvider extends SearchRequestOperationsListener {

static final class SearchTimeProvider {
private final long absoluteStartMillis;
private final long relativeStartNanos;
private final LongSupplier relativeCurrentNanosProvider;
Expand Down Expand Up @@ -310,53 +306,6 @@ long getAbsoluteStartMillis() {
long buildTookInMillis() {
return TimeUnit.NANOSECONDS.toMillis(relativeCurrentNanosProvider.getAsLong() - relativeStartNanos);
}

SearchResponse.PhaseTook getPhaseTook() {
if (getEnabled()) {
Map<String, Long> phaseTookMap = new HashMap<>();
// Convert Map<SearchPhaseName, Long> to Map<String, Long> for SearchResponse()
for (SearchPhaseName searchPhaseName : phaseStatsMap.keySet()) {
phaseTookMap.put(searchPhaseName.getName(), phaseStatsMap.get(searchPhaseName));
}
return new SearchResponse.PhaseTook(phaseTookMap);
} else {
return null;
}
}

Map<SearchPhaseName, Long> phaseStatsMap = new EnumMap<>(SearchPhaseName.class);

/**
* Set if this listener is enabled based on the cluster level setting
* and per request enable flags.
*
* @param enabledAtClusterLevel if the SearchTimeProvider listener is enabled at cluster level
* @param searchRequest the original Search Request
* @opensearch.internal
*/

void setEnabled(boolean enabledAtClusterLevel, SearchRequest searchRequest) {
// phase_took is enabled wi th request param and/or cluster setting
super.setEnabled(enabledAtClusterLevel || (searchRequest.isPhaseTook() != null && searchRequest.isPhaseTook()));
}

@Override
void onPhaseStart(SearchPhaseContext context) {}

@Override
void onPhaseEnd(SearchPhaseContext context, SearchRequestContext searchRequestContext) {
phaseStatsMap.put(
context.getCurrentPhase().getSearchPhaseName(),
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - context.getCurrentPhase().getStartTimeInNanos())
);
}

@Override
void onPhaseFailure(SearchPhaseContext context) {}

public Long getPhaseTookTime(SearchPhaseName searchPhaseName) {
return phaseStatsMap.get(searchPhaseName);
}
}

@Override
Expand Down Expand Up @@ -479,10 +428,15 @@ private void executeRequest(
relativeStartNanos,
System::nanoTime
);
timeProvider.setEnabled(clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED), originalSearchRequest);
final boolean phaseTookEnabled;
if (originalSearchRequest.isPhaseTook() == null) {
phaseTookEnabled = clusterService.getClusterSettings().get(SEARCH_PHASE_TOOK_ENABLED);
} else {
phaseTookEnabled = originalSearchRequest.isPhaseTook();
}
SearchRequestOperationsListener.CompositeListener requestOperationsListeners = searchRequestOperationsListeners
.buildCompositeListener(logger, timeProvider);
SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners);
.buildCompositeListener(originalSearchRequest, logger);
SearchRequestContext searchRequestContext = new SearchRequestContext(requestOperationsListeners, phaseTookEnabled);
searchRequestContext.getSearchRequestOperationsListener().onRequestStart(searchRequestContext);

PipelinedRequest searchRequest;
Expand Down Expand Up @@ -587,7 +541,8 @@ private ActionListener<SearchSourceBuilder> buildRewriteListener(
searchContext,
searchAsyncActionProvider,
searchRequestContext
)
),
searchRequestContext
);
} else {
AtomicInteger skippedClusters = new AtomicInteger(0);
Expand Down Expand Up @@ -675,7 +630,8 @@ static void ccsRemoteReduce(
RemoteClusterService remoteClusterService,
ThreadPool threadPool,
ActionListener<SearchResponse> listener,
BiConsumer<SearchRequest, ActionListener<SearchResponse>> localSearchConsumer
BiConsumer<SearchRequest, ActionListener<SearchResponse>> localSearchConsumer,
SearchRequestContext searchRequestContext
) {

if (localIndices == null && remoteIndices.size() == 1) {
Expand Down Expand Up @@ -717,7 +673,7 @@ public void onResponse(SearchResponse searchResponse) {
searchResponse.getSuccessfulShards(),
searchResponse.getSkippedShards(),
timeProvider.buildTookInMillis(),
timeProvider.getPhaseTook(),
searchRequestContext.getPhaseTook(),
searchResponse.getShardFailures(),
new SearchResponse.Clusters(1, 1, 0),
searchResponse.pointInTimeId()
Expand Down Expand Up @@ -763,7 +719,8 @@ public void onFailure(Exception e) {
exceptions,
searchResponseMerger,
totalClusters,
listener
listener,
searchRequestContext
);
Client remoteClusterClient = remoteClusterService.getRemoteClusterClient(threadPool, clusterAlias);
remoteClusterClient.search(ccsSearchRequest, ccsListener);
Expand All @@ -777,7 +734,8 @@ public void onFailure(Exception e) {
exceptions,
searchResponseMerger,
totalClusters,
listener
listener,
searchRequestContext
);
SearchRequest ccsLocalSearchRequest = SearchRequest.subSearchRequest(
searchRequest,
Expand Down Expand Up @@ -872,7 +830,8 @@ private static ActionListener<SearchResponse> createCCSListener(
AtomicReference<Exception> exceptions,
SearchResponseMerger searchResponseMerger,
int totalClusters,
ActionListener<SearchResponse> originalListener
ActionListener<SearchResponse> originalListener,
SearchRequestContext searchRequestContext
) {
return new CCSActionListener<SearchResponse, SearchResponse>(
clusterAlias,
Expand All @@ -894,7 +853,7 @@ SearchResponse createFinalResponse() {
searchResponseMerger.numResponses(),
skippedClusters.get()
);
return searchResponseMerger.getMergedResponse(clusters);
return searchResponseMerger.getMergedResponse(clusters, searchRequestContext);
}
};
}
Expand Down
Loading

0 comments on commit 0e731b8

Please sign in to comment.