-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Ruben Vargas <ruben.vp8510@gmail.com>
- Loading branch information
1 parent
ea9ab1c
commit eefdd51
Showing
6 changed files
with
294 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,247 @@ | ||
package sarama | ||
|
||
import ( | ||
"encoding/binary" | ||
"encoding/hex" | ||
"fmt" | ||
"github.com/jcmturner/gofork/encoding/asn1" | ||
"gopkg.in/jcmturner/gokrb5.v7/asn1tools" | ||
krb5client "gopkg.in/jcmturner/gokrb5.v7/client" | ||
krb5config "gopkg.in/jcmturner/gokrb5.v7/config" | ||
"gopkg.in/jcmturner/gokrb5.v7/gssapi" | ||
"gopkg.in/jcmturner/gokrb5.v7/iana/chksumtype" | ||
"gopkg.in/jcmturner/gokrb5.v7/iana/keyusage" | ||
"gopkg.in/jcmturner/gokrb5.v7/keytab" | ||
"gopkg.in/jcmturner/gokrb5.v7/messages" | ||
"gopkg.in/jcmturner/gokrb5.v7/types" | ||
"io" | ||
"strings" | ||
"time" | ||
) | ||
|
||
const ( | ||
TOK_ID_KRB_AP_REQ = "0100" | ||
GSS_API_GENERIC_TAG = 0x60 | ||
) | ||
|
||
type GSSAPIConfig struct { | ||
KeyTabPath string | ||
KerberosConfigPath string | ||
ServiceName string | ||
Username string | ||
Realm string | ||
} | ||
|
||
type GSSAPIKerberosAuth struct { | ||
config *GSSAPIConfig | ||
client *krb5client.Client | ||
ticket messages.Ticket | ||
encryptionKey types.EncryptionKey | ||
} | ||
|
||
/* | ||
* | ||
* Appends length in big endian before payload, and send it to kafka | ||
* | ||
*/ | ||
|
||
func (krbAuth *GSSAPIKerberosAuth) writePackage(broker *Broker, payload []byte) (int, error) { | ||
length := len(payload) | ||
finalPackage := make([]byte, length+4) //4 byte length header + payload | ||
copy(finalPackage[4:], payload) | ||
binary.BigEndian.PutUint32(finalPackage, uint32(length)) | ||
bytes, err := broker.conn.Write(finalPackage) | ||
if err != nil { | ||
return bytes, err | ||
} | ||
return bytes, nil | ||
} | ||
|
||
/* | ||
* | ||
* Read length (4 bytes) and then read the payload | ||
* | ||
*/ | ||
|
||
func (krbAuth *GSSAPIKerberosAuth) readPackage(broker *Broker) ([]byte, int, error) { | ||
bytesRead := 0 | ||
lengthInBytes := make([]byte, 4) | ||
bytes, err := io.ReadFull(broker.conn, lengthInBytes) | ||
if err != nil { | ||
return nil, bytesRead, err | ||
} | ||
bytesRead += bytes | ||
payloadLength := binary.BigEndian.Uint32(lengthInBytes) | ||
payloadBytes := make([]byte, payloadLength) // buffer for read.. | ||
bytes, err = io.ReadFull(broker.conn, payloadBytes) // read bytes | ||
if err != nil { | ||
return payloadBytes, bytesRead, err | ||
} | ||
bytesRead += bytes | ||
return payloadBytes, bytesRead, nil | ||
} | ||
|
||
func (krbAuth *GSSAPIKerberosAuth) newAuthenticatorChecksum(flags []int) []byte { | ||
a := make([]byte, 24) | ||
binary.LittleEndian.PutUint32(a[:4], 16) | ||
for _, i := range flags { | ||
if i == gssapi.ContextFlagDeleg { | ||
x := make([]byte, 28-len(a)) | ||
a = append(a, x...) | ||
} | ||
f := binary.LittleEndian.Uint32(a[20:24]) | ||
f |= uint32(i) | ||
binary.LittleEndian.PutUint32(a[20:24], f) | ||
} | ||
return a | ||
} | ||
|
||
/* | ||
* | ||
* Construct Kerberos AP_REQ package, conforming to RFC-4120 | ||
* https://tools.ietf.org/html/rfc4120#page-84 | ||
* | ||
*/ | ||
func (krbAuth *GSSAPIKerberosAuth) createKrb5Token(client *krb5client.Client, ticket messages.Ticket, sessionKey types.EncryptionKey) []byte { | ||
var GSSAPIFlags = []int{gssapi.ContextFlagInteg, gssapi.ContextFlagConf} | ||
auth, _ := types.NewAuthenticator(client.Credentials.Domain(), client.Credentials.CName()) | ||
auth.Cksum = types.Checksum{ | ||
CksumType: chksumtype.GSSAPI, | ||
Checksum: krbAuth.newAuthenticatorChecksum(GSSAPIFlags), | ||
} | ||
APReq, _ := messages.NewAPReq( | ||
ticket, | ||
sessionKey, | ||
auth, | ||
) | ||
aprBytes, _ := hex.DecodeString(TOK_ID_KRB_AP_REQ) | ||
tb, _ := APReq.Marshal() | ||
aprBytes = append(aprBytes, tb...) | ||
return aprBytes | ||
} | ||
|
||
/* | ||
* | ||
* Append the GSS-API header to the payload, conforming to RFC-2743 | ||
* Section 3.1, Mechanism-Independent Token Format | ||
* | ||
* https://tools.ietf.org/html/rfc2743#page-81 | ||
* | ||
* GSSAPIHeader + <specific mechanism payload> | ||
* | ||
*/ | ||
func (krbAuth *GSSAPIKerberosAuth) appendGSSAPIHeader(payload []byte) []byte { | ||
oidBytes, _ := asn1.Marshal(gssapi.OID(gssapi.OIDKRB5)) | ||
tkoLengthBytes := asn1tools.MarshalLengthBytes(len(oidBytes) + len(payload)) | ||
GSSHeader := append([]byte{GSS_API_GENERIC_TAG}, tkoLengthBytes...) | ||
GSSHeader = append(GSSHeader, oidBytes...) | ||
GSSPackage := append(GSSHeader, payload...) | ||
return GSSPackage | ||
} | ||
|
||
/* | ||
* | ||
* Create kerberos client used to obtain TGT and TGS tokens | ||
* used gokrb5 library, which is a pure go kerberos client with | ||
* some GSS-API capabilities, and SPNEGO support. Kafka does not use SPNEGO | ||
* it uses pure Kerberos 5 solution (RFC-4121 and RFC-4120). | ||
* | ||
*/ | ||
func (krbAuth *GSSAPIKerberosAuth) createKerberosClient() error { | ||
kt, err := keytab.Load(krbAuth.config.KeyTabPath) | ||
if err != nil { | ||
return err | ||
} | ||
cfg, err := krb5config.Load(krbAuth.config.KerberosConfigPath) | ||
if err != nil { | ||
return err | ||
} | ||
krbAuth.client = krb5client.NewClientWithKeytab(krbAuth.config.Username, krbAuth.config.Realm, kt, cfg) | ||
return nil | ||
} | ||
|
||
/* This does the handshake for authorization */ | ||
func (krbAuth *GSSAPIKerberosAuth) Authorize(broker *Broker) error { | ||
|
||
err := krbAuth.createKerberosClient() | ||
if err != nil { | ||
Logger.Printf("Kerberos client error: %s", err) | ||
} | ||
// Construct SPN using serviceName and host | ||
// SPN format: <SERVICE>/<FQDN> | ||
|
||
host := strings.SplitN(broker.addr, ":", 2)[0] // Strip port part | ||
spn := fmt.Sprintf("%s/%s", broker.conf.Net.SASL.GSSAPI.ServiceName, host) | ||
|
||
ticket, encryptionKey, err := krbAuth.client.GetServiceTicket(spn) | ||
if err != nil { | ||
Logger.Printf("Error getting Kerberos service ticket : %s", err) | ||
return err | ||
} | ||
|
||
krbAuth.ticket = ticket | ||
krbAuth.encryptionKey = encryptionKey | ||
|
||
aprBytes := krbAuth.createKrb5Token(krbAuth.client, krbAuth.ticket, krbAuth.encryptionKey) | ||
GSSPackage := krbAuth.appendGSSAPIHeader(aprBytes) | ||
|
||
// Send kerberos token with GSSAPI frame. | ||
requestTime := time.Now() | ||
bytesWritten, err := krbAuth.writePackage(broker, GSSPackage) | ||
if err != nil { | ||
Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err) | ||
return err | ||
} | ||
broker.updateOutgoingCommunicationMetrics(bytesWritten) | ||
|
||
wrapBytes, bytesRead, err := krbAuth.readPackage(broker) | ||
|
||
requestLatency := time.Since(requestTime) | ||
broker.updateIncomingCommunicationMetrics(bytesRead, requestLatency) | ||
|
||
if err != nil { | ||
Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err) | ||
return err | ||
} | ||
|
||
wrapTokenReq := gssapi.WrapToken{} | ||
err = wrapTokenReq.Unmarshal(wrapBytes, true) | ||
if err != nil { | ||
Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err) | ||
return err | ||
} | ||
|
||
// Validate response. | ||
isValid, err := wrapTokenReq.Verify(krbAuth.encryptionKey, keyusage.GSSAPI_ACCEPTOR_SEAL) | ||
if !isValid { | ||
Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s", err) | ||
return err | ||
} | ||
|
||
// Reply to server | ||
wrapTokenResponse, err := gssapi.NewInitiatorWrapToken(wrapTokenReq.Payload, krbAuth.encryptionKey) | ||
if err != nil { | ||
Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s", err) | ||
return err | ||
} | ||
wrapResponseBytes, err := wrapTokenResponse.Marshal() | ||
if err != nil { | ||
Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s", err) | ||
return err | ||
} | ||
|
||
bytesWritten, err = krbAuth.writePackage(broker, wrapResponseBytes) | ||
if err != nil { | ||
Logger.Printf("Error while performing GSSAPI Kerberos Authentication: %s\n", err) | ||
return err | ||
} | ||
broker.updateOutgoingCommunicationMetrics(bytesWritten) | ||
// If we reach this, we were already authenticated to kafka broker using kerberos. | ||
return nil | ||
} | ||
|
||
func NewGSSAPIKerberosAuthenticator(config *GSSAPIConfig) *GSSAPIKerberosAuth { | ||
return &GSSAPIKerberosAuth{ | ||
config: config, | ||
} | ||
} |