Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integrate Document Level Alerting changes #410

Merged
merged 8 commits into from
Apr 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ configurations.testImplementation {

dependencies {
compileOnly "org.opensearch.plugin:opensearch-scripting-painless-spi:${versions.opensearch}"
api "org.opensearch.plugin:percolator-client:${opensearch_version}"

// OpenSearch Nanny state
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.percolator;

import org.opensearch.common.settings.Setting;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.plugins.ExtensiblePlugin;
import org.opensearch.plugins.MapperPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.plugins.SearchPlugin;
import org.opensearch.search.fetch.FetchSubPhase;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;

/**
* This is a stop-gap solution & will be removed in future.
*/
// TODO for cleanup
public class PercolatorPluginExt extends Plugin implements MapperPlugin, SearchPlugin, ExtensiblePlugin {
@Override
public List<QuerySpec<?>> getQueries() {
return singletonList(new QuerySpec<>(PercolateQueryBuilderExt.NAME, PercolateQueryBuilderExt::new, PercolateQueryBuilderExt::fromXContent));
}

@Override
public List<FetchSubPhase> getFetchSubPhases(FetchPhaseConstructionContext context) {
return Arrays.asList(new PercolatorMatchedSlotSubFetchPhase(), new PercolatorHighlightSubFetchPhase(context.getHighlighters()));
}

@Override
public List<Setting<?>> getSettings() {
return Arrays.asList(PercolatorFieldMapperExt.INDEX_MAP_UNMAPPED_FIELDS_AS_TEXT_SETTING);
}

@Override
public Map<String, Mapper.TypeParser> getMappers() {
return singletonMap(PercolatorFieldMapperExt.CONTENT_TYPE, new PercolatorFieldMapperExt.TypeParser());
}

@Override
public void loadExtensions(ExtensionLoader loader) {
ExtensiblePlugin.super.loadExtensions(loader);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@ import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.model.Trigger
import org.opensearch.alerting.model.action.AlertCategory
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.getBucketKeysHash
Expand Down Expand Up @@ -166,6 +168,29 @@ class AlertService(
}
}

// TODO: clean this up so it follows the proper alert management for doc monitors
fun composeDocLevelAlert(
findings: List<String>,
relatedDocIds: List<String>,
ctx: DocumentLevelTriggerExecutionContext,
result: DocumentLevelTriggerRunResult,
alertError: AlertError?
): Alert {
val currentTime = Instant.now()

val actionExecutionResults = result.actionResults.map {
ActionExecutionResult(it.key, it.value.executionTime, if (it.value.throttled) 1 else 0)
}

val alertState = if (alertError == null) Alert.State.ACTIVE else Alert.State.ERROR
return Alert(
monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
actionExecutionResults = actionExecutionResults, schemaVersion = IndexUtils.alertIndexSchemaVersion,
findingIds = findings, relatedDocIds = relatedDocIds
)
}

fun updateActionResultsForBucketLevelAlert(
currentAlert: Alert,
actionResults: Map<String, ActionRunResult>,
Expand Down Expand Up @@ -295,8 +320,8 @@ class AlertService(
DeleteRequest(AlertIndices.ALERT_INDEX, alert.id)
.routing(alert.monitorId),
// Only add completed alert to history index if history is enabled
if (alertIndices.isHistoryEnabled()) {
IndexRequest(AlertIndices.HISTORY_WRITE_INDEX)
if (alertIndices.isAlertHistoryEnabled()) {
IndexRequest(AlertIndices.ALERT_HISTORY_WRITE_INDEX)
.routing(alert.monitorId)
.source(alert.toXContentWithUser(XContentFactory.jsonBuilder()))
.id(alert.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.opensearch.alerting.action.GetAlertsAction
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetEmailAccountAction
import org.opensearch.alerting.action.GetEmailGroupAction
import org.opensearch.alerting.action.GetFindingsAction
import org.opensearch.alerting.action.GetMonitorAction
import org.opensearch.alerting.action.IndexDestinationAction
import org.opensearch.alerting.action.IndexEmailAccountAction
Expand All @@ -32,13 +33,15 @@ import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.model.ClusterMetricsInput
import org.opensearch.alerting.core.model.DocLevelMonitorInput
import org.opensearch.alerting.core.model.ScheduledJob
import org.opensearch.alerting.core.model.SearchInput
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
Expand All @@ -51,6 +54,7 @@ import org.opensearch.alerting.resthandler.RestGetAlertsAction
import org.opensearch.alerting.resthandler.RestGetDestinationsAction
import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestIndexDestinationAction
import org.opensearch.alerting.resthandler.RestIndexEmailAccountAction
Expand All @@ -74,6 +78,7 @@ import org.opensearch.alerting.transport.TransportGetAlertsAction
import org.opensearch.alerting.transport.TransportGetDestinationsAction
import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportIndexDestinationAction
import org.opensearch.alerting.transport.TransportIndexEmailAccountAction
Expand All @@ -82,6 +87,7 @@ import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
Expand All @@ -102,8 +108,8 @@ import org.opensearch.index.IndexModule
import org.opensearch.painless.spi.PainlessExtension
import org.opensearch.painless.spi.Whitelist
import org.opensearch.painless.spi.WhitelistLoader
import org.opensearch.percolator.PercolatorPluginExt
import org.opensearch.plugins.ActionPlugin
import org.opensearch.plugins.Plugin
import org.opensearch.plugins.ReloadablePlugin
import org.opensearch.plugins.ScriptPlugin
import org.opensearch.plugins.SearchPlugin
Expand All @@ -122,7 +128,7 @@ import java.util.function.Supplier
* It also adds [Monitor.XCONTENT_REGISTRY], [SearchInput.XCONTENT_REGISTRY], [QueryLevelTrigger.XCONTENT_REGISTRY],
* [BucketLevelTrigger.XCONTENT_REGISTRY], [ClusterMetricsInput.XCONTENT_REGISTRY] to the [NamedXContentRegistry] so that we are able to deserialize the custom named objects.
*/
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, Plugin() {
internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, ReloadablePlugin, SearchPlugin, PercolatorPluginExt() {

override fun getContextWhitelists(): Map<ScriptContext<*>, List<Whitelist>> {
val whitelist = WhitelistLoader.loadFromResourceFiles(javaClass, "org.opensearch.alerting.txt")
Expand All @@ -140,13 +146,15 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
@JvmField val EMAIL_GROUP_BASE_URI = "$DESTINATION_BASE_URI/email_groups"
@JvmField val LEGACY_OPENDISTRO_EMAIL_ACCOUNT_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_accounts"
@JvmField val LEGACY_OPENDISTRO_EMAIL_GROUP_BASE_URI = "$LEGACY_OPENDISTRO_DESTINATION_BASE_URI/email_groups"
@JvmField val FINDING_BASE_URI = "/_plugins/_alerting/findings"
@JvmField val ALERTING_JOB_TYPES = listOf("monitor")
}

lateinit var runner: MonitorRunner
lateinit var runner: MonitorRunnerService
lateinit var scheduler: JobScheduler
lateinit var sweeper: JobSweeper
lateinit var scheduledJobIndices: ScheduledJobIndices
lateinit var docLevelMonitorQueries: DocLevelMonitorQueries
lateinit var threadPool: ThreadPool
lateinit var alertIndices: AlertIndices
lateinit var clusterService: ClusterService
Expand Down Expand Up @@ -180,7 +188,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestSearchEmailGroupAction(),
RestGetEmailGroupAction(),
RestGetDestinationsAction(),
RestGetAlertsAction()
RestGetAlertsAction(),
RestGetFindingsAction()
)
}

Expand All @@ -204,17 +213,21 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(SearchEmailGroupAction.INSTANCE, TransportSearchEmailGroupAction::class.java),
ActionPlugin.ActionHandler(DeleteEmailGroupAction.INSTANCE, TransportDeleteEmailGroupAction::class.java),
ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java),
ActionPlugin.ActionHandler(GetAlertsAction.INSTANCE, TransportGetAlertsAction::class.java)
ActionPlugin.ActionHandler(GetAlertsAction.INSTANCE, TransportGetAlertsAction::class.java),
ActionPlugin.ActionHandler(GetFindingsAction.INSTANCE, TransportGetFindingsSearchAction::class.java)

)
}

override fun getNamedXContent(): List<NamedXContentRegistry.Entry> {
return listOf(
Monitor.XCONTENT_REGISTRY,
SearchInput.XCONTENT_REGISTRY,
DocLevelMonitorInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY,
ClusterMetricsInput.XCONTENT_REGISTRY
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY
)
}

Expand All @@ -234,7 +247,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
val settings = environment.settings()
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
runner = MonitorRunner
runner = MonitorRunnerService
.registerClusterService(clusterService)
.registerClient(client)
.registerNamedXContentRegistry(xContentRegistry)
Expand All @@ -248,12 +261,13 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client.admin(), clusterService)
scheduler = JobScheduler(threadPool, runner)
sweeper = JobSweeper(environment.settings(), client, clusterService, threadPool, xContentRegistry, scheduler, ALERTING_JOB_TYPES)
destinationMigrationCoordinator = DestinationMigrationCoordinator(client, clusterService, threadPool)
this.threadPool = threadPool
this.clusterService = clusterService
return listOf(sweeper, scheduler, runner, scheduledJobIndices, destinationMigrationCoordinator)
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
}

override fun getSettings(): List<Setting<*>> {
Expand Down Expand Up @@ -310,7 +324,12 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
LegacyOpenDistroDestinationSettings.EMAIL_USERNAME,
LegacyOpenDistroDestinationSettings.EMAIL_PASSWORD,
LegacyOpenDistroDestinationSettings.ALLOW_LIST,
LegacyOpenDistroDestinationSettings.HOST_DENY_LIST
LegacyOpenDistroDestinationSettings.HOST_DENY_LIST,
AlertingSettings.FINDING_HISTORY_ENABLED,
AlertingSettings.FINDING_HISTORY_MAX_DOCS,
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD
)
}

Expand Down
Loading