From 31acdef1d71f69428f6094591da8ddb98496e4ad Mon Sep 17 00:00:00 2001 From: yuvraj Date: Sat, 13 Mar 2021 14:33:50 +0530 Subject: [PATCH 1/4] wip: Added rabbitmq type Signed-off-by: yuvraj --- README.md | 5 +++- config.go | 4 +++ config_example.yaml | 6 +++++ main.go | 11 ++++++++ outputs/client.go | 2 ++ outputs/rabbitmq.go | 64 +++++++++++++++++++++++++++++++++++++++++++++ stats.go | 1 + types/types.go | 9 +++++++ 8 files changed, 101 insertions(+), 1 deletion(-) create mode 100644 outputs/rabbitmq.go diff --git a/README.md b/README.md index fe3b6ae70..64be85925 100644 --- a/README.md +++ b/README.md @@ -577,7 +577,10 @@ care of lower/uppercases**) : `yaml: a.b --> envvar: A_B` : `emergency|alert|critical|error|warning|notice|informational|debug or "" (default)` - **WEBUI_URL** : WebUI URL, if not empty, WebUI output is _enabled_ - +- **RABBITMQ_URL**: Rabbitmq URL, if not empty, Rabbitmq output is enabled +- **RABBITMQ_QUEUE**: # Rabbitmq Queue name +- **RABBITMQ_MINIMUMPRIORITY**: "debug" # minimum priority of event for using + this output, order is #### Slack/Rocketchat/Mattermost/Googlechat Message Formatting The `SLACK_MESSAGEFORMAT` environment variable and `slack.messageformat` YAML diff --git a/config.go b/config.go index 9c118a2d0..00a618fa8 100644 --- a/config.go +++ b/config.go @@ -150,6 +150,9 @@ func getConfig() *types.Configuration { v.SetDefault("Openfaas.MinimumPriority", "") v.SetDefault("Webui.URL", "") + v.SetDefault("Rabbitmq.MinimumPriority", "") + v.SetDefault("Rabbitmq.URL", "") + v.SetDefault("Rabbitmq.Queue", "") v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) v.AutomaticEnv() @@ -236,6 +239,7 @@ func getConfig() *types.Configuration { c.Pagerduty.MinimumPriority = checkPriority(c.Pagerduty.MinimumPriority) c.Kubeless.MinimumPriority = checkPriority(c.Kubeless.MinimumPriority) c.Openfaas.MinimumPriority = checkPriority(c.Openfaas.MinimumPriority) + c.Rabbitmq.MinimumPriority = checkPriority(c.Rabbitmq.MinimumPriority) c.Slack.MessageFormatTemplate = getMessageFormatTemplate("Slack", c.Slack.MessageFormat) c.Rocketchat.MessageFormatTemplate = getMessageFormatTemplate("Rocketchat", c.Rocketchat.MessageFormat) diff --git a/config_example.yaml b/config_example.yaml index 1f05a4a94..8a14c57cb 100644 --- a/config_example.yaml +++ b/config_example.yaml @@ -176,3 +176,9 @@ openfaas: webui: url: "" # WebUI URL, if not empty, WebUI output is enabled + +rabbitmq: + url: "" # Rabbitmq URL, if not empty, Rabbitmq output is enabled + queue: "" # Rabbitmq Queue name + minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) + diff --git a/main.go b/main.go index a4ca29e5e..8d039263e 100644 --- a/main.go +++ b/main.go @@ -42,6 +42,7 @@ var ( kubelessClient *outputs.Client openfaasClient *outputs.Client webUIClient *outputs.Client + rabbitmqClient *outputs.Client statsdClient, dogstatsdClient *statsd.Client config *types.Configuration @@ -378,6 +379,16 @@ func init() { } } + if config.Rabbitmq.URL != "" && config.Rabbitmq.Queue != "" { + var err error + rabbitmqClient, err = outputs.NewRabbitmqClient(config, stats, promStats, statsdClient, dogstatsdClient) + if err != nil { + config.Rabbitmq.URL = "" + } else { + outputs.EnabledOutputs = append(outputs.EnabledOutputs, "Rabbitmq") + } + } + log.Printf("[INFO] : Enabled Outputs : %s\n", outputs.EnabledOutputs) } diff --git a/outputs/client.go b/outputs/client.go index 57538283a..5da1eca0e 100644 --- a/outputs/client.go +++ b/outputs/client.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/streadway/amqp" "io/ioutil" "log" "net/http" @@ -62,6 +63,7 @@ type Client struct { KafkaProducer *kafka.Writer CloudEventsClient cloudevents.Client KubernetesClient kubernetes.Interface + RabbitmqClient *amqp.Channel } // NewClient returns a new output.Client for accessing the different API. diff --git a/outputs/rabbitmq.go b/outputs/rabbitmq.go new file mode 100644 index 000000000..e4a53351d --- /dev/null +++ b/outputs/rabbitmq.go @@ -0,0 +1,64 @@ +package outputs + +import ( + "errors" + "log" + "encoding/json" + "github.com/streadway/amqp" + "github.com/falcosecurity/falcosidekick/types" +) + +// NewRabbitmqClient returns a new output.Client for accessing the Rabbitmqs API. +func NewRabbitmqClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics, statsdClient, dogstatsdClient *statsd.Client) (*Client, error) { + + var channel *amqp.Channel + if config.Rabbitmq.URL != "" && config.Rabbitmq.Queue != "" { + conn, err := amqp.Dial(config.Rabbitmq.URL) + if err != nil { + log.Printf("[ERROR] : Rabbitmq - %v\n", "Error while connecting rabbitmq") + return nil, errors.New("Error while connecting Rabbitmq") + } + ch, err := conn.Channel() + if err != nil { + log.Printf("[ERROR] : Rabbitmq Channel - %v\n", "Error while creating rabbitmq channel") + return nil, errors.New("Error while creating rabbitmq channel") + } + channel = ch + } + + return &Client{ + OutputType: "GCP", + Config: config, + RabbitmqClient: channel, + Stats: stats, + PromStats: promStats, + StatsdClient: statsdClient, + DogstatsdClient: dogstatsdClient, + }, nil +} + +// Publish sends a message to a Rabbitmq +func (c *Client) Publish(falcopayload types.FalcoPayload) { + c.Stats.Rabbitmq.Add(Total, 1) + + payload, _ := json.Marshal(falcopayload) + + err := c.RabbitmqClient.Publish("", c.Config.Rabbitmq.Queue, false,false, amqp.Publishing { + ContentType: "text/plain", + Body: payload, + }) + + if err != nil { + log.Printf("[ERROR] : RabbitMQ - %v - %v\n", "Error while publishing message", err.Error()) + c.Stats.Rabbitmq.Add(Error, 1) + go c.CountMetric("outputs", 1, []string{"output:rabbitmq", "status:error"}) + c.PromStats.Outputs.With(map[string]string{"destination": "rabbitmq", "status": Error}).Inc() + + return + } + + log.Printf("[INFO] : rabbitmq - Send to message OK \n") + c.Stats.Rabbitmq.Add(OK, 1) + go c.CountMetric("outputs", 1, []string{"output:rabbitmq", "status:ok"}) + c.PromStats.Outputs.With(map[string]string{"destination": "rabbitmq", "status": OK}).Inc() +} \ No newline at end of file diff --git a/stats.go b/stats.go index 2791a9d42..21c2f5364 100644 --- a/stats.go +++ b/stats.go @@ -53,6 +53,7 @@ func getInitStats() *types.Statistics { Kubeless: getOutputNewMap("kubeless"), Openfaas: getOutputNewMap("openfaas"), WebUI: getOutputNewMap("webui"), + Rabbitmq: getOutputNewMap("rabbitmq"), } stats.Falco.Add(outputs.Emergency, 0) stats.Falco.Add(outputs.Alert, 0) diff --git a/types/types.go b/types/types.go index 3e84366ae..3b4f66e7e 100644 --- a/types/types.go +++ b/types/types.go @@ -52,6 +52,7 @@ type Configuration struct { Kubeless kubelessConfig Openfaas openfaasConfig WebUI WebUIOutputConfig + Rabbitmq RabbitmqConfig } // SlackOutputConfig represents parameters for Slack @@ -289,6 +290,13 @@ type WebUIOutputConfig struct { URL string } +// RabbitmqConfig represents parameters for rabbitmq +type RabbitmqConfig struct { + URL string + Queue string + MinimumPriority string +} + // Statistics is a struct to store stastics type Statistics struct { Requests *expvar.Map @@ -326,6 +334,7 @@ type Statistics struct { Kubeless *expvar.Map Openfaas *expvar.Map WebUI *expvar.Map + Rabbitmq *expvar.Map } // PromStatistics is a struct to store prometheus metrics From 1a34e3b3ff1b02ce7eceb2cc8da84794ed08af62 Mon Sep 17 00:00:00 2001 From: yuvraj Date: Thu, 25 Mar 2021 22:52:20 +0530 Subject: [PATCH 2/4] wip: small fix Signed-off-by: yuvraj --- config.go | 2 +- main.go | 2 +- outputs/rabbitmq.go | 20 +++++++++++--------- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/config.go b/config.go index 00a618fa8..76c1c1028 100644 --- a/config.go +++ b/config.go @@ -150,9 +150,9 @@ func getConfig() *types.Configuration { v.SetDefault("Openfaas.MinimumPriority", "") v.SetDefault("Webui.URL", "") - v.SetDefault("Rabbitmq.MinimumPriority", "") v.SetDefault("Rabbitmq.URL", "") v.SetDefault("Rabbitmq.Queue", "") + v.SetDefault("Rabbitmq.MinimumPriority", "") v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) v.AutomaticEnv() diff --git a/main.go b/main.go index 8d039263e..500276623 100644 --- a/main.go +++ b/main.go @@ -385,7 +385,7 @@ func init() { if err != nil { config.Rabbitmq.URL = "" } else { - outputs.EnabledOutputs = append(outputs.EnabledOutputs, "Rabbitmq") + outputs.EnabledOutputs = append(outputs.EnabledOutputs, "RabbitMQ") } } diff --git a/outputs/rabbitmq.go b/outputs/rabbitmq.go index e4a53351d..309add403 100644 --- a/outputs/rabbitmq.go +++ b/outputs/rabbitmq.go @@ -1,14 +1,16 @@ package outputs import ( + "encoding/json" "errors" "log" - "encoding/json" - "github.com/streadway/amqp" + + "github.com/DataDog/datadog-go/statsd" "github.com/falcosecurity/falcosidekick/types" + "github.com/streadway/amqp" ) -// NewRabbitmqClient returns a new output.Client for accessing the Rabbitmqs API. +// NewRabbitmqClient returns a new output.Client for accessing the RabbitmMQ API. func NewRabbitmqClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics, statsdClient, dogstatsdClient *statsd.Client) (*Client, error) { var channel *amqp.Channel @@ -27,7 +29,7 @@ func NewRabbitmqClient(config *types.Configuration, stats *types.Statistics, pro } return &Client{ - OutputType: "GCP", + OutputType: "RabbitMQ", Config: config, RabbitmqClient: channel, Stats: stats, @@ -43,9 +45,9 @@ func (c *Client) Publish(falcopayload types.FalcoPayload) { payload, _ := json.Marshal(falcopayload) - err := c.RabbitmqClient.Publish("", c.Config.Rabbitmq.Queue, false,false, amqp.Publishing { - ContentType: "text/plain", - Body: payload, + err := c.RabbitmqClient.Publish("", c.Config.Rabbitmq.Queue, false, false, amqp.Publishing{ + ContentType: "text/plain", + Body: payload, }) if err != nil { @@ -57,8 +59,8 @@ func (c *Client) Publish(falcopayload types.FalcoPayload) { return } - log.Printf("[INFO] : rabbitmq - Send to message OK \n") + log.Printf("[INFO] : RabbitMQ - Send to message OK \n") c.Stats.Rabbitmq.Add(OK, 1) go c.CountMetric("outputs", 1, []string{"output:rabbitmq", "status:ok"}) c.PromStats.Outputs.With(map[string]string{"destination": "rabbitmq", "status": OK}).Inc() -} \ No newline at end of file +} From 04d7e4d4217cb5ccb7a1e6c59001d348b06bd500 Mon Sep 17 00:00:00 2001 From: yuvraj Date: Thu, 25 Mar 2021 23:01:33 +0530 Subject: [PATCH 3/4] wip: fix lint issue Signed-off-by: yuvraj --- go.mod | 1 + go.sum | 2 ++ outputs/client.go | 2 +- types/types.go | 6 +++--- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 4a4d0a23c..fd35a893e 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/prometheus/client_golang v1.9.0 github.com/segmentio/kafka-go v0.4.10 github.com/spf13/viper v1.7.1 + github.com/streadway/amqp v1.0.0 // indirect github.com/stretchr/testify v1.7.0 golang.org/x/oauth2 v0.0.0-20210220000619-9bb904979d93 google.golang.org/api v0.40.0 diff --git a/go.sum b/go.sum index 55b3580e4..964b46ebb 100644 --- a/go.sum +++ b/go.sum @@ -594,6 +594,8 @@ github.com/spf13/viper v1.7.1 h1:pM5oEahlgWv/WnHXpgbKz7iLIxRf65tye2Ci+XFK5sk= github.com/spf13/viper v1.7.1/go.mod h1:8WkrPz2fc9jxqZNCJI/76HCieCp4Q8HaLFoCha5qpdg= github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/streadway/handy v0.0.0-20190108123426-d5acb3125c2a/go.mod h1:qNTQ5P5JnDBl6z3cMAg/SywNDC5ABu5ApDIw6lUbRmI= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/outputs/client.go b/outputs/client.go index 5da1eca0e..a1e1c594c 100644 --- a/outputs/client.go +++ b/outputs/client.go @@ -63,7 +63,7 @@ type Client struct { KafkaProducer *kafka.Writer CloudEventsClient cloudevents.Client KubernetesClient kubernetes.Interface - RabbitmqClient *amqp.Channel + RabbitmqClient *amqp.Channel } // NewClient returns a new output.Client for accessing the different API. diff --git a/types/types.go b/types/types.go index 3b4f66e7e..08da3e74f 100644 --- a/types/types.go +++ b/types/types.go @@ -292,8 +292,8 @@ type WebUIOutputConfig struct { // RabbitmqConfig represents parameters for rabbitmq type RabbitmqConfig struct { - URL string - Queue string + URL string + Queue string MinimumPriority string } @@ -334,7 +334,7 @@ type Statistics struct { Kubeless *expvar.Map Openfaas *expvar.Map WebUI *expvar.Map - Rabbitmq *expvar.Map + Rabbitmq *expvar.Map } // PromStatistics is a struct to store prometheus metrics From d869b439e3f3d32cc0c72291c3a4984fc8374852 Mon Sep 17 00:00:00 2001 From: yuvraj Date: Thu, 25 Mar 2021 23:18:49 +0530 Subject: [PATCH 4/4] tested rabbitmq Signed-off-by: yuvraj --- handlers.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/handlers.go b/handlers.go index 6f2b5e3fa..670975c31 100644 --- a/handlers.go +++ b/handlers.go @@ -229,6 +229,10 @@ func forwardEvent(falcopayload types.FalcoPayload) { go openfaasClient.OpenfaasCall(falcopayload) } + if config.Rabbitmq.URL != "" && config.Rabbitmq.Queue != "" && (falcopayload.Priority >= types.Priority(config.Openfaas.MinimumPriority) || falcopayload.Rule == testRule) { + go rabbitmqClient.Publish(falcopayload) + } + if config.WebUI.URL != "" { go webUIClient.WebUIPost(falcopayload) }