Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding CloudEvents output option. #169

Merged
merged 1 commit into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ falcosidekick
.idea
*.swp
/hack/tools/bin/*

tmp/
556 changes: 355 additions & 201 deletions README.md

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func getConfig() *types.Configuration {
c := &types.Configuration{
Customfields: make(map[string]string),
Webhook: types.WebhookOutputConfig{CustomHeaders: make(map[string]string)},
CloudEvents: types.CloudEventsOutputConfig{Extensions: make(map[string]string)},
}

configFile := kingpin.Flag("config-file", "config file").Short('c').ExistingFile()
Expand Down Expand Up @@ -112,6 +113,8 @@ func getConfig() *types.Configuration {
v.SetDefault("Customfields", map[string]string{})
v.SetDefault("Webhook.Address", "")
v.SetDefault("Webhook.MinimumPriority", "")
v.SetDefault("CloudEvents.Address", "")
v.SetDefault("CloudEvents.MinimumPriority", "")
v.SetDefault("Azure.eventHub.Namespace", "")
v.SetDefault("Azure.eventHub.Name", "")
v.SetDefault("Azure.eventHub.MinimumPriority", "")
Expand Down Expand Up @@ -155,6 +158,7 @@ func getConfig() *types.Configuration {

v.GetStringMapString("customfields")
v.GetStringMapString("Webhook.CustomHeaders")
v.GetStringMapString("CloudEvents.Extensions")
v.Unmarshal(c)

if value, present := os.LookupEnv("CUSTOMFIELDS"); present {
Expand All @@ -177,6 +181,16 @@ func getConfig() *types.Configuration {
}
}

if value, present := os.LookupEnv("CLOUDEVENTS_EXTENSIONS"); present {
customfields := strings.Split(value, ",")
for _, label := range customfields {
tagkeys := strings.Split(label, ":")
if len(tagkeys) == 2 {
c.CloudEvents.Extensions[tagkeys[0]] = tagkeys[1]
}
}
}

if c.ListenPort == 0 || c.ListenPort > 65536 {
log.Fatalf("[ERROR] : Bad port number\n")
}
Expand All @@ -198,6 +212,7 @@ func getConfig() *types.Configuration {
c.AWS.CloudWatchLogs.MinimumPriority = checkPriority(c.AWS.CloudWatchLogs.MinimumPriority)
c.Opsgenie.MinimumPriority = checkPriority(c.Opsgenie.MinimumPriority)
c.Webhook.MinimumPriority = checkPriority(c.Webhook.MinimumPriority)
c.CloudEvents.MinimumPriority = checkPriority(c.CloudEvents.MinimumPriority)
c.Azure.EventHub.MinimumPriority = checkPriority(c.Azure.EventHub.MinimumPriority)
c.GCP.PubSub.MinimumPriority = checkPriority(c.GCP.PubSub.MinimumPriority)
c.Googlechat.MinimumPriority = checkPriority(c.Googlechat.MinimumPriority)
Expand Down
6 changes: 6 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,12 @@ webhook:
# key: value
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)

cloudevents:
# address: "" # CloudEvents consumer http address, if not empty, CloudEvents output is enabled
# extensions: # Extensions to add in the outbound Event, useful for routing
# key: value
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)

azure:
eventHub:
name: "" # Name of the Hub, if not empty, EventHub is enabled
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21
github.com/emersion/go-smtp v0.14.0
github.com/gkarthiks/k8s-discovery v0.19.0
github.com/cloudevents/sdk-go/v2 v2.3.1
github.com/google/uuid v1.1.2
github.com/imdario/mergo v0.3.7 // indirect
github.com/nats-io/nats-streaming-server v0.19.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ func forwardEvent(falcopayload types.FalcoPayload) {
go webhookClient.WebhookPost(falcopayload)
}

if config.CloudEvents.Address != "" && (falcopayload.Priority >= types.Priority(config.CloudEvents.MinimumPriority) || falcopayload.Rule == TestRule) {
go cloudeventsClient.CloudEventsSend(falcopayload)
}

if config.Azure.EventHub.Name != "" && (falcopayload.Priority >= types.Priority(config.Azure.EventHub.MinimumPriority) || falcopayload.Rule == TestRule) {
go azureClient.EventHubPost(falcopayload)
}
Expand Down
11 changes: 11 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
smtpClient *outputs.Client
opsgenieClient *outputs.Client
webhookClient *outputs.Client
cloudeventsClient *outputs.Client
azureClient *outputs.Client
gcpClient *outputs.Client
googleChatClient *outputs.Client
Expand Down Expand Up @@ -271,6 +272,16 @@ func init() {
}
}

if config.CloudEvents.Address != "" {
var err error
cloudeventsClient, err = outputs.NewClient("CloudEvents", config.CloudEvents.Address, config, stats, promStats, statsdClient, dogstatsdClient)
if err != nil {
config.CloudEvents.Address = ""
} else {
enabledOutputsText += "CloudEvents "
}
}

if config.Azure.EventHub.Name != "" {
var err error
azureClient, err = outputs.NewEventHubClient(config, stats, promStats, statsdClient, dogstatsdClient)
Expand Down
26 changes: 14 additions & 12 deletions outputs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/DataDog/datadog-go/statsd"
"github.com/PagerDuty/go-pagerduty"
"github.com/aws/aws-sdk-go/aws/session"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
"github.com/segmentio/kafka-go"
"k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -47,18 +48,19 @@ var ErrClientCreation = errors.New("Client creation Error")

// Client communicates with the different API.
type Client struct {
OutputType string
EndpointURL *url.URL
Config *types.Configuration
Stats *types.Statistics
PromStats *types.PromStatistics
AWSSession *session.Session
StatsdClient *statsd.Client
DogstatsdClient *statsd.Client
GCPTopicClient *pubsub.Topic
KafkaProducer *kafka.Conn
PagerdutyClient *pagerduty.Client
KubernetesClient kubernetes.Interface
OutputType string
EndpointURL *url.URL
Config *types.Configuration
Stats *types.Statistics
PromStats *types.PromStatistics
AWSSession *session.Session
StatsdClient *statsd.Client
DogstatsdClient *statsd.Client
GCPTopicClient *pubsub.Topic
KafkaProducer *kafka.Conn
PagerdutyClient *pagerduty.Client
CloudEventsClient cloudevents.Client
Issif marked this conversation as resolved.
Show resolved Hide resolved
KubernetesClient kubernetes.Interface
}

// NewClient returns a new output.Client for accessing the different API.
Expand Down
56 changes: 56 additions & 0 deletions outputs/cloudevents.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package outputs

import (
"context"
cloudevents "github.com/cloudevents/sdk-go/v2"
"log"

"github.com/falcosecurity/falcosidekick/types"
)

// CloudEventsSend produces a CloudEvent and sends to the CloudEvents consumers.
func (c *Client) CloudEventsSend(falcopayload types.FalcoPayload) {
c.Stats.CloudEvents.Add(Total, 1)

if c.CloudEventsClient == nil {
client, err := cloudevents.NewDefaultClient()
Issif marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
go c.CountMetric(Outputs, 1, []string{"output:cloudevents", "status:error"})
log.Printf("[ERROR] : CloudEvents - NewDefaultClient : %v\n", err)
return
}
c.CloudEventsClient = client
Issif marked this conversation as resolved.
Show resolved Hide resolved
}

ctx := cloudevents.ContextWithTarget(context.Background(), c.EndpointURL.String())

event := cloudevents.NewEvent()
event.SetTime(falcopayload.Time)
event.SetSource("falco.org") // TODO: this should have some info on the falco server that made the event.
Issif marked this conversation as resolved.
Show resolved Hide resolved
event.SetType("falco.rule.output.v1")
event.SetExtension("priority", falcopayload.Priority.String())
event.SetExtension("rule", falcopayload.Rule)

// Set Extensions.
for k, v := range c.Config.CloudEvents.Extensions {
event.SetExtension(k, v)
}

if err := event.SetData(cloudevents.ApplicationJSON, falcopayload); err != nil {
log.Printf("[ERROR] : CloudEvents, failed to set data : %v\n", err)
}

if result := c.CloudEventsClient.Send(ctx, event); cloudevents.IsUndelivered(result) {
go c.CountMetric(Outputs, 1, []string{"output:cloudevents", "status:error"})
c.Stats.CloudEvents.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "cloudevents", "status": Error}).Inc()
log.Printf("[ERROR] : CloudEvents - %v\n", result)
return
}

// Setting the success status
go c.CountMetric(Outputs, 1, []string{"output:cloudevents", "status:ok"})
c.Stats.CloudEvents.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "cloudevents", "status": OK}).Inc()
log.Printf("[INFO] : CloudEvents - Send OK\n")
}
1 change: 1 addition & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func getInitStats() *types.Statistics {
Statsd: getOutputNewMap("statsd"),
Dogstatsd: getOutputNewMap("dogstatsd"),
Webhook: getOutputNewMap("webhook"),
CloudEvents: getOutputNewMap("cloudevents"),
AzureEventHub: getOutputNewMap("azureeventhub"),
GCPPubSub: getOutputNewMap("gcppubsub"),
GoogleChat: getOutputNewMap("googlechat"),
Expand Down
9 changes: 9 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Configuration struct {
Statsd statsdOutputConfig
Dogstatsd statsdOutputConfig
Webhook WebhookOutputConfig
CloudEvents CloudEventsOutputConfig
Azure azureConfig
GCP gcpOutputConfig
Googlechat GooglechatConfig
Expand Down Expand Up @@ -200,6 +201,13 @@ type WebhookOutputConfig struct {
MinimumPriority string
}

// CloudEventsOutputConfig represents parameters for CloudEvents
type CloudEventsOutputConfig struct {
Address string
Extensions map[string]string
MinimumPriority string
}

type statsdOutputConfig struct {
Forwarder string
Namespace string
Expand Down Expand Up @@ -291,6 +299,7 @@ type Statistics struct {
GoogleChat *expvar.Map
Kafka *expvar.Map
Pagerduty *expvar.Map
CloudEvents *expvar.Map
Kubeless *expvar.Map
}

Expand Down