Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to talk to Kafka over TLS rather than plaintext #21

Merged
merged 3 commits into from
Feb 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
language: go

go:
- 1.6.4
- 1.9.2
- tip

Expand Down
22 changes: 20 additions & 2 deletions Godeps/Godeps.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,26 @@ type Kafka struct {
RepartitionMax int `toml:"repartition_max"`

Compression string `toml:"compression"` // ("gzip", "snappy" or "none", default: "none")

// EnableTLS, if set, will connect to Kafka with TLS instead of plaintext.
EnableTLS bool `toml:"enable_tls"`

// CACerts is a list of CAs certificates used to verify the host.
// Usually there is only one, however multiple can be specified to allow
// for rotation. These should be PEM encoded CERTIFICATEs.
// If none are specified, then the system CA pool is used.
// Ignored unless enable_tls is set.
CACerts []string `toml:"ca_certificates"`

// ClientKey is used with the client certificate to identify this client
// to Kafka. This should be a PEM encoded RSA PRIVATE KEY.
// Ignored unless enable_tls is set.
ClientKey string `toml:"private_key"`

// ClientCertificate is used with the client key to identify this client
// to Kafka. This should be a PEM encoded CERTIFICATE.
// Ignored unless enable_tls is set.
ClientCert string `toml:"certificate"`
}

type Topic struct {
Expand Down
40 changes: 40 additions & 0 deletions kafka.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package main

import (
"crypto/tls"
"crypto/x509"
"encoding/binary"
"errors"
"fmt"
"log"
"sync"
Expand All @@ -27,6 +30,43 @@ func NewKafkaProducer(logger *log.Logger, stats *Stats, config *Config) (NozzleP
// TODO (tcnksm): Enable to configure more properties.
producerConfig := sarama.NewConfig()

if config.Kafka.EnableTLS {
if config.Kafka.ClientCert == "" {
return nil, errors.New("please specify client_certificate")
}
if config.Kafka.ClientKey == "" {
return nil, errors.New("please specify private_key")
}

producerConfig.Net.TLS.Enable = true
if producerConfig.Net.TLS.Config == nil {
producerConfig.Net.TLS.Config = &tls.Config{}
}

if len(config.Kafka.CACerts) == 0 {
var err error
producerConfig.Net.TLS.Config.RootCAs, err = x509.SystemCertPool()
if err != nil {
return nil, err
}
} else {
producerConfig.Net.TLS.Config.RootCAs = x509.NewCertPool()
for _, certString := range config.Kafka.CACerts {
if !producerConfig.Net.TLS.Config.RootCAs.AppendCertsFromPEM([]byte(certString)) {
return nil, errors.New("no certs in ca pem")
}
}
}

cert, err := tls.X509KeyPair([]byte(config.Kafka.ClientCert), []byte(config.Kafka.ClientKey))
if err != nil {
return nil, err
}

producerConfig.Net.TLS.Config.Certificates = []tls.Certificate{cert}
producerConfig.Net.TLS.Config.BuildNameToCertificate()
}

producerConfig.Producer.Partitioner = sarama.NewRoundRobinPartitioner
producerConfig.Producer.Return.Successes = true
producerConfig.Producer.RequiredAcks = sarama.WaitForAll
Expand Down
273 changes: 273 additions & 0 deletions tls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
package main

import (
"crypto/rand"
"crypto/rsa"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/pem"
"math/big"
"net"
"testing"
"time"

"github.com/Shopify/sarama"
)

func TestTLS(t *testing.T) {
_, err := NewKafkaProducer(nil, NewStats(), &Config{
Kafka: Kafka{
EnableTLS: true,
ClientCert: "",
ClientKey: "",
},
})
if err == nil || err.Error() != "please specify client_certificate" {
t.Fatal("expected fail:", err)
}
_, err = NewKafkaProducer(nil, NewStats(), &Config{
Kafka: Kafka{
EnableTLS: true,
ClientCert: "foo",
ClientKey: "",
},
})
if err == nil || err.Error() != "please specify private_key" {
t.Fatal("expected fail:", err)
}
_, err = NewKafkaProducer(nil, NewStats(), &Config{
Kafka: Kafka{
EnableTLS: true,
ClientCert: "foo",
ClientKey: "bar",
},
})
if err == nil || err.Error() != "tls: failed to find any PEM data in certificate input" {
t.Fatal("expected fail:", err)
}

cakey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
t.Fatal(err)
}

clientkey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
t.Fatal(err)
}

hostkey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
t.Fatal(err)
}

nvb := time.Now().Add(-1 * time.Hour)
nva := time.Now().Add(1 * time.Hour)

caTemplate := &x509.Certificate{
Subject: pkix.Name{CommonName: "ca"},
Issuer: pkix.Name{CommonName: "ca"},
SerialNumber: big.NewInt(0),
NotAfter: nva,
NotBefore: nvb,
IsCA: true,
BasicConstraintsValid: true,
KeyUsage: x509.KeyUsageCertSign,
}
caDer, err := x509.CreateCertificate(rand.Reader, caTemplate, caTemplate, &cakey.PublicKey, cakey)
if err != nil {
t.Fatal(err)
}
caFinalCert, err := x509.ParseCertificate(caDer)
if err != nil {
t.Fatal(err)
}

hostDer, err := x509.CreateCertificate(rand.Reader, &x509.Certificate{
Subject: pkix.Name{CommonName: "host"},
Issuer: pkix.Name{CommonName: "ca"},
IPAddresses: []net.IP{net.IPv4(127, 0, 0, 1)},
SerialNumber: big.NewInt(0),
NotAfter: nva,
NotBefore: nvb,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth},
}, caFinalCert, &hostkey.PublicKey, cakey)
if err != nil {
t.Fatal(err)
}

