Skip to content

Commit

Permalink
added Yandex Data Streams support
Browse files Browse the repository at this point in the history
Signed-off-by: Ildar Valiullin <preved.911@gmail.com>
  • Loading branch information
preved911 committed Jun 21, 2022
1 parent b036954 commit 5879d22
Show file tree
Hide file tree
Showing 8 changed files with 102 additions and 6 deletions.
14 changes: 11 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ It works as a single endpoint for as many as you want `Falco` instances :
- [**Kafka Rest Proxy**](https://docs.confluent.io/platform/current/kafka-rest/index.html)
- [**RabbitMQ**](https://www.rabbitmq.com/)
- [**Azure Event Hubs**](https://azure.microsoft.com/en-in/services/event-hubs/)
- [**Yandex Data Streams**](https://cloud.yandex.com/en/docs/data-streams/)

### Email

Expand Down Expand Up @@ -454,10 +455,14 @@ yandex:
# secretaccesskey: "" # yandex secret access key
# region: "" # yandex storage region (default: ru-central-1)
s3:
# endpoint: "" yandex storage endpoint (default: https://storage.yandexcloud.net)
# endpoint: "" # yandex storage endpoint (default: https://storage.yandexcloud.net)
# bucket: "falcosidekick" # Yandex storage, bucket name
# prefix: "" # name of prefix, keys will have format: s3://<bucket>/<prefix>/YYYY-MM-DD/YYYY-MM-DDTHH:mm:ss.s+01:00.json
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|erro
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug
datastreams:
# endpoint: "" # Yandex Data Streams endpoint (default: https://yds.serverless.yandexcloud.net)
# streamname: "" # stream name in format /${region}/${folder_id}/${ydb_id}/${stream_name}
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug

syslog:
# host: "" # Syslog host, if not empty, Syslog output is enabled
Expand Down Expand Up @@ -851,7 +856,10 @@ care of lower/uppercases**) : `yaml: a.b --> envvar: A_B` :
- **YANDEX_S3_ENDPOINT**: Yandex storage endpoint (default: https://storage.yandexcloud.net)
- **YANDEX_S3_BUCKET**: Yandex storage, bucket name
- **YANDEX_S3_PREFIX**: name of prefix, keys will have format: s3://<bucket>/<prefix>/YYYY-MM-DD/YYYY-MM-DDTHH:mm:ss.s+01:00.json
- **YANDEX_S3_MINIMUMPRIORITY**: # minimum priority of event for using this output, order is emergency|alert|critical|erro
- **YANDEX_S3_MINIMUMPRIORITY**: # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug
- **YANDEX_DATASTREAMS_ENDPOINT**: Yandex Data Streams endpoint (default: https://yds.serverless.yandexcloud.net)
- **YANDEX_DATASTREAMS_STREAMNAME**: Stream name in format /${region}/${folder_id}/${ydb_id}/${stream_name}
- **YANDEX_DATASTREAMS_MINIMUMPRIORITY**: # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug
- **SYSLOG_HOST**: Syslog Host, if not empty, Syslog output is enabled
- **SYSLOG_PORT**: Syslog endpoint port number
- **SYSLOG_PROTOCOL**: Syslog transport protocol. It can be either "tcp" or "udp"
Expand Down
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,11 +309,16 @@ func getConfig() *types.Configuration {
v.SetDefault("Yandex.AccessKeyID", "")
v.SetDefault("Yandex.SecretAccessKey", "")
v.SetDefault("Yandex.Region", "ru-central1")

v.SetDefault("Yandex.S3.Endpoint", "https://storage.yandexcloud.net")
v.SetDefault("Yandex.S3.Bucket", "")
v.SetDefault("Yandex.S3.Prefix", "falco")
v.SetDefault("Yamdex.S3.MinimumPriority", "")

v.SetDefault("Yandex.DataStreams.Endpoint", "https://yds.serverless.yandexcloud.net")
v.SetDefault("Yandex.DataStreams.StreamName", "")
v.SetDefault("Yandex.DataStreams.MinimumPriority", "")

v.SetDefault("Syslog.Host", "")
v.SetDefault("Syslog.Port", "")
v.SetDefault("Syslog.Protocol", "")
Expand Down
14 changes: 14 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,17 @@ policyreport:
failthreshold: 4 # events with priority above this threshold are mapped to fail in PolicyReport Summary and lower that those are mapped to warn (default=4)
maxevents: 1000 # the max number of events per report(default: 1000)
prunebypriority: false # if true; the events with lowest severity are pruned first, in FIFO order (default: false)

yandex:
# accesskeyid: "" # yandex access key
# secretaccesskey: "" # yandex secret access key
# region: "" # yandex storage region (default: ru-central-1)
s3:
# endpoint: "" # yandex storage endpoint (default: https://storage.yandexcloud.net)
# bucket: "falcosidekick" # Yandex storage, bucket name
# prefix: "" # name of prefix, keys will have format: s3://<bucket>/<prefix>/YYYY-MM-DD/YYYY-MM-DDTHH:mm:ss.s+01:00.json
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug
datastreams:
# endpoint: "" # Yandex Data Streams endpoint (default: https://yds.serverless.yandexcloud.net)
# streamname: "" # stream name in format /${region}/${folder_id}/${ydb_id}/${stream_name}
# minimumpriority: "" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug
4 changes: 4 additions & 0 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,10 @@ func forwardEvent(falcopayload types.FalcoPayload) {
go yandexClient.UploadYandexS3(falcopayload)
}

if config.Yandex.DataStreams.StreamName != "" && (falcopayload.Priority >= types.Priority(config.Yandex.DataStreams.MinimumPriority) || falcopayload.Rule == testRule) {
go yandexClient.UploadYandexDataStreams(falcopayload)
}

if config.Syslog.Host != "" && (falcopayload.Priority >= types.Priority(config.Syslog.MinimumPriority) || falcopayload.Rule == testRule) {
go syslogClient.SyslogPost(falcopayload)
}
Expand Down
13 changes: 13 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,19 @@ func init() {
}
}

if config.Yandex.DataStreams.StreamName != "" {
var err error
yandexClient, err = outputs.NewYandexClient(config, stats, promStats, statsdClient, dogstatsdClient)
if err != nil {
config.Yandex.DataStreams.StreamName = ""
log.Printf("[ERROR] : Yandex - %v\n", err)
} else {
if config.Yandex.DataStreams.StreamName != "" {
outputs.EnabledOutputs = append(outputs.EnabledOutputs, "YandexDataStreams")
}
}
}

if config.Syslog.Host != "" {
var err error
syslogClient, err = outputs.NewSyslogClient(config, stats, promStats, statsdClient, dogstatsdClient)
Expand Down
50 changes: 47 additions & 3 deletions outputs/yandex.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,39 @@ import (

"github.com/DataDog/datadog-go/statsd"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/endpoints"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/google/uuid"

"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/falcosecurity/falcosidekick/types"
)

// NewYandexClient returns a new output.Client for accessing the Yandex API.
func NewYandexClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics, statsdClient, dogstatsdClient *statsd.Client) (*Client, error) {
resolverFn := func(service, region string, optFns ...func(*endpoints.Options)) (endpoints.ResolvedEndpoint, error) {
switch service {
case endpoints.S3ServiceID:
return endpoints.ResolvedEndpoint{
URL: config.Yandex.S3.Endpoint,
SigningRegion: "ru-central1",
}, nil
case endpoints.KinesisServiceID:
return endpoints.ResolvedEndpoint{
URL: config.Yandex.DataStreams.Endpoint,
SigningRegion: "ru-central1",
}, nil
}

return endpoints.DefaultResolver().EndpointFor(service, region, optFns...)
}

sess, err := session.NewSession(&aws.Config{
Region: aws.String(config.Yandex.Region),
Endpoint: aws.String(config.Yandex.S3.Endpoint),
Credentials: credentials.NewStaticCredentials(config.Yandex.AccessKeyID, config.Yandex.SecretAccessKey, ""),
Region: aws.String(config.Yandex.Region),
Credentials: credentials.NewStaticCredentials(config.Yandex.AccessKeyID, config.Yandex.SecretAccessKey, ""),
EndpointResolver: endpoints.ResolverFunc(resolverFn),
})
if err != nil {
log.Printf("[ERROR] : Yandex - %v\n", "Error while creating Yandex Session")
Expand Down Expand Up @@ -68,3 +87,28 @@ func (c *Client) UploadYandexS3(falcopayload types.FalcoPayload) {
go c.CountMetric("outputs", 1, []string{"output:yandexs3", "status:ok"})
c.PromStats.Outputs.With(map[string]string{"destination": "yandexs3", "status": "ok"}).Inc()
}

// UploadYandexDataStreams uploads payload to Yandex Data Streams
func (c *Client) UploadYandexDataStreams(falcoPayLoad types.FalcoPayload) {
svc := kinesis.New(c.AWSSession)

f, _ := json.Marshal(falcoPayLoad)
input := &kinesis.PutRecordInput{
Data: f,
PartitionKey: aws.String(uuid.NewString()),
StreamName: aws.String(c.Config.Yandex.DataStreams.StreamName),
}

resp, err := svc.PutRecord(input)
if err != nil {
go c.CountMetric("outputs", 1, []string{"output:yandexdatastreams", "status:error"})
c.PromStats.Outputs.With(map[string]string{"destination": "yandexdatastreams", "status": Error}).Inc()
log.Printf("[ERROR] : %v Data Streams - %v\n", c.OutputType, err.Error())
return
}

log.Printf("[INFO] : %v Data Streams - Put Record OK (%v)\n", c.OutputType, resp.SequenceNumber)
go c.CountMetric("outputs", 1, []string{"output:yandexdatastreams", "status:ok"})
c.Stats.YandexDataStreams.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "yandexdatastreams", "status": "ok"}).Inc()
}
1 change: 1 addition & 0 deletions stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func getInitStats() *types.Statistics {
Fission: getOutputNewMap("fission"),
Grafana: getOutputNewMap("grafana"),
YandexS3: getOutputNewMap("yandexs3"),
YandexDataStreams: getOutputNewMap("yandexdatastreams"),
Syslog: getOutputNewMap("syslog"),
PolicyReport: getOutputNewMap("policyreport"),
}
Expand Down
7 changes: 7 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,13 +446,19 @@ type YandexOutputConfig struct {
SecretAccessKey string
Region string
S3 YandexS3Config
DataStreams YandexDataStreamsConfig
}
type YandexS3Config struct {
Endpoint string
Prefix string
Bucket string
MinimumPriority string
}
type YandexDataStreamsConfig struct {
Endpoint string
StreamName string
MinimumPriority string
}

// SyslogConfig represents config parameters for the syslog client
// Host: the remote syslog host. It can be either an IP address or a domain.
Expand Down Expand Up @@ -512,6 +518,7 @@ type Statistics struct {
Fission *expvar.Map
Grafana *expvar.Map
YandexS3 *expvar.Map
YandexDataStreams *expvar.Map
Syslog *expvar.Map
Cliq *expvar.Map
PolicyReport *expvar.Map
Expand Down

0 comments on commit 5879d22

Please sign in to comment.