From fb76429a8f14f4aaa81b5ddcacb13d70384e1cc6 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Wed, 15 Aug 2018 10:27:25 -0700 Subject: [PATCH 1/6] Retrofit the Bulk Uploader types combiner [ch2198] fix usage collector, add comments to formatForBulk remove unnecessary customizations --- .../collectors/get_ops_stats_collector.js | 3 +- src/server/usage/classes/collector.js | 26 ++- src/server/usage/classes/collector_set.js | 32 ++- x-pack/plugins/monitoring/common/constants.js | 2 +- .../__tests__/bulk_uploader.js | 15 +- ...bulk_uploader.combine_stats_legacy.test.js | 187 ------------------ .../server/kibana_monitoring/bulk_uploader.js | 96 ++++----- .../collectors/get_kibana_usage_collector.js | 17 +- .../collectors/get_ops_stats_collector.js | 8 +- .../collectors/get_settings_collector.js | 10 +- .../collectors/ops_buffer/ops_buffer.js | 8 +- .../server/kibana_monitoring/init.js | 4 +- .../usage/get_reporting_usage_collector.js | 19 ++ 13 files changed, 149 insertions(+), 278 deletions(-) delete mode 100644 x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.combine_stats_legacy.test.js diff --git a/src/server/status/collectors/get_ops_stats_collector.js b/src/server/status/collectors/get_ops_stats_collector.js index ba4a3d92699ae..a11c5bf9b1dc9 100644 --- a/src/server/status/collectors/get_ops_stats_collector.js +++ b/src/server/status/collectors/get_ops_stats_collector.js @@ -44,6 +44,7 @@ export function getOpsStatsCollector(server, kbnServer) { kibana: getKibanaInfoForStats(server, kbnServer), ...kbnServer.metrics // latest metrics captured from the ops event listener in src/server/status/index }; - } + }, + internalIgnore: true, // Ignore this one from internal uploader. A different stats collector is used there. }); } diff --git a/src/server/usage/classes/collector.js b/src/server/usage/classes/collector.js index 7e433068911ea..f103126740e13 100644 --- a/src/server/usage/classes/collector.js +++ b/src/server/usage/classes/collector.js @@ -25,26 +25,48 @@ export class Collector { * @param {String} options.type - property name as the key for the data * @param {Function} options.init (optional) - initialization function * @param {Function} options.fetch - function to query data + * @param {Function} options.formatForBulkUpload - optional + * @param {Function} options.rest - optional other properties */ - constructor(server, { type, init, fetch } = {}) { + constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) { if (type === undefined) { throw new Error('Collector must be instantiated with a options.type string property'); } + if (typeof init !== 'undefined' && typeof init !== 'function') { + throw new Error('If init property is passed, Collector must be instantiated with a options.init as a function property'); + } if (typeof fetch !== 'function') { throw new Error('Collector must be instantiated with a options.fetch function property'); } + this.log = getCollectorLogger(server); + + Object.assign(this, options); // spread in other properties and mutate "this" + this.type = type; this.init = init; this.fetch = fetch; - this.log = getCollectorLogger(server); + const defaultFormatterForBulkUpload = result => ({ type, payload: result }); + this._formatForBulkUpload = formatForBulkUpload || defaultFormatterForBulkUpload; } + /* + * @param {Function} callCluster - callCluster function + */ fetchInternal(callCluster) { if (typeof callCluster !== 'function') { throw new Error('A `callCluster` function must be passed to the fetch methods of collectors'); } return this.fetch(callCluster); } + + /* + * A hook for allowing the fetched data payload to be organized into a typed + * data model for internal bulk upload. See defaultFormatterForBulkUpload for + * a generic example. + */ + formatForBulkUpload(result) { + return this._formatForBulkUpload(result); + } } diff --git a/src/server/usage/classes/collector_set.js b/src/server/usage/classes/collector_set.js index 2b6d3717cfdb9..19cefde55687c 100644 --- a/src/server/usage/classes/collector_set.js +++ b/src/server/usage/classes/collector_set.js @@ -26,19 +26,17 @@ import { UsageCollector } from './usage_collector'; /* * A collector object has types registered into it with the register(type) * function. Each type that gets registered defines how to fetch its own data - * and combine it into a unified payload for bulk upload. + * and optionally, how to combine it into a unified payload for bulk upload. */ export class CollectorSet { /* * @param {Object} server - server object - * @param {Number} options.interval - in milliseconds - * @param {Function} options.combineTypes - * @param {Function} options.onPayload + * @param {Array} collectors to initialize, usually as a result of filtering another CollectorSet instance */ - constructor(server) { + constructor(server, collectors = []) { this._log = getCollectorLogger(server); - this._collectors = []; + this._collectors = collectors; /* * Helper Factory methods @@ -46,6 +44,7 @@ export class CollectorSet { */ this.makeStatsCollector = options => new Collector(server, options); this.makeUsageCollector = options => new UsageCollector(server, options); + this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray); } /* @@ -73,12 +72,12 @@ export class CollectorSet { * Call a bunch of fetch methods and then do them in bulk * @param {Array} collectors - an array of collectors, default to all registered collectors */ - bulkFetch(callCluster, collectors = this._collectors) { - if (!Array.isArray(collectors)) { + bulkFetch(callCluster, collectors = this) { + if (!(collectors instanceof CollectorSet)) { throw new Error(`bulkFetch method given bad collectors parameter: ` + typeof collectors); } - return Promise.map(collectors, collector => { + const fetchPromises = collectors.map(collector => { const collectorType = collector.type; this._log.debug(`Fetching data from ${collectorType} collector`); return Promise.props({ @@ -90,10 +89,19 @@ export class CollectorSet { this._log.warn(`Unable to fetch data from ${collectorType} collector`); }); }); + return Promise.all(fetchPromises); + } + + /* + * @return {new CollectorSet} + */ + getFilteredCollectorSet(filter) { + const filtered = this._collectors.filter(filter); + return this._makeCollectorSetFromArray(filtered); } async bulkFetchUsage(callCluster) { - const usageCollectors = this._collectors.filter(c => c instanceof UsageCollector); + const usageCollectors = this.getFilteredCollectorSet(c => c instanceof UsageCollector); return this.bulkFetch(callCluster, usageCollectors); } @@ -137,4 +145,8 @@ export class CollectorSet { }; }, {}); } + + map(mapFn) { + return this._collectors.map(mapFn); + } } diff --git a/x-pack/plugins/monitoring/common/constants.js b/x-pack/plugins/monitoring/common/constants.js index 30b700c48ae0e..718606d8b6be2 100644 --- a/x-pack/plugins/monitoring/common/constants.js +++ b/x-pack/plugins/monitoring/common/constants.js @@ -22,7 +22,7 @@ export const MONITORING_SYSTEM_API_VERSION = '6'; * The type name used within the Monitoring index to publish Kibana ops stats. * @type {string} */ -export const KIBANA_STATS_TYPE_MONITORING = 'kibana_stats_monitoring'; // similar to KIBANA_STATS_TYPE but rolled up into 10s stats from 5s intervals through ops_buffer +export const KIBANA_STATS_TYPE_MONITORING = 'kibana_stats'; // similar to KIBANA_STATS_TYPE but rolled up into 10s stats from 5s intervals through ops_buffer /** * The type name used within the Monitoring index to publish Kibana stats. * @type {string} diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js b/x-pack/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js index 0b69370b8e4c1..fe17f205c65b9 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/__tests__/bulk_uploader.js @@ -14,8 +14,15 @@ const CHECK_DELAY = 500; class MockCollectorSet { constructor(_mockServer, mockCollectors) { + this.mockServer = _mockServer; this.mockCollectors = mockCollectors; } + getCollectorByType(type) { + return this.mockCollectors.find(collector => collector.type === type) || this.mockCollectors[0]; + } + getFilteredCollectorSet(filter) { + return new MockCollectorSet(this.mockServer, this.mockCollectors.filter(filter)); + } async bulkFetch() { return this.mockCollectors.map(({ fetch }) => fetch()); } @@ -47,7 +54,8 @@ describe('BulkUploader', () => { const collectors = new MockCollectorSet(server, [ { type: 'type_collector_test', - fetch: noop, // empty payloads + fetch: noop, // empty payloads, + formatForBulkUpload: result => result, } ]); @@ -82,7 +90,10 @@ describe('BulkUploader', () => { it('should run the bulk upload handler', done => { const collectors = new MockCollectorSet(server, [ - { fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }) } + { + fetch: () => ({ type: 'type_collector_test', result: { testData: 12345 } }), + formatForBulkUpload: result => result + } ]); const uploader = new BulkUploader(server, { interval: FETCH_INTERVAL diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.combine_stats_legacy.test.js b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.combine_stats_legacy.test.js deleted file mode 100644 index 2ecd9a8329eab..0000000000000 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.combine_stats_legacy.test.js +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License; - * you may not use this file except in compliance with the Elastic License. - */ - -import { KIBANA_STATS_TYPE_MONITORING, KIBANA_USAGE_TYPE, KIBANA_SETTINGS_TYPE } from '../../common/constants'; -import { KIBANA_REPORTING_TYPE } from '../../../reporting/common/constants'; -import { BulkUploader } from './bulk_uploader'; - -const getInitial = () => { - return [ - [ - { 'index': { '_type': KIBANA_STATS_TYPE_MONITORING } }, - { - 'host': 'tsullivan.local', - 'concurrent_connections': 0, - 'os': { - 'load': { '1m': 2.28857421875, '5m': 2.45068359375, '15m': 2.29248046875 }, - 'memory': { 'total_in_bytes': 17179869184, 'free_in_bytes': 527749120, 'used_in_bytes': 16652120064 }, - 'uptime_in_millis': 1211027000 - }, - 'process': { - 'event_loop_delay': 4.222616970539093, - 'memory': { - 'heap': { 'total_in_bytes': 219455488, 'used_in_bytes': 152622064, 'size_limit': 1501560832 }, - 'resident_set_size_in_bytes': 245923840 - }, - 'uptime_in_millis': 18467 - }, - 'requests': { - 'disconnects': 0, - 'total': 2, - }, - 'response_times': { 'average': 47, 'max': 47 }, - 'timestamp': '2017-07-26T00:14:20.771Z', - } - ], - [ - { 'index': { '_type': KIBANA_USAGE_TYPE } }, - { - 'dashboard': { 'total': 0 }, - 'visualization': { 'total': 0 }, - 'search': { 'total': 0 }, - 'index_pattern': { 'total': 2 }, - 'index': '.kibana' - } - ], - [ - { 'index': { '_type': KIBANA_REPORTING_TYPE } }, - { - 'available': true, - 'enabled': false, - '_all': 55, - 'csv': { - 'available': true, - 'count': 25 - }, - 'printable_pdf': { - 'available': true, - 'count': 30 - } - } - ], - [ - { 'index': { '_type': KIBANA_SETTINGS_TYPE } }, - { 'xpack': { 'defaultAdminEmail': 'tim@elastic.co' } } - ] - ]; -}; - -// TODO use jest snapshotting -const getResult = () => { - return [ - [ - { 'index': { '_type': 'kibana_stats' } }, - { - 'host': 'tsullivan.local', - 'concurrent_connections': 0, - 'os': { - 'load': { '1m': 2.28857421875, '5m': 2.45068359375, '15m': 2.29248046875 }, - 'memory': { 'total_in_bytes': 17179869184, 'free_in_bytes': 527749120, 'used_in_bytes': 16652120064 }, - 'uptime_in_millis': 1211027000 - }, - 'process': { - 'event_loop_delay': 4.222616970539093, - 'memory': { - 'heap': { 'total_in_bytes': 219455488, 'used_in_bytes': 152622064, 'size_limit': 1501560832 }, - 'resident_set_size_in_bytes': 245923840 - }, - 'uptime_in_millis': 18467 - }, - 'requests': { - 'disconnects': 0, - 'total': 2, - }, - 'response_times': { 'average': 47, 'max': 47 }, - 'timestamp': '2017-07-26T00:14:20.771Z', - 'usage': { - 'dashboard': { 'total': 0 }, - 'visualization': { 'total': 0 }, - 'search': { 'total': 0 }, - 'index_pattern': { 'total': 2 }, - 'index': '.kibana', - 'xpack': { - 'reporting': { - '_all': 55, - 'available': true, - 'csv': { - 'available': true, - 'count': 25, - }, - 'enabled': false, - 'printable_pdf': { - 'available': true, - 'count': 30, - } - } - } - } - } - ], - [ - { 'index': { '_type': 'kibana_settings' } }, - { - 'xpack': { 'defaultAdminEmail': 'tim@elastic.co' }, - } - ] - ]; -}; - -describe('Collector Types Combiner', () => { - describe('with all the data types present', () => { - it('provides settings, and combined stats/usage data', () => { - // default gives all the data types - const initial = getInitial(); - const result = BulkUploader.combineStatsLegacy(initial); - expect(result).toEqual(getResult()); - }); - }); - describe('with settings data missing', () => { - it('provides combined stats/usage data', () => { - // default gives all the data types - const initial = getInitial(); - const trimmedInitial = [ initial[0], initial[1], initial[2] ]; // just stats, usage and reporting, no settings - const result = BulkUploader.combineStatsLegacy(trimmedInitial); - const expectedResult = getResult(); - const trimmedExpectedResult = [ expectedResult[0] ]; // single combined item - expect(result).toEqual(trimmedExpectedResult); - }); - }); - describe('with usage data missing', () => { - it('provides settings, and stats data', () => { - // default gives all the data types - const initial = getInitial(); - const trimmedInitial = [ initial[0], initial[3] ]; // just stats and settings, no usage or reporting - const result = BulkUploader.combineStatsLegacy(trimmedInitial); - const expectedResult = getResult(); - delete expectedResult[0][1].usage; // usage stats should not be present in the result - const trimmedExpectedResult = [ expectedResult[0], expectedResult[1] ]; - expect(result).toEqual(trimmedExpectedResult); - }); - }); - describe('with stats data missing', () => { - it('provides settings data', () => { - // default gives all the data types - const initial = getInitial(); - const trimmedInitial = [ initial[3] ]; // just settings - const result = BulkUploader.combineStatsLegacy(trimmedInitial); - const expectedResult = getResult(); - const trimmedExpectedResult = [ expectedResult[1] ]; // just settings - expect(result).toEqual(trimmedExpectedResult); - }); - }); - - it('throws an error if duplicate types are registered', () => { - const combineWithDuplicate = () => { - const initial = getInitial(); - const withDuplicate = [ initial[0] ].concat(initial); - return BulkUploader.combineStatsLegacy(withDuplicate); - }; - expect(combineWithDuplicate).toThrow( - 'Duplicate collector type identifiers found in payload! ' + - 'kibana_stats_monitoring,kibana_stats_monitoring,kibana,reporting,kibana_settings' - ); - }); -}); diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js index c95baa056c8d6..66b3351a71d30 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js @@ -4,19 +4,16 @@ * you may not use this file except in compliance with the Elastic License. */ -import { get, set, isEmpty, flatten, uniq } from 'lodash'; +import { defaultsDeep, isEmpty, uniq, compact } from 'lodash'; import { callClusterFactory } from '../../../xpack_main'; import { LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG, - KIBANA_STATS_TYPE_MONITORING, - KIBANA_SETTINGS_TYPE, - KIBANA_USAGE_TYPE, } from '../../common/constants'; -import { KIBANA_REPORTING_TYPE } from '../../../reporting/common/constants'; import { sendBulkPayload, monitoringBulk, + getKibanaInfoForStats, } from './lib'; const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG]; @@ -38,7 +35,7 @@ const LOGGING_TAGS = [LOGGING_TAG, KIBANA_MONITORING_LOGGING_TAG]; * @param {Object} xpackInfo server.plugins.xpack_main.info object */ export class BulkUploader { - constructor(server, { interval }) { + constructor(server, { kbnServer, interval }) { if (typeof interval !== 'number') { throw new Error('interval number of milliseconds is required'); } @@ -56,7 +53,7 @@ export class BulkUploader { }); this._callClusterWithInternalUser = callClusterFactory(server).getCallClusterInternal(); - + this._getKibanaInfoForStats = () => getKibanaInfoForStats(server, kbnServer); } /* @@ -66,9 +63,12 @@ export class BulkUploader { */ start(collectorSet) { this._log.info('Starting monitoring stats collection'); - this._fetchAndUpload(collectorSet); // initial fetch + + // this is internal bulk upload, so filter out API-only collectors + const filterThem = _collectorSet => _collectorSet.getFilteredCollectorSet(c => c.internalIgnore !== true); + this._fetchAndUpload(filterThem(collectorSet)); // initial fetch this._timer = setInterval(() => { - this._fetchAndUpload(collectorSet); + this._fetchAndUpload(filterThem(collectorSet)); }, this._interval); } @@ -98,7 +98,7 @@ export class BulkUploader { */ async _fetchAndUpload(collectorSet) { const data = await collectorSet.bulkFetch(this._callClusterWithInternalUser); - const payload = BulkUploader.toBulkUploadFormat(data); + const payload = this.toBulkUploadFormat(compact(data), collectorSet); if (payload) { try { @@ -121,14 +121,35 @@ export class BulkUploader { * Bulk stats are transformed into a bulk upload format * Non-legacy transformation is done in CollectorSet.toApiStats */ - static toBulkUploadFormat(uploadData) { - const payload = uploadData - .filter(d => Boolean(d) && !isEmpty(d.result)) - .map(({ result, type }) => [{ index: { _type: type } }, result]); - if (payload.length > 0) { - const combinedData = BulkUploader.combineStatsLegacy(payload); // arrange the usage data into the stats - return flatten(combinedData); + toBulkUploadFormat(rawData, collectorSet) { + if (rawData.length === 0) { + return; } + + // convert the raw data to a nested object by taking each payload through + // its formatter, organizing it per-type + const typesNested = rawData.reduce((accum, { type, result }) => { + if (isEmpty(result)) { + return accum; + } + const { type: uploadType, payload: uploadData } = collectorSet.getCollectorByType(type).formatForBulkUpload(result); + return defaultsDeep(accum, { [uploadType]: uploadData }); + }, {}); + + // convert the nested object into a flat array, with each payload prefixed + // with an 'index' instruction, for bulk upload + const flat = Object.keys(typesNested).reduce((accum, type) => { + return [ + ...accum, + { index: { _type: type } }, + { + kibana: this._getKibanaInfoForStats(), + ...typesNested[type], + } + ]; + }, []); + + return flat; } static checkPayloadTypesUnique(payload) { @@ -138,45 +159,4 @@ export class BulkUploader { throw new Error('Duplicate collector type identifiers found in payload! ' + ids.join(',')); } } - - static combineStatsLegacy(payload) { - BulkUploader.checkPayloadTypesUnique(payload); - - // default the item to [] to allow destructuring - const findItem = type => payload.find(item => get(item, '[0].index._type') === type) || []; - - // kibana usage and stats - let statsResult; - const [ statsHeader, statsPayload ] = findItem(KIBANA_STATS_TYPE_MONITORING); - const [ reportingHeader, reportingPayload ] = findItem(KIBANA_REPORTING_TYPE); - - if (statsHeader && statsPayload) { - statsHeader.index._type = 'kibana_stats'; // HACK to convert kibana_stats_monitoring to just kibana_stats for bwc - const [ usageHeader, usagePayload ] = findItem(KIBANA_USAGE_TYPE); - const kibanaUsage = (usageHeader && usagePayload) ? usagePayload : null; - const reportingUsage = (reportingHeader && reportingPayload) ? reportingPayload : null; - statsResult = [ statsHeader, statsPayload ]; - if (kibanaUsage) { - set(statsResult, '[1].usage', kibanaUsage); - } - if (reportingUsage) { - set(statsResult, '[1].usage.xpack.reporting', reportingUsage); - } - } - - // kibana settings - let settingsResult; - const [ settingsHeader, settingsPayload ] = findItem(KIBANA_SETTINGS_TYPE); - if (settingsHeader && settingsPayload) { - settingsResult = [ settingsHeader, settingsPayload ]; - } - - // return new payload with the combined data - // adds usage data to stats data - // strips usage out as a top-level type - const result = [ statsResult, settingsResult ]; - - // remove result items that are undefined - return result.filter(Boolean); - } } diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_kibana_usage_collector.js b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_kibana_usage_collector.js index 2742ea6da5714..46178c9bfc020 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_kibana_usage_collector.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_kibana_usage_collector.js @@ -17,12 +17,13 @@ const TYPES = [ ]; /** - * Fetches saved object client counts by querying the saved object index + * Fetches saved object counts by querying the .kibana index */ export function getKibanaUsageCollector(server) { const { collectorSet } = server.usage; return collectorSet.makeUsageCollector({ type: KIBANA_USAGE_TYPE, + async fetch(callCluster) { const index = server.config().get('kibana.index'); const savedObjectCountSearchParams = { @@ -60,6 +61,20 @@ export function getKibanaUsageCollector(server) { } }), {}) }; + }, + + /* + * Format the response data into a model for internal upload + * 1. Make this data part of the "kibana_stats" type + * 2. Organize the payload in the usage namespace of the data payload (usage.index, etc) + */ + formatForBulkUpload: result => { + return { + type: 'kibana_stats', + payload: { + usage: result + } + }; } }); } diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_ops_stats_collector.js b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_ops_stats_collector.js index 1c79de29dea40..baad391693451 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_ops_stats_collector.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_ops_stats_collector.js @@ -6,12 +6,11 @@ import { KIBANA_STATS_TYPE_MONITORING } from '../../../common/constants'; import { opsBuffer } from './ops_buffer'; -import { getKibanaInfoForStats } from '../lib'; /* * Initialize a collector for Kibana Ops Stats */ -export function getOpsStatsCollector(server, kbnServer) { +export function getOpsStatsCollector(server) { let monitor; const buffer = opsBuffer(server); const onOps = event => buffer.push(event); @@ -48,10 +47,7 @@ export function getOpsStatsCollector(server, kbnServer) { type: KIBANA_STATS_TYPE_MONITORING, init: start, fetch: () => { - return { - kibana: getKibanaInfoForStats(server, kbnServer), - ...buffer.flush() - }; + return buffer.flush(); } }); } diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_settings_collector.js b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_settings_collector.js index 1382abb4ea293..f3978b54d31d4 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_settings_collector.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_settings_collector.js @@ -7,7 +7,6 @@ import { get } from 'lodash'; import { XPACK_DEFAULT_ADMIN_EMAIL_UI_SETTING } from '../../../../../server/lib/constants'; import { KIBANA_SETTINGS_TYPE } from '../../../common/constants'; -import { getKibanaInfoForStats } from '../lib'; /* * Check if Cluster Alert email notifications is enabled in config @@ -54,10 +53,10 @@ export async function checkForEmailValue( } } -export function getSettingsCollector(server, kbnServer) { +export function getSettingsCollector(server) { const config = server.config(); - const { collectorSet } = server.usage; + return collectorSet.makeStatsCollector({ type: KIBANA_SETTINGS_TYPE, async fetch(callCluster) { @@ -79,10 +78,7 @@ export function getSettingsCollector(server, kbnServer) { // remember the current email so that we can mark it as successful if the bulk does not error out shouldUseNull = !!defaultAdminEmail; - return { - kibana: getKibanaInfoForStats(server, kbnServer), - ...kibanaSettingsData - }; + return kibanaSettingsData; } }); } diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/ops_buffer/ops_buffer.js b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/ops_buffer/ops_buffer.js index bb7683899200c..7799f27c87bd8 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/ops_buffer/ops_buffer.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/ops_buffer/ops_buffer.js @@ -27,8 +27,14 @@ export function opsBuffer(server) { }, flush() { + let cloud; // a property that will be left out of the result if the details are undefined + const cloudDetails = cloudDetector.getCloudDetails(); + if (cloudDetails != null) { + cloud = { cloud: cloudDetails }; + } + return { - cloud: cloudDetector.getCloudDetails(), + ...cloud, ...eventRoller.flush() }; } diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/init.js b/x-pack/plugins/monitoring/server/kibana_monitoring/init.js index 6dcb0ae1fc076..90d9a6b5de7d1 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/init.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/init.js @@ -15,11 +15,11 @@ import { BulkUploader } from './bulk_uploader'; * @param {Object} kbnServer manager of Kibana services - see `src/server/kbn_server` in Kibana core * @param {Object} server HapiJS server instance */ -export function initBulkUploader(_kbnServer, server) { - +export function initBulkUploader(kbnServer, server) { const config = server.config(); const interval = config.get('xpack.monitoring.kibana.collection.interval'); return new BulkUploader(server, { + kbnServer, interval }); } diff --git a/x-pack/plugins/reporting/server/usage/get_reporting_usage_collector.js b/x-pack/plugins/reporting/server/usage/get_reporting_usage_collector.js index b4a76767b3adf..206363b5b303a 100644 --- a/x-pack/plugins/reporting/server/usage/get_reporting_usage_collector.js +++ b/x-pack/plugins/reporting/server/usage/get_reporting_usage_collector.js @@ -117,6 +117,7 @@ export function getReportingUsageCollector(server) { const { collectorSet } = server.usage; return collectorSet.makeUsageCollector({ type: KIBANA_REPORTING_TYPE, + fetch: async callCluster => { const xpackInfo = server.plugins.xpack_main.info; const config = server.config(); @@ -147,6 +148,24 @@ export function getReportingUsageCollector(server) { ...statsOverLast7Days } }; + }, + + /* + * Format the response data into a model for internal upload + * 1. Make this data part of the "kibana_stats" type + * 2. Organize the payload in the usage.xpack.reporting namespace of the data payload + */ + formatForBulkUpload: result => { + return { + type: 'kibana_stats', + payload: { + usage: { + xpack: { + reporting: result + } + } + } + }; } }); } From c940ee16aa954355c8975cca6e8fa514f5a9ce72 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Wed, 15 Aug 2018 17:22:53 -0700 Subject: [PATCH 2/6] override default format for bulk upload for usage type collectors --- src/server/usage/classes/usage_collector.js | 31 ++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/src/server/usage/classes/usage_collector.js b/src/server/usage/classes/usage_collector.js index d8ddd8a255252..559deaef2ce15 100644 --- a/src/server/usage/classes/usage_collector.js +++ b/src/server/usage/classes/usage_collector.js @@ -17,6 +17,35 @@ * under the License. */ +import { KIBANA_STATS_TYPE } from '../../status/constants'; import { Collector } from './collector'; -export class UsageCollector extends Collector {} +export class UsageCollector extends Collector { + /* + * @param {Object} server - server object + * @param {String} options.type - property name as the key for the data + * @param {Function} options.init (optional) - initialization function + * @param {Function} options.fetch - function to query data + * @param {Function} options.formatForBulkUpload - optional + * @param {Function} options.rest - optional other properties + */ + constructor(server, { type, init, fetch, formatForBulkUpload = null, ...options } = {}) { + super(server, { type, init, fetch, formatForBulkUpload, ...options }); + + /* + * Currently, for internal bulk uploading, usage stats are part of + * `kibana_stats` type, under the `usage` namespace in the document. + */ + const defaultUsageFormatterForBulkUpload = result => { + return { + type: KIBANA_STATS_TYPE, + payload: { + usage: { + [type]: result + } + } + }; + }; + this._formatForBulkUpload = formatForBulkUpload || defaultUsageFormatterForBulkUpload; + } +} From 5213838acbbd505c4e080d93ca34bac272e6805a Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Mon, 20 Aug 2018 13:12:32 -0700 Subject: [PATCH 3/6] rename to ignoreForInternalUploader --- src/server/status/collectors/get_ops_stats_collector.js | 2 +- .../monitoring/server/kibana_monitoring/bulk_uploader.js | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server/status/collectors/get_ops_stats_collector.js b/src/server/status/collectors/get_ops_stats_collector.js index a11c5bf9b1dc9..d7e8e154b3bc2 100644 --- a/src/server/status/collectors/get_ops_stats_collector.js +++ b/src/server/status/collectors/get_ops_stats_collector.js @@ -45,6 +45,6 @@ export function getOpsStatsCollector(server, kbnServer) { ...kbnServer.metrics // latest metrics captured from the ops event listener in src/server/status/index }; }, - internalIgnore: true, // Ignore this one from internal uploader. A different stats collector is used there. + ignoreForInternalUploader: true, // Ignore this one from internal uploader. A different stats collector is used there. }); } diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js index 66b3351a71d30..737360c5e6b5c 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js @@ -65,7 +65,7 @@ export class BulkUploader { this._log.info('Starting monitoring stats collection'); // this is internal bulk upload, so filter out API-only collectors - const filterThem = _collectorSet => _collectorSet.getFilteredCollectorSet(c => c.internalIgnore !== true); + const filterThem = _collectorSet => _collectorSet.getFilteredCollectorSet(c => c.ignoreForInternalUploader !== true); this._fetchAndUpload(filterThem(collectorSet)); // initial fetch this._timer = setInterval(() => { this._fetchAndUpload(filterThem(collectorSet)); From 7f1833702e5fd375139376414646ff57d32ddab9 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Mon, 20 Aug 2018 13:17:21 -0700 Subject: [PATCH 4/6] collectors -> collectorSet --- src/server/usage/classes/collector_set.js | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/server/usage/classes/collector_set.js b/src/server/usage/classes/collector_set.js index 19cefde55687c..8bcb75e39959f 100644 --- a/src/server/usage/classes/collector_set.js +++ b/src/server/usage/classes/collector_set.js @@ -70,14 +70,14 @@ export class CollectorSet { /* * Call a bunch of fetch methods and then do them in bulk - * @param {Array} collectors - an array of collectors, default to all registered collectors + * @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors */ - bulkFetch(callCluster, collectors = this) { - if (!(collectors instanceof CollectorSet)) { - throw new Error(`bulkFetch method given bad collectors parameter: ` + typeof collectors); + bulkFetch(callCluster, collectorSet = this) { + if (!(collectorSet instanceof CollectorSet)) { + throw new Error(`bulkFetch method given bad collectorSet parameter: ` + typeof collectorSet); } - const fetchPromises = collectors.map(collector => { + const fetchPromises = collectorSet.map(collector => { const collectorType = collector.type; this._log.debug(`Fetching data from ${collectorType} collector`); return Promise.props({ From 3a11d4f847d8755804119e94855eca3b4ab6d1fb Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Mon, 20 Aug 2018 13:21:55 -0700 Subject: [PATCH 5/6] use constant for kibana_stats type --- .../collectors/get_kibana_usage_collector.js | 4 ++-- .../reporting/server/usage/get_reporting_usage_collector.js | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_kibana_usage_collector.js b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_kibana_usage_collector.js index 46178c9bfc020..a81db5aca586a 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_kibana_usage_collector.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/collectors/get_kibana_usage_collector.js @@ -5,7 +5,7 @@ */ import { get, snakeCase } from 'lodash'; -import { KIBANA_USAGE_TYPE } from '../../../common/constants'; +import { KIBANA_USAGE_TYPE, KIBANA_STATS_TYPE_MONITORING } from '../../../common/constants'; const TYPES = [ 'dashboard', @@ -70,7 +70,7 @@ export function getKibanaUsageCollector(server) { */ formatForBulkUpload: result => { return { - type: 'kibana_stats', + type: KIBANA_STATS_TYPE_MONITORING, payload: { usage: result } diff --git a/x-pack/plugins/reporting/server/usage/get_reporting_usage_collector.js b/x-pack/plugins/reporting/server/usage/get_reporting_usage_collector.js index 206363b5b303a..36a24a3c7b1b6 100644 --- a/x-pack/plugins/reporting/server/usage/get_reporting_usage_collector.js +++ b/x-pack/plugins/reporting/server/usage/get_reporting_usage_collector.js @@ -8,6 +8,7 @@ import { uniq } from 'lodash'; import { getExportTypesHandler } from './get_export_type_handler'; import { getReportCountsByParameter } from './get_reporting_type_counts'; import { KIBANA_REPORTING_TYPE } from '../../common/constants'; +import { KIBANA_STATS_TYPE_MONITORING } from '../../../monitoring/common/constants'; /** * @typedef {Object} ReportingUsageStats Almost all of these stats are optional. @@ -157,7 +158,7 @@ export function getReportingUsageCollector(server) { */ formatForBulkUpload: result => { return { - type: 'kibana_stats', + type: KIBANA_STATS_TYPE_MONITORING, payload: { usage: { xpack: { From d4b73c7495f7b38b75ff102cdd8d9f0f5e56dda0 Mon Sep 17 00:00:00 2001 From: Timothy Sullivan Date: Mon, 20 Aug 2018 13:29:14 -0700 Subject: [PATCH 6/6] example of data formatting for bulk in function comment --- .../server/kibana_monitoring/bulk_uploader.js | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js index 737360c5e6b5c..b838ada1a86ca 100644 --- a/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js +++ b/x-pack/plugins/monitoring/server/kibana_monitoring/bulk_uploader.js @@ -120,6 +120,39 @@ export class BulkUploader { /* * Bulk stats are transformed into a bulk upload format * Non-legacy transformation is done in CollectorSet.toApiStats + * + * Example: + * Before: + * [ + * { + * "type": "kibana_stats", + * "result": { + * "process": { ... }, + * "requests": { ... }, + * ... + * } + * }, + * ] + * + * After: + * [ + * { + * "index": { + * "_type": "kibana_stats" + * } + * }, + * { + * "kibana": { + * "host": "localhost", + * "uuid": "d619c5d1-4315-4f35-b69d-a3ac805489fb", + * "version": "7.0.0-alpha1", + * ... + * }, + * "process": { ... }, + * "requests": { ... }, + * ... + * } + * ] */ toBulkUploadFormat(rawData, collectorSet) { if (rawData.length === 0) {