Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added rabbitmq type #210

Merged
merged 4 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,10 @@ care of lower/uppercases**) : `yaml: a.b --> envvar: A_B` :
`emergency|alert|critical|error|warning|notice|informational|debug or "" (default)`
- **WEBUI_URL** : WebUI URL, if not empty, WebUI output is
_enabled_

- **RABBITMQ_URL**: Rabbitmq URL, if not empty, Rabbitmq output is enabled
- **RABBITMQ_QUEUE**: # Rabbitmq Queue name
- **RABBITMQ_MINIMUMPRIORITY**: "debug" # minimum priority of event for using
this output, order is
#### Slack/Rocketchat/Mattermost/Googlechat Message Formatting

The `SLACK_MESSAGEFORMAT` environment variable and `slack.messageformat` YAML
Expand Down
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ func getConfig() *types.Configuration {
v.SetDefault("Openfaas.MinimumPriority", "")

v.SetDefault("Webui.URL", "")
v.SetDefault("Rabbitmq.URL", "")
v.SetDefault("Rabbitmq.Queue", "")
v.SetDefault("Rabbitmq.MinimumPriority", "")

v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
v.AutomaticEnv()
Expand Down Expand Up @@ -236,6 +239,7 @@ func getConfig() *types.Configuration {
c.Pagerduty.MinimumPriority = checkPriority(c.Pagerduty.MinimumPriority)
c.Kubeless.MinimumPriority = checkPriority(c.Kubeless.MinimumPriority)
c.Openfaas.MinimumPriority = checkPriority(c.Openfaas.MinimumPriority)
c.Rabbitmq.MinimumPriority = checkPriority(c.Rabbitmq.MinimumPriority)

c.Slack.MessageFormatTemplate = getMessageFormatTemplate("Slack", c.Slack.MessageFormat)
c.Rocketchat.MessageFormatTemplate = getMessageFormatTemplate("Rocketchat", c.Rocketchat.MessageFormat)
Expand Down
6 changes: 6 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -176,3 +176,9 @@ openfaas:

webui:
url: "" # WebUI URL, if not empty, WebUI output is enabled

rabbitmq:
url: "" # Rabbitmq URL, if not empty, Rabbitmq output is enabled
queue: "" # Rabbitmq Queue name
minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)

1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/prometheus/client_golang v1.9.0
github.com/segmentio/kafka-go v0.4.10
github.com/spf13/viper v1.7.1
github.com/streadway/amqp v1.0.0 // indirect
github.com/stretchr/testify v1.7.0
golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93
google.golang.org/api v0.40.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,8 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk=
github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg=
github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down
4 changes: 4 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ func forwardEvent(falcopayload types.FalcoPayload) {
go openfaasClient.OpenfaasCall(falcopayload)
}

if config.Rabbitmq.URL != "" && config.Rabbitmq.Queue != "" && (falcopayload.Priority >= types.Priority(config.Openfaas.MinimumPriority) || falcopayload.Rule == testRule) {
go rabbitmqClient.Publish(falcopayload)
}

if config.WebUI.URL != "" {
go webUIClient.WebUIPost(falcopayload)
}
Expand Down
11 changes: 11 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ var (
kubelessClient *outputs.Client
openfaasClient *outputs.Client
webUIClient *outputs.Client
rabbitmqClient *outputs.Client

statsdClient, dogstatsdClient *statsd.Client
config *types.Configuration
Expand Down Expand Up @@ -378,6 +379,16 @@ func init() {
}
}

if config.Rabbitmq.URL != "" && config.Rabbitmq.Queue != "" {
var err error
rabbitmqClient, err = outputs.NewRabbitmqClient(config, stats, promStats, statsdClient, dogstatsdClient)
if err != nil {
config.Rabbitmq.URL = ""
} else {
outputs.EnabledOutputs = append(outputs.EnabledOutputs, "RabbitMQ")
}
}

log.Printf("[INFO] : Enabled Outputs : %s\n", outputs.EnabledOutputs)
}

