Skip to content

Commit

Permalink
Make Visualize responsible for explicitly defining how Courier should…
Browse files Browse the repository at this point in the history
… execute its search.

- Remove fetchSoon, fetchNow, callResponseHandlers, continueIncomplete, isRequest, mergeDuplicateRequests, searchRequestQueue, and courier notifier instance.
  • Loading branch information
cjcenizal committed Jul 4, 2018
1 parent e024d3d commit da8dd57
Show file tree
Hide file tree
Showing 21 changed files with 58 additions and 831 deletions.
4 changes: 2 additions & 2 deletions src/core_plugins/kibana/public/dashboard/dashboard_app.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ app.directive('dashboardViewportProvider', function (reactDirective) {
});

app.directive('dashboardApp', function ($injector) {
const courier = $injector.get('courier');
const AppState = $injector.get('AppState');
const kbnUrl = $injector.get('kbnUrl');
const confirmModal = $injector.get('confirmModal');
Expand Down Expand Up @@ -162,7 +161,8 @@ app.directive('dashboardApp', function ($injector) {

$scope.refresh = () => {
$rootScope.$broadcast('fetch');
courier.fetch();
// TODO: Tell embeddables to do their own fetch.
// courier.fetch();
};
dashboardStateManager.handleTimeChange(timefilter.getTime());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,8 @@ function discoverController(
.then(setupVisualization)
.then(function () {
$state.save();
return courier.fetch();
// TODO: Call searchSource.fetch() directly.
// return courier.fetch();
})
.catch(notify.error);
};
Expand Down
27 changes: 8 additions & 19 deletions src/ui/public/courier/courier.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,9 @@ import { uiModules } from '../modules';
import { addFatalErrorCallback } from '../notify';
import '../promises';

import { searchRequestQueue } from './search_request_queue';
import { FetchSoonProvider } from './fetch';
import { SearchPollProvider } from './search_poll';

uiModules.get('kibana/courier').service('courier', ($rootScope, Private) => {
const fetchSoon = Private(FetchSoonProvider);

// This manages the doc fetch interval.
const searchPoll = Private(SearchPollProvider);

Expand All @@ -59,26 +55,19 @@ uiModules.get('kibana/courier').service('courier', ($rootScope, Private) => {
// clearTimer because if the search results come back after the fatal error then we'll
// resume polling.
searchPoll.pause();

// And abort all pending requests.
searchRequestQueue.abortAll();

if (searchRequestQueue.getCount()) {
throw new Error('Aborting all pending requests failed.');
}
});

addFatalErrorCallback(closeOnFatal);
}

/**
* Fetch the pending requests.
*/
fetch = () => {
fetchSoon.fetchQueued().then(() => {
// Reset the timer using the time that we get this response as the starting point.
searchPoll.resetTimer();
});
setSearchCallback = searchCallback => {
// TODO: Intercept the callback and reset the timer using the time that we get this response as the starting point.
// searchPoll.resetTimer();
searchPoll.setSearchCallback(searchCallback);
};

clearSearchCallback = () => {
searchPoll.clearSearchCallback();
};
}

Expand Down
122 changes: 0 additions & 122 deletions src/ui/public/courier/fetch/__tests__/fetch_now.js

This file was deleted.

72 changes: 8 additions & 64 deletions src/ui/public/courier/fetch/call_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,16 @@
import _ from 'lodash';

import { ErrorAllowExplicitIndexProvider } from '../../error_allow_explicit_index';
import { IsRequestProvider } from './is_request';
import { MergeDuplicatesRequestProvider } from './merge_duplicate_requests';
import { RequestStatus } from './req_status';
import { SerializeFetchParamsProvider } from './request/serialize_fetch_params';

export function CallClientProvider(Private, Promise, es) {
const errorAllowExplicitIndex = Private(ErrorAllowExplicitIndexProvider);
const isRequest = Private(IsRequestProvider);
const mergeDuplicateRequests = Private(MergeDuplicatesRequestProvider);
const serializeFetchParams = Private(SerializeFetchParamsProvider);

const ABORTED = RequestStatus.ABORTED;
const DUPLICATE = RequestStatus.DUPLICATE;

function callClient(requests) {
// merging docs can change status to DUPLICATE, capture new statuses
const statuses = mergeDuplicateRequests(requests);

// get the actual list of requests that we will be fetching
let requestsToFetch = statuses.filter(isRequest);
let execCount = requestsToFetch.length;

if (!execCount) return Promise.resolve([]);

function callClient(request) {
// resolved by respond()
let esPromise = undefined;
let isRequestAborted = false;
Expand All @@ -52,39 +38,11 @@ export function CallClientProvider(Private, Promise, es) {
// for each respond with either the response or ABORTED
const respond = function (responses) {
responses = responses || [];
return Promise.map(requests, function (request, i) {
switch (statuses[i]) {
case ABORTED:
return ABORTED;
case DUPLICATE:
return request._uniq.resp;
default:
const index = _.findIndex(requestsToFetch, request);
if (index < 0) {
// This means the request failed.
return ABORTED;
}
return responses[index];
}
})
.then(
(res) => defer.resolve(res),
(err) => defer.reject(err)
);
defer.resolve(responses[0]);
};

// handle a request being aborted while being fetched
const requestWasAborted = Promise.method(function (req, i) {
if (statuses[i] === ABORTED) {
defer.reject(new Error('Request was aborted twice?'));
}

execCount -= 1;
if (execCount > 0) {
// the multi-request still contains other requests
return;
}

const requestWasAborted = Promise.method(function () {
if (esPromise && _.isFunction(esPromise.abort)) {
esPromise.abort();
}
Expand All @@ -95,29 +53,19 @@ export function CallClientProvider(Private, Promise, es) {
return respond();
});

// attach abort handlers, close over request index
statuses.forEach(function (req, i) {
if (!isRequest(req)) return;
req.whenAborted(function () {
requestWasAborted(req, i).catch(defer.reject);
});
request.whenAborted(function () {
requestWasAborted(request).catch(defer.reject);
});

// We're going to create a new async context here, so that the logic within it can execute
// asynchronously after we've returned a reference to defer.promise.
Promise.resolve().then(async () => {
// Flatten the searchSource within each searchRequest to get the fetch params,
// e.g. body, filters, index pattern, query.
const allFetchParams = await getAllFetchParams(requestsToFetch);
const allFetchParams = await getAllFetchParams([request]);

// Serialize the fetch params into a format suitable for the body of an ES query.
const serializedFetchParams = await serializeAllFetchParams(allFetchParams, requestsToFetch);

// The index of the request inside requestsToFetch determines which response is mapped to it.
// If a request won't generate a response, since it already failed, we need to remove the
// request from the requestsToFetch array so the indexes will continue to match up to the
// responses correctly.
requestsToFetch = requestsToFetch.filter(request => request !== undefined);
const serializedFetchParams = await serializeAllFetchParams(allFetchParams, [request]);

try {
// The request was aborted while we were doing the above logic.
Expand Down Expand Up @@ -145,11 +93,7 @@ export function CallClientProvider(Private, Promise, es) {
// send them to the requests
return defer.promise
.catch((err) => {
requests.forEach((req, index) => {
if (statuses[index] !== ABORTED) {
req.handleFailure(err);
}
});
request.handleFailure(err);
});
}

Expand Down
67 changes: 0 additions & 67 deletions src/ui/public/courier/fetch/call_response_handlers.js

This file was deleted.

Loading

0 comments on commit da8dd57

Please sign in to comment.