Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yandex S3 output #261

Merged
merged 6 commits into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ It works as a single endpoint for as many as you want `Falco` instances :

- [**AWS S3**](https://aws.amazon.com/s3/features/)
- [**GCP Storage**](https://cloud.google.com/storage)
- [**Yandex S3 Storage**](https://cloud.yandex.com/en-ru/services/storage)

### FaaS / Serverless

Expand Down Expand Up @@ -406,6 +407,16 @@ grafana:

webui:
url: "" # WebUI URL, if not empty, WebUI output is enabled

yandex:
# accesskeyid: "" # yandex access key
# secretaccesskey: "" # yandex secret access key
# region: "" # yandex storage region (default: ru-central-1)
s3:
# endpoint: "" yandex storage endpoint (default: https://storage.yandexcloud.net)
# bucket: "falcosidekick" # Yandex storage, bucket name
# prefix: "" # name of prefix, keys will have format: s3://<bucket>/<prefix>/YYYY-MM-DD/YYYY-MM-DDTHH:mm:ss.s+01:00.json
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|erro
```

Usage :
Expand Down Expand Up @@ -755,6 +766,14 @@ care of lower/uppercases**) : `yaml: a.b --> envvar: A_B` :
- **GRAFANA_CHECKCERT**: check if ssl certificate of the output is valid (default: true)
- **GRAFANA_MINIMUMPRIORITY**: minimum priority of event for using this output, order is
`emergency|alert|critical|error|warning|notice|informational|debug or "" (default)`
- **YANDEX_ACCESSKEYID** : Yandex Access Key Id
- **YANDEX_SECRETACCESSKEY** : Yandex Secret Access Key
- **YANDEX_REGION**: Yandex region (default: ru-central-1)
- **YANDEX_S3_ENDPOINT**: Yandex storage endpoint (default: https://storage.yandexcloud.net)
- **YANDEX_S3_BUCKET**: Yandex storage, bucket name
- **YANDEX_S3_PREFIX**: name of prefix, keys will have format: s3://<bucket>/<prefix>/YYYY-MM-DD/YYYY-MM-DDTHH:mm:ss.s+01:00.json
- **YANDEX_S3_MINIMUMPRIORITY**: # minimum priority of event for using this output, order is emergency|alert|critical|erro


#### Slack/Rocketchat/Mattermost/Googlechat Message Formatting

Expand Down
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,14 @@ func getConfig() *types.Configuration {
v.SetDefault("Grafana.MutualTls", false)
v.SetDefault("Grafana.CheckCert", true)

v.SetDefault("Yandex.AccessKeyID", "")
v.SetDefault("Yandex.SecretAccessKey", "")
v.SetDefault("Yandex.Region", "ru-central1")
v.SetDefault("Yandex.S3.Endpoint", "https://storage.yandexcloud.net")
v.SetDefault("Yandex.S3.Bucket", "")
v.SetDefault("Yandex.S3.Prefix", "falco")
v.SetDefault("Yamdex.S3.MinimumPriority", "")

v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
v.AutomaticEnv()
if *configFile != "" {
Expand Down Expand Up @@ -358,6 +366,7 @@ func getConfig() *types.Configuration {
c.Fission.MinimumPriority = checkPriority(c.Fission.MinimumPriority)
c.Rabbitmq.MinimumPriority = checkPriority(c.Rabbitmq.MinimumPriority)
c.Wavefront.MinimumPriority = checkPriority(c.Wavefront.MinimumPriority)
c.Yandex.S3.MinimumPriority = checkPriority(c.Yandex.S3.MinimumPriority)

c.Slack.MessageFormatTemplate = getMessageFormatTemplate("Slack", c.Slack.MessageFormat)
c.Rocketchat.MessageFormatTemplate = getMessageFormatTemplate("Rocketchat", c.Rocketchat.MessageFormat)
Expand Down
4 changes: 4 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,8 @@ func forwardEvent(falcopayload types.FalcoPayload) {
if config.Fission.Function != "" && (falcopayload.Priority >= types.Priority(config.Fission.MinimumPriority) || falcopayload.Rule == testRule) {
go fissionClient.FissionCall(falcopayload)
}

if config.Yandex.S3.Bucket != "" && (falcopayload.Priority >= types.Priority(config.Yandex.S3.MinimumPriority) || falcopayload.Rule == testRule) {
go yandexClient.UploadYandexS3(falcopayload)
}
}
16 changes: 16 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
wavefrontClient *outputs.Client
fissionClient *outputs.Client
grafanaClient *outputs.Client
yandexClient *outputs.Client

statsdClient, dogstatsdClient *statsd.Client
config *types.Configuration
Expand Down Expand Up @@ -450,6 +451,21 @@ func init() {
}
log.Printf("[INFO] : Enabled Outputs : %s\n", outputs.EnabledOutputs)
nar3k marked this conversation as resolved.
Show resolved Hide resolved
}

if config.Yandex.S3.Bucket != "" {
var err error
yandexClient, err = outputs.NewYandexClient(config, stats, promStats, statsdClient, dogstatsdClient)
if err != nil {
config.Yandex.S3.Bucket = ""
log.Printf("[ERROR] : Yandex - %v\n", err)
nar3k marked this conversation as resolved.
Show resolved Hide resolved
} else {
if config.Yandex.S3.Bucket != "" {
outputs.EnabledOutputs = append(outputs.EnabledOutputs, "YandexS3")
}
}
}
log.Printf("[INFO] : Enabled Outputs : %s\n", outputs.EnabledOutputs)

}

func main() {
Expand Down
70 changes: 70 additions & 0 deletions outputs/yandex.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package outputs

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"log"
"time"

"github.com/DataDog/datadog-go/statsd"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/falcosecurity/falcosidekick/types"
)

// NewYandexClient returns a new output.Client for accessing the Yandex API.
func NewYandexClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics, statsdClient, dogstatsdClient *statsd.Client) (*Client, error) {

sess, err := session.NewSession(&aws.Config{
Region: aws.String(config.Yandex.Region),
Endpoint: aws.String(config.Yandex.S3.Endpoint),
Credentials: credentials.NewStaticCredentials(config.Yandex.AccessKeyID, config.Yandex.SecretAccessKey, ""),
})
if err != nil {
log.Printf("[ERROR] : Yandex - %v\n", "Error while creating Yandex Session")
return nil, errors.New("Error while creating Yandex Session")
}
log.Printf("[INFO] : Yandex Session has been configured successfully")

return &Client{
OutputType: "Yandex",
Config: config,
AWSSession: sess,
Stats: stats,
PromStats: promStats,
StatsdClient: statsdClient,
DogstatsdClient: dogstatsdClient,
}, nil
}

// UploadYandexS3 uploads payload to Yandex S3
func (c *Client) UploadYandexS3(falcopayload types.FalcoPayload) {
f, _ := json.Marshal(falcopayload)
prefix := ""
t := time.Now()
if c.Config.Yandex.S3.Prefix != "" {
prefix = c.Config.Yandex.S3.Prefix
}
key := fmt.Sprintf("%s/%s/%s.json", prefix, t.Format("2006-01-02"), t.Format(time.RFC3339Nano))
_, err := s3.New(c.AWSSession).PutObject(&s3.PutObjectInput{
Bucket: aws.String(c.Config.Yandex.S3.Bucket),
Key: aws.String(key),
Body: bytes.NewReader(f),
})
if err != nil {
go c.CountMetric("outputs", 1, []string{"output:yandexs3", "status:error"})
c.PromStats.Outputs.With(map[string]string{"destination": "yandexs3", "status": Error}).Inc()
log.Printf("[ERROR] : %v S3 - %v\n", c.OutputType, err.Error())
return
}

log.Printf("[INFO] : %v S3 - Upload payload OK\n", c.OutputType)

go c.CountMetric("outputs", 1, []string{"output:yandexs3", "status:ok"})
c.PromStats.Outputs.With(map[string]string{"destination": "yandexs3", "status": "ok"}).Inc()
}
1 change: 1 addition & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func getInitStats() *types.Statistics {
Wavefront: getOutputNewMap("wavefront"),
Fission: getOutputNewMap("fission"),
Grafana: getOutputNewMap("grafana"),
YandexS3: getOutputNewMap("yandexs3"),
}
stats.Falco.Add(outputs.Emergency, 0)
stats.Falco.Add(outputs.Alert, 0)
Expand Down
15 changes: 15 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type Configuration struct {
Wavefront WavefrontOutputConfig
Fission fissionConfig
Grafana grafanaOutputConfig
Yandex YandexOutputConfig
}

// SlackOutputConfig represents parameters for Slack
Expand Down Expand Up @@ -386,6 +387,19 @@ type grafanaOutputConfig struct {
MinimumPriority string
}

type YandexOutputConfig struct {
AccessKeyID string
SecretAccessKey string
Region string
S3 YandexS3Config
}
type YandexS3Config struct {
Endpoint string
Prefix string
Bucket string
MinimumPriority string
}

// Statistics is a struct to store stastics
type Statistics struct {
Requests *expvar.Map
Expand Down Expand Up @@ -430,6 +444,7 @@ type Statistics struct {
Wavefront *expvar.Map
Fission *expvar.Map
Grafana *expvar.Map
YandexS3 *expvar.Map
}

// PromStatistics is a struct to store prometheus metrics
Expand Down