Expand Down
2 changes: 2 additions & 0 deletions outputs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/streadway/amqp"
"io/ioutil"
"log"
"net/http"
Expand Down Expand Up @@ -62,6 +63,7 @@ type Client struct {
KafkaProducer *kafka.Writer
CloudEventsClient cloudevents.Client
KubernetesClient kubernetes.Interface
RabbitmqClient *amqp.Channel
}

// NewClient returns a new output.Client for accessing the different API.
Expand Down
66 changes: 66 additions & 0 deletions outputs/rabbitmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package outputs

import (
"encoding/json"
"errors"
"log"

"github.com/DataDog/datadog-go/statsd"
"github.com/falcosecurity/falcosidekick/types"
"github.com/streadway/amqp"
)

// NewRabbitmqClient returns a new output.Client for accessing the RabbitmMQ API.
func NewRabbitmqClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics, statsdClient, dogstatsdClient *statsd.Client) (*Client, error) {

var channel *amqp.Channel
if config.Rabbitmq.URL != "" && config.Rabbitmq.Queue != "" {
conn, err := amqp.Dial(config.Rabbitmq.URL)
if err != nil {
log.Printf("[ERROR] : Rabbitmq - %v\n", "Error while connecting rabbitmq")
yindia marked this conversation as resolved.
Show resolved Hide resolved
return nil, errors.New("Error while connecting Rabbitmq")
}
ch, err := conn.Channel()
if err != nil {
log.Printf("[ERROR] : Rabbitmq Channel - %v\n", "Error while creating rabbitmq channel")
return nil, errors.New("Error while creating rabbitmq channel")
}
channel = ch
}

return &Client{
OutputType: "RabbitMQ",
Config: config,
RabbitmqClient: channel,
Stats: stats,
PromStats: promStats,
StatsdClient: statsdClient,
DogstatsdClient: dogstatsdClient,
}, nil
}

// Publish sends a message to a Rabbitmq
func (c *Client) Publish(falcopayload types.FalcoPayload) {
c.Stats.Rabbitmq.Add(Total, 1)

payload, _ := json.Marshal(falcopayload)

err := c.RabbitmqClient.Publish("", c.Config.Rabbitmq.Queue, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: payload,
})

if err != nil {
log.Printf("[ERROR] : RabbitMQ - %v - %v\n", "Error while publishing message", err.Error())
c.Stats.Rabbitmq.Add(Error, 1)
go c.CountMetric("outputs", 1, []string{"output:rabbitmq", "status:error"})
c.PromStats.Outputs.With(map[string]string{"destination": "rabbitmq", "status": Error}).Inc()

return
}

log.Printf("[INFO] : RabbitMQ - Send to message OK \n")
c.Stats.Rabbitmq.Add(OK, 1)
go c.CountMetric("outputs", 1, []string{"output:rabbitmq", "status:ok"})
c.PromStats.Outputs.With(map[string]string{"destination": "rabbitmq", "status": OK}).Inc()
}
1 change: 1 addition & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func getInitStats() *types.Statistics {
Kubeless: getOutputNewMap("kubeless"),
Openfaas: getOutputNewMap("openfaas"),
WebUI: getOutputNewMap("webui"),
Rabbitmq: getOutputNewMap("rabbitmq"),
}
stats.Falco.Add(outputs.Emergency, 0)
stats.Falco.Add(outputs.Alert, 0)
Expand Down
9 changes: 9 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Configuration struct {
Kubeless kubelessConfig
Openfaas openfaasConfig
WebUI WebUIOutputConfig
Rabbitmq RabbitmqConfig
}

// SlackOutputConfig represents parameters for Slack
Expand Down Expand Up @@ -289,6 +290,13 @@ type WebUIOutputConfig struct {
URL string
}

// RabbitmqConfig represents parameters for rabbitmq
type RabbitmqConfig struct {
URL string
Queue string
MinimumPriority string
}

// Statistics is a struct to store stastics
type Statistics struct {
Requests *expvar.Map
Expand Down Expand Up @@ -326,6 +334,7 @@ type Statistics struct {
Kubeless *expvar.Map
Openfaas *expvar.Map
WebUI *expvar.Map
Rabbitmq *expvar.Map
}

// PromStatistics is a struct to store prometheus metrics
Expand Down