diff --git a/src/ui/public/courier/courier.js b/src/ui/public/courier/courier.js index c468182cbb6fc..66d3c40659913 100644 --- a/src/ui/public/courier/courier.js +++ b/src/ui/public/courier/courier.js @@ -30,6 +30,9 @@ import '../promises'; import { searchRequestQueue } from './search_request_queue'; import { FetchSoonProvider } from './fetch'; import { SearchPollProvider } from './search_poll'; +import { addSearchStrategy, defaultSearchStrategy } from './search_strategy'; + +addSearchStrategy(defaultSearchStrategy); uiModules.get('kibana/courier').service('courier', ($rootScope, Private) => { const fetchSoon = Private(FetchSoonProvider); @@ -74,12 +77,12 @@ uiModules.get('kibana/courier').service('courier', ($rootScope, Private) => { /** * Fetch the pending requests. */ - fetch = () => { + fetch() { fetchSoon.fetchQueued().then(() => { // Reset the timer using the time that we get this response as the starting point. searchPoll.resetTimer(); }); - }; + } } return new Courier(); diff --git a/src/ui/public/courier/fetch/call_client.js b/src/ui/public/courier/fetch/call_client.js index 93453c3a3f8ee..40774620c034f 100644 --- a/src/ui/public/courier/fetch/call_client.js +++ b/src/ui/public/courier/fetch/call_client.js @@ -20,6 +20,7 @@ import _ from 'lodash'; import { ErrorAllowExplicitIndexProvider } from '../../error_allow_explicit_index'; +import { assignSearchRequestsToSearchStrategies } from '../search_strategy'; import { IsRequestProvider } from './is_request'; import { MergeDuplicatesRequestProvider } from './merge_duplicate_requests'; import { RequestStatus } from './req_status'; @@ -39,10 +40,14 @@ export function CallClientProvider(Private, Promise, es) { const statuses = mergeDuplicateRequests(requests); // get the actual list of requests that we will be fetching - let requestsToFetch = statuses.filter(isRequest); + const requestsToFetch = statuses.filter(isRequest); let execCount = requestsToFetch.length; - if (!execCount) return Promise.resolve([]); + if (!execCount) { + return Promise.resolve([]); + } + + const searchStrategiesWithRequests = assignSearchRequestsToSearchStrategies(requestsToFetch); // resolved by respond() let esPromise = undefined; @@ -52,14 +57,24 @@ export function CallClientProvider(Private, Promise, es) { // for each respond with either the response or ABORTED const respond = function (responses) { responses = responses || []; - return Promise.map(requests, function (request, i) { + const activeSearchRequests = searchStrategiesWithRequests.reduce((allSearchRequests, { searchRequests }) => { + return allSearchRequests.concat(searchRequests); + }, []) + .filter(request => { + // We'll use the index of the request to map it to its response. If a request has already + // failed then it won't generate a response. In this case we need to remove the request + // to maintain parity between the list of requests and the list of correspoding responses. + return request !== undefined; + }); + + return Promise.map(activeSearchRequests, function (request, i) { switch (statuses[i]) { case ABORTED: return ABORTED; case DUPLICATE: return request._uniq.resp; default: - const index = _.findIndex(requestsToFetch, request); + const index = _.findIndex(activeSearchRequests, request); if (index < 0) { // This means the request failed. return ABORTED; @@ -106,18 +121,19 @@ export function CallClientProvider(Private, Promise, es) { // We're going to create a new async context here, so that the logic within it can execute // asynchronously after we've returned a reference to defer.promise. Promise.resolve().then(async () => { - // Flatten the searchSource within each searchRequest to get the fetch params, - // e.g. body, filters, index pattern, query. - const allFetchParams = await getAllFetchParams(requestsToFetch); + // Execute each request using its search strategy. + const esPromises = searchStrategiesWithRequests.map(async searchStrategyWithSearchRequests => { + const { searchStrategy, searchRequests: searchStrategyRequests } = searchStrategyWithSearchRequests; - // Serialize the fetch params into a format suitable for the body of an ES query. - const serializedFetchParams = await serializeAllFetchParams(allFetchParams, requestsToFetch); + // Flatten the searchSource within each searchRequest to get the fetch params, + // e.g. body, filters, index pattern, query. + const allFetchParams = await getAllFetchParams(searchStrategyRequests); - // The index of the request inside requestsToFetch determines which response is mapped to it. - // If a request won't generate a response, since it already failed, we need to remove the - // request from the requestsToFetch array so the indexes will continue to match up to the - // responses correctly. - requestsToFetch = requestsToFetch.filter(request => request !== undefined); + // Serialize the fetch params into a format suitable for the body of an ES query. + const serializedFetchParams = await serializeAllFetchParams(allFetchParams, searchStrategyRequests); + + return searchStrategy.search({ body: serializedFetchParams, es }); + }); try { // The request was aborted while we were doing the above logic. @@ -125,9 +141,15 @@ export function CallClientProvider(Private, Promise, es) { throw ABORTED; } - esPromise = es.msearch({ body: serializedFetchParams }); - const clientResponse = await esPromise; - await respond(clientResponse.responses); + esPromise = Promise.all(esPromises); + const segregatedResponses = await esPromise; + + // Aggregate the responses returned by all of the search strategies. + const aggregatedResponses = segregatedResponses.reduce((aggregation, responses) => { + return aggregation.concat(responses.responses); + }, []); + + await respond(aggregatedResponses); } catch(error) { if (error === ABORTED) { return await respond(); diff --git a/src/ui/public/courier/search_strategy/default_search_strategy.js b/src/ui/public/courier/search_strategy/default_search_strategy.js new file mode 100644 index 0000000000000..1f3597b1e6756 --- /dev/null +++ b/src/ui/public/courier/search_strategy/default_search_strategy.js @@ -0,0 +1,32 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export const defaultSearchStrategy = { + id: 'default', + + search: ({ body, es }) => { + return es.msearch({ body }); + }, + + isValidForSearchRequest: searchRequest => { + // Basic index patterns don't have `type` defined. + const indexPattern = searchRequest.source.getField('index'); + return indexPattern.type == null; + }, +}; diff --git a/src/ui/public/courier/search_strategy/index.js b/src/ui/public/courier/search_strategy/index.js new file mode 100644 index 0000000000000..533acd17c481e --- /dev/null +++ b/src/ui/public/courier/search_strategy/index.js @@ -0,0 +1,27 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +export { + assignSearchRequestsToSearchStrategies, + addSearchStrategy, +} from './search_strategy_registry'; + +export { + defaultSearchStrategy, +} from './default_search_strategy'; diff --git a/src/ui/public/courier/search_strategy/search_strategy_registry.js b/src/ui/public/courier/search_strategy/search_strategy_registry.js new file mode 100644 index 0000000000000..3de2042666ccc --- /dev/null +++ b/src/ui/public/courier/search_strategy/search_strategy_registry.js @@ -0,0 +1,69 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +const searchStrategies = []; + +const addSearchStrategy = searchStrategy => { + searchStrategies.push(searchStrategy); +}; + +/** + * Build a structure like this: + * + * [{ + * searchStrategy: , + * searchRequests: [], + * }, { + * searchStrategy: , + * searchRequests: [], + * }] + * + * We use an array of objects to preserve the order of the search requests, which use to + * deterministically associate each response with the originating request. + */ +const assignSearchRequestsToSearchStrategies = searchRequests => { + const searchStrategiesWithRequests = []; + const searchStrategyById = {}; + + searchRequests.forEach(searchRequest => { + const matchingSearchStrategy = searchStrategies.find(searchStrategy => searchStrategy.isValidForSearchRequest(searchRequest)); + const { id } = matchingSearchStrategy; + let searchStrategyWithRequest = searchStrategyById[id]; + + // Create the data structure if we don't already have it. + if (!searchStrategyWithRequest) { + searchStrategyWithRequest = { + searchStrategy: matchingSearchStrategy, + searchRequests: [], + }; + + searchStrategyById[id] = searchStrategyWithRequest; + searchStrategiesWithRequests.push(searchStrategyWithRequest); + } + + searchStrategyWithRequest.searchRequests.push(searchRequest); + }); + + return searchStrategiesWithRequests; +}; + +export { + assignSearchRequestsToSearchStrategies, + addSearchStrategy, +}; diff --git a/src/ui/public/courier/search_strategy/search_strategy_registry.test.js b/src/ui/public/courier/search_strategy/search_strategy_registry.test.js new file mode 100644 index 0000000000000..d83625964304c --- /dev/null +++ b/src/ui/public/courier/search_strategy/search_strategy_registry.test.js @@ -0,0 +1,83 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { + assignSearchRequestsToSearchStrategies, + addSearchStrategy, +} from './search_strategy_registry'; + +describe('SearchStrategyRegistry', () => { + describe('assignSearchRequestsToSearchStrategies', () => { + test('associates search requests with valid search strategies', () => { + const searchStrategyA = { + id: 'a', + isValidForSearchRequest: searchRequest => { + return searchRequest.type === 'a'; + }, + }; + + addSearchStrategy(searchStrategyA); + + const searchStrategyB = { + id: 'b', + isValidForSearchRequest: searchRequest => { + return searchRequest.type === 'b'; + }, + }; + + addSearchStrategy(searchStrategyB); + + const searchRequests = [{ + id: 0, + type: 'b', + }, { + id: 1, + type: 'a', + }, { + id: 2, + type: 'a', + }, { + id: 3, + type: 'b', + }]; + + const searchStrategiesWithSearchRequests = assignSearchRequestsToSearchStrategies(searchRequests); + + expect(searchStrategiesWithSearchRequests).toEqual([{ + searchStrategy: searchStrategyB, + searchRequests: [{ + id: 0, + type: 'b', + }, { + id: 3, + type: 'b', + }], + }, { + searchStrategy: searchStrategyA, + searchRequests: [{ + id: 1, + type: 'a', + }, { + id: 2, + type: 'a', + }], + }]); + }); + }); +});