Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds workflows model and workflow actions for Alerting Plugin #439

Merged
merged 2 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,18 @@ import org.opensearch.commons.alerting.action.AcknowledgeAlertResponse
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.DeleteMonitorRequest
import org.opensearch.commons.alerting.action.DeleteMonitorResponse
import org.opensearch.commons.alerting.action.DeleteWorkflowRequest
import org.opensearch.commons.alerting.action.DeleteWorkflowResponse
import org.opensearch.commons.alerting.action.GetAlertsRequest
import org.opensearch.commons.alerting.action.GetAlertsResponse
import org.opensearch.commons.alerting.action.GetFindingsRequest
import org.opensearch.commons.alerting.action.GetFindingsResponse
import org.opensearch.commons.alerting.action.GetWorkflowRequest
import org.opensearch.commons.alerting.action.GetWorkflowResponse
import org.opensearch.commons.alerting.action.IndexMonitorRequest
import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.action.IndexWorkflowRequest
import org.opensearch.commons.alerting.action.IndexWorkflowResponse
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.notifications.action.BaseResponse
Expand Down Expand Up @@ -55,6 +61,7 @@ object AlertingPluginInterface {
}
)
}

fun deleteMonitor(
client: NodeClient,
request: DeleteMonitorRequest,
Expand All @@ -73,6 +80,49 @@ object AlertingPluginInterface {
)
}

/**
* Index monitor interface.
* @param client Node client for making transport action
* @param request The request object
* @param namedWriteableRegistry Registry for building aggregations
* @param listener The listener for getting response
*/
fun indexWorkflow(
client: NodeClient,
request: IndexWorkflowRequest,
listener: ActionListener<IndexWorkflowResponse>
) {
client.execute(
AlertingActions.INDEX_WORKFLOW_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
IndexWorkflowResponse(
it
)
}
}
)
}

fun deleteWorkflow(
client: NodeClient,
request: DeleteWorkflowRequest,
listener: ActionListener<DeleteWorkflowResponse>
) {
client.execute(
AlertingActions.DELETE_WORKFLOW_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
DeleteWorkflowResponse(
it
)
}
}
)
}

/**
* Get Alerts interface.
* @param client Node client for making transport action
Expand All @@ -97,6 +147,30 @@ object AlertingPluginInterface {
)
}

/**
* Get Workflow interface.
* @param client Node client for making transport action
* @param request The request object
* @param listener The listener for getting response
*/
fun getWorkflow(
client: NodeClient,
request: GetWorkflowRequest,
listener: ActionListener<GetWorkflowResponse>
) {
client.execute(
AlertingActions.GET_WORKFLOW_ACTION_TYPE,
request,
wrapActionListener(listener) { response ->
recreateObject(response) {
GetWorkflowResponse(
it
)
}
}
)
}

/**
* Get Findings interface.
* @param client Node client for making transport action
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import org.opensearch.action.ActionType

object AlertingActions {
const val INDEX_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/write"
const val INDEX_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/write"
const val GET_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/get"
const val GET_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/get"
const val DELETE_MONITOR_ACTION_NAME = "cluster:admin/opendistro/alerting/monitor/delete"
const val DELETE_WORKFLOW_ACTION_NAME = "cluster:admin/opensearch/alerting/workflow/delete"
const val GET_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/get"
const val ACKNOWLEDGE_ALERTS_ACTION_NAME = "cluster:admin/opendistro/alerting/alerts/ack"
const val SUBSCRIBE_FINDINGS_ACTION_NAME = "cluster:admin/opensearch/alerting/findings/subscribe"
Expand All @@ -18,12 +21,22 @@ object AlertingActions {
val INDEX_MONITOR_ACTION_TYPE =
ActionType(INDEX_MONITOR_ACTION_NAME, ::IndexMonitorResponse)
@JvmField
val INDEX_WORKFLOW_ACTION_TYPE =
ActionType(INDEX_WORKFLOW_ACTION_NAME, ::IndexWorkflowResponse)
@JvmField
val GET_ALERTS_ACTION_TYPE =
ActionType(GET_ALERTS_ACTION_NAME, ::GetAlertsResponse)
@JvmField
val GET_WORKFLOW_ACTION_TYPE =
ActionType(GET_WORKFLOW_ACTION_NAME, ::GetWorkflowResponse)

@JvmField
val DELETE_MONITOR_ACTION_TYPE =
ActionType(DELETE_MONITOR_ACTION_NAME, ::DeleteMonitorResponse)
@JvmField
val DELETE_WORKFLOW_ACTION_TYPE =
ActionType(DELETE_WORKFLOW_ACTION_NAME, ::DeleteWorkflowResponse)
@JvmField
val GET_FINDINGS_ACTION_TYPE =
ActionType(GET_FINDINGS_ACTION_NAME, ::GetFindingsResponse)
@JvmField
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import java.io.IOException

class DeleteWorkflowRequest : ActionRequest {

val workflowId: String
/**
* Flag that indicates whether the delegate monitors should be deleted or not.
* If the flag is set to true, Delegate monitors will be deleted only in the case when they are part of the specified workflow and no other.
*/
val deleteDelegateMonitors: Boolean?