clientDer, err := x509.CreateCertificate(rand.Reader, &x509.Certificate{
Subject: pkix.Name{CommonName: "client"},
Issuer: pkix.Name{CommonName: "ca"},
SerialNumber: big.NewInt(0),
NotAfter: nva,
NotBefore: nvb,
ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
}, caFinalCert, &clientkey.PublicKey, cakey)
if err != nil {
t.Fatal(err)
}

clientCertPem := string(pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: clientDer,
}))
clientKeyPem := string(pem.EncodeToMemory(&pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(clientkey),
}))

hostCertPem := string(pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: hostDer,
}))
hostKeyPem := string(pem.EncodeToMemory(&pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(hostkey),
}))

_, err = NewKafkaProducer(nil, NewStats(), &Config{
Kafka: Kafka{
EnableTLS: true,
ClientCert: clientCertPem,
ClientKey: clientKeyPem,
},
})
if err == nil || err.Error() != "brokers are not provided" {
t.Fatal("expected fail:", err)
}

caPem := string(pem.EncodeToMemory(&pem.Block{
Type: "CERTIFICATE",
Bytes: caDer,
}))

pool := x509.NewCertPool()
pool.AddCert(caFinalCert)

// Fail with system CAs
doListenerTLSTest(t, false, &tls.Config{
Certificates: []tls.Certificate{tls.Certificate{
Certificate: [][]byte{hostDer},
PrivateKey: hostkey,
}},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: pool,
}, func(addr string) {
_, err = NewKafkaProducer(nil, NewStats(), &Config{
Kafka: Kafka{
EnableTLS: true,
ClientCert: clientCertPem,
ClientKey: clientKeyPem,
Brokers: []string{addr},
Topic: Topic{
LogMessage: "foo",
},
},
})
if err == nil {
t.Fatal("Should fail as we have the wrong system CA")
}
})

// Fail with no TLS
doListenerTLSTest(t, false, &tls.Config{
Certificates: []tls.Certificate{tls.Certificate{
Certificate: [][]byte{hostDer},
PrivateKey: hostkey,
}},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: pool,
}, func(addr string) {
_, err = NewKafkaProducer(nil, NewStats(), &Config{
Kafka: Kafka{
EnableTLS: false,
Brokers: []string{addr},
Topic: Topic{
LogMessage: "foo",
},
},
})
if err == nil {
t.Fatal("Should fail as we have the wrong system CA")
}
})

// Fail with wrong key for cert
doListenerTLSTest(t, false, &tls.Config{
Certificates: []tls.Certificate{tls.Certificate{
Certificate: [][]byte{hostDer},
PrivateKey: hostkey,
}},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: pool,
}, func(addr string) {
_, err = NewKafkaProducer(nil, NewStats(), &Config{
Kafka: Kafka{
EnableTLS: true,
ClientCert: hostCertPem,
ClientKey: hostKeyPem,
CACerts: []string{caPem},
Brokers: []string{addr},
Topic: Topic{
LogMessage: "foo",
},
},
})
if err == nil {
t.Fatal("wrong type of cert")
}
})

// Try to actually work
doListenerTLSTest(t, true, &tls.Config{
Certificates: []tls.Certificate{tls.Certificate{
Certificate: [][]byte{hostDer},
PrivateKey: hostkey,
}},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: pool,
}, func(addr string) {
_, err = NewKafkaProducer(nil, NewStats(), &Config{
Kafka: Kafka{
EnableTLS: true,
ClientCert: clientCertPem,
ClientKey: clientKeyPem,
CACerts: []string{caPem},
Brokers: []string{addr},
Topic: Topic{
LogMessage: "foo",
},
},
})
if err != nil {
t.Fatal("Expecting to work:", err)
}
})
}

func doListenerTLSTest(t *testing.T, willWork bool, tlsConf *tls.Config, f func(addr string)) {
//sarama.Logger = log.New(os.Stderr, "", log.LstdFlags)

seedListener, err := tls.Listen("tcp", "127.0.0.1:0", tlsConf)
if err != nil {
t.Fatal("cannot open listener", err)
}

var childT *testing.T
if willWork {
childT = t
} else {
childT = &testing.T{} // we want to swallow errors
}

seed := sarama.NewMockBrokerListener(childT, int32(0), seedListener)
defer seed.Close()

if willWork {
seed.Returns(new(sarama.MetadataResponse))
}

f(seed.Addr())
}
Loading