Skip to content

Commit

Permalink
feat: support hdfs kerberos (#277)
Browse files Browse the repository at this point in the history
  • Loading branch information
veezhang authored Jun 14, 2023
1 parent 6b7e05c commit dd462b5
Show file tree
Hide file tree
Showing 8 changed files with 441 additions and 99 deletions.
16 changes: 15 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -264,14 +264,28 @@ sftp:
It only needs to be configured for hdfs data sources.

```yaml
sftp:
hdfs:
address: 192.168.0.10:8020
user: <user>
servicePrincipalName: <Kerberos Service Principal Name>
krb5ConfigFile: <Kerberos config file>
ccacheFile: <Kerberos ccache file>
keyTabFile: <Kerberos keytab file>
password: <Kerberos password>
dataTransferProtection: <Kerberos Data Transfer Protection>
disablePAFXFAST: false
path: <path of file>
```

* `address`: **Required**. The address of hdfs service.
* `user`: **Optional**. The user of hdfs service.
* `servicePrincipalName`: **Optional**. The kerberos service principal name of hdfs service when enable kerberos.
* `krb5ConfigFile`: **Optional**. The kerberos config file of hdfs service when enable kerberos, default is `/etc/krb5.conf`.
* `ccacheFile`: **Optional**. The ccache file of hdfs service when enable kerberos.
* `keyTabFile`: **Optional**. The keytab file of hdfs service when enable kerberos.
* `password`: **Optional**. The kerberos password of hdfs service when enable kerberos.
* `dataTransferProtection`: **Optional**. The data transfer protection of hdfs service.
* `disablePAFXFAST`: **Optional**. Whether to prohibit the client to use PA_FX_FAST.
* `path`: **Required**. The path of file in the sftp service.

#### batch
Expand Down
190 changes: 101 additions & 89 deletions docs/configuration-reference.md

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/dustin/go-humanize v1.0.0
github.com/fclairamb/ftpserverlib v0.21.0
github.com/golang/mock v1.6.0
github.com/jcmturner/gokrb5/v8 v8.4.2
github.com/jlaffaye/ftp v0.1.0
github.com/onsi/ginkgo/v2 v2.4.0
github.com/onsi/gomega v1.24.0
Expand Down Expand Up @@ -40,7 +41,6 @@ require (
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/jcmturner/goidentity/v6 v6.0.1 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/fs v0.1.0 // indirect
Expand Down
100 changes: 95 additions & 5 deletions pkg/source/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,34 @@ package source

import (
"fmt"
"os"
"os/user"
"strings"

"github.com/colinmarc/hdfs/v2"
"github.com/colinmarc/hdfs/v2/hadoopconf"
krb "github.com/jcmturner/gokrb5/v8/client"
"github.com/jcmturner/gokrb5/v8/config"
"github.com/jcmturner/gokrb5/v8/credentials"
"github.com/jcmturner/gokrb5/v8/keytab"
)

const defaultKrb5ConfigFile = "/etc/krb5.conf"

var _ Source = (*hdfsSource)(nil)

type (
HDFSConfig struct {
Address string `yaml:"address,omitempty"`
User string `yaml:"user,omitempty"`
Path string `yaml:"path,omitempty"`
Address string `yaml:"address,omitempty"`
User string `yaml:"user,omitempty"`
ServicePrincipalName string `yaml:"servicePrincipalName,omitempty"`
Krb5ConfigFile string `yaml:"krb5ConfigFile,omitempty"`
CCacheFile string `yaml:"ccacheFile,omitempty"`
KeyTabFile string `yaml:"keyTabFile,omitempty"`
Password string `yaml:"password,omitempty"`
DataTransferProtection string `yaml:"dataTransferProtection,omitempty"`
DisablePAFXFAST bool `yaml:"disablePAFXFAST,omitempty"`
Path string `yaml:"path,omitempty"`
}

hdfsSource struct {
Expand All @@ -35,7 +50,6 @@ func (s *hdfsSource) Name() string {
}

func (s *hdfsSource) Open() error {
// TODO: support kerberos
conf, err := hadoopconf.LoadFromEnvironment()
if err != nil {
return err
Expand All @@ -45,7 +59,20 @@ func (s *hdfsSource) Open() error {
if s.c.HDFS.Address != "" {
options.Addresses = strings.Split(s.c.HDFS.Address, ",")
}
options.User = s.c.HDFS.User

if s.c.HDFS.ServicePrincipalName != "" {
options.KerberosClient, err = s.c.HDFS.getKerberosClient()
if err != nil {
return err
}

options.KerberosServicePrincipleName = s.c.HDFS.ServicePrincipalName
if s.c.HDFS.DataTransferProtection != "" {
options.DataTransferProtection = s.c.HDFS.DataTransferProtection
}
} else {
options.User = s.c.HDFS.User
}

cli, err := hdfs.NewClient(options)
if err != nil {
Expand Down Expand Up @@ -85,3 +112,66 @@ func (s *hdfsSource) Close() error {
func (c *HDFSConfig) String() string {
return fmt.Sprintf("hdfs %s %s", c.Address, c.Path)
}
func (c *HDFSConfig) getKerberosClient() (*krb.Client, error) {
krb5ConfigFile := c.Krb5ConfigFile
if krb5ConfigFile == "" {
krb5ConfigFile = os.Getenv("KRB5_CONFIG")
}
if krb5ConfigFile == "" {
krb5ConfigFile = defaultKrb5ConfigFile
}
krb5conf, err := config.Load(krb5ConfigFile)
if err != nil {
return nil, err
}

settings := []func(*krb.Settings){
krb.DisablePAFXFAST(c.DisablePAFXFAST),
}

var krb5client *krb.Client
var needLogin = true
if c.Password != "" {
krb5client = krb.NewWithPassword(c.User, krb5conf.LibDefaults.DefaultRealm, c.Password, krb5conf, settings...)
} else if c.KeyTabFile != "" {
var kt *keytab.Keytab
if kt, err = keytab.Load(c.KeyTabFile); err != nil {
return nil, err
}
krb5client = krb.NewWithKeytab(c.User, krb5conf.LibDefaults.DefaultRealm, kt, krb5conf, settings...)
} else {
ccacheFile := c.CCacheFile
if ccacheFile == "" {
ccacheFile = os.Getenv("KRB5CCNAME")
if strings.Contains(ccacheFile, ":") {
if strings.HasPrefix(ccacheFile, "FILE:") {
ccacheFile = strings.SplitN(ccacheFile, ":", 2)[1]
}
}
}

if ccacheFile == "" {
var u *user.User
if u, err = user.Current(); err != nil {
return nil, err
}
ccacheFile = fmt.Sprintf("/tmp/krb5cc_%s", u.Uid)
}
var ccache *credentials.CCache
if ccache, err = credentials.LoadCCache(ccacheFile); err != nil {
return nil, err
}
krb5client, err = krb.NewFromCCache(ccache, krb5conf, settings...)
if err != nil {
return nil, err
}
needLogin = false
}

if needLogin {
if err = krb5client.Login(); err != nil {
return nil, err
}
}
return krb5client, nil
}
Loading

0 comments on commit dd462b5

Please sign in to comment.