Skip to content

Commit

Permalink
Retrofit the Bulk Uploader types combiner [ch2198]
Browse files Browse the repository at this point in the history
  • Loading branch information
tsullivan committed Aug 15, 2018
1 parent 595476b commit 0305d48
Show file tree
Hide file tree
Showing 13 changed files with 145 additions and 279 deletions.
3 changes: 2 additions & 1 deletion src/server/status/collectors/get_ops_stats_collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export function getOpsStatsCollector(server, kbnServer) {
kibana: getKibanaInfoForStats(server, kbnServer),
...kbnServer.metrics // latest metrics captured from the ops event listener in src/server/status/index
};
}
},
internalIgnore: true, // Ignore this one from internal uploader. A different stats collector is used there.
});
}
26 changes: 24 additions & 2 deletions src/server/usage/classes/collector.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,48 @@ export class Collector {
* @param {String} options.type - property name as the key for the data
* @param {Function} options.init (optional) - initialization function
* @param {Function} options.fetch - function to query data
* @param {Function} options.formatForBulkUpload - optional
* @param {Function} options.rest - optional other properties
*/
constructor(server, { type, init, fetch } = {}) {
constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) {
if (type === undefined) {
throw new Error('Collector must be instantiated with a options.type string property');
}
if (typeof init !== 'undefined' && typeof init !== 'function') {
throw new Error('If init property is passed, Collector must be instantiated with a options.init as a function property');
}
if (typeof fetch !== 'function') {
throw new Error('Collector must be instantiated with a options.fetch function property');
}

this.log = getCollectorLogger(server);

Object.assign(this, options); // spread in other properties and mutate "this"

this.type = type;
this.init = init;
this.fetch = fetch;

this.log = getCollectorLogger(server);
const defaultFormatterForBulkUpload = result => ({ type, payload: result });
this._formatForBulkUpload = formatForBulkUpload || defaultFormatterForBulkUpload;
}

/*
* @param {Function} callCluster - callCluster function
*/
fetchInternal(callCluster) {
if (typeof callCluster !== 'function') {
throw new Error('A `callCluster` function must be passed to the fetch methods of collectors');
}
return this.fetch(callCluster);
}

/*
* A hook for allowing the fetched data payload to be organized into a typed
* data model for internal bulk upload. See defaultFormatterForBulkUpload for
* a generic example.
*/
formatForBulkUpload(result) {
return this._formatForBulkUpload(result);
}
}
32 changes: 22 additions & 10 deletions src/server/usage/classes/collector_set.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,26 +26,25 @@ import { UsageCollector } from './usage_collector';
/*
* A collector object has types registered into it with the register(type)
* function. Each type that gets registered defines how to fetch its own data
* and combine it into a unified payload for bulk upload.
* and optionally, how to combine it into a unified payload for bulk upload.
*/
export class CollectorSet {

/*
* @param {Object} server - server object
* @param {Number} options.interval - in milliseconds
* @param {Function} options.combineTypes
* @param {Function} options.onPayload
* @param {Array} collectors to initialize, usually as a result of filtering another CollectorSet instance
*/
constructor(server) {
constructor(server, collectors = []) {
this._log = getCollectorLogger(server);
this._collectors = [];
this._collectors = collectors;

/*
* Helper Factory methods
* Define as instance properties to allow enclosing the server object
*/
this.makeStatsCollector = options => new Collector(server, options);
this.makeUsageCollector = options => new UsageCollector(server, options);
this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray);
}

/*
Expand Down Expand Up @@ -73,12 +72,12 @@ export class CollectorSet {
* Call a bunch of fetch methods and then do them in bulk
* @param {Array} collectors - an array of collectors, default to all registered collectors
*/
bulkFetch(callCluster, collectors = this._collectors) {
if (!Array.isArray(collectors)) {
bulkFetch(callCluster, collectors = this) {
if (!(collectors instanceof CollectorSet)) {
throw new Error(`bulkFetch method given bad collectors parameter: ` + typeof collectors);
}

return Promise.map(collectors, collector => {
const fetchPromises = collectors.map(collector => {
const collectorType = collector.type;
this._log.debug(`Fetching data from ${collectorType} collector`);
return Promise.props({
Expand All @@ -90,10 +89,19 @@ export class CollectorSet {
this._log.warn(`Unable to fetch data from ${collectorType} collector`);
});
});
return Promise.all(fetchPromises);
}

/*
* @return {new CollectorSet}
*/
getFilteredCollectorSet(filter) {
const filtered = this._collectors.filter(filter);
return this._makeCollectorSetFromArray(filtered);
}

async bulkFetchUsage(callCluster) {
const usageCollectors = this._collectors.filter(c => c instanceof UsageCollector);
const usageCollectors = this.getFilteredCollectorSet(c => c instanceof UsageCollector);
return this.bulkFetch(callCluster, usageCollectors);
}

Expand Down Expand Up @@ -137,4 +145,8 @@ export class CollectorSet {
};
}, {});
}

map(mapFn) {
return this._collectors.map(mapFn);
}
}
2 changes: 1 addition & 1 deletion x-pack/plugins/monitoring/common/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export const MONITORING_SYSTEM_API_VERSION = '6';
* The type name used within the Monitoring index to publish Kibana ops stats.
* @type {string}
*/
export const KIBANA_STATS_TYPE_MONITORING = 'kibana_stats_monitoring'; // similar to KIBANA_STATS_TYPE but rolled up into 10s stats from 5s intervals through ops_buffer
export const KIBANA_STATS_TYPE_MONITORING = 'kibana_stats_internal'; // similar to KIBANA_STATS_TYPE but rolled up into 10s stats from 5s intervals through ops_buffer
/**
* The type name used within the Monitoring index to publish Kibana stats.
* @type {string}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,15 @@ const CHECK_DELAY = 500;

class MockCollectorSet {
constructor(_mockServer, mockCollectors) {
this.mockServer = _mockServer;
this.mockCollectors = mockCollectors;
}
getCollectorByType(type) {
return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0];
}
getFilteredCollectorSet(filter) {
return new MockCollectorSet(this.mockServer, this.mockCollectors.filter(filter));
}
async bulkFetch() {
return this.mockCollectors.map(({ fetch }) => fetch());
}
Expand Down Expand Up @@ -47,7 +54,8 @@ describe('BulkUploader', () => {
const collectors = new MockCollectorSet(server, [
{
type: 'type_collector_test',
fetch: noop, // empty payloads
fetch: noop, // empty payloads,
formatForBulkUpload: result => result,
}
]);

Expand Down Expand Up @@ -82,7 +90,10 @@ describe('BulkUploader', () => {

it('should run the bulk upload handler', done => {
const collectors = new MockCollectorSet(server, [
{ fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }) }
{
fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }),
formatForBulkUpload: result => result
}
]);
const uploader = new BulkUploader(server, {
interval: FETCH_INTERVAL
Expand Down

This file was deleted.

Loading

0 comments on commit 0305d48

Please sign in to comment.