Skip to content

Commit

Permalink
Add kerberos support
Browse files Browse the repository at this point in the history
Signed-off-by: Ruben Vargas <ruben.vp8510@gmail.com>
  • Loading branch information
rubenvp8510 committed May 12, 2019
1 parent a5ecebc commit 3ec45e5
Show file tree
Hide file tree
Showing 11 changed files with 916 additions and 53 deletions.
16 changes: 14 additions & 2 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"encoding/binary"
"fmt"
metrics "github.com/rcrowley/go-metrics"
"io"
"net"
"sort"
Expand All @@ -12,8 +13,6 @@ import (
"sync"
"sync/atomic"
"time"

metrics "github.com/rcrowley/go-metrics"
)

// Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe.
Expand Down Expand Up @@ -47,6 +46,8 @@ type Broker struct {
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram

kerberosAuthenticator GSSAPIKerberosAuth
}

// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
Expand All @@ -61,6 +62,7 @@ const (
SASLTypeSCRAMSHA256 = "SCRAM-SHA-256"
// SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism.
SASLTypeSCRAMSHA512 = "SCRAM-SHA-512"
SASLTypeGSSAPI = "GSSAPI"
// SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and
// server negotiate SASL auth using opaque packets.
SASLHandshakeV0 = int16(0)
Expand Down Expand Up @@ -844,11 +846,21 @@ func (b *Broker) authenticateViaSASL() error {
return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider)
case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512:
return b.sendAndReceiveSASLSCRAMv1()
case SASLTypeGSSAPI:
return b.sendAndReceiveKerberos()
default:
return b.sendAndReceiveSASLPlainAuth()
}
}

func (b *Broker) sendAndReceiveKerberos() error {
b.kerberosAuthenticator.Config = &b.conf.Net.SASL.GSSAPI
if b.kerberosAuthenticator.NewKerberosClientFunc == nil {
b.kerberosAuthenticator.NewKerberosClientFunc = NewKerberosClient
}
return b.kerberosAuthenticator.Authorize(b)
}

func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error {
rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version}

Expand Down
123 changes: 122 additions & 1 deletion broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package sarama
import (
"errors"
"fmt"
"gopkg.in/jcmturner/gokrb5.v7/krberror"
"net"
"reflect"
"testing"
"time"

metrics "github.com/rcrowley/go-metrics"
"github.com/rcrowley/go-metrics"
)

func ExampleBroker() {
Expand Down Expand Up @@ -477,6 +478,126 @@ func TestSASLPlainAuth(t *testing.T) {
}
}

func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {

testTable := []struct {
name string
error error
mockKerberosClient bool
errorStage string
badResponse bool
badKeyChecksum bool
}{
{
name: "Kerberos authentication success",
error: nil,
mockKerberosClient: true,
},
{
name: "Kerberos login fails",
error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
"kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
"cation information was invalid - PREAUTH_FAILED"),
mockKerberosClient: true,
errorStage: "login",
},
{
name: "Kerberos service ticket fails",
error: krberror.NewErrorf(krberror.KDCError, "KDC_Error: AS Exchange Error: "+
"kerberos error response from KDC: KRB Error: (24) KDC_ERR_PREAUTH_FAILED Pre-authenti"+
"cation information was invalid - PREAUTH_FAILED"),
mockKerberosClient: true,
errorStage: "service_ticket",
},
{
name: "Kerberos client creation fails",
error: errors.New("configuration file could not be opened: krb5.conf open krb5.conf: no such file or directory"),
},
{
name: "Bad server response, unmarshall key error",
error: errors.New("bytes shorter than header length"),
badResponse: true,
mockKerberosClient: true,
},
{
name: "Bad token checksum",
error: errors.New("checksum mismatch. Computed: 39feb88ac2459f2b77738493, Contained in token: ffffffffffffffff00000000"),
badResponse: false,
badKeyChecksum: true,
mockKerberosClient: true,
},
}
for i, test := range testTable {
mockBroker := NewMockBroker(t, 0)
// broker executes SASL requests against mockBroker

mockBroker.SetGSSAPIHandler(func(bytes []byte) []byte {
return nil
})
broker := NewBroker(mockBroker.Addr())
broker.requestRate = metrics.NilMeter{}
broker.outgoingByteRate = metrics.NilMeter{}
broker.incomingByteRate = metrics.NilMeter{}
broker.requestSize = metrics.NilHistogram{}
broker.responseSize = metrics.NilHistogram{}
broker.responseRate = metrics.NilMeter{}
broker.requestLatency = metrics.NilHistogram{}
conf := NewConfig()
conf.Net.SASL.Mechanism = SASLTypeGSSAPI
conf.Net.SASL.GSSAPI.ServiceName = "kafka"
conf.Net.SASL.GSSAPI.KerberosConfigPath = "krb5.conf"
conf.Net.SASL.GSSAPI.Realm = "EXAMPLE.COM"
conf.Net.SASL.GSSAPI.Username = "kafka"
conf.Net.SASL.GSSAPI.Password = "kafka"
conf.Net.SASL.GSSAPI.KeyTabPath = "kafka.keytab"
conf.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
broker.conf = conf
broker.conf.Version = V1_0_0_0
dialer := net.Dialer{
Timeout: conf.Net.DialTimeout,
KeepAlive: conf.Net.KeepAlive,
LocalAddr: conf.Net.LocalAddr,
}

conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())

if err != nil {
t.Fatal(err)
}

gssapiHandler := KafkaGSSAPIHandler{
client: &MockKerberosClient{},
badResponse: test.badResponse,
badKeyChecksum: test.badKeyChecksum,
}
mockBroker.SetGSSAPIHandler(gssapiHandler.MockKafkaGSSAPI)
broker.conn = conn
if test.mockKerberosClient {
broker.kerberosAuthenticator.NewKerberosClientFunc = func(config *GSSAPIConfig) (KerberosClient, error) {
return &MockKerberosClient{
mockError: test.error,
errorStage: test.errorStage,
}, nil
}
} else {
broker.kerberosAuthenticator.NewKerberosClientFunc = nil
}

err = broker.authenticateViaSASL()

if err != nil && test.error != nil {
if test.error.Error() != err.Error() {
t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
}
} else if (err == nil && test.error != nil) || (err != nil && test.error == nil) {
t.Errorf("[%d] Expected error:%s, got:%s.", i, test.error, err)
}

mockBroker.Close()
}

}

func TestBuildClientInitialResponse(t *testing.T) {

testTable := []struct {
Expand Down
33 changes: 31 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type Config struct {
// AccessTokenProvider interface docs for proper implementation
// guidelines.
TokenProvider AccessTokenProvider

GSSAPI GSSAPIConfig
}

// KeepAlive specifies the keep-alive period for an active network connection.
Expand Down Expand Up @@ -527,9 +529,36 @@ func (c *Config) Validate() error {
if c.Net.SASL.SCRAMClientGeneratorFunc == nil {
return ConfigurationError("A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc")
}
case SASLTypeGSSAPI:
if c.Net.SASL.GSSAPI.ServiceName == "" {
return ConfigurationError("Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used")
}

if c.Net.SASL.GSSAPI.AuthType == KRB5_USER_AUTH {
if c.Net.SASL.GSSAPI.Password == "" {
return ConfigurationError("Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
"mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH")
}
} else if c.Net.SASL.GSSAPI.AuthType == KRB5_KEYTAB_AUTH {
if c.Net.SASL.GSSAPI.KeyTabPath == "" {
return ConfigurationError("Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
" and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH")
}
} else {
return ConfigurationError("Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH")
}
if c.Net.SASL.GSSAPI.KerberosConfigPath == "" {
return ConfigurationError("Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used")
}
if c.Net.SASL.GSSAPI.Username == "" {
return ConfigurationError("Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used")
}
if c.Net.SASL.GSSAPI.Realm == "" {
return ConfigurationError("Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used")
}
default:
msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s` and `%s`",
SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512)
msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s`, `%s` and `%s`",
SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512, SASLTypeGSSAPI)
return ConfigurationError(msg)
}
}
Expand Down
82 changes: 81 additions & 1 deletion config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func TestNetConfigValidates(t *testing.T) {
cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism"
cfg.Net.SASL.TokenProvider = &DummyTokenProvider{}
},
"The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256` and `SCRAM-SHA-512`"},
"The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`"},
{"SASL.Mechanism.OAUTHBEARER - Missing token provider",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
Expand All @@ -117,6 +117,86 @@ func TestNetConfigValidates(t *testing.T) {
cfg.Net.SASL.Password = "stong_password"
},
"A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc"},
{"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing password field",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
cfg.Net.SASL.GSSAPI.Username = "sarama"
cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
cfg.Net.SASL.GSSAPI.Realm = "kafka"
cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
},
"Net.SASL.GSSAPI.Password must not be empty when GSS-API " +
"mechanism is used and Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH"},
{"SASL.Mechanism GSSAPI (Kerberos) - Using User/Password, Missing KeyTabPath field",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH
cfg.Net.SASL.GSSAPI.Username = "sarama"
cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
cfg.Net.SASL.GSSAPI.Realm = "kafka"
cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
},
"Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used" +
" and Net.SASL.GSSAPI.AuthType = KRB5_KEYTAB_AUTH"},
{"SASL.Mechanism GSSAPI (Kerberos) - Missing username",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
cfg.Net.SASL.GSSAPI.Password = "sarama"
cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
cfg.Net.SASL.GSSAPI.Realm = "kafka"
cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
},
"Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used"},
{"SASL.Mechanism GSSAPI (Kerberos) - Missing ServiceName",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
cfg.Net.SASL.GSSAPI.Username = "sarama"
cfg.Net.SASL.GSSAPI.Password = "sarama"
cfg.Net.SASL.GSSAPI.Realm = "kafka"
cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
},
"Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used"},
{"SASL.Mechanism GSSAPI (Kerberos) - Missing AuthType",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.Username = "sarama"
cfg.Net.SASL.GSSAPI.Password = "sarama"
cfg.Net.SASL.GSSAPI.Realm = "kafka"
cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"
},
"Net.SASL.GSSAPI.AuthType is invalid. Possible values are KRB5_USER_AUTH and KRB5_KEYTAB_AUTH"},
{"SASL.Mechanism GSSAPI (Kerberos) - Missing KerberosConfigPath",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
cfg.Net.SASL.GSSAPI.Username = "sarama"
cfg.Net.SASL.GSSAPI.Password = "sarama"
cfg.Net.SASL.GSSAPI.Realm = "kafka"
},
"Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used"},
{"SASL.Mechanism GSSAPI (Kerberos) - Missing Realm",
func(cfg *Config) {
cfg.Net.SASL.Enable = true
cfg.Net.SASL.GSSAPI.ServiceName = "kafka"
cfg.Net.SASL.Mechanism = SASLTypeGSSAPI
cfg.Net.SASL.GSSAPI.AuthType = KRB5_USER_AUTH
cfg.Net.SASL.GSSAPI.Username = "sarama"
cfg.Net.SASL.GSSAPI.Password = "sarama"
cfg.Net.SASL.GSSAPI.KerberosConfigPath = "/etc/krb5.conf"

},
"Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used"},
}

for i, test := range tests {
Expand Down
6 changes: 6 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ require (
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
github.com/eapache/queue v1.1.0
github.com/golang/snappy v0.0.1 // indirect
github.com/hashicorp/go-uuid v1.0.1 // indirect
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
github.com/stretchr/testify v1.3.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/xdg/stringprep v1.0.0 // indirect
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 // indirect
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect
gopkg.in/jcmturner/gokrb5.v7 v7.2.3
gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect
)
12 changes: 12 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU=
github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
Expand All @@ -37,3 +41,11 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010=
gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
Loading

0 comments on commit 3ec45e5

Please sign in to comment.