Skip to content

Commit

Permalink
[Search] Add shard delay aggregation (elastic#77423)
Browse files Browse the repository at this point in the history
* [Search] [WIP] Add shard delay aggregation

* Add expression functions

* Register function

* Fix test

* Add comment

Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
  • Loading branch information
lukasolson and elasticmachine committed Sep 29, 2020
1 parent 57d76ed commit c347dfd
Show file tree
Hide file tree
Showing 10 changed files with 355 additions and 9 deletions.
76 changes: 76 additions & 0 deletions src/plugins/data/common/search/aggs/buckets/shard_delay.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { AggConfigs } from '../agg_configs';
import { FieldFormatsGetConfigFn, NumberFormat } from '../../../../common/field_formats';
import { getShardDelayBucketAgg, SHARD_DELAY_AGG_NAME } from './shard_delay';

describe('Shard Delay Agg', () => {
const getConfig = (() => {}) as FieldFormatsGetConfigFn;
const getAggConfigs = () => {
const field = { name: 'bytes' };

const indexPattern = {
id: '1234',
title: 'logstash-*',
fields: {
getByName: () => field,
filter: () => [field],
},
getFormatterForField: () =>
new NumberFormat(
{
pattern: '0,0.[000] b',
},
getConfig
),
} as any;

return new AggConfigs(
indexPattern,
[
{
type: SHARD_DELAY_AGG_NAME,
params: {
duration: 1000,
},
},
],
{
typesRegistry: {
get: getShardDelayBucketAgg,
} as any,
}
);
};

describe('write', () => {
test('writes the delay as the value parameter', () => {
const aggConfigs = getAggConfigs();
const agg = aggConfigs.aggs[0];
expect(agg.write(aggConfigs)).toMatchInlineSnapshot(`
Object {
"params": Object {
"value": "5s",
},
}
`);
});
});
});
50 changes: 50 additions & 0 deletions src/plugins/data/common/search/aggs/buckets/shard_delay.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { BucketAggType } from './bucket_agg_type';
import { BaseAggParams } from '../types';
import { aggShardDelayFnName } from './shard_delay_fn';

export const SHARD_DELAY_AGG_NAME = 'shard_delay';

export interface AggParamsShardDelay extends BaseAggParams {
delay?: number;
}

export const getShardDelayBucketAgg = () =>
new BucketAggType({
name: SHARD_DELAY_AGG_NAME,
title: 'Shard Delay',
expressionName: aggShardDelayFnName,
createFilter: () => ({ match_all: {} }),
customLabels: false,
params: [
{
name: 'delay',
type: 'string',
default: '5s',
write(aggConfig, output) {
output.params = {
...output.params,
value: aggConfig.params.delay,
};
},
},
],
});
65 changes: 65 additions & 0 deletions src/plugins/data/common/search/aggs/buckets/shard_delay_fn.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { functionWrapper } from '../test_helpers';
import { aggShardDelay } from './shard_delay_fn';

describe('agg_expression_functions', () => {
describe('aggShardDelay', () => {
const fn = functionWrapper(aggShardDelay());

test('correctly serializes', () => {
const actual = fn({
delay: 1000,
});
expect(actual).toMatchInlineSnapshot(`
Object {
"type": "agg_type",
"value": Object {
"enabled": true,
"id": undefined,
"params": Object {
"customLabel": undefined,
"delay": 1000,
"json": undefined,
},
"schema": undefined,
"type": "shard_delay",
},
}
`);
});

test('correctly parses json string argument', () => {
const actual = fn({
delay: 1000,
json: '{ "foo": true }',
});

expect(actual.value.params.json).toEqual({ foo: true });

expect(() => {
fn({
delay: 1000,
json: '/// intentionally malformed json ///',
});
}).toThrowErrorMatchingInlineSnapshot(`"Unable to parse json argument string"`);
});
});
});
105 changes: 105 additions & 0 deletions src/plugins/data/common/search/aggs/buckets/shard_delay_fn.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import { i18n } from '@kbn/i18n';
import { Assign } from '@kbn/utility-types';
import { ExpressionFunctionDefinition } from 'src/plugins/expressions/common';
import { AggExpressionType, AggConfigSerialized } from '../';
import { getParsedValue } from '../utils/get_parsed_value';
import { AggParamsShardDelay, SHARD_DELAY_AGG_NAME } from './shard_delay';

export const aggShardDelayFnName = 'aggShardDelay';

type Input = any;
type AggArgs = AggParamsShardDelay & Pick<AggConfigSerialized, 'id' | 'enabled' | 'schema'>;

type Arguments = Assign<AggArgs, { delay?: number }>;

type Output = AggExpressionType;
type FunctionDefinition = ExpressionFunctionDefinition<
typeof aggShardDelayFnName,
Input,
Arguments,
Output
>;

export const aggShardDelay = (): FunctionDefinition => ({
name: aggShardDelayFnName,
help: i18n.translate('data.search.aggs.function.buckets.shardDelay.help', {
defaultMessage: 'Generates a serialized agg config for a Shard Delay agg',
}),
type: 'agg_type',
args: {
id: {
types: ['string'],
help: i18n.translate('data.search.aggs.buckets.shardDelay.id.help', {
defaultMessage: 'ID for this aggregation',
}),
},
enabled: {
types: ['boolean'],
default: true,
help: i18n.translate('data.search.aggs.buckets.shardDelay.enabled.help', {
defaultMessage: 'Specifies whether this aggregation should be enabled',
}),
},
schema: {
types: ['string'],
help: i18n.translate('data.search.aggs.buckets.shardDelay.schema.help', {
defaultMessage: 'Schema to use for this aggregation',
}),
},
delay: {
types: ['number'],
help: i18n.translate('data.search.aggs.buckets.shardDelay.delay.help', {
defaultMessage: 'Delay in ms between shards to process.',
}),
},
json: {
types: ['string'],
help: i18n.translate('data.search.aggs.buckets.shardDelay.json.help', {
defaultMessage: 'Advanced json to include when the agg is sent to Elasticsearch',
}),
},
customLabel: {
types: ['string'],
help: i18n.translate('data.search.aggs.buckets.shardDelay.customLabel.help', {
defaultMessage: 'Represents a custom label for this aggregation',
}),
},
},
fn: (input, args) => {
const { id, enabled, schema, ...rest } = args;

return {
type: 'agg_type',
value: {
id,
enabled,
schema,
type: SHARD_DELAY_AGG_NAME,
params: {
...rest,
json: getParsedValue(args, 'json'),
delay: getParsedValue(args, 'delay'),
},
},
};
},
});
10 changes: 10 additions & 0 deletions src/plugins/data/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,16 @@ export const configSchema = schema.object({
enabled: schema.boolean({ defaultValue: true }),
}),
}),
search: schema.object({
aggs: schema.object({
shardDelay: schema.object({
// Whether or not to register the shard_delay (which is only available in snapshot versions
// of Elasticsearch) agg type/expression function to make it available in the UI for either
// functional or manual testing
enabled: schema.boolean({ defaultValue: false }),
}),
}),
}),
});

