From 96e40d6edec484ea321192b294bd459210d0d729 Mon Sep 17 00:00:00 2001 From: Chris Roberson Date: Wed, 2 Oct 2019 15:09:25 -0400 Subject: [PATCH] [Monitoring] Use server side pagination for Logstash Pipelines page (#46587) * Basic version working for cluster pipelines * More support * Refactoring * Fixes * Fix sorting issues * Reduce the number of buckets too * Fix tests * This is actually not helping - it seems that the filter in the query doesn't work as expected - maybe related to the fact that are using nested fields * Add more data for metric.debug * Support sorting on throughput and node count * Fix broken test * Use getMetrics and support with numOfBuckets parameter * Fix test for realz * Fix logstash management pages by introducing a new api to just retrieve ids * We need this to go back to 1000 but it doesn't affect the number of created buckets * Fix issue with pagination data when filtering * Fix sorting by id not working * Make this a little more sturdy --- .../services/monitoring/monitoring_service.js | 4 +- .../pipeline_listing/pipeline_listing.js | 18 +- .../public/components/table/eui_table_ssp.js | 79 ++++++++ .../public/components/table/index.js | 1 + .../monitoring/public/views/alerts/index.js | 1 + .../public/views/base_controller.js | 11 +- .../public/views/base_eui_table_controller.js | 75 +++++++- .../views/logstash/node/pipelines/index.js | 19 +- .../public/views/logstash/pipelines/index.js | 18 +- .../lib/cluster/get_clusters_from_request.js | 2 +- .../server/lib/details/get_metrics.js | 11 +- .../server/lib/details/get_series.js | 59 ++++-- .../lib/logstash/__tests__/get_pipelines.js | 6 +- .../lib/logstash/get_paginated_pipelines.js | 69 +++++++ .../server/lib/logstash/get_pipeline_ids.js | 78 ++++++++ .../server/lib/logstash/get_pipelines.js | 37 ++-- .../server/lib/logstash/sort_pipelines.js | 14 ++ .../__snapshots__/metrics.test.js.snap | 172 +----------------- .../server/lib/metrics/logstash/classes.js | 58 +++--- .../server/lib/metrics/logstash/metrics.js | 2 +- .../server/lib/pagination/filter.js | 23 +++ .../server/lib/pagination/paginate.js | 10 + .../server/routes/api/v1/logstash/index.js | 1 + .../pipelines/cluster_pipeline_ids.js | 49 +++++ .../logstash/pipelines/cluster_pipelines.js | 42 ++++- .../v1/logstash/pipelines/node_pipelines.js | 43 ++++- .../monitoring/server/routes/api/v1/ui.js | 3 +- .../apps/monitoring/logstash/pipelines.js | 11 +- .../page_objects/monitoring_page.js | 3 +- .../services/monitoring/logstash_pipelines.js | 7 + 30 files changed, 653 insertions(+), 273 deletions(-) create mode 100644 x-pack/legacy/plugins/monitoring/public/components/table/eui_table_ssp.js create mode 100644 x-pack/legacy/plugins/monitoring/server/lib/logstash/get_paginated_pipelines.js create mode 100644 x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipeline_ids.js create mode 100644 x-pack/legacy/plugins/monitoring/server/lib/logstash/sort_pipelines.js create mode 100644 x-pack/legacy/plugins/monitoring/server/lib/pagination/filter.js create mode 100644 x-pack/legacy/plugins/monitoring/server/lib/pagination/paginate.js create mode 100644 x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipeline_ids.js diff --git a/x-pack/legacy/plugins/logstash/public/services/monitoring/monitoring_service.js b/x-pack/legacy/plugins/logstash/public/services/monitoring/monitoring_service.js index a160408bcaa02..6a948ad2e39ed 100755 --- a/x-pack/legacy/plugins/logstash/public/services/monitoring/monitoring_service.js +++ b/x-pack/legacy/plugins/logstash/public/services/monitoring/monitoring_service.js @@ -32,7 +32,7 @@ export class MonitoringService { return this.clusterService.loadCluster() .then(cluster => { - const url = `${this.basePath}/v1/clusters/${cluster.uuid}/logstash/pipelines`; + const url = `${this.basePath}/v1/clusters/${cluster.uuid}/logstash/pipeline_ids`; const now = moment.utc(); const body = { timeRange: { @@ -42,7 +42,7 @@ export class MonitoringService { }; return this.$http.post(url, body); }) - .then(response => response.data.pipelines.map(pipeline => PipelineListItem.fromUpstreamMonitoringJSON(pipeline))) + .then(response => response.data.map(pipeline => PipelineListItem.fromUpstreamMonitoringJSON(pipeline))) .catch(() => []); } } diff --git a/x-pack/legacy/plugins/monitoring/public/components/logstash/pipeline_listing/pipeline_listing.js b/x-pack/legacy/plugins/monitoring/public/components/logstash/pipeline_listing/pipeline_listing.js index 9dc336f24a40b..ef306a9a2f06c 100644 --- a/x-pack/legacy/plugins/monitoring/public/components/logstash/pipeline_listing/pipeline_listing.js +++ b/x-pack/legacy/plugins/monitoring/public/components/logstash/pipeline_listing/pipeline_listing.js @@ -11,7 +11,7 @@ import { EuiPage, EuiLink, EuiPageBody, EuiPageContent, EuiPanel, EuiSpacer, Eui import { formatMetric } from '../../../lib/format_number'; import { ClusterStatus } from '../cluster_status'; import { Sparkline } from 'plugins/monitoring/components/sparkline'; -import { EuiMonitoringTable } from '../../table'; +import { EuiMonitoringSSPTable } from '../../table'; import { i18n } from '@kbn/i18n'; export class PipelineListing extends Component { @@ -137,6 +137,7 @@ export class PipelineListing extends Component { sorting, pagination, onTableChange, + fetchMoreData, upgradeMessage, className } = this.props; @@ -151,31 +152,22 @@ export class PipelineListing extends Component { - diff --git a/x-pack/legacy/plugins/monitoring/public/components/table/eui_table_ssp.js b/x-pack/legacy/plugins/monitoring/public/components/table/eui_table_ssp.js new file mode 100644 index 0000000000000..868464e8703be --- /dev/null +++ b/x-pack/legacy/plugins/monitoring/public/components/table/eui_table_ssp.js @@ -0,0 +1,79 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import React from 'react'; +import { + EuiBasicTable, + EuiSpacer, + EuiSearchBar +} from '@elastic/eui'; + +export function EuiMonitoringSSPTable({ + rows: items, + search = {}, + pagination, + columns: _columns, + onTableChange, + fetchMoreData, + ...props +}) { + const [isLoading, setIsLoading] = React.useState(false); + const [queryText, setQueryText] = React.useState(''); + const [page, setPage] = React.useState({ + index: pagination.pageIndex, + size: pagination.pageSize + }); + const [sort, setSort] = React.useState(props.sorting); + + if (search.box && !search.box['data-test-subj']) { + search.box['data-test-subj'] = 'monitoringTableToolBar'; + } + + const columns = _columns.map(column => { + if (!column['data-test-subj']) { + column['data-test-subj'] = 'monitoringTableHasData'; + } + + if (!('sortable' in column)) { + column.sortable = true; + } + + return column; + }); + + const onChange = async ({ page, sort }) => { + setPage(page); + setSort({ sort }); + setIsLoading(true); + await fetchMoreData({ page, sort: { sort }, queryText }); + setIsLoading(false); + onTableChange({ page, sort }); + }; + + const onQueryChange = async ({ queryText }) => { + const newPage = { ...page, index: 0 }; + setPage(newPage); + setQueryText(queryText); + setIsLoading(true); + await fetchMoreData({ page: newPage, sort, queryText }); + setIsLoading(false); + }; + + return ( +
+ + + +
+ ); +} diff --git a/x-pack/legacy/plugins/monitoring/public/components/table/index.js b/x-pack/legacy/plugins/monitoring/public/components/table/index.js index d807352ff14c8..66bdb46904dba 100644 --- a/x-pack/legacy/plugins/monitoring/public/components/table/index.js +++ b/x-pack/legacy/plugins/monitoring/public/components/table/index.js @@ -5,4 +5,5 @@ */ export { EuiMonitoringTable } from './eui_table'; +export { EuiMonitoringSSPTable } from './eui_table_ssp'; export { tableStorageGetter, tableStorageSetter, euiTableStorageGetter, euiTableStorageSetter } from './storage'; diff --git a/x-pack/legacy/plugins/monitoring/public/views/alerts/index.js b/x-pack/legacy/plugins/monitoring/public/views/alerts/index.js index 5f278d661db1d..8ffb30cee9623 100644 --- a/x-pack/legacy/plugins/monitoring/public/views/alerts/index.js +++ b/x-pack/legacy/plugins/monitoring/public/views/alerts/index.js @@ -66,6 +66,7 @@ uiRoutes.when('/alerts', { getPageData, $scope, $injector, + storageKey: 'alertsTable', reactNodeId: 'monitoringAlertsApp' }); diff --git a/x-pack/legacy/plugins/monitoring/public/views/base_controller.js b/x-pack/legacy/plugins/monitoring/public/views/base_controller.js index 15abe5ed3e949..600e229b031bf 100644 --- a/x-pack/legacy/plugins/monitoring/public/views/base_controller.js +++ b/x-pack/legacy/plugins/monitoring/public/views/base_controller.js @@ -70,7 +70,8 @@ export class MonitoringViewBaseController { reactNodeId = null, // WIP: https://github.com/elastic/x-pack-kibana/issues/5198 $scope, $injector, - options = {} + options = {}, + fetchDataImmediately = true }) { const titleService = $injector.get('title'); const $executor = $injector.get('$executor'); @@ -119,7 +120,7 @@ export class MonitoringViewBaseController { this.updateDataPromise = null; } const _api = apiUrlFn ? apiUrlFn() : api; - const promises = [_getPageData($injector, _api)]; + const promises = [_getPageData($injector, _api, this.getPaginationRouteOptions())]; const setupMode = getSetupModeState(); if (setupMode.enabled) { promises.push(updateSetupModeData()); @@ -132,7 +133,7 @@ export class MonitoringViewBaseController { }); }); }; - this.updateData(); + fetchDataImmediately && this.updateData(); $executor.register({ execute: () => this.updateData() @@ -175,4 +176,8 @@ export class MonitoringViewBaseController { render(component, document.getElementById(this.reactNodeId)); } } + + getPaginationRouteOptions() { + return {}; + } } diff --git a/x-pack/legacy/plugins/monitoring/public/views/base_eui_table_controller.js b/x-pack/legacy/plugins/monitoring/public/views/base_eui_table_controller.js index dfc548aeb97f2..fb712fa3e7c6c 100644 --- a/x-pack/legacy/plugins/monitoring/public/views/base_eui_table_controller.js +++ b/x-pack/legacy/plugins/monitoring/public/views/base_eui_table_controller.js @@ -8,6 +8,8 @@ import { MonitoringViewBaseController } from './'; import { euiTableStorageGetter, euiTableStorageSetter } from 'plugins/monitoring/components/table'; import { EUI_SORT_ASCENDING } from '../../common/constants'; +const PAGE_SIZE_OPTIONS = [5, 10, 20, 50]; + /** * Class to manage common instantiation behaviors in a view controller * And add persistent state to a table: @@ -42,17 +44,22 @@ export class MonitoringViewBaseEuiTableController extends MonitoringViewBaseCont const setLocalStorageData = euiTableStorageSetter(storageKey); const { page, sort } = getLocalStorageData(storage); - this.pagination = page || { + this.pagination = { + pageSize: 20, initialPageSize: 20, - pageSizeOptions: [5, 10, 20, 50] + pageIndex: 0, + initialPageIndex: 0, + pageSizeOptions: PAGE_SIZE_OPTIONS }; - this.sorting = sort || { - sort: { - field: 'name', - direction: EUI_SORT_ASCENDING + if (page) { + if (!PAGE_SIZE_OPTIONS.includes(page.size)) { + page.size = 20; } - }; + this.setPagination(page); + } + + this.setSorting(sort); this.onTableChange = ({ page, sort }) => { setLocalStorageData(storage, { @@ -62,5 +69,59 @@ export class MonitoringViewBaseEuiTableController extends MonitoringViewBaseCont } }); }; + + this.updateData(); + } + + setPagination(page) { + this.pagination = { + pageSize: page.size, + pageIndex: page.index, + pageSizeOptions: PAGE_SIZE_OPTIONS + }; + } + + setSorting(sort) { + this.sorting = sort || { sort: {} }; + + if (!this.sorting.sort.field) { + this.sorting.sort.field = 'name'; + } + if (!this.sorting.sort.direction) { + this.sorting.sort.direction = EUI_SORT_ASCENDING; + } + } + + setQueryText(queryText) { + this.queryText = queryText; + } + + getPaginationRouteOptions() { + if (!this.pagination || !this.sorting) { + return {}; + } + + return { + pagination: { + size: this.pagination.pageSize, + index: this.pagination.pageIndex + }, + ...this.sorting, + queryText: this.queryText, + }; + } + + getPaginationTableProps(pagination) { + return { + sorting: this.sorting, + pagination: pagination, + onTableChange: this.onTableChange, + fetchMoreData: async ({ page, sort, queryText }) => { + this.setPagination(page); + this.setSorting(sort); + this.setQueryText(queryText); + this.updateData(); + } + }; } } diff --git a/x-pack/legacy/plugins/monitoring/public/views/logstash/node/pipelines/index.js b/x-pack/legacy/plugins/monitoring/public/views/logstash/node/pipelines/index.js index 80fcde7642758..b8bdccab34977 100644 --- a/x-pack/legacy/plugins/monitoring/public/views/logstash/node/pipelines/index.js +++ b/x-pack/legacy/plugins/monitoring/public/views/logstash/node/pipelines/index.js @@ -24,7 +24,7 @@ import { PipelineListing } from '../../../../components/logstash/pipeline_listin import { DetailStatus } from '../../../../components/logstash/detail_status'; import { CODE_PATH_LOGSTASH } from '../../../../../common/constants'; -const getPageData = ($injector) => { +const getPageData = ($injector, _api = undefined, routeOptions = {}) => { const $route = $injector.get('$route'); const $http = $injector.get('$http'); const globalState = $injector.get('globalState'); @@ -39,7 +39,8 @@ const getPageData = ($injector) => { timeRange: { min: timeBounds.min.toISOString(), max: timeBounds.max.toISOString() - } + }, + ...routeOptions }) .then(response => response.data) .catch((err) => { @@ -70,7 +71,6 @@ uiRoutes const routeInit = Private(routeInitProvider); return routeInit({ codePaths: [CODE_PATH_LOGSTASH] }); }, - pageData: getPageData }, controller: class extends MonitoringViewBaseEuiTableController { constructor($injector, $scope) { @@ -82,9 +82,11 @@ uiRoutes getPageData, reactNodeId: 'monitoringLogstashNodePipelinesApp', $scope, - $injector + $injector, + fetchDataImmediately: false // We want to apply pagination before sending the first request }); + $scope.$watch(() => this.data, data => { if (!data || !data.nodeSummary) { return; @@ -97,6 +99,11 @@ uiRoutes } })); + const pagination = { + ...this.pagination, + totalItemCount: data.totalPipelineCount + }; + this.renderReact( { +const getPageData = ($injector, _api = undefined, routeOptions = {}) => { const $http = $injector.get('$http'); const globalState = $injector.get('globalState'); const Private = $injector.get('Private'); @@ -37,7 +37,8 @@ const getPageData = ($injector) => { timeRange: { min: timeBounds.min.toISOString(), max: timeBounds.max.toISOString() - } + }, + ...routeOptions }) .then(response => response.data) .catch((err) => { @@ -64,7 +65,6 @@ uiRoutes const routeInit = Private(routeInitProvider); return routeInit({ codePaths: [CODE_PATH_LOGSTASH] }); }, - pageData: getPageData }, controller: class LogstashPipelinesList extends MonitoringViewBaseEuiTableController { constructor($injector, $scope) { @@ -74,7 +74,8 @@ uiRoutes getPageData, reactNodeId: 'monitoringLogstashPipelinesApp', $scope, - $injector + $injector, + fetchDataImmediately: false // We want to apply pagination before sending the first request }); const $route = $injector.get('$route'); @@ -93,6 +94,11 @@ uiRoutes ? makeUpgradeMessage(pageData.clusterStatus.versions, i18n) : null; + const pagination = { + ...this.pagination, + totalItemCount: pageData.totalPipelineCount + }; + super.renderReact( this.onBrush({ xaxis })} stats={pageData.clusterStatus} data={pageData.pipelines} - sorting={this.sorting} - pagination={this.pagination} - onTableChange={this.onTableChange} + {...this.getPaginationTableProps(pagination)} upgradeMessage={upgradeMessage} dateFormat={config.get('dateFormat')} angular={{ diff --git a/x-pack/legacy/plugins/monitoring/server/lib/cluster/get_clusters_from_request.js b/x-pack/legacy/plugins/monitoring/server/lib/cluster/get_clusters_from_request.js index 9b6a2380bf4ec..9fc6866f0f579 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/cluster/get_clusters_from_request.js +++ b/x-pack/legacy/plugins/monitoring/server/lib/cluster/get_clusters_from_request.js @@ -150,7 +150,7 @@ export async function getClustersFromRequest(req, indexPatterns, { clusterUuid, : []; const clusterPipelineNodesCount = isInCodePath(codePaths, [CODE_PATH_LOGSTASH]) - ? await getPipelines(req, lsIndexPattern, ['logstash_cluster_pipeline_nodes_count']) + ? await getPipelines(req, lsIndexPattern, null, ['logstash_cluster_pipeline_nodes_count']) : []; // add the logstash data to each cluster diff --git a/x-pack/legacy/plugins/monitoring/server/lib/details/get_metrics.js b/x-pack/legacy/plugins/monitoring/server/lib/details/get_metrics.js index c5d2ee2032b01..e11de68b55c1f 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/details/get_metrics.js +++ b/x-pack/legacy/plugins/monitoring/server/lib/details/get_metrics.js @@ -12,18 +12,23 @@ import { getSeries } from './get_series'; import { calculateTimeseriesInterval } from '../calculate_timeseries_interval'; import { getTimezone } from '../get_timezone'; -export async function getMetrics(req, indexPattern, metricSet = [], filters = []) { +export async function getMetrics(req, indexPattern, metricSet = [], filters = [], metricOptions = {}, numOfBuckets = 0) { checkParam(indexPattern, 'indexPattern in details/getMetrics'); checkParam(metricSet, 'metricSet in details/getMetrics'); const config = req.server.config(); // TODO: Pass in req parameters as explicit function parameters - const min = moment.utc(req.payload.timeRange.min).valueOf(); + let min = moment.utc(req.payload.timeRange.min).valueOf(); const max = moment.utc(req.payload.timeRange.max).valueOf(); const minIntervalSeconds = config.get('xpack.monitoring.min_interval_seconds'); const bucketSize = calculateTimeseriesInterval(min, max, minIntervalSeconds); const timezone = await getTimezone(req); + // If specified, adjust the time period to ensure we only return this many buckets + if (numOfBuckets > 0) { + min = max - (numOfBuckets * bucketSize * 1000); + } + return Promise.map(metricSet, metric => { // metric names match the literal metric name, but they can be supplied in groups or individually let metricNames; @@ -35,7 +40,7 @@ export async function getMetrics(req, indexPattern, metricSet = [], filters = [] } return Promise.map(metricNames, metricName => { - return getSeries(req, indexPattern, metricName, filters, { min, max, bucketSize, timezone }); + return getSeries(req, indexPattern, metricName, metricOptions, filters, { min, max, bucketSize, timezone }); }); }) .then(rows => { diff --git a/x-pack/legacy/plugins/monitoring/server/lib/details/get_series.js b/x-pack/legacy/plugins/monitoring/server/lib/details/get_series.js index e66878f522ecb..059b02980a427 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/details/get_series.js +++ b/x-pack/legacy/plugins/monitoring/server/lib/details/get_series.js @@ -71,18 +71,27 @@ function createMetricAggs(metric) { return metric.aggs; } -function fetchSeries(req, indexPattern, metric, min, max, bucketSize, filters) { +function fetchSeries(req, indexPattern, metric, metricOptions, min, max, bucketSize, filters) { // if we're using a derivative metric, offset the min (also @see comment on offsetMinForDerivativeMetric function) const adjustedMin = metric.derivative ? offsetMinForDerivativeMetric(min, bucketSize) : min; - const dateHistogramSubAggs = metric.dateHistogramSubAggs || { - metric: { - [metric.metricAgg]: { - field: metric.field - } - }, - ...createMetricAggs(metric) - }; + let dateHistogramSubAggs = null; + if (metric.getDateHistogramSubAggs) { + dateHistogramSubAggs = metric.getDateHistogramSubAggs(metricOptions); + } + else if (metric.dateHistogramSubAggs) { + dateHistogramSubAggs = metric.dateHistogramSubAggs; + } + else { + dateHistogramSubAggs = { + metric: { + [metric.metricAgg]: { + field: metric.field + } + }, + ...createMetricAggs(metric) + }; + } const params = { index: indexPattern, @@ -178,6 +187,30 @@ const formatBucketSize = bucketSizeInSeconds => { return formatTimestampToDuration(timestamp, CALCULATE_DURATION_UNTIL, now); }; +function isObject(value) { + return typeof value === 'object' && !!value && !Array.isArray(value); +} + +function countBuckets(data, count = 0) { + if (data.buckets) { + count += data.buckets.length; + for (const bucket of data.buckets) { + for (const key of Object.keys(bucket)) { + if (isObject(bucket[key])) { + count = countBuckets(bucket[key], count); + } + } + } + } else { + for (const key of Object.keys(data)) { + if (isObject(data[key])) { + count = countBuckets(data[key], count); + } + } + } + return count; +} + function handleSeries(metric, min, max, bucketSizeInSeconds, timezone, response) { const { derivative, calculation: customCalculation } = metric; const buckets = get(response, 'aggregations.check.buckets', []); @@ -185,6 +218,10 @@ function handleSeries(metric, min, max, bucketSizeInSeconds, timezone, response) const lastUsableBucketIndex = findLastUsableBucketIndex(buckets, max, firstUsableBucketIndex, bucketSizeInSeconds * 1000); let data = []; + if (metric.debug) { + console.log(`metric.debug field=${metric.field} bucketsCreated: ${countBuckets(get(response, 'aggregations.check'))}`); + console.log(`metric.debug`, { bucketsLength: buckets.length, firstUsableBucketIndex, lastUsableBucketIndex }); + } if (firstUsableBucketIndex <= lastUsableBucketIndex) { // map buckets to values for charts @@ -221,14 +258,14 @@ function handleSeries(metric, min, max, bucketSizeInSeconds, timezone, response) * @param {Array} filters Any filters that should be applied to the query. * @return {Promise} The object response containing the {@code timeRange}, {@code metric}, and {@code data}. */ -export async function getSeries(req, indexPattern, metricName, filters, { min, max, bucketSize, timezone }) { +export async function getSeries(req, indexPattern, metricName, metricOptions, filters, { min, max, bucketSize, timezone }) { checkParam(indexPattern, 'indexPattern in details/getSeries'); const metric = metrics[metricName]; if (!metric) { throw new Error(`Not a valid metric: ${metricName}`); } - const response = await fetchSeries(req, indexPattern, metric, min, max, bucketSize, filters); + const response = await fetchSeries(req, indexPattern, metric, metricOptions, min, max, bucketSize, filters); return handleSeries(metric, min, max, bucketSize, timezone, response); } diff --git a/x-pack/legacy/plugins/monitoring/server/lib/logstash/__tests__/get_pipelines.js b/x-pack/legacy/plugins/monitoring/server/lib/logstash/__tests__/get_pipelines.js index 47735e08c470a..d4327049b2b41 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/logstash/__tests__/get_pipelines.js +++ b/x-pack/legacy/plugins/monitoring/server/lib/logstash/__tests__/get_pipelines.js @@ -6,7 +6,7 @@ import expect from '@kbn/expect'; import { - _handleResponse, + handleGetPipelinesResponse, processPipelinesAPIResponse } from '../get_pipelines'; describe('processPipelinesAPIResponse', () => { @@ -71,7 +71,7 @@ describe('get_pipelines', () => { }); it ('returns an empty array', () => { - const result = _handleResponse(fetchPipelinesWithMetricsResult); + const result = handleGetPipelinesResponse(fetchPipelinesWithMetricsResult); expect(result).to.eql([]); }); }); @@ -97,7 +97,7 @@ describe('get_pipelines', () => { }); it ('returns the correct structure for a non-empty response', () => { - const result = _handleResponse(fetchPipelinesWithMetricsResult); + const result = handleGetPipelinesResponse(fetchPipelinesWithMetricsResult); expect(result).to.eql([ { id: 'apache_logs', diff --git a/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_paginated_pipelines.js b/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_paginated_pipelines.js new file mode 100644 index 0000000000000..9d988c2da2224 --- /dev/null +++ b/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_paginated_pipelines.js @@ -0,0 +1,69 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import { get } from 'lodash'; +import { filter } from '../pagination/filter'; +import { getLogstashPipelineIds } from './get_pipeline_ids'; +import { handleGetPipelinesResponse } from './get_pipelines'; +import { sortPipelines } from './sort_pipelines'; +import { paginate } from '../pagination/paginate'; +import { getMetrics } from '../details/get_metrics'; + +/** + * This function performs an optimization around the pipeline listing tables in the UI. To avoid + * query performances in Elasticsearch (mainly thinking of `search.max_buckets` overflows), we do + * not want to fetch all time-series data for all pipelines. Instead, we only want to fetch the + * time-series data for the pipelines visible in the listing table. This function accepts + * pagination/sorting/filtering data to determine which pipelines will be visible in the table + * and returns that so the caller can perform their normal call to get the time-series data. + * + * @param {*} req - Server request object + * @param {*} lsIndexPattern - The index pattern to search against (`.monitoring-logstash-*`) + * @param {*} uuids - The optional `clusterUuid` and `logstashUuid` to filter the results from + * @param {*} metricSet - The array of metrics that are sortable in the UI + * @param {*} pagination - ({ index, size }) + * @param {*} sort - ({ field, direction }) + * @param {*} queryText - Text that will be used to filter out pipelines + */ +export async function getPaginatedPipelines(req, lsIndexPattern, { clusterUuid, logstashUuid }, metricSet, pagination, sort, queryText) { + const config = req.server.config(); + const size = config.get('xpack.monitoring.max_bucket_size'); + const pipelines = await getLogstashPipelineIds(req, lsIndexPattern, { clusterUuid, logstashUuid }, size); + + // `metricSet` defines a list of metrics that are sortable in the UI + // but we don't need to fetch all the data for these metrics to perform + // the necessary sort - we only need the last bucket of data so we + // fetch the last two buckets of data (to ensure we have a single full bucekt), + // then return the value from that last bucket + const metricSeriesData = await getMetrics(req, lsIndexPattern, metricSet, [], { pageOfPipelines: pipelines }, 2); + const pipelineAggregationsData = handleGetPipelinesResponse(metricSeriesData, pipelines.map(p => p.id)); + for (const pipelineAggregationData of pipelineAggregationsData) { + for (const pipeline of pipelines) { + if (pipelineAggregationData.id === pipeline.id) { + for (const metric of metricSet) { + const dataSeries = get(pipelineAggregationData, `metrics.${metric}.data`, [[]]); + pipeline[metric] = dataSeries[dataSeries.length - 1][1]; + } + } + } + } + + // Manually apply pagination/sorting/filtering concerns + + // Filtering + const filteredPipelines = filter(pipelines, queryText, ['id']); // We only support filtering by id right now + + // Sorting + const sortedPipelines = sortPipelines(filteredPipelines, sort); + + // Pagination + const pageOfPipelines = paginate(pagination, sortedPipelines); + + return { + pageOfPipelines, + totalPipelineCount: filteredPipelines.length + }; +} diff --git a/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipeline_ids.js b/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipeline_ids.js new file mode 100644 index 0000000000000..f99925425abd3 --- /dev/null +++ b/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipeline_ids.js @@ -0,0 +1,78 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import moment from 'moment'; +import { get } from 'lodash'; +import { createQuery } from '../create_query'; +import { LogstashMetric } from '../metrics'; + +export async function getLogstashPipelineIds(req, logstashIndexPattern, { clusterUuid, logstashUuid }, size) { + const start = moment.utc(req.payload.timeRange.min).valueOf(); + const end = moment.utc(req.payload.timeRange.max).valueOf(); + + const filters = []; + if (logstashUuid) { + filters.push({ term: { 'logstash_stats.logstash.uuid': logstashUuid } }); + } + + const params = { + index: logstashIndexPattern, + size: 0, + ignoreUnavailable: true, + filterPath: [ + 'aggregations.nested_context.composite_data.buckets' + ], + body: { + query: createQuery({ + start, + end, + metric: LogstashMetric.getMetricFields(), + clusterUuid, + filters, + }), + aggs: { + nested_context: { + nested: { + path: 'logstash_stats.pipelines' + }, + aggs: { + composite_data: { + composite: { + size, + sources: [ + { + id: { + terms: { + field: 'logstash_stats.pipelines.id', + } + } + }, + { + hash: { + terms: { + field: 'logstash_stats.pipelines.hash', + } + } + }, + { + ephemeral_id: { + terms: { + field: 'logstash_stats.pipelines.ephemeral_id', + } + } + } + ] + } + } + } + } + } + } + }; + + const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring'); + const response = await callWithRequest(req, 'search', params); + return get(response, 'aggregations.nested_context.composite_data.buckets', []).map(bucket => bucket.key); +} diff --git a/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipelines.js b/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipelines.js index 8cec101477ecf..c059e62815917 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipelines.js +++ b/x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipelines.js @@ -3,12 +3,11 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ - import { cloneDeep, last, omit } from 'lodash'; import { checkParam } from '../error_missing_required'; import { getMetrics } from '../details/get_metrics'; -export function _handleResponse(response) { +export function handleGetPipelinesResponse(response, exclusivePipelineIds) { const pipelinesById = {}; const metrics = Object.keys(response); @@ -16,6 +15,9 @@ export function _handleResponse(response) { response[metric][0].data.forEach(([x, y]) => { const pipelineIds = Object.keys(y); pipelineIds.forEach(pipelineId => { + if (exclusivePipelineIds && !exclusivePipelineIds.includes(pipelineId)) { + return; + } // Create new pipeline object if necessary if (!pipelinesById.hasOwnProperty(pipelineId)) { pipelinesById[pipelineId] = { @@ -40,14 +42,24 @@ export function _handleResponse(response) { }); }); - // Convert pipelinesById map to array + // Convert pipelinesById map to array and preserve sorting const pipelines = []; - Object.keys(pipelinesById).forEach(pipelineId => { - pipelines.push({ - id: pipelineId, - ...pipelinesById[pipelineId] + if (exclusivePipelineIds) { + for (const exclusivePipelineId of exclusivePipelineIds) { + pipelines.push({ + id: exclusivePipelineId, + ...pipelinesById[exclusivePipelineId] + }); + } + } + else { + Object.keys(pipelinesById).forEach(pipelineId => { + pipelines.push({ + id: pipelineId, + ...pipelinesById[pipelineId] + }); }); - }); + } return pipelines; } @@ -71,10 +83,13 @@ export async function processPipelinesAPIResponse(response, throughputMetricKey, return processedResponse; } -export async function getPipelines(req, logstashIndexPattern, metricSet) { + +export async function getPipelines(req, logstashIndexPattern, pipelineIds, metricSet, metricOptions = {}) { checkParam(logstashIndexPattern, 'logstashIndexPattern in logstash/getPipelines'); checkParam(metricSet, 'metricSet in logstash/getPipelines'); - const metricsResponse = await getMetrics(req, logstashIndexPattern, metricSet); - return _handleResponse(metricsResponse); + const filters = []; + + const metricsResponse = await getMetrics(req, logstashIndexPattern, metricSet, filters, metricOptions); + return handleGetPipelinesResponse(metricsResponse, pipelineIds); } diff --git a/x-pack/legacy/plugins/monitoring/server/lib/logstash/sort_pipelines.js b/x-pack/legacy/plugins/monitoring/server/lib/logstash/sort_pipelines.js new file mode 100644 index 0000000000000..994e910c8ec4b --- /dev/null +++ b/x-pack/legacy/plugins/monitoring/server/lib/logstash/sort_pipelines.js @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { sortByOrder } from 'lodash'; + +export function sortPipelines(pipelines, sort) { + if (!sort) { + return pipelines; + } + + return sortByOrder(pipelines, pipeline => pipeline[sort.field], sort.direction); +} diff --git a/x-pack/legacy/plugins/monitoring/server/lib/metrics/__test__/__snapshots__/metrics.test.js.snap b/x-pack/legacy/plugins/monitoring/server/lib/metrics/__test__/__snapshots__/metrics.test.js.snap index a899dc9dfc748..21fce7e64376c 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/metrics/__test__/__snapshots__/metrics.test.js.snap +++ b/x-pack/legacy/plugins/monitoring/server/lib/metrics/__test__/__snapshots__/metrics.test.js.snap @@ -2996,37 +2996,11 @@ Object { "logstash_cluster_pipeline_nodes_count": LogstashPipelineNodeCountMetric { "app": "logstash", "calculation": [Function], - "dateHistogramSubAggs": Object { - "pipelines_nested": Object { - "aggs": Object { - "by_pipeline_id": Object { - "aggs": Object { - "to_root": Object { - "aggs": Object { - "node_count": Object { - "cardinality": Object { - "field": "logstash_stats.logstash.uuid", - }, - }, - }, - "reverse_nested": Object {}, - }, - }, - "terms": Object { - "field": "logstash_stats.pipelines.id", - "size": 1000, - }, - }, - }, - "nested": Object { - "path": "logstash_stats.pipelines", - }, - }, - }, "derivative": false, "description": "Number of nodes on which the Logstash pipeline is running.", "field": "logstash_stats.logstash.uuid", "format": "0,0.[00]", + "getDateHistogramSubAggs": [Function], "label": "Pipeline Node Count", "timestampField": "logstash_stats.timestamp", "units": "", @@ -3035,67 +3009,11 @@ Object { "logstash_cluster_pipeline_throughput": LogstashPipelineThroughputMetric { "app": "logstash", "calculation": [Function], - "dateHistogramSubAggs": Object { - "pipelines_nested": Object { - "aggs": Object { - "by_pipeline_id": Object { - "aggs": Object { - "by_pipeline_hash": Object { - "aggs": Object { - "by_ephemeral_id": Object { - "aggs": Object { - "events_stats": Object { - "stats": Object { - "field": "logstash_stats.pipelines.events.out", - }, - }, - "throughput": Object { - "bucket_script": Object { - "buckets_path": Object { - "max": "events_stats.max", - "min": "events_stats.min", - }, - "script": "params.max - params.min", - }, - }, - }, - "terms": Object { - "field": "logstash_stats.pipelines.ephemeral_id", - "size": 1000, - }, - }, - "throughput": Object { - "sum_bucket": Object { - "buckets_path": "by_ephemeral_id>throughput", - }, - }, - }, - "terms": Object { - "field": "logstash_stats.pipelines.hash", - "size": 1000, - }, - }, - "throughput": Object { - "sum_bucket": Object { - "buckets_path": "by_pipeline_hash>throughput", - }, - }, - }, - "terms": Object { - "field": "logstash_stats.pipelines.id", - "size": 1000, - }, - }, - }, - "nested": Object { - "path": "logstash_stats.pipelines", - }, - }, - }, "derivative": false, "description": "Number of events emitted per second by the Logstash pipeline at the outputs stage.", "field": "logstash_stats.pipelines.events.out", "format": "0,0.[00]", + "getDateHistogramSubAggs": [Function], "label": "Pipeline Throughput", "timestampField": "logstash_stats.timestamp", "units": "e/s", @@ -3354,37 +3272,11 @@ Object { "logstash_node_pipeline_nodes_count": LogstashPipelineNodeCountMetric { "app": "logstash", "calculation": [Function], - "dateHistogramSubAggs": Object { - "pipelines_nested": Object { - "aggs": Object { - "by_pipeline_id": Object { - "aggs": Object { - "to_root": Object { - "aggs": Object { - "node_count": Object { - "cardinality": Object { - "field": "logstash_stats.logstash.uuid", - }, - }, - }, - "reverse_nested": Object {}, - }, - }, - "terms": Object { - "field": "logstash_stats.pipelines.id", - "size": 1000, - }, - }, - }, - "nested": Object { - "path": "logstash_stats.pipelines", - }, - }, - }, "derivative": false, "description": "Number of nodes on which the Logstash pipeline is running.", "field": "logstash_stats.logstash.uuid", "format": "0,0.[00]", + "getDateHistogramSubAggs": [Function], "label": "Pipeline Node Count", "timestampField": "logstash_stats.timestamp", "units": "", @@ -3393,67 +3285,11 @@ Object { "logstash_node_pipeline_throughput": LogstashPipelineThroughputMetric { "app": "logstash", "calculation": [Function], - "dateHistogramSubAggs": Object { - "pipelines_nested": Object { - "aggs": Object { - "by_pipeline_id": Object { - "aggs": Object { - "by_pipeline_hash": Object { - "aggs": Object { - "by_ephemeral_id": Object { - "aggs": Object { - "events_stats": Object { - "stats": Object { - "field": "logstash_stats.pipelines.events.out", - }, - }, - "throughput": Object { - "bucket_script": Object { - "buckets_path": Object { - "max": "events_stats.max", - "min": "events_stats.min", - }, - "script": "params.max - params.min", - }, - }, - }, - "terms": Object { - "field": "logstash_stats.pipelines.ephemeral_id", - "size": 1000, - }, - }, - "throughput": Object { - "sum_bucket": Object { - "buckets_path": "by_ephemeral_id>throughput", - }, - }, - }, - "terms": Object { - "field": "logstash_stats.pipelines.hash", - "size": 1000, - }, - }, - "throughput": Object { - "sum_bucket": Object { - "buckets_path": "by_pipeline_hash>throughput", - }, - }, - }, - "terms": Object { - "field": "logstash_stats.pipelines.id", - "size": 1000, - }, - }, - }, - "nested": Object { - "path": "logstash_stats.pipelines", - }, - }, - }, "derivative": false, "description": "Number of events emitted per second by the Logstash pipeline at the outputs stage.", "field": "logstash_stats.pipelines.events.out", "format": "0,0.[00]", + "getDateHistogramSubAggs": [Function], "label": "Pipeline Throughput", "timestampField": "logstash_stats.timestamp", "units": "e/s", diff --git a/x-pack/legacy/plugins/monitoring/server/lib/metrics/logstash/classes.js b/x-pack/legacy/plugins/monitoring/server/lib/metrics/logstash/classes.js index dade736cd53f8..3af726328aca8 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/metrics/logstash/classes.js +++ b/x-pack/legacy/plugins/monitoring/server/lib/metrics/logstash/classes.js @@ -270,7 +270,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric { derivative: false }); - this.dateHistogramSubAggs = { + this.getDateHistogramSubAggs = ({ pageOfPipelines }) => ({ pipelines_nested: { nested: { path: 'logstash_stats.pipelines' @@ -279,7 +279,8 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric { by_pipeline_id: { terms: { field: 'logstash_stats.pipelines.id', - size: 1000 + size: 1000, + include: pageOfPipelines.map(pipeline => pipeline.id), }, aggs: { throughput: { @@ -290,7 +291,8 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric { by_pipeline_hash: { terms: { field: 'logstash_stats.pipelines.hash', - size: 1000 + size: 1000, + include: pageOfPipelines.map(pipeline => pipeline.hash), }, aggs: { throughput: { @@ -301,7 +303,8 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric { by_ephemeral_id: { terms: { field: 'logstash_stats.pipelines.ephemeral_id', - size: 1000 + size: 1000, + include: pageOfPipelines.map(pipeline => pipeline.ephemeral_id), }, aggs: { events_stats: { @@ -326,7 +329,7 @@ export class LogstashPipelineThroughputMetric extends LogstashMetric { } } } - }; + }); this.calculation = (bucket, _key, _metric, bucketSizeInSeconds) => { const pipelineThroughputs = {}; @@ -353,24 +356,31 @@ export class LogstashPipelineNodeCountMetric extends LogstashMetric { derivative: false }); - this.dateHistogramSubAggs = { - pipelines_nested: { - nested: { - path: 'logstash_stats.pipelines' - }, - aggs: { - by_pipeline_id: { - terms: { - field: 'logstash_stats.pipelines.id', - size: 1000 - }, - aggs: { - to_root: { - reverse_nested: {}, - aggs: { - node_count: { - cardinality: { - field: this.field + this.getDateHistogramSubAggs = ({ pageOfPipelines }) => { + const termAggExtras = {}; + if (pageOfPipelines) { + termAggExtras.include = pageOfPipelines.map(pipeline => pipeline.id); + } + return { + pipelines_nested: { + nested: { + path: 'logstash_stats.pipelines' + }, + aggs: { + by_pipeline_id: { + terms: { + field: 'logstash_stats.pipelines.id', + size: 1000, + ...termAggExtras + }, + aggs: { + to_root: { + reverse_nested: {}, + aggs: { + node_count: { + cardinality: { + field: this.field + } } } } @@ -378,7 +388,7 @@ export class LogstashPipelineNodeCountMetric extends LogstashMetric { } } } - } + }; }; this.calculation = bucket => { diff --git a/x-pack/legacy/plugins/monitoring/server/lib/metrics/logstash/metrics.js b/x-pack/legacy/plugins/monitoring/server/lib/metrics/logstash/metrics.js index a87f79533da3b..6c9bc31585806 100644 --- a/x-pack/legacy/plugins/monitoring/server/lib/metrics/logstash/metrics.js +++ b/x-pack/legacy/plugins/monitoring/server/lib/metrics/logstash/metrics.js @@ -324,7 +324,7 @@ export const metrics = { label: pipelineThroughputLabel, description: pipelineThroughputDescription, format: LARGE_FLOAT, - units: eventsPerSecondUnitLabel + units: eventsPerSecondUnitLabel, }), logstash_node_pipeline_throughput: new LogstashPipelineThroughputMetric({ uuidField: 'logstash_stats.logstash.uuid', // TODO: add comment explaining why diff --git a/x-pack/legacy/plugins/monitoring/server/lib/pagination/filter.js b/x-pack/legacy/plugins/monitoring/server/lib/pagination/filter.js new file mode 100644 index 0000000000000..7cc91d8deeb32 --- /dev/null +++ b/x-pack/legacy/plugins/monitoring/server/lib/pagination/filter.js @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +import { get } from 'lodash'; + +function defaultFilterFn(value, query) { + if (value.toLowerCase().includes(query.toLowerCase())) { + return true; + } + return false; +} + +export function filter(data, queryText, fields, filterFn = defaultFilterFn) { + return data.filter(item => { + for (const field of fields) { + if (filterFn(get(item, field), queryText)) { + return true; + } + } + }); +} diff --git a/x-pack/legacy/plugins/monitoring/server/lib/pagination/paginate.js b/x-pack/legacy/plugins/monitoring/server/lib/pagination/paginate.js new file mode 100644 index 0000000000000..b5e63fb862fe7 --- /dev/null +++ b/x-pack/legacy/plugins/monitoring/server/lib/pagination/paginate.js @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +export function paginate({ size, index }, data) { + const start = index * size; + return data.slice(start, Math.min(data.length, start + size)); +} diff --git a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/index.js b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/index.js index 7f9e6d71621b9..796b5f29cef6c 100644 --- a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/index.js +++ b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/index.js @@ -10,3 +10,4 @@ export { logstashOverviewRoute } from './overview'; export { logstashPipelineRoute } from './pipeline'; export { logstashNodePipelinesRoute } from './pipelines/node_pipelines'; export { logstashClusterPipelinesRoute } from './pipelines/cluster_pipelines'; +export { logstashClusterPipelineIdsRoute } from './pipelines/cluster_pipeline_ids'; diff --git a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipeline_ids.js b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipeline_ids.js new file mode 100644 index 0000000000000..066b89fc325bb --- /dev/null +++ b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipeline_ids.js @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ + +import Joi from 'joi'; +import { handleError } from '../../../../../lib/errors'; +import { prefixIndexPattern } from '../../../../../lib/ccs_utils'; +import { INDEX_PATTERN_LOGSTASH } from '../../../../../../common/constants'; +import { getLogstashPipelineIds } from '../../../../../lib/logstash/get_pipeline_ids'; + +/** + * Retrieve pipelines for a cluster + */ +export function logstashClusterPipelineIdsRoute(server) { + server.route({ + method: 'POST', + path: '/api/monitoring/v1/clusters/{clusterUuid}/logstash/pipeline_ids', + config: { + validate: { + params: Joi.object({ + clusterUuid: Joi.string().required() + }), + payload: Joi.object({ + ccs: Joi.string().optional(), + timeRange: Joi.object({ + min: Joi.date().required(), + max: Joi.date().required() + }).required() + }) + } + }, + handler: async (req) => { + const config = server.config(); + const { ccs } = req.payload; + const clusterUuid = req.params.clusterUuid; + const lsIndexPattern = prefixIndexPattern(config, INDEX_PATTERN_LOGSTASH, ccs); + const size = config.get('xpack.monitoring.max_bucket_size'); + + try { + const pipelines = await getLogstashPipelineIds(req, lsIndexPattern, { clusterUuid }, size); + return pipelines; + } catch (err) { + throw handleError(err, req); + } + } + }); +} diff --git a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipelines.js b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipelines.js index 247e734360dc9..c55f8c19037d5 100644 --- a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipelines.js +++ b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/cluster_pipelines.js @@ -10,6 +10,7 @@ import { getPipelines, processPipelinesAPIResponse } from '../../../../../lib/lo import { handleError } from '../../../../../lib/errors'; import { prefixIndexPattern } from '../../../../../lib/ccs_utils'; import { INDEX_PATTERN_LOGSTASH } from '../../../../../../common/constants'; +import { getPaginatedPipelines } from '../../../../../lib/logstash/get_paginated_pipelines'; /** * Retrieve pipelines for a cluster @@ -28,13 +29,22 @@ export function logstashClusterPipelinesRoute(server) { timeRange: Joi.object({ min: Joi.date().required(), max: Joi.date().required() - }).required() + }).required(), + pagination: Joi.object({ + index: Joi.number().required(), + size: Joi.number().required() + }).required(), + sort: Joi.object({ + field: Joi.string().required(), + direction: Joi.string().required() + }).optional(), + queryText: Joi.string().default('').allow('').optional(), }) } }, handler: async (req) => { const config = server.config(); - const { ccs } = req.payload; + const { ccs, pagination, sort, queryText } = req.payload; const clusterUuid = req.params.clusterUuid; const lsIndexPattern = prefixIndexPattern(config, INDEX_PATTERN_LOGSTASH, ccs); @@ -46,16 +56,40 @@ export function logstashClusterPipelinesRoute(server) { nodesCountMetric ]; + // The client side fields do not match the server side metric names + // so adjust that here. See processPipelinesAPIResponse + const sortMetricSetMap = { + latestThroughput: throughputMetric, + latestNodesCount: nodesCountMetric + }; + if (sort) { + sort.field = sortMetricSetMap[sort.field] || sort.field; + } + + const { pageOfPipelines, totalPipelineCount } = + await getPaginatedPipelines(req, lsIndexPattern, {}, metricSet, pagination, sort, queryText); + + // Just the IDs for the rest + const pipelineIds = pageOfPipelines.map(pipeline => pipeline.id); + + const metricOptions = { + pageOfPipelines, + }; + try { + const pipelineData = await getPipelines(req, lsIndexPattern, pipelineIds, metricSet, metricOptions); const response = await processPipelinesAPIResponse( { - pipelines: await getPipelines(req, lsIndexPattern, metricSet), + pipelines: pipelineData, clusterStatus: await getClusterStatus(req, lsIndexPattern, { clusterUuid }) }, throughputMetric, nodesCountMetric ); - return response; + return { + ...response, + totalPipelineCount + }; } catch (err) { throw handleError(err, req); } diff --git a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/node_pipelines.js b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/node_pipelines.js index faec0791d7c32..84f1626770127 100644 --- a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/node_pipelines.js +++ b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/logstash/pipelines/node_pipelines.js @@ -10,6 +10,7 @@ import { getPipelines, processPipelinesAPIResponse } from '../../../../../lib/lo import { handleError } from '../../../../../lib/errors'; import { prefixIndexPattern } from '../../../../../lib/ccs_utils'; import { INDEX_PATTERN_LOGSTASH } from '../../../../../../common/constants'; +import { getPaginatedPipelines } from '../../../../../lib/logstash/get_paginated_pipelines'; /** * Retrieve pipelines for a node @@ -29,15 +30,25 @@ export function logstashNodePipelinesRoute(server) { timeRange: Joi.object({ min: Joi.date().required(), max: Joi.date().required() - }).required() + }).required(), + pagination: Joi.object({ + index: Joi.number().required(), + size: Joi.number().required() + }).required(), + sort: Joi.object({ + field: Joi.string().required(), + direction: Joi.string().required() + }).optional(), + queryText: Joi.string().default('').allow('').optional(), }) } }, handler: async (req) => { const config = server.config(); - const { ccs } = req.payload; + const { ccs, pagination, sort, queryText } = req.payload; const { clusterUuid, logstashUuid } = req.params; const lsIndexPattern = prefixIndexPattern(config, INDEX_PATTERN_LOGSTASH, ccs); + const throughputMetric = 'logstash_node_pipeline_throughput'; const nodesCountMetric = 'logstash_node_pipeline_nodes_count'; const metricSet = [ @@ -45,16 +56,40 @@ export function logstashNodePipelinesRoute(server) { nodesCountMetric ]; + // The client side fields do not match the server side metric names + // so adjust that here. See processPipelinesAPIResponse + const sortMetricSetMap = { + latestThroughput: throughputMetric, + latestNodesCount: nodesCountMetric + }; + if (sort) { + sort.field = sortMetricSetMap[sort.field] || sort.field; + } + + const { pageOfPipelines, totalPipelineCount } + = await getPaginatedPipelines(req, lsIndexPattern, { clusterUuid, logstashUuid }, metricSet, pagination, sort, queryText); + + // Just the IDs for the rest + const pipelineIds = pageOfPipelines.map(pipeline => pipeline.id); + + const metricOptions = { + pageOfPipelines, + }; + try { + const pipelineData = await getPipelines(req, lsIndexPattern, pipelineIds, metricSet, metricOptions); const response = await processPipelinesAPIResponse( { - pipelines: await getPipelines(req, lsIndexPattern, metricSet), + pipelines: pipelineData, nodeSummary: await getNodeInfo(req, lsIndexPattern, { clusterUuid, logstashUuid }) }, throughputMetric, nodesCountMetric ); - return response; + return { + ...response, + totalPipelineCount + }; } catch (err) { throw handleError(err, req); } diff --git a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/ui.js b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/ui.js index 302cdafc72561..dc0549a283972 100644 --- a/x-pack/legacy/plugins/monitoring/server/routes/api/v1/ui.js +++ b/x-pack/legacy/plugins/monitoring/server/routes/api/v1/ui.js @@ -52,6 +52,7 @@ export { logstashNodeRoute, logstashNodesRoute, logstashOverviewRoute, - logstashPipelineRoute + logstashPipelineRoute, + logstashClusterPipelineIdsRoute } from './logstash'; export * from './setup'; diff --git a/x-pack/test/functional/apps/monitoring/logstash/pipelines.js b/x-pack/test/functional/apps/monitoring/logstash/pipelines.js index 4d2a7dbc68887..fb9fcf8bab8bb 100644 --- a/x-pack/test/functional/apps/monitoring/logstash/pipelines.js +++ b/x-pack/test/functional/apps/monitoring/logstash/pipelines.js @@ -8,6 +8,8 @@ import expect from '@kbn/expect'; import { getLifecycleMethods } from '../_get_lifecycle_methods'; export default function ({ getService, getPageObjects }) { + const PageObjects = getPageObjects(['common']); + const retry = getService('retry'); const overview = getService('monitoringClusterOverview'); const pipelinesList = getService('monitoringLogstashPipelines'); const lsClusterSummaryStatus = getService('monitoringLogstashSummaryStatus'); @@ -43,6 +45,8 @@ export default function ({ getService, getPageObjects }) { const rows = await pipelinesList.getRows(); expect(rows.length).to.be(4); + await pipelinesList.clickIdCol(); + const pipelinesAll = await pipelinesList.getPipelinesAll(); const tableData = [ @@ -85,8 +89,11 @@ export default function ({ getService, getPageObjects }) { it('should filter for specific pipelines', async () => { await pipelinesList.setFilter('la'); - const rows = await pipelinesList.getRows(); - expect(rows.length).to.be(2); + await PageObjects.common.pressEnterKey(); + await retry.try(async () => { + const rows = await pipelinesList.getRows(); + expect(rows.length).to.be(2); + }); await pipelinesList.clearFilter(); }); diff --git a/x-pack/test/functional/page_objects/monitoring_page.js b/x-pack/test/functional/page_objects/monitoring_page.js index 9eef3dff01bba..84c9981a1bcc9 100644 --- a/x-pack/test/functional/page_objects/monitoring_page.js +++ b/x-pack/test/functional/page_objects/monitoring_page.js @@ -50,7 +50,8 @@ export function MonitoringPageProvider({ getPageObjects, getService }) { } async tableSetFilter(subj, text) { - return await testSubjects.setValue(subj, text); + await testSubjects.setValue(subj, text); + await PageObjects.common.pressEnterKey(); } async tableClearFilter(subj) { diff --git a/x-pack/test/functional/services/monitoring/logstash_pipelines.js b/x-pack/test/functional/services/monitoring/logstash_pipelines.js index f2c3fb839f7af..d4d367665ef5b 100644 --- a/x-pack/test/functional/services/monitoring/logstash_pipelines.js +++ b/x-pack/test/functional/services/monitoring/logstash_pipelines.js @@ -18,6 +18,7 @@ export function MonitoringLogstashPipelinesProvider({ getService, getPageObjects const SUBJ_SEARCH_BAR = `${SUBJ_TABLE_CONTAINER} > monitoringTableToolBar`; const SUBJ_TABLE_SORT_EVENTS_EMITTED_RATE_COL = `${SUBJ_TABLE_CONTAINER} > tableHeaderCell_latestThroughput_1`; + const SUBJ_TABLE_SORT_ID_COL = `${SUBJ_TABLE_CONTAINER} > tableHeaderCell_id_0`; const SUBJ_PIPELINES_IDS = `${SUBJ_TABLE_CONTAINER} > id`; const SUBJ_PIPELINES_EVENTS_EMITTED_RATES = `${SUBJ_TABLE_CONTAINER} > eventsEmittedRate`; @@ -53,6 +54,12 @@ export function MonitoringLogstashPipelinesProvider({ getService, getPageObjects }, []); } + async clickIdCol() { + const headerCell = await testSubjects.find(SUBJ_TABLE_SORT_ID_COL); + const button = await headerCell.findByTagName('button'); + return button.click(); + } + async clickEventsEmittedRateCol() { const headerCell = await testSubjects.find(SUBJ_TABLE_SORT_EVENTS_EMITTED_RATE_COL); const button = await headerCell.findByTagName('button');