Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

config: add possibility to specify tls as bytes #147

Merged
merged 1 commit into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/reference-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ kafka:
caFilepath: ""
certFilepath: ""
keyFilepath: ""
# base64 encoded tls CA, cannot be set if 'caFilepath' is set
ca: ""
# base64 encoded tls cert, cannot be set if 'certFilepath' is set
cert: ""
# base64 encoded tls key, cannot be set if 'keyFilepath' is set
key: ""
passphrase: ""
insecureSkipTlsVerify: false

Expand Down
36 changes: 25 additions & 11 deletions kafka/client_config_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,14 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {
var caCertPool *x509.CertPool
if cfg.TLS.Enabled {
// Root CA
if cfg.TLS.CaFilepath != "" {
ca, err := ioutil.ReadFile(cfg.TLS.CaFilepath)
if err != nil {
return nil, err
if cfg.TLS.CaFilepath != "" || len(cfg.TLS.Ca) > 0 {
ca := cfg.TLS.Ca
if cfg.TLS.CaFilepath != "" {
caBytes, err := ioutil.ReadFile(cfg.TLS.CaFilepath)
if err != nil {
return nil, fmt.Errorf("failed to load ca cert: %w", err)
}
ca = caBytes
}
caCertPool = x509.NewCertPool()
isSuccessful := caCertPool.AppendCertsFromPEM(ca)
Expand All @@ -128,16 +132,26 @@ func NewKgoConfig(cfg Config, logger *zap.Logger) ([]kgo.Opt, error) {

// If configured load TLS cert & key - Mutual TLS
var certificates []tls.Certificate
if cfg.TLS.CertFilepath != "" && cfg.TLS.KeyFilepath != "" {
hasCertFile := cfg.TLS.CertFilepath != "" || len(cfg.TLS.Cert) > 0
hasKeyFile := cfg.TLS.KeyFilepath != "" || len(cfg.TLS.Key) > 0
if hasCertFile || hasKeyFile {
cert := cfg.TLS.Cert
privateKey := cfg.TLS.Key
// 1. Read certificates
cert, err := ioutil.ReadFile(cfg.TLS.CertFilepath)
if err != nil {
return nil, fmt.Errorf("failed to TLS certificate: %w", err)
if cfg.TLS.CertFilepath != "" {
certBytes, err := ioutil.ReadFile(cfg.TLS.CertFilepath)
if err != nil {
return nil, fmt.Errorf("failed to TLS certificate: %w", err)
}
cert = certBytes
}

privateKey, err := ioutil.ReadFile(cfg.TLS.KeyFilepath)
if err != nil {
return nil, fmt.Errorf("failed to read TLS key: %w", err)
if cfg.TLS.KeyFilepath != "" {
keyBytes, err := ioutil.ReadFile(cfg.TLS.KeyFilepath)
if err != nil {
return nil, fmt.Errorf("failed to read TLS key: %w", err)
}
privateKey = keyBytes
}

// 2. Check if private key needs to be decrypted. Decrypt it if passphrase is given, otherwise return error
Expand Down
15 changes: 15 additions & 0 deletions kafka/config_tls.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package kafka

import "fmt"

// TLSConfig to connect to Kafka via TLS
type TLSConfig struct {
Enabled bool `koanf:"enabled"`
CaFilepath string `koanf:"caFilepath"`
CertFilepath string `koanf:"certFilepath"`
KeyFilepath string `koanf:"keyFilepath"`
Ca []byte `koanf:"ca"`
Cert []byte `koanf:"cert"`
Key []byte `koanf:"key"`
Passphrase string `koanf:"passphrase"`
InsecureSkipTLSVerify bool `koanf:"insecureSkipTlsVerify"`
}
Expand All @@ -15,5 +20,15 @@ func (c *TLSConfig) SetDefaults() {
}

func (c *TLSConfig) Validate() error {
if len(c.CaFilepath) > 0 && len(c.Ca) > 0 {
return fmt.Errorf("config keys 'caFilepath' and 'ca' are both set. only one can be used at the same time")
}
if len(c.CertFilepath) > 0 && len(c.Cert) > 0 {
return fmt.Errorf("config keys 'certFilepath' and 'cert' are both set. only one can be used at the same time")
}

if len(c.KeyFilepath) > 0 && len(c.Key) > 0 {
return fmt.Errorf("config keys 'keyFilepath' and 'key' are both set. only one can be used at the same time")
}
return nil
}