Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): SASL authentication and additional configuration #385

Merged
merged 1 commit into from
Nov 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ func getConfig() *types.Configuration {
v.SetDefault("Kafka.HostPort", "")
v.SetDefault("Kafka.Topic", "")
v.SetDefault("Kafka.MinimumPriority", "")
v.SetDefault("Kafka.SASL", "")
v.SetDefault("Kafka.Username", "")
v.SetDefault("Kafka.Password", "")
v.SetDefault("Kafka.Balancer", "round_robin")
v.SetDefault("Kafka.ClientID", "")
v.SetDefault("Kafka.Compression", "NONE")
v.SetDefault("Kafka.Async", false)
v.SetDefault("Kafka.RequiredACKs", "NONE")

v.SetDefault("KafkaRest.Address", "")
v.SetDefault("KafkaRest.Version", 2)
Expand Down
8 changes: 8 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,14 @@ kafka:
hostport: "" # Apache Kafka Host:Port (ex: localhost:9092). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
sasl: "" # SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512)
username: "" # use this username to authenticate to Kafka via SASL (default: "")
password: "" # use this password to authenticate to Kafka via SASL (default: "")
# async: false # produce messages without blocking (default: false)
# requiredacks: NONE # number of acknowledges from partition replicas required before receiving (default: "NONE")
# compression: "" # enable message compression using this algorithm, no compression (GZIP|SNAPPY|LZ4|ZSTD|NONE) (default: "NONE")
# balancer: "" # partition balancing strategy when producing, (default: "round_robin")
# clientid: "" # specify a client.id when communicating with the broker for tracing

kafkarest:
address: "" # The full URL to the topic (example "http://kafkarest:8082/topics/test")
Expand Down
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ require (
github.com/emersion/go-smtp v0.15.0
github.com/google/uuid v1.3.0
github.com/googleapis/gax-go v1.0.3
github.com/jackc/pgx/v5 v5.0.4
github.com/kubernetes-sigs/wg-policy-prototypes/policy-report/kube-bench-adapter v0.0.0-20210714174227-a3d56502c383
github.com/nats-io/nats.go v1.16.0
github.com/nats-io/stan.go v0.10.3
github.com/prometheus/client_golang v1.13.0
github.com/segmentio/kafka-go v0.4.34
github.com/segmentio/kafka-go v0.4.38
github.com/spf13/viper v1.12.0
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.8.0
Expand Down Expand Up @@ -79,13 +80,12 @@ require (
github.com/imdario/mergo v0.3.13 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgx/v5 v5.0.4 // indirect
github.com/jackc/puddle/v2 v2.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/compress v1.15.12 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
Expand All @@ -100,7 +100,7 @@ require (
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
Expand All @@ -110,6 +110,8 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
github.com/xdg/scram v1.0.5 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
Expand Down
14 changes: 6 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,6 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg=
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
github.com/jackc/pgx/v5 v5.0.3 h1:4flM5ecR/555F0EcnjdaZa6MhBU+nr0QbZIo5vaKjuM=
github.com/jackc/pgx/v5 v5.0.3/go.mod h1:JBbvW3Hdw77jKl9uJrEDATUZIFM2VFPzRq4RWIhkF4o=
github.com/jackc/pgx/v5 v5.0.4 h1:r5O6y84qHX/z/HZV40JBdx2obsHz7/uRj5b+CcYEdeY=
github.com/jackc/pgx/v5 v5.0.4/go.mod h1:U0ynklHtgg43fue9Ly30w3OCSTDPlXjig9ghrNGaguQ=
github.com/jackc/puddle/v2 v2.0.0 h1:Kwk/AlLigcnZsDssc3Zun1dk1tAtQNPaBBxBHWn0Mjc=
Expand Down Expand Up @@ -546,9 +544,9 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM=
github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
Expand Down Expand Up @@ -682,8 +680,9 @@ github.com/pelletier/go-toml/v2 v2.0.5 h1:ipoSadvV8oGUjnUbMub59IDPPwfxF694nG/jwb
github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -740,8 +739,8 @@ github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBO
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/kafka-go v0.4.34 h1:Dm6YlLMiVSiwwav20KY0AoY63s661FXevwJ3CVHUERo=
github.com/segmentio/kafka-go v0.4.34/go.mod h1:GAjxBQJdQMB5zfNA21AhpaqOB2Mu+w3De4ni3Gbm8y0=
github.com/segmentio/kafka-go v0.4.38 h1:iQdOBbUSdfuYlFpvjuALgj7N6DrdPA0HfB4AhREOdtg=
github.com/segmentio/kafka-go v0.4.38/go.mod h1:ikyuGon/60MN/vXFgykf7Zm8P5Be49gJU6vezwjnnhU=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
Expand Down Expand Up @@ -1132,7 +1131,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
Expand Down
138 changes: 120 additions & 18 deletions outputs/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,124 @@ package outputs
import (
"context"
"encoding/json"
"fmt"
"log"
"net"
"strings"
"time"

"github.com/DataDog/datadog-go/statsd"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"

"github.com/falcosecurity/falcosidekick/types"
)

// NewKafkaClient returns a new output.Client for accessing the Apache Kafka.
func NewKafkaClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics, statsdClient, dogstatsdClient *statsd.Client) (*Client, error) {

transport := &kafka.Transport{
Dial: (&net.Dialer{
Timeout: 3 * time.Second,
DualStack: true,
}).DialContext,
ClientID: config.Kafka.ClientID,
}

var err error

if config.Kafka.SASL != "" {
saslMode := strings.ToUpper(config.Kafka.SASL)
switch {
case saslMode == "PLAIN":
transport.SASL = plain.Mechanism{
Username: config.Kafka.Username,
Password: config.Kafka.Password,
}
case strings.HasPrefix(saslMode, "SCRAM_"):
algo := strings.TrimPrefix(config.Kafka.SASL, "SCRAM_")
switch algo {
case "SHA256":
transport.SASL, err = scram.Mechanism(scram.SHA256, config.Kafka.Username, config.Kafka.Password)
case "SHA512":
transport.SASL, err = scram.Mechanism(scram.SHA512, config.Kafka.Username, config.Kafka.Password)
default:
err = fmt.Errorf("unsupported SASL SCRAM algorithm %q", algo)
}
if err != nil {
err = fmt.Errorf("failed to initialize SASL SCRAM %q: %w", algo, err)
}
default:
err = fmt.Errorf("unsupported SASL mode: %q", config.Kafka.SASL)
}
}
if err != nil {
log.Printf("[ERROR] : Kafka - %v\n", err)
return nil, err
}

kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(config.Kafka.HostPort),
Topic: config.Kafka.Topic,
Addr: kafka.TCP(config.Kafka.HostPort),
Topic: config.Kafka.Topic,
Async: config.Kafka.Async,
Transport: transport,
}

switch strings.ToLower(config.Kafka.Balancer) {
case "crc32":
kafkaWriter.Balancer = kafka.CRC32Balancer{Consistent: true}
case "crc32_random":
kafkaWriter.Balancer = kafka.CRC32Balancer{Consistent: false}
case "murmur2":
kafkaWriter.Balancer = kafka.Murmur2Balancer{Consistent: true}
case "murmur2_random":
kafkaWriter.Balancer = kafka.Murmur2Balancer{Consistent: false}
case "least_bytes":
kafkaWriter.Balancer = &kafka.LeastBytes{}
case "round_robin":
kafkaWriter.Balancer = &kafka.RoundRobin{}
default:
return nil, fmt.Errorf("unsupported balancer %q", config.Kafka.Balancer)
}

return &Client{
switch strings.ToUpper(config.Kafka.Compression) {
case "GZIP":
kafkaWriter.Compression = kafka.Gzip
case "SNAPPY":
kafkaWriter.Compression = kafka.Snappy
case "LZ4":
kafkaWriter.Compression = kafka.Lz4
case "ZSTD":
kafkaWriter.Compression = kafka.Zstd
case "NONE":
// leave as default, none
default:
return nil, fmt.Errorf("unsupported compression %q", config.Kafka.Compression)
}

switch strings.ToUpper(config.Kafka.RequiredACKs) {
case "ALL":
kafkaWriter.RequiredAcks = kafka.RequireAll
case "ONE":
kafkaWriter.RequiredAcks = kafka.RequireOne
case "NONE":
kafkaWriter.RequiredAcks = kafka.RequireNone
default:
return nil, fmt.Errorf("unsupported required ACKs %q", config.Kafka.Compression)
}

client := &Client{
OutputType: "Kafka",
Config: config,
Stats: stats,
PromStats: promStats,
StatsdClient: statsdClient,
DogstatsdClient: dogstatsdClient,
KafkaProducer: kafkaWriter,
}, nil
}
kafkaWriter.Completion = client.handleKafkaCompletion
return client, nil
}

// KafkaProduce sends a message to a Apach Kafka Topic
Expand All @@ -36,7 +129,7 @@ func (c *Client) KafkaProduce(falcopayload types.FalcoPayload) {

falcoMsg, err := json.Marshal(falcopayload)
if err != nil {
c.setKafkaErrorMetrics()
c.incrKafkaErrorMetrics(1)
log.Printf("[ERROR] : Kafka - %v - %v\n", "failed to marshalling message", err.Error())
return
}
Expand All @@ -45,22 +138,31 @@ func (c *Client) KafkaProduce(falcopayload types.FalcoPayload) {
Value: falcoMsg,
}

err = c.KafkaProducer.WriteMessages(context.Background(), kafkaMsg)
// Errors are logged/captured via handleKafkaCompletion function, ignore here
_ = c.KafkaProducer.WriteMessages(context.Background(), kafkaMsg)
}

// handleKafkaCompletion is called when a message is produced
func (c *Client) handleKafkaCompletion(messages []kafka.Message, err error) {
if err != nil {
c.setKafkaErrorMetrics()
log.Printf("[ERROR] : Kafka - %v\n", err)
return
c.incrKafkaErrorMetrics(len(messages))
log.Printf("[ERROR] : Kafka (%d) - %v\n", len(messages), err)
} else {
c.incrKafkaSuccessMetrics(len(messages))
log.Printf("[INFO] : Kafka (%d) - Publish OK\n", len(messages))
}
}

go c.CountMetric("outputs", 1, []string{"output:kafka", "status:ok"})
c.Stats.Kafka.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": OK}).Inc()
log.Printf("[INFO] : Kafka - Publish OK\n")
// incrKafkaSuccessMetrics increments the error stats
func (c *Client) incrKafkaSuccessMetrics(add int) {
go c.CountMetric("outputs", int64(add), []string{"output:kafka", "status:ok"})
c.Stats.Kafka.Add(OK, int64(add))
c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": OK}).Add(float64(add))
}

// setKafkaErrorMetrics set the error stats
func (c *Client) setKafkaErrorMetrics() {
go c.CountMetric(Outputs, 1, []string{"output:kafka", "status:error"})
c.Stats.Kafka.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": Error}).Inc()
// incrKafkaErrorMetrics increments the error stats
func (c *Client) incrKafkaErrorMetrics(add int) {
go c.CountMetric(Outputs, int64(add), []string{"output:kafka", "status:error"})
c.Stats.Kafka.Add(Error, int64(add))
c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": Error}).Add(float64(add))
}
8 changes: 8 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,14 @@ type kafkaConfig struct {
HostPort string
Topic string
MinimumPriority string
SASL string
Username string
Password string
Balancer string
ClientID string
Compression string
Async bool
RequiredACKs string
}

type KafkaRestConfig struct {
Expand Down