constructor(workflowId: String, deleteDelegateMonitors: Boolean?) : super() {
this.workflowId = workflowId
this.deleteDelegateMonitors = deleteDelegateMonitors
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
workflowId = sin.readString(),
deleteDelegateMonitors = sin.readOptionalBoolean()
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(workflowId)
out.writeOptionalBoolean(deleteDelegateMonitors)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.opensearch.commons.alerting.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.commons.alerting.util.IndexUtils
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder

class DeleteWorkflowResponse : BaseResponse {
var id: String
var version: Long
var nonDeletedMonitors: List<String>? = null

constructor(
id: String,
version: Long,
nonDeletedMonitors: List<String>? = null
) : super() {
this.id = id
this.version = version
this.nonDeletedMonitors = nonDeletedMonitors
}

constructor(sin: StreamInput) : this(
sin.readString(), // id
sin.readLong(), // version
sin.readOptionalStringList()
)

override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeLong(version)
out.writeOptionalStringCollection(nonDeletedMonitors)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(IndexUtils._ID, id)
.field(IndexUtils._VERSION, version)
.field(NON_DELETED_MONITORS, nonDeletedMonitors)
.endObject()
}

companion object {
const val NON_DELETED_MONITORS = "NON_DELETED_MONITORS"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.rest.RestRequest
import java.io.IOException

class GetWorkflowRequest : ActionRequest {
val workflowId: String
val method: RestRequest.Method

constructor(
workflowId: String,
method: RestRequest.Method
) : super() {
this.workflowId = workflowId
this.method = method
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(), // workflowId
sin.readEnum(RestRequest.Method::class.java) // method
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(workflowId)
out.writeEnum(method)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.commons.alerting.action

import org.opensearch.common.io.stream.StreamInput
import org.opensearch.common.io.stream.StreamOutput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.util.IndexUtils.Companion._ID
import org.opensearch.commons.alerting.util.IndexUtils.Companion._PRIMARY_TERM
import org.opensearch.commons.alerting.util.IndexUtils.Companion._SEQ_NO
import org.opensearch.commons.alerting.util.IndexUtils.Companion._VERSION
import org.opensearch.commons.notifications.action.BaseResponse
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.rest.RestStatus
import java.io.IOException

class GetWorkflowResponse : BaseResponse {
var id: String
var version: Long
var seqNo: Long
var primaryTerm: Long
private var status: RestStatus
var workflow: Workflow?

constructor(
id: String,
version: Long,
seqNo: Long,
primaryTerm: Long,
status: RestStatus,
workflow: Workflow?
) : super() {
this.id = id
this.version = version
this.seqNo = seqNo
this.primaryTerm = primaryTerm
this.status = status
this.workflow = workflow
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(), // id
sin.readLong(), // version
sin.readLong(), // seqNo
sin.readLong(), // primaryTerm
sin.readEnum(RestStatus::class.java), // RestStatus
if (sin.readBoolean()) {
Workflow.readFrom(sin) // monitor
} else null
)

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(id)
out.writeLong(version)
out.writeLong(seqNo)
out.writeLong(primaryTerm)
out.writeEnum(status)
if (workflow != null) {
out.writeBoolean(true)
workflow?.writeTo(out)
} else {
out.writeBoolean(false)
}
}

@Throws(IOException::class)
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(_ID, id)
.field(_VERSION, version)
.field(_SEQ_NO, seqNo)
.field(_PRIMARY_TERM, primaryTerm)
if (workflow != null)
builder.field("workflow", workflow)

return builder.endObject()
}

override fun getStatus(): RestStatus {
return this.status
}
}
Loading