export type ConfigSchema = TypeOf<typeof configSchema>;
2 changes: 1 addition & 1 deletion src/plugins/data/public/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ export class DataPublicPlugin
private readonly storage: IStorageWrapper;

constructor(initializerContext: PluginInitializerContext<ConfigSchema>) {
this.searchService = new SearchService();
this.searchService = new SearchService(initializerContext);
this.queryService = new QueryService();
this.fieldFormatsService = new FieldFormatsService();
this.autocomplete = new AutocompleteService(initializerContext);
Expand Down
6 changes: 5 additions & 1 deletion src/plugins/data/public/search/search_service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ describe('Search service', () => {
let searchService: SearchService;
let mockCoreSetup: MockedKeys<CoreSetup>;
let mockCoreStart: MockedKeys<CoreStart>;
const initializerContext = coreMock.createPluginInitializerContext();
initializerContext.config.get = jest.fn().mockReturnValue({
search: { aggs: { shardDelay: { enabled: false } } },
});

beforeEach(() => {
searchService = new SearchService();
mockCoreSetup = coreMock.createSetup();
mockCoreStart = coreMock.createStart();
searchService = new SearchService(initializerContext);
});

describe('setup()', () => {
Expand Down
25 changes: 20 additions & 5 deletions src/plugins/data/public/search/search_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

import { Plugin, CoreSetup, CoreStart } from 'src/core/public';
import { Plugin, CoreSetup, CoreStart, PluginInitializerContext } from 'src/core/public';
import { BehaviorSubject } from 'rxjs';
import { ISearchSetup, ISearchStart, SearchEnhancements } from './types';

Expand All @@ -36,6 +36,12 @@ import { SearchUsageCollector, createUsageCollector } from './collectors';
import { UsageCollectionSetup } from '../../../usage_collection/public';
import { esdsl, esRawResponse } from './expressions';
import { ExpressionsSetup } from '../../../expressions/public';
import { ConfigSchema } from '../../config';
import {
SHARD_DELAY_AGG_NAME,
getShardDelayBucketAgg,
} from '../../common/search/aggs/buckets/shard_delay';
import { aggShardDelay } from '../../common/search/aggs/buckets/shard_delay_fn';

/** @internal */
export interface SearchServiceSetupDependencies {
Expand All @@ -54,6 +60,8 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
private searchInterceptor!: ISearchInterceptor;
private usageCollector?: SearchUsageCollector;

constructor(private initializerContext: PluginInitializerContext<ConfigSchema>) {}

public setup(
{ http, getStartServices, notifications, uiSettings }: CoreSetup,
{ expressions, usageCollection }: SearchServiceSetupDependencies
Expand All @@ -75,11 +83,18 @@ export class SearchService implements Plugin<ISearchSetup, ISearchStart> {
expressions.registerFunction(esdsl);
expressions.registerType(esRawResponse);

const aggs = this.aggsService.setup({
registerFunction: expressions.registerFunction,
uiSettings,
});

if (this.initializerContext.config.get().search.aggs.shardDelay.enabled) {
aggs.types.registerBucket(SHARD_DELAY_AGG_NAME, getShardDelayBucketAgg);
expressions.registerFunction(aggShardDelay);
}

return {
aggs: this.aggsService.setup({
registerFunction: expressions.registerFunction,
uiSettings,
}),
aggs,
usageCollector: this.usageCollector!,
__enhance: (enhancements: SearchEnhancements) => {
this.searchInterceptor = enhancements.searchInterceptor;
Expand Down
Loading

0 comments on commit c347dfd

Please sign in to comment.