-
Notifications
You must be signed in to change notification settings - Fork 8.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Retrofit the Bulk Uploader types combiner [ch2198] #22030
Changes from all commits
fb76429
c940ee1
ee1fa5d
5a7525a
5213838
7f18337
3a11d4f
d4b73c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,26 +26,25 @@ import { UsageCollector } from './usage_collector'; | |
/* | ||
* A collector object has types registered into it with the register(type) | ||
* function. Each type that gets registered defines how to fetch its own data | ||
* and combine it into a unified payload for bulk upload. | ||
* and optionally, how to combine it into a unified payload for bulk upload. | ||
*/ | ||
export class CollectorSet { | ||
|
||
/* | ||
* @param {Object} server - server object | ||
* @param {Number} options.interval - in milliseconds | ||
* @param {Function} options.combineTypes | ||
* @param {Function} options.onPayload | ||
* @param {Array} collectors to initialize, usually as a result of filtering another CollectorSet instance | ||
*/ | ||
constructor(server) { | ||
constructor(server, collectors = []) { | ||
this._log = getCollectorLogger(server); | ||
this._collectors = []; | ||
this._collectors = collectors; | ||
|
||
/* | ||
* Helper Factory methods | ||
* Define as instance properties to allow enclosing the server object | ||
*/ | ||
this.makeStatsCollector = options => new Collector(server, options); | ||
this.makeUsageCollector = options => new UsageCollector(server, options); | ||
this._makeCollectorSetFromArray = collectorsArray => new CollectorSet(server, collectorsArray); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this an instanced variable versus just static function defined at the top? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Having it here let's us enclose the |
||
} | ||
|
||
/* | ||
|
@@ -71,14 +70,14 @@ export class CollectorSet { | |
|
||
/* | ||
* Call a bunch of fetch methods and then do them in bulk | ||
* @param {Array} collectors - an array of collectors, default to all registered collectors | ||
* @param {CollectorSet} collectorSet - a set of collectors to fetch. Default to all registered collectors | ||
*/ | ||
bulkFetch(callCluster, collectors = this._collectors) { | ||
if (!Array.isArray(collectors)) { | ||
throw new Error(`bulkFetch method given bad collectors parameter: ` + typeof collectors); | ||
bulkFetch(callCluster, collectorSet = this) { | ||
if (!(collectorSet instanceof CollectorSet)) { | ||
throw new Error(`bulkFetch method given bad collectorSet parameter: ` + typeof collectorSet); | ||
} | ||
|
||
return Promise.map(collectors, collector => { | ||
const fetchPromises = collectorSet.map(collector => { | ||
const collectorType = collector.type; | ||
this._log.debug(`Fetching data from ${collectorType} collector`); | ||
return Promise.props({ | ||
|
@@ -90,10 +89,19 @@ export class CollectorSet { | |
this._log.warn(`Unable to fetch data from ${collectorType} collector`); | ||
}); | ||
}); | ||
return Promise.all(fetchPromises); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just wondering, is there a specific reason we keep using promises versus using async/await? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not really. This lets us not await each fetch one at a time. I could be wrong, but using |
||
} | ||
|
||
/* | ||
* @return {new CollectorSet} | ||
*/ | ||
getFilteredCollectorSet(filter) { | ||
const filtered = this._collectors.filter(filter); | ||
return this._makeCollectorSetFromArray(filtered); | ||
} | ||
|
||
async bulkFetchUsage(callCluster) { | ||
const usageCollectors = this._collectors.filter(c => c instanceof UsageCollector); | ||
const usageCollectors = this.getFilteredCollectorSet(c => c instanceof UsageCollector); | ||
return this.bulkFetch(callCluster, usageCollectors); | ||
} | ||
|
||
|
@@ -137,4 +145,8 @@ export class CollectorSet { | |
}; | ||
}, {}); | ||
} | ||
|
||
map(mapFn) { | ||
return this._collectors.map(mapFn); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm doing some testing, and it looks like I want to override this in the UsageCollector class. Usage collectors should organize their data in the
kibana_stats
type in theusage
namespace. - c940ee1There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that's snazzy!