Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retrofit the Bulk Uploader types combiner [ch2198] #22030

Merged
merged 8 commits into from
Aug 21, 2018
3 changes: 2 additions & 1 deletion src/server/status/collectors/get_ops_stats_collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export function getOpsStatsCollector(server, kbnServer) {
kibana: getKibanaInfoForStats(server, kbnServer),
...kbnServer.metrics // latest metrics captured from the ops event listener in src/server/status/index
};
}
},
internalIgnore: true, // Ignore this one from internal uploader. A different stats collector is used there.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this can be more verbose to avoid ambiguity - maybe ignoreForInternalUploader?

});
}
26 changes: 24 additions & 2 deletions src/server/usage/classes/collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,48 @@ export class Collector {
* @param {String} options.type - property name as the key for the data
* @param {Function} options.init (optional) - initialization function
* @param {Function} options.fetch - function to query data
* @param {Function} options.formatForBulkUpload - optional
* @param {Function} options.rest - optional other properties
*/
constructor(server, { type, init, fetch } = {}) {
constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) {
if (type === undefined) {
throw new Error('Collector must be instantiated with a options.type string property');
}
if (typeof init !== 'undefined' && typeof init !== 'function') {
throw new Error('If init property is passed, Collector must be instantiated with a options.init as a function property');
}
if (typeof fetch !== 'function') {
throw new Error('Collector must be instantiated with a options.fetch function property');
}

this.log = getCollectorLogger(server);

Object.assign(this, options); // spread in other properties and mutate "this"

this.type = type;
this.init = init;
this.fetch = fetch;

this.log = getCollectorLogger(server);
const defaultFormatterForBulkUpload = result => ({ type, payload: result });
this._formatForBulkUpload = formatForBulkUpload || defaultFormatterForBulkUpload;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm doing some testing, and it looks like I want to override this in the UsageCollector class. Usage collectors should organize their data in the kibana_stats type in the usage namespace. - c940ee1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, that's snazzy!

}

/*
* @param {Function} callCluster - callCluster function
*/
fetchInternal(callCluster) {
if (typeof callCluster !== 'function') {
throw new Error('A `callCluster` function must be passed to the fetch methods of collectors');
}
return this.fetch(callCluster);
}

/*
* A hook for allowing the fetched data payload to be organized into a typed
* data model for internal bulk upload. See defaultFormatterForBulkUpload for
* a generic example.
*/
formatForBulkUpload(result) {
return this._formatForBulkUpload(result);
}
}
32 changes: 22 additions & 10 deletions src/server/usage/classes/collector_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,25 @@ import { UsageCollector } from './usage_collector';
/*
* A collector object has types registered into it with the register(type)
* function. Each type that gets registered defines how to fetch its own data
* and combine it into a unified payload for bulk upload.
* and optionally, how to combine it into a unified payload for bulk upload.
*/
export class CollectorSet {

/*
* @param {Object} server - server object
* @param {Number} options.interval - in milliseconds
* @param {Function} options.combineTypes
* @param {Function} options.onPayload
* @param {Array} collectors to initialize, usually as a result of filtering another CollectorSet instance
*/
constructor(server) {
constructor(server, collectors = []) {
this._log = getCollectorLogger(server);
this._collectors = [];
this._collectors = collectors;

/*
* Helper Factory methods
* Define as instance properties to allow enclosing the server object
*/
this.makeStatsCollector = options => new Collector(server, options);
this.makeUsageCollector = options => new UsageCollector(server, options);
this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray);
Copy link
Contributor

@chrisronline chrisronline Aug 20, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this an instanced variable versus just static function defined at the top?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having it here let's us enclose the server object that the constructor is called with

}

/*
Expand Down Expand Up @@ -73,12 +72,12 @@ export class CollectorSet {
* Call a bunch of fetch methods and then do them in bulk
* @param {Array} collectors - an array of collectors, default to all registered collectors
*/
bulkFetch(callCluster, collectors = this._collectors) {
if (!Array.isArray(collectors)) {
bulkFetch(callCluster, collectors = this) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we rename collectors to collectorSet to avoid any potential confusion? By collectors, I'd assume we're doing with an array of collectors versus a typed CollectorSet object

if (!(collectors instanceof CollectorSet)) {
throw new Error(`bulkFetch method given bad collectors parameter: ` + typeof collectors);
}

return Promise.map(collectors, collector => {
const fetchPromises = collectors.map(collector => {
const collectorType = collector.type;
this._log.debug(`Fetching data from ${collectorType} collector`);
return Promise.props({
Expand All @@ -90,10 +89,19 @@ export class CollectorSet {
this._log.warn(`Unable to fetch data from ${collectorType} collector`);
});
});
return Promise.all(fetchPromises);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering, is there a specific reason we keep using promises versus using async/await?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really. This lets us not await each fetch one at a time. I could be wrong, but using Promise.props inside the loop has it run all the fetches at the same time.

}

/*
* @return {new CollectorSet}
*/
getFilteredCollectorSet(filter) {
const filtered = this._collectors.filter(filter);
return this._makeCollectorSetFromArray(filtered);
}

async bulkFetchUsage(callCluster) {
const usageCollectors = this._collectors.filter(c => c instanceof UsageCollector);
const usageCollectors = this.getFilteredCollectorSet(c => c instanceof UsageCollector);
return this.bulkFetch(callCluster, usageCollectors);
}

Expand Down Expand Up @@ -137,4 +145,8 @@ export class CollectorSet {
};
}, {});
}

map(mapFn) {
return this._collectors.map(mapFn);
}
}
31 changes: 30 additions & 1 deletion src/server/usage/classes/usage_collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,35 @@
* under the License.
*/

import { KIBANA_STATS_TYPE } from '../../status/constants';
import { Collector } from './collector';

export class UsageCollector extends Collector {}
export class UsageCollector extends Collector {
/*
* @param {Object} server - server object
* @param {String} options.type - property name as the key for the data
* @param {Function} options.init (optional) - initialization function
* @param {Function} options.fetch - function to query data
* @param {Function} options.formatForBulkUpload - optional
* @param {Function} options.rest - optional other properties
*/
constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) {
super(server, { type, init, fetch, formatForBulkUpload, ...options });

/*
* Currently, for internal bulk uploading, usage stats are part of
* `kibana_stats` type, under the `usage` namespace in the document.
*/
const defaultUsageFormatterForBulkUpload = result => {
return {
type: KIBANA_STATS_TYPE,
payload: {
usage: {
[type]: result
}
}
};
};
this._formatForBulkUpload = formatForBulkUpload || defaultUsageFormatterForBulkUpload;
}
}
2 changes: 1 addition & 1 deletion x-pack/plugins/monitoring/common/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export const MONITORING_SYSTEM_API_VERSION = '6';
* The type name used within the Monitoring index to publish Kibana ops stats.
* @type {string}
*/
export const KIBANA_STATS_TYPE_MONITORING = 'kibana_stats_monitoring'; // similar to KIBANA_STATS_TYPE but rolled up into 10s stats from 5s intervals through ops_buffer
export const KIBANA_STATS_TYPE_MONITORING = 'kibana_stats'; // similar to KIBANA_STATS_TYPE but rolled up into 10s stats from 5s intervals through ops_buffer
/**
* The type name used within the Monitoring index to publish Kibana stats.
* @type {string}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ const CHECK_DELAY = 500;

class MockCollectorSet {
constructor(_mockServer, mockCollectors) {
this.mockServer = _mockServer;
this.mockCollectors = mockCollectors;
}
getCollectorByType(type) {
return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0];
}
getFilteredCollectorSet(filter) {
return new MockCollectorSet(this.mockServer, this.mockCollectors.filter(filter));
}
async bulkFetch() {
return this.mockCollectors.map(({ fetch }) => fetch());
}
Expand Down Expand Up @@ -47,7 +54,8 @@ describe('BulkUploader', () => {
const collectors = new MockCollectorSet(server, [
{
type: 'type_collector_test',
fetch: noop, // empty payloads
fetch: noop, // empty payloads,
formatForBulkUpload: result => result,
}
]);

Expand Down Expand Up @@ -82,7 +90,10 @@ describe('BulkUploader', () => {

it('should run the bulk upload handler', done => {
const collectors = new MockCollectorSet(server, [
{ fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }) }
{
fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }),
formatForBulkUpload: result => result
}
]);
const uploader = new BulkUploader(server, {
interval: FETCH_INTERVAL
Expand Down
Loading