Skip to content

Commit

Permalink
feat(kafka): SASL authentication and additional configuration
Browse files Browse the repository at this point in the history
Co-authored-by: Luke Young <lyoung@confluent.io>
Co-authored-by: Lyonel Martinez <lyonel.martinez@numberly.com>
  • Loading branch information
Lowaiz and lyoung-confluent committed Nov 28, 2022
1 parent 97a5fce commit 29571d2
Show file tree
Hide file tree
Showing 6 changed files with 156 additions and 30 deletions.
8 changes: 8 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ func getConfig() *types.Configuration {
v.SetDefault("Kafka.HostPort", "")
v.SetDefault("Kafka.Topic", "")
v.SetDefault("Kafka.MinimumPriority", "")
v.SetDefault("Kafka.SASL", "")
v.SetDefault("Kafka.Username", "")
v.SetDefault("Kafka.Password", "")
v.SetDefault("Kafka.Balancer", "round_robin")
v.SetDefault("Kafka.ClientID", "")
v.SetDefault("Kafka.Compression", "NONE")
v.SetDefault("Kafka.Async", false)
v.SetDefault("Kafka.RequiredACKs", "NONE")

v.SetDefault("KafkaRest.Address", "")
v.SetDefault("KafkaRest.Version", 2)
Expand Down
8 changes: 8 additions & 0 deletions config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,14 @@ kafka:
hostport: "" # Apache Kafka Host:Port (ex: localhost:9092). Defaults to port 9092 if no port is specified after the domain, if not empty, Kafka output is enabled
topic: "" # Name of the topic, if not empty, Kafka output is enabled
# minimumpriority: "debug" # minimum priority of event for using this output, order is emergency|alert|critical|error|warning|notice|informational|debug or "" (default)
sasl: "" # SASL authentication mechanism, if empty, no authentication (PLAIN|SCRAM_SHA256|SCRAM_SHA512)
username: "" # use this username to authenticate to Kafka via SASL (default: "")
password: "" # use this password to authenticate to Kafka via SASL (default: "")
# async: false # produce messages without blocking (default: false)
# requiredacks: NONE # number of acknowledges from partition replicas required before receiving (default: "NONE")
# compression: "" # enable message compression using this algorithm, no compression (GZIP|SNAPPY|LZ4|ZSTD|NONE) (default: "NONE")
# balancer: "" # partition balancing strategy when producing, (default: "round_robin")
# clientid: "" # specify a client.id when communicating with the broker for tracing

kafkarest:
address: "" # The full URL to the topic (example "http://kafkarest:8082/topics/test")
Expand Down
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@ require (
github.com/emersion/go-smtp v0.15.0
github.com/google/uuid v1.3.0
github.com/googleapis/gax-go v1.0.3
github.com/jackc/pgx/v5 v5.0.4
github.com/kubernetes-sigs/wg-policy-prototypes/policy-report/kube-bench-adapter v0.0.0-20210714174227-a3d56502c383
github.com/nats-io/nats.go v1.16.0
github.com/nats-io/stan.go v0.10.3
github.com/prometheus/client_golang v1.13.0
github.com/segmentio/kafka-go v0.4.34
github.com/segmentio/kafka-go v0.4.38
github.com/spf13/viper v1.12.0
github.com/streadway/amqp v1.0.0
github.com/stretchr/testify v1.8.0
Expand Down Expand Up @@ -79,13 +80,12 @@ require (
github.com/imdario/mergo v0.3.13 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/pgx/v5 v5.0.4 // indirect
github.com/jackc/puddle/v2 v2.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/jpillora/backoff v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/klauspost/compress v1.15.12 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
Expand All @@ -100,7 +100,7 @@ require (
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
Expand All @@ -110,6 +110,8 @@ require (
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
github.com/xdg/scram v1.0.5 // indirect
github.com/xdg/stringprep v1.0.3 // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
Expand Down
14 changes: 6 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -509,8 +509,6 @@ github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsI
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b h1:C8S2+VttkHFdOOCXJe+YGfa4vHYwlt4Zx+IVXQ97jYg=
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
github.com/jackc/pgx/v5 v5.0.3 h1:4flM5ecR/555F0EcnjdaZa6MhBU+nr0QbZIo5vaKjuM=
github.com/jackc/pgx/v5 v5.0.3/go.mod h1:JBbvW3Hdw77jKl9uJrEDATUZIFM2VFPzRq4RWIhkF4o=
github.com/jackc/pgx/v5 v5.0.4 h1:r5O6y84qHX/z/HZV40JBdx2obsHz7/uRj5b+CcYEdeY=
github.com/jackc/pgx/v5 v5.0.4/go.mod h1:U0ynklHtgg43fue9Ly30w3OCSTDPlXjig9ghrNGaguQ=
github.com/jackc/puddle/v2 v2.0.0 h1:Kwk/AlLigcnZsDssc3Zun1dk1tAtQNPaBBxBHWn0Mjc=
Expand Down Expand Up @@ -546,9 +544,9 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.12 h1:YClS/PImqYbn+UILDnqxQCZ3RehC9N318SU3kElDUEM=
github.com/klauspost/compress v1.15.12/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
Expand Down Expand Up @@ -682,8 +680,9 @@ github.com/pelletier/go-toml/v2 v2.0.5 h1:ipoSadvV8oGUjnUbMub59IDPPwfxF694nG/jwb
github.com/pelletier/go-toml/v2 v2.0.5/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
Expand Down Expand Up @@ -740,8 +739,8 @@ github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBO
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/segmentio/kafka-go v0.4.34 h1:Dm6YlLMiVSiwwav20KY0AoY63s661FXevwJ3CVHUERo=
github.com/segmentio/kafka-go v0.4.34/go.mod h1:GAjxBQJdQMB5zfNA21AhpaqOB2Mu+w3De4ni3Gbm8y0=
github.com/segmentio/kafka-go v0.4.38 h1:iQdOBbUSdfuYlFpvjuALgj7N6DrdPA0HfB4AhREOdtg=
github.com/segmentio/kafka-go v0.4.38/go.mod h1:ikyuGon/60MN/vXFgykf7Zm8P5Be49gJU6vezwjnnhU=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
Expand Down Expand Up @@ -1132,7 +1131,6 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7 h1:olpwvP2KacW1ZWvsR7uQhoyTYvKAupfQrRGBFM352Gk=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.3.8 h1:nAL+RVCQ9uMn3vJZbV+MRnydTJFPf8qqY42YiA6MrqY=
golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ=
Expand Down
138 changes: 120 additions & 18 deletions outputs/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,31 +3,124 @@ package outputs
import (
"context"
"encoding/json"
"fmt"
"log"
"net"
"strings"
"time"

"github.com/DataDog/datadog-go/statsd"
"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl/plain"
"github.com/segmentio/kafka-go/sasl/scram"

"github.com/falcosecurity/falcosidekick/types"
)

// NewKafkaClient returns a new output.Client for accessing the Apache Kafka.
func NewKafkaClient(config *types.Configuration, stats *types.Statistics, promStats *types.PromStatistics, statsdClient, dogstatsdClient *statsd.Client) (*Client, error) {

transport := &kafka.Transport{
Dial: (&net.Dialer{
Timeout: 3 * time.Second,
DualStack: true,
}).DialContext,
ClientID: config.Kafka.ClientID,
}

var err error

if config.Kafka.SASL != "" {
saslMode := strings.ToUpper(config.Kafka.SASL)
switch {
case saslMode == "PLAIN":
transport.SASL = plain.Mechanism{
Username: config.Kafka.Username,
Password: config.Kafka.Password,
}
case strings.HasPrefix(saslMode, "SCRAM_"):
algo := strings.TrimPrefix(config.Kafka.SASL, "SCRAM_")
switch algo {
case "SHA256":
transport.SASL, err = scram.Mechanism(scram.SHA256, config.Kafka.Username, config.Kafka.Password)
case "SHA512":
transport.SASL, err = scram.Mechanism(scram.SHA512, config.Kafka.Username, config.Kafka.Password)
default:
err = fmt.Errorf("unsupported SASL SCRAM algorithm %q", algo)
}
if err != nil {
err = fmt.Errorf("failed to initialize SASL SCRAM %q: %w", algo, err)
}
default:
err = fmt.Errorf("unsupported SASL mode: %q", config.Kafka.SASL)
}
}
if err != nil {
log.Printf("[ERROR] : Kafka - %v\n", err)
return nil, err
}

kafkaWriter := &kafka.Writer{
Addr: kafka.TCP(config.Kafka.HostPort),
Topic: config.Kafka.Topic,
Addr: kafka.TCP(config.Kafka.HostPort),
Topic: config.Kafka.Topic,
Async: config.Kafka.Async,
Transport: transport,
}

switch strings.ToLower(config.Kafka.Balancer) {
case "crc32":
kafkaWriter.Balancer = kafka.CRC32Balancer{Consistent: true}
case "crc32_random":
kafkaWriter.Balancer = kafka.CRC32Balancer{Consistent: false}
case "murmur2":
kafkaWriter.Balancer = kafka.Murmur2Balancer{Consistent: true}
case "murmur2_random":
kafkaWriter.Balancer = kafka.Murmur2Balancer{Consistent: false}
case "least_bytes":
kafkaWriter.Balancer = &kafka.LeastBytes{}
case "round_robin":
kafkaWriter.Balancer = &kafka.RoundRobin{}
default:
return nil, fmt.Errorf("unsupported balancer %q", config.Kafka.Balancer)
}

return &Client{
switch strings.ToUpper(config.Kafka.Compression) {
case "GZIP":
kafkaWriter.Compression = kafka.Gzip
case "SNAPPY":
kafkaWriter.Compression = kafka.Snappy
case "LZ4":
kafkaWriter.Compression = kafka.Lz4
case "ZSTD":
kafkaWriter.Compression = kafka.Zstd
case "NONE":
// leave as default, none
default:
return nil, fmt.Errorf("unsupported compression %q", config.Kafka.Compression)
}

switch strings.ToUpper(config.Kafka.RequiredACKs) {
case "ALL":
kafkaWriter.RequiredAcks = kafka.RequireAll
case "ONE":
kafkaWriter.RequiredAcks = kafka.RequireOne
case "NONE":
kafkaWriter.RequiredAcks = kafka.RequireNone
default:
return nil, fmt.Errorf("unsupported required ACKs %q", config.Kafka.Compression)
}

client := &Client{
OutputType: "Kafka",
Config: config,
Stats: stats,
PromStats: promStats,
StatsdClient: statsdClient,
DogstatsdClient: dogstatsdClient,
KafkaProducer: kafkaWriter,
}, nil
}
kafkaWriter.Completion = client.handleKafkaCompletion
return client, nil
}

// KafkaProduce sends a message to a Apach Kafka Topic
Expand All @@ -36,7 +129,7 @@ func (c *Client) KafkaProduce(falcopayload types.FalcoPayload) {

falcoMsg, err := json.Marshal(falcopayload)
if err != nil {
c.setKafkaErrorMetrics()
c.incrKafkaErrorMetrics(1)
log.Printf("[ERROR] : Kafka - %v - %v\n", "failed to marshalling message", err.Error())
return
}
Expand All @@ -45,22 +138,31 @@ func (c *Client) KafkaProduce(falcopayload types.FalcoPayload) {
Value: falcoMsg,
}

err = c.KafkaProducer.WriteMessages(context.Background(), kafkaMsg)
// Errors are logged/captured via handleKafkaCompletion function, ignore here
_ = c.KafkaProducer.WriteMessages(context.Background(), kafkaMsg)
}

// handleKafkaCompletion is called when a message is produced
func (c *Client) handleKafkaCompletion(messages []kafka.Message, err error) {
if err != nil {
c.setKafkaErrorMetrics()
log.Printf("[ERROR] : Kafka - %v\n", err)
return
c.incrKafkaErrorMetrics(len(messages))
log.Printf("[ERROR] : Kafka (%d) - %v\n", len(messages), err)
} else {
c.incrKafkaSuccessMetrics(len(messages))
log.Printf("[INFO] : Kafka (%d) - Publish OK\n", len(messages))
}
}

go c.CountMetric("outputs", 1, []string{"output:kafka", "status:ok"})
c.Stats.Kafka.Add(OK, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": OK}).Inc()
log.Printf("[INFO] : Kafka - Publish OK\n")
// incrKafkaSuccessMetrics increments the error stats
func (c *Client) incrKafkaSuccessMetrics(add int) {
go c.CountMetric("outputs", int64(add), []string{"output:kafka", "status:ok"})
c.Stats.Kafka.Add(OK, int64(add))
c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": OK}).Add(float64(add))
}

// setKafkaErrorMetrics set the error stats
func (c *Client) setKafkaErrorMetrics() {
go c.CountMetric(Outputs, 1, []string{"output:kafka", "status:error"})
c.Stats.Kafka.Add(Error, 1)
c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": Error}).Inc()
// incrKafkaErrorMetrics increments the error stats
func (c *Client) incrKafkaErrorMetrics(add int) {
go c.CountMetric(Outputs, int64(add), []string{"output:kafka", "status:error"})
c.Stats.Kafka.Add(Error, int64(add))
c.PromStats.Outputs.With(map[string]string{"destination": "kafka", "status": Error}).Add(float64(add))
}
8 changes: 8 additions & 0 deletions types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,14 @@ type kafkaConfig struct {
HostPort string
Topic string
MinimumPriority string
SASL string
Username string
Password string
Balancer string
ClientID string
Compression string
Async bool
RequiredACKs string
}

type KafkaRestConfig struct {
Expand Down

0 comments on commit 29571d2

Please sign in to comment.