diff --git a/broker.go b/broker.go index e6da68f71f..4055575d91 100644 --- a/broker.go +++ b/broker.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "encoding/binary" "fmt" + metrics "github.com/rcrowley/go-metrics" "io" "net" "sort" @@ -12,8 +13,6 @@ import ( "sync" "sync/atomic" "time" - - metrics "github.com/rcrowley/go-metrics" ) // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe. @@ -61,6 +60,7 @@ const ( SASLTypeSCRAMSHA256 = "SCRAM-SHA-256" // SASLTypeSCRAMSHA512 represents the SCRAM-SHA-512 mechanism. SASLTypeSCRAMSHA512 = "SCRAM-SHA-512" + SASLTypeGSSAPI = "GSSAPI" // SASLHandshakeV0 is v0 of the Kafka SASL handshake protocol. Client and // server negotiate SASL auth using opaque packets. SASLHandshakeV0 = int16(0) @@ -844,11 +844,17 @@ func (b *Broker) authenticateViaSASL() error { return b.sendAndReceiveSASLOAuth(b.conf.Net.SASL.TokenProvider) case SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512: return b.sendAndReceiveSASLSCRAMv1() + case SASLTypeGSSAPI: + return b.sendAndReceiveKerberos() default: return b.sendAndReceiveSASLPlainAuth() } } +func (b *Broker) sendAndReceiveKerberos() error { + return NewGSSAPIKerberosAuthenticator(&b.conf.Net.SASL.GSSAPI).Authorize(b) +} + func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int16) error { rb := &SaslHandshakeRequest{Mechanism: string(saslType), Version: version} diff --git a/config.go b/config.go index d0cfa62e72..d74d35ea92 100644 --- a/config.go +++ b/config.go @@ -75,6 +75,8 @@ type Config struct { // AccessTokenProvider interface docs for proper implementation // guidelines. TokenProvider AccessTokenProvider + + GSSAPI GSSAPIConfig } // KeepAlive specifies the keep-alive period for an active network connection. @@ -520,9 +522,25 @@ func (c *Config) Validate() error { if c.Net.SASL.SCRAMClientGeneratorFunc == nil { return ConfigurationError("A SCRAMClientGeneratorFunc function must be provided to Net.SASL.SCRAMClientGeneratorFunc") } + case SASLTypeGSSAPI: + if c.Net.SASL.GSSAPI.ServiceName == "" { + return ConfigurationError("Net.SASL.GSSAPI.ServiceName must not be empty when GSS-API mechanism is used") + } + if c.Net.SASL.GSSAPI.KeyTabPath == "" { + return ConfigurationError("Net.SASL.GSSAPI.KeyTabPath must not be empty when GSS-API mechanism is used") + } + if c.Net.SASL.GSSAPI.KerberosConfigPath == "" { + return ConfigurationError("Net.SASL.GSSAPI.KerberosConfigPath must not be empty when GSS-API mechanism is used") + } + if c.Net.SASL.GSSAPI.Username == "" { + return ConfigurationError("Net.SASL.GSSAPI.Username must not be empty when GSS-API mechanism is used") + } + if c.Net.SASL.GSSAPI.Realm == "" { + return ConfigurationError("Net.SASL.GSSAPI.Realm must not be empty when GSS-API mechanism is used") + } default: - msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s` and `%s`", - SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512) + msg := fmt.Sprintf("The SASL mechanism configuration is invalid. Possible values are `%s`, `%s`, `%s`, `%s` and `%s`", + SASLTypeOAuth, SASLTypePlaintext, SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512, SASLTypeGSSAPI) return ConfigurationError(msg) } } diff --git a/config_test.go b/config_test.go index 4aa16860e3..7e1568e1a7 100644 --- a/config_test.go +++ b/config_test.go @@ -91,7 +91,7 @@ func TestNetConfigValidates(t *testing.T) { cfg.Net.SASL.Mechanism = "AnIncorrectSASLMechanism" cfg.Net.SASL.TokenProvider = &DummyTokenProvider{} }, - "The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256` and `SCRAM-SHA-512`"}, + "The SASL mechanism configuration is invalid. Possible values are `OAUTHBEARER`, `PLAIN`, `SCRAM-SHA-256`, `SCRAM-SHA-512` and `GSSAPI`"}, {"SASL.Mechanism.OAUTHBEARER - Missing token provider", func(cfg *Config) { cfg.Net.SASL.Enable = true diff --git a/go.mod b/go.mod index 8cb2e4900b..8c45155ddd 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,8 @@ require ( github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 github.com/eapache/queue v1.1.0 github.com/golang/snappy v0.0.1 // indirect + github.com/hashicorp/go-uuid v1.0.1 // indirect + github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/stretchr/testify v1.3.0 @@ -15,4 +17,8 @@ require ( github.com/xdg/stringprep v1.0.0 // indirect golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5 // indirect golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3 + gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect + gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect + gopkg.in/jcmturner/gokrb5.v7 v7.2.3 + gopkg.in/jcmturner/rpc.v1 v1.1.0 // indirect ) diff --git a/go.sum b/go.sum index 7f6c26fe1d..4dbc6d2276 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,10 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/hashicorp/go-uuid v1.0.1 h1:fv1ep09latC32wFoVwnqcnKJGnMSdBanPczbHAYm1BE= +github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM= +github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41 h1:GeinFsrjWz97fAxVUEd748aV0cYL+I6k44gFJTCVvpU= github.com/pierrec/lz4 v0.0.0-20190327172049-315a67e90e41/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= @@ -37,3 +41,11 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw= +gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM= +gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3 h1:hHMV/yKPwMnJhPuPx7pH2Uw/3Qyf+thJYlisUc44010= +gopkg.in/jcmturner/gokrb5.v7 v7.2.3/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM= +gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU= +gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8= diff --git a/gssapi_kerberos.go b/gssapi_kerberos.go new file mode 100644 index 0000000000..d09a9fa916 --- /dev/null +++ b/gssapi_kerberos.go @@ -0,0 +1,247 @@ +package sarama + +import ( + "encoding/binary" + "encoding/hex" + "fmt" + "github.com/jcmturner/gofork/encoding/asn1" + "gopkg.in/jcmturner/gokrb5.v7/asn1tools" + krb5client "gopkg.in/jcmturner/gokrb5.v7/client" + krb5config "gopkg.in/jcmturner/gokrb5.v7/config" + "gopkg.in/jcmturner/gokrb5.v7/gssapi" + "gopkg.in/jcmturner/gokrb5.v7/iana/chksumtype" + "gopkg.in/jcmturner/gokrb5.v7/iana/keyusage" + "gopkg.in/jcmturner/gokrb5.v7/keytab" + "gopkg.in/jcmturner/gokrb5.v7/messages" + "gopkg.in/jcmturner/gokrb5.v7/types" + "io" + "strings" + "time" +) + +const ( + TOK_ID_KRB_AP_REQ = "0100" + GSS_API_GENERIC_TAG = 0x60 +) + +type GSSAPIConfig struct { + KeyTabPath string + KerberosConfigPath string + ServiceName string + Username string + Realm string +} + +type GSSAPIKerberosAuth struct { + config *GSSAPIConfig + client *krb5client.Client + ticket messages.Ticket + encryptionKey types.EncryptionKey +} + +/* +* +* Appends length in big endian before payload, and send it to kafka +* + */ + +func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) { + length := len(payload) + finalPackage := make([]byte, length+4) //4 byte length header + payload + copy(finalPackage[4:], payload) + binary.BigEndian.PutUint32(finalPackage, uint32(length)) + bytes, err := broker.conn.Write(finalPackage) + if err != nil { + return bytes, err + } + return bytes, nil +} + +/* +* +* Read length (4 bytes) and then read the payload +* + */ + +func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) { + bytesRead := 0 + lengthInBytes := make([]byte, 4) + bytes, err := io.ReadFull(broker.conn, lengthInBytes) + if err != nil { + return nil, bytesRead, err + } + bytesRead += bytes + payloadLength := binary.BigEndian.Uint32(lengthInBytes) + payloadBytes := make([]byte, payloadLength) // buffer for read.. + bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes + if err != nil { + return payloadBytes, bytesRead, err + } + bytesRead += bytes + return payloadBytes, bytesRead, nil +} + +func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum(flags []int) []byte { + a := make([]byte, 24) + binary.LittleEndian.PutUint32(a[:4], 16) + for _, i := range flags { + if i == gssapi.ContextFlagDeleg { + x := make([]byte, 28-len(a)) + a = append(a, x...) + } + f := binary.LittleEndian.Uint32(a[20:24]) + f |= uint32(i) + binary.LittleEndian.PutUint32(a[20:24], f) + } + return a +} + +/* +* +* Construct Kerberos AP_REQ package, conforming to RFC-4120 +* https://tools.ietf.org/html/rfc4120#page-84 +* + */ +func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(client *krb5client.Client, ticket messages.Ticket, sessionKey types.EncryptionKey) []byte { + var GSSAPIFlags = []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf} + auth, _ := types.NewAuthenticator(client.Credentials.Domain(), client.Credentials.CName()) + auth.Cksum = types.Checksum{ + CksumType: chksumtype.GSSAPI, + Checksum: krbAuth.newAuthenticatorChecksum(GSSAPIFlags), + } + APReq, _ := messages.NewAPReq( + ticket, + sessionKey, + auth, + ) + aprBytes, _ := hex.DecodeString(TOK_ID_KRB_AP_REQ) + tb, _ := APReq.Marshal() + aprBytes = append(aprBytes, tb...) + return aprBytes +} + +/* +* +* Append the GSS-API header to the payload, conforming to RFC-2743 +* Section 3.1, Mechanism-Independent Token Format +* +* https://tools.ietf.org/html/rfc2743#page-81 +* +* GSSAPIHeader + +* + */ +func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) []byte { + oidBytes, _ := asn1.Marshal(gssapi.OID(gssapi.OIDKRB5)) + tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload)) + GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...) + GSSHeader = append(GSSHeader, oidBytes...) + GSSPackage := append(GSSHeader, payload...) + return GSSPackage +} + +/* +* +* Create kerberos client used to obtain TGT and TGS tokens +* used gokrb5 library, which is a pure go kerberos client with +* some GSS-API capabilities, and SPNEGO support. Kafka does not use SPNEGO +* it uses pure Kerberos 5 solution (RFC-4121 and RFC-4120). +* + */ +func (krbAuth *GSSAPIKerberosAuth) createKerberosClient() error { + kt, err := keytab.Load(krbAuth.config.KeyTabPath) + if err != nil { + return err + } + cfg, err := krb5config.Load(krbAuth.config.KerberosConfigPath) + if err != nil { + return err + } + krbAuth.client = krb5client.NewClientWithKeytab(krbAuth.config.Username, krbAuth.config.Realm, kt, cfg) + return nil +} + +/* This does the handshake for authorization */ +func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error { + + err := krbAuth.createKerberosClient() + if err != nil { + Logger.Printf("Kerberos client error: %s", err) + } + // Construct SPN using serviceName and host + // SPN format: / + + host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part + spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host) + + ticket, encryptionKey, err := krbAuth.client.GetServiceTicket(spn) + if err != nil { + Logger.Printf("Error getting Kerberos service ticket : %s", err) + return err + } + + krbAuth.ticket = ticket + krbAuth.encryptionKey = encryptionKey + + aprBytes := krbAuth.createKrb5Token(krbAuth.client, krbAuth.ticket, krbAuth.encryptionKey) + GSSPackage := krbAuth.appendGSSAPIHeader(aprBytes) + + // Send kerberos token with GSSAPI frame. + requestTime := time.Now() + bytesWritten, err := krbAuth.writePackage(broker, GSSPackage) + if err != nil { + Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err) + return err + } + broker.updateOutgoingCommunicationMetrics(bytesWritten) + + wrapBytes, bytesRead, err := krbAuth.readPackage(broker) + + requestLatency := time.Since(requestTime) + broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency) + + if err != nil { + Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err) + return err + } + + wrapTokenReq := gssapi.WrapToken{} + err = wrapTokenReq.Unmarshal(wrapBytes, true) + if err != nil { + Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err) + return err + } + + // Validate response. + isValid, err := wrapTokenReq.Verify(krbAuth.encryptionKey, keyusage.GSSAPI_ACCEPTOR_SEAL) + if !isValid { + Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s", err) + return err + } + + // Reply to server + wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encryptionKey) + if err != nil { + Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s", err) + return err + } + wrapResponseBytes, err := wrapTokenResponse.Marshal() + if err != nil { + Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s", err) + return err + } + + bytesWritten, err = krbAuth.writePackage(broker, wrapResponseBytes) + if err != nil { + Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err) + return err + } + broker.updateOutgoingCommunicationMetrics(bytesWritten) + // If we reach this, we were already authenticated to kafka broker using kerberos. + return nil +} + +func NewGSSAPIKerberosAuthenticator(config *GSSAPIConfig) *GSSAPIKerberosAuth { + return &GSSAPIKerberosAuth{ + config: config, + } +}