diff --git a/config.go b/config.go index 66e5ec738..c3e36cbf2 100644 --- a/config.go +++ b/config.go @@ -253,6 +253,14 @@ func getConfig() *types.Configuration { v.SetDefault("Kafka.HostPort", "") v.SetDefault("Kafka.Topic", "") v.SetDefault("Kafka.MinimumPriority", "") + v.SetDefault("Kafka.SASL", "") + v.SetDefault("Kafka.Username", "") + v.SetDefault("Kafka.Password", "") + v.SetDefault("Kafka.Balancer", "round_robin") + v.SetDefault("Kafka.ClientID", "") + v.SetDefault("Kafka.Compression", "NONE") + v.SetDefault("Kafka.Async", false) + v.SetDefault("Kafka.RequiredACKs", "NONE") v.SetDefault("KafkaRest.Address", "") v.SetDefault("KafkaRest.Version", 2) diff --git a/config_example.yaml b/config_example.yaml index 1ce4c34b9..1b40dd5e1 100644 --- a/config_example.yaml +++ b/config_example.yaml @@ -239,6 +239,14 @@ kafka: hostport: "" # Apache Kafka Host:Port (ex: localhost:9092). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled topic: "" # Name of the topic, if not empty, Kafka output is enabled # minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default) + sasl: "" # SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512) + username: "" # use this username to authenticate to Kafka via SASL (default: "") + password: "" # use this password to authenticate to Kafka via SASL (default: "") + # async: false # produce messages without blocking (default: false) + # requiredacks: NONE # number of acknowledges from partition replicas required before receiving (default: "NONE") + # compression: "" # enable message compression using this algorithm, no compression (GZIP|SNAPPY|LZ4|ZSTD|NONE) (default: "NONE") + # balancer: "" # partition balancing strategy when producing, (default: "round_robin") + # clientid: "" # specify a client.id when communicating with the broker for tracing kafkarest: address: "" # The full URL to the topic (example "http://kafkarest:8082/topics/test") diff --git a/go.mod b/go.mod index bbb22a58d..458ef18b9 100644 --- a/go.mod +++ b/go.mod @@ -16,11 +16,12 @@ require ( github.com/emersion/go-smtp v0.15.0 github.com/google/uuid v1.3.0 github.com/googleapis/gax-go v1.0.3 + github.com/jackc/pgx/v5 v5.0.4 github.com/kubernetes-sigs/wg-policy-prototypes/policy-report/kube-bench-adapter v0.0.0-20210714174227-a3d56502c383 github.com/nats-io/nats.go v1.16.0 github.com/nats-io/stan.go v0.10.3 github.com/prometheus/client_golang v1.13.0 - github.com/segmentio/kafka-go v0.4.34 + github.com/segmentio/kafka-go v0.4.38 github.com/spf13/viper v1.12.0 github.com/streadway/amqp v1.0.0 github.com/stretchr/testify v1.8.0 @@ -79,13 +80,12 @@ require ( github.com/imdario/mergo v0.3.13 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect - github.com/jackc/pgx/v5 v5.0.4 // indirect github.com/jackc/puddle/v2 v2.0.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.15.9 // indirect + github.com/klauspost/compress v1.15.12 // indirect github.com/magiconair/properties v1.8.6 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect @@ -100,7 +100,7 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.0.5 // indirect - github.com/pierrec/lz4/v4 v4.1.15 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect @@ -110,6 +110,8 @@ require ( github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.4.1 // indirect + github.com/xdg/scram v1.0.5 // indirect + github.com/xdg/stringprep v1.0.3 // indirect go.opencensus.io v0.23.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect diff --git a/go.sum b/go.sum index 64a98f1bf..1036d13f9 100644 --- a/go.sum +++ b/go.sum @@ -509,8 +509,6 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg= github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= -github.com/jackc/pgx/v5 v5.0.3 h1:4flM5ecR/555F0EcnjdaZa6MhBU+nr0QbZIo5vaKjuM= -github.com/jackc/pgx/v5 v5.0.3/go.mod h1:JBbvW3Hdw77jKl9uJrEDATUZIFM2VFPzRq4RWIhkF4o= github.com/jackc/pgx/v5 v5.0.4 h1:r5O6y84qHX/z/HZV40JBdx2obsHz7/uRj5b+CcYEdeY= github.com/jackc/pgx/v5 v5.0.4/go.mod h1:U0ynklHtgg43fue9Ly30w3OCSTDPlXjig9ghrNGaguQ= github.com/jackc/puddle/v2 v2.0.0 h1:Kwk/AlLigcnZsDssc3Zun1dk1tAtQNPaBBxBHWn0Mjc= @@ -546,9 +544,9 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= +github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM= +github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= @@ -682,8 +680,9 @@ github.com/pelletier/go-toml/v2 v2.0.5 h1:ipoSadvV8oGUjnUbMub59IDPPwfxF694nG/jwb github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -740,8 +739,8 @@ github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBO github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= -github.com/segmentio/kafka-go v0.4.34 h1:Dm6YlLMiVSiwwav20KY0AoY63s661FXevwJ3CVHUERo= -github.com/segmentio/kafka-go v0.4.34/go.mod h1:GAjxBQJdQMB5zfNA21AhpaqOB2Mu+w3De4ni3Gbm8y0= +github.com/segmentio/kafka-go v0.4.38 h1:iQdOBbUSdfuYlFpvjuALgj7N6DrdPA0HfB4AhREOdtg= +github.com/segmentio/kafka-go v0.4.38/go.mod h1:ikyuGon/60MN/vXFgykf7Zm8P5Be49gJU6vezwjnnhU= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= @@ -1132,7 +1131,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY= golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= diff --git a/outputs/kafka.go b/outputs/kafka.go index 109bdb3ae..a4deb6d6e 100644 --- a/outputs/kafka.go +++ b/outputs/kafka.go @@ -3,10 +3,16 @@ package outputs import ( "context" "encoding/json" + "fmt" "log" + "net" + "strings" + "time" "github.com/DataDog/datadog-go/statsd" "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/sasl/plain" + "github.com/segmentio/kafka-go/sasl/scram" "github.com/falcosecurity/falcosidekick/types" ) @@ -14,12 +20,97 @@ import ( // NewKafkaClient returns a new output.Client for accessing the Apache Kafka. func NewKafkaClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics, statsdClient, dogstatsdClient *statsd.Client) (*Client, error) { + transport := &kafka.Transport{ + Dial: (&net.Dialer{ + Timeout: 3 * time.Second, + DualStack: true, + }).DialContext, + ClientID: config.Kafka.ClientID, + } + + var err error + + if config.Kafka.SASL != "" { + saslMode := strings.ToUpper(config.Kafka.SASL) + switch { + case saslMode == "PLAIN": + transport.SASL = plain.Mechanism{ + Username: config.Kafka.Username, + Password: config.Kafka.Password, + } + case strings.HasPrefix(saslMode, "SCRAM_"): + algo := strings.TrimPrefix(config.Kafka.SASL, "SCRAM_") + switch algo { + case "SHA256": + transport.SASL, err = scram.Mechanism(scram.SHA256, config.Kafka.Username, config.Kafka.Password) + case "SHA512": + transport.SASL, err = scram.Mechanism(scram.SHA512, config.Kafka.Username, config.Kafka.Password) + default: + err = fmt.Errorf("unsupported SASL SCRAM algorithm %q", algo) + } + if err != nil { + err = fmt.Errorf("failed to initialize SASL SCRAM %q: %w", algo, err) + } + default: + err = fmt.Errorf("unsupported SASL mode: %q", config.Kafka.SASL) + } + } + if err != nil { + log.Printf("[ERROR] : Kafka - %v\n", err) + return nil, err + } + kafkaWriter := &kafka.Writer{ - Addr: kafka.TCP(config.Kafka.HostPort), - Topic: config.Kafka.Topic, + Addr: kafka.TCP(config.Kafka.HostPort), + Topic: config.Kafka.Topic, + Async: config.Kafka.Async, + Transport: transport, + } + + switch strings.ToLower(config.Kafka.Balancer) { + case "crc32": + kafkaWriter.Balancer = kafka.CRC32Balancer{Consistent: true} + case "crc32_random": + kafkaWriter.Balancer = kafka.CRC32Balancer{Consistent: false} + case "murmur2": + kafkaWriter.Balancer = kafka.Murmur2Balancer{Consistent: true} + case "murmur2_random": + kafkaWriter.Balancer = kafka.Murmur2Balancer{Consistent: false} + case "least_bytes": + kafkaWriter.Balancer = &kafka.LeastBytes{} + case "round_robin": + kafkaWriter.Balancer = &kafka.RoundRobin{} + default: + return nil, fmt.Errorf("unsupported balancer %q", config.Kafka.Balancer) } - return &Client{ + switch strings.ToUpper(config.Kafka.Compression) { + case "GZIP": + kafkaWriter.Compression = kafka.Gzip + case "SNAPPY": + kafkaWriter.Compression = kafka.Snappy + case "LZ4": + kafkaWriter.Compression = kafka.Lz4 + case "ZSTD": + kafkaWriter.Compression = kafka.Zstd + case "NONE": + // leave as default, none + default: + return nil, fmt.Errorf("unsupported compression %q", config.Kafka.Compression) + } + + switch strings.ToUpper(config.Kafka.RequiredACKs) { + case "ALL": + kafkaWriter.RequiredAcks = kafka.RequireAll + case "ONE": + kafkaWriter.RequiredAcks = kafka.RequireOne + case "NONE": + kafkaWriter.RequiredAcks = kafka.RequireNone + default: + return nil, fmt.Errorf("unsupported required ACKs %q", config.Kafka.Compression) + } + + client := &Client{ OutputType: "Kafka", Config: config, Stats: stats, @@ -27,7 +118,9 @@ func NewKafkaClient(config *types.Configuration, stats *types.Statistics, promSt StatsdClient: statsdClient, DogstatsdClient: dogstatsdClient, KafkaProducer: kafkaWriter, - }, nil + } + kafkaWriter.Completion = client.handleKafkaCompletion + return client, nil } // KafkaProduce sends a message to a Apach Kafka Topic @@ -36,7 +129,7 @@ func (c *Client) KafkaProduce(falcopayload types.FalcoPayload) { falcoMsg, err := json.Marshal(falcopayload) if err != nil { - c.setKafkaErrorMetrics() + c.incrKafkaErrorMetrics(1) log.Printf("[ERROR] : Kafka - %v - %v\n", "failed to marshalling message", err.Error()) return } @@ -45,22 +138,31 @@ func (c *Client) KafkaProduce(falcopayload types.FalcoPayload) { Value: falcoMsg, } - err = c.KafkaProducer.WriteMessages(context.Background(), kafkaMsg) + // Errors are logged/captured via handleKafkaCompletion function, ignore here + _ = c.KafkaProducer.WriteMessages(context.Background(), kafkaMsg) +} + +// handleKafkaCompletion is called when a message is produced +func (c *Client) handleKafkaCompletion(messages []kafka.Message, err error) { if err != nil { - c.setKafkaErrorMetrics() - log.Printf("[ERROR] : Kafka - %v\n", err) - return + c.incrKafkaErrorMetrics(len(messages)) + log.Printf("[ERROR] : Kafka (%d) - %v\n", len(messages), err) + } else { + c.incrKafkaSuccessMetrics(len(messages)) + log.Printf("[INFO] : Kafka (%d) - Publish OK\n", len(messages)) } +} - go c.CountMetric("outputs", 1, []string{"output:kafka", "status:ok"}) - c.Stats.Kafka.Add(OK, 1) - c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": OK}).Inc() - log.Printf("[INFO] : Kafka - Publish OK\n") +// incrKafkaSuccessMetrics increments the error stats +func (c *Client) incrKafkaSuccessMetrics(add int) { + go c.CountMetric("outputs", int64(add), []string{"output:kafka", "status:ok"}) + c.Stats.Kafka.Add(OK, int64(add)) + c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": OK}).Add(float64(add)) } -// setKafkaErrorMetrics set the error stats -func (c *Client) setKafkaErrorMetrics() { - go c.CountMetric(Outputs, 1, []string{"output:kafka", "status:error"}) - c.Stats.Kafka.Add(Error, 1) - c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": Error}).Inc() +// incrKafkaErrorMetrics increments the error stats +func (c *Client) incrKafkaErrorMetrics(add int) { + go c.CountMetric(Outputs, int64(add), []string{"output:kafka", "status:error"}) + c.Stats.Kafka.Add(Error, int64(add)) + c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": Error}).Add(float64(add)) } diff --git a/types/types.go b/types/types.go index 896b82a9b..0dee169fd 100644 --- a/types/types.go +++ b/types/types.go @@ -421,6 +421,14 @@ type kafkaConfig struct { HostPort string Topic string MinimumPriority string + SASL string + Username string + Password string + Balancer string + ClientID string + Compression string + Async bool + RequiredACKs string } type KafkaRestConfig struct {