Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Etcdv3 pr686 #761

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 102 additions & 65 deletions Gopkg.lock

Large diffs are not rendered by default.

4 changes: 0 additions & 4 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ ignored = ["github.com/kubernetes/repo-infra/kazel"]
name = "github.com/cloudflare/cloudflare-go"
version = "0.7.3"

[[constraint]]
name = "github.com/coreos/etcd"
version = "~3.2.15"

[[constraint]]
name = "github.com/digitalocean/godo"
version = "~1.1.0"
Expand Down
4 changes: 2 additions & 2 deletions endpoint/labels.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func NewLabels() Labels {
// NewLabelsFromString constructs endpoints labels from a provided format string
// if heritage set to another value is found then error is returned
// no heritage automatically assumes is not owned by external-dns and returns invalidHeritage error
func NewLabelsFromString(labelText string) (Labels, error) {
endpointLabels := map[string]string{}
func NewLabelsFromString(existingLabels map[string]string, labelText string) (Labels, error) {
endpointLabels := existingLabels
labelText = strings.Trim(labelText, "\"") // drop quotes
tokens := strings.Split(labelText, ",")
foundExternalDNSHeritage := false
Expand Down
12 changes: 6 additions & 6 deletions endpoint/labels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,27 +62,27 @@ func (suite *LabelsSuite) TestSerialize() {
}

func (suite *LabelsSuite) TestDeserialize() {
foo, err := NewLabelsFromString(suite.fooAsText)
foo, err := NewLabelsFromString(NewLabels(), suite.fooAsText)
suite.NoError(err, "should succeed for valid label text")
suite.Equal(suite.foo, foo, "should reconstruct original label map")

foo, err = NewLabelsFromString(suite.fooAsTextWithQuotes)
foo, err = NewLabelsFromString(NewLabels(), suite.fooAsTextWithQuotes)
suite.NoError(err, "should succeed for valid label text")
suite.Equal(suite.foo, foo, "should reconstruct original label map")

bar, err := NewLabelsFromString(suite.barText)
bar, err := NewLabelsFromString(NewLabels(), suite.barText)
suite.NoError(err, "should succeed for valid label text")
suite.Equal(suite.barTextAsMap, bar, "should reconstruct original label map")

noHeritage, err := NewLabelsFromString(suite.noHeritageText)
noHeritage, err := NewLabelsFromString(NewLabels(), suite.noHeritageText)
suite.Equal(ErrInvalidHeritage, err, "should fail if no heritage is found")
suite.Nil(noHeritage, "should return nil")

wrongHeritage, err := NewLabelsFromString(suite.wrongHeritageText)
wrongHeritage, err := NewLabelsFromString(NewLabels(), suite.wrongHeritageText)
suite.Equal(ErrInvalidHeritage, err, "should fail if wrong heritage is found")
suite.Nil(wrongHeritage, "if error should return nil")

multipleHeritage, err := NewLabelsFromString(suite.multipleHeritageText)
multipleHeritage, err := NewLabelsFromString(NewLabels(), suite.multipleHeritageText)
suite.Equal(ErrInvalidHeritage, err, "should fail if multiple heritage is found")
suite.Nil(multipleHeritage, "if error should return nil")
}
Expand Down
142 changes: 73 additions & 69 deletions provider/coredns.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package provider

import (
"container/list"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand All @@ -26,14 +26,12 @@ import (
"io/ioutil"
"math/rand"
"net"
"net/http"
"os"
"strings"
"time"

etcd "github.com/coreos/etcd/client"
etcdcv3 "github.com/coreos/etcd/clientv3"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"

"github.com/kubernetes-incubator/external-dns/endpoint"
"github.com/kubernetes-incubator/external-dns/plan"
Expand All @@ -43,8 +41,17 @@ func init() {
rand.Seed(time.Now().UnixNano())
}

// skyDNSClient is an interface to work with SkyDNS service records in etcd
type skyDNSClient interface {
const (
priority = 10 // default priority when nothing is set
etcdTimeout = 5 * time.Second

coreDNSPrefix = "/skydns/"

randomPrefixLabel = "prefix"
)

// coreDNSClient is an interface to work with CoreDNS service records in etcd
type coreDNSClient interface {
GetServices(prefix string) ([]*Service, error)
SaveService(value *Service) error
DeleteService(key string) error
Expand All @@ -53,10 +60,10 @@ type skyDNSClient interface {
type coreDNSProvider struct {
dryRun bool
domainFilter DomainFilter
client skyDNSClient
client coreDNSClient
}

// Service represents SkyDNS/CoreDNS etcd record
// Service represents CoreDNS etcd record
type Service struct {
Host string `json:"host,omitempty"`
Port int `json:"port,omitempty"`
Expand All @@ -83,52 +90,56 @@ type Service struct {
}

type etcdClient struct {
api etcd.KeysAPI
client *etcdcv3.Client
ctx context.Context
}

var _ skyDNSClient = etcdClient{}
var _ coreDNSClient = etcdClient{}

// GetService return all Service records stored in etcd stored anywhere under the given key (recursively)
func (c etcdClient) GetServices(prefix string) ([]*Service, error) {
var result []*Service
opts := &etcd.GetOptions{Recursive: true}
data, err := c.api.Get(context.Background(), prefix, opts)
ctx, cancel := context.WithTimeout(c.ctx, etcdTimeout)
defer cancel()

path := prefix
r, err := c.client.Get(ctx, path, etcdcv3.WithPrefix())
if err != nil {
if etcd.IsKeyNotFound(err) {
return nil, nil
}
return nil, err
}

queue := list.New()
queue.PushFront(data.Node)
for queueNode := queue.Front(); queueNode != nil; queueNode = queueNode.Next() {
node := queueNode.Value.(*etcd.Node)
if node.Dir {
for _, childNode := range node.Nodes {
queue.PushBack(childNode)
}
continue
var sx []*Service
bx := make(map[Service]bool)
for _, n := range r.Kvs {
serv := new(Service)
if err := json.Unmarshal(n.Value, serv); err != nil {
return nil, fmt.Errorf("%s: %s", n.Key, err.Error())
}
service := &Service{}
err = json.Unmarshal([]byte(node.Value), service)
if err != nil {
log.Error("Cannot parse JSON value ", node.Value)
b := Service{Host: serv.Host, Port: serv.Port, Priority: serv.Priority, Weight: serv.Weight, Text: serv.Text, Key: string(n.Key)}
if _, ok := bx[b]; ok {
continue
}
service.Key = node.Key
result = append(result, service)
bx[b] = true

serv.Key = string(n.Key)
if serv.Priority == 0 {
serv.Priority = priority
}
sx = append(sx, serv)
}
return result, nil

return sx, nil
}

// SaveService persists service data into etcd
func (c etcdClient) SaveService(service *Service) error {
ctx, cancel := context.WithTimeout(c.ctx, etcdTimeout)
defer cancel()

value, err := json.Marshal(&service)
if err != nil {
return err
}
_, err = c.api.Set(context.Background(), service.Key, string(value), nil)
_, err = c.client.Put(ctx, service.Key, string(value))
if err != nil {
return err
}
Expand All @@ -137,9 +148,11 @@ func (c etcdClient) SaveService(service *Service) error {

// DeleteService deletes service record from etcd
func (c etcdClient) DeleteService(key string) error {
_, err := c.api.Delete(context.Background(), key, nil)
return err
ctx, cancel := context.WithTimeout(c.ctx, etcdTimeout)
defer cancel()

_, err := c.client.Delete(ctx, key)
return err
}

// loads TLS artifacts and builds tls.Clonfig object
Expand Down Expand Up @@ -186,29 +199,16 @@ func loadRoots(caPath string) (*x509.CertPool, error) {
return roots, nil
}

// constructs http.Transport object for https protocol
func newHTTPSTransport(cc *tls.Config) *http.Transport {
return &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).Dial,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: cc,
}
}

// builds etcd client config depending on connection scheme and TLS parameters
func getETCDConfig() (*etcd.Config, error) {
func getETCDConfig() (*etcdcv3.Config, error) {
etcdURLsStr := os.Getenv("ETCD_URLS")
if etcdURLsStr == "" {
etcdURLsStr = "http://localhost:2379"
}
etcdURLs := strings.Split(etcdURLsStr, ",")
firstURL := strings.ToLower(etcdURLs[0])
if strings.HasPrefix(firstURL, "http://") {
return &etcd.Config{Endpoints: etcdURLs}, nil
return &etcdcv3.Config{Endpoints: etcdURLs}, nil
} else if strings.HasPrefix(firstURL, "https://") {
caFile := os.Getenv("ETCD_CA_FILE")
certFile := os.Getenv("ETCD_CERT_FILE")
Expand All @@ -220,26 +220,26 @@ func getETCDConfig() (*etcd.Config, error) {
if err != nil {
return nil, err
}
return &etcd.Config{
return &etcdcv3.Config{
Endpoints: etcdURLs,
Transport: newHTTPSTransport(tlsConfig),
TLS: tlsConfig,
}, nil
} else {
return nil, errors.New("etcd URLs must start with either http:// or https://")
}
}

//newETCDClient is an etcd client constructor
func newETCDClient() (skyDNSClient, error) {
func newETCDClient() (coreDNSClient, error) {
cfg, err := getETCDConfig()
if err != nil {
return nil, err
}
c, err := etcd.New(*cfg)
c, err := etcdcv3.New(*cfg)
if err != nil {
return nil, err
}
return etcdClient{etcd.NewKeysAPI(c)}, nil
return etcdClient{c, context.Background()}, nil
}

// NewCoreDNSProvider is a CoreDNS provider constructor
Expand All @@ -255,30 +255,31 @@ func NewCoreDNSProvider(domainFilter DomainFilter, dryRun bool) (Provider, error
}, nil
}

// Records returns all DNS records found in SkyDNS/CoreDNS etcd backend. Depending on the record fields
// Records returns all DNS records found in CoreDNS etcd backend. Depending on the record fields
// it may be mapped to one or two records of type A, CNAME, TXT, A+TXT, CNAME+TXT
func (p coreDNSProvider) Records() ([]*endpoint.Endpoint, error) {
var result []*endpoint.Endpoint
services, err := p.client.GetServices("/skydns")
services, err := p.client.GetServices(coreDNSPrefix)
if err != nil {
return nil, err
}
for _, service := range services {
domains := strings.Split(strings.TrimPrefix(service.Key, "/skydns/"), "/")
domains := strings.Split(strings.TrimPrefix(service.Key, coreDNSPrefix), "/")
reverse(domains)
dnsName := strings.Join(domains[service.TargetStrip:], ".")
if !p.domainFilter.Match(dnsName) {
continue
}
prefix := strings.Join(domains[:service.TargetStrip], ".")
if service.Host != "" {
ep := endpoint.NewEndpoint(
ep := endpoint.NewEndpointWithTTL(
dnsName,
guessRecordType(service.Host),
endpoint.TTL(service.TTL),
service.Host,
)
ep.Labels["originalText"] = service.Text
ep.Labels["prefix"] = prefix
ep.Labels[randomPrefixLabel] = prefix
result = append(result, ep)
}
if service.Text != "" {
Expand All @@ -287,20 +288,21 @@ func (p coreDNSProvider) Records() ([]*endpoint.Endpoint, error) {
endpoint.RecordTypeTXT,
service.Text,
)
ep.Labels["prefix"] = prefix
ep.Labels[randomPrefixLabel] = prefix
result = append(result, ep)
}
}
return result, nil
}

// ApplyChanges stores changes back to etcd converting them to SkyDNS format and aggregating A/CNAME and TXT records
// ApplyChanges stores changes back to etcd converting them to CoreDNS format and aggregating A/CNAME and TXT records
func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error {
grouped := map[string][]*endpoint.Endpoint{}
for _, ep := range changes.Create {
grouped[ep.DNSName] = append(grouped[ep.DNSName], ep)
}
for _, ep := range changes.UpdateNew {
for i, ep := range changes.UpdateNew {
ep.Labels[randomPrefixLabel] = changes.UpdateOld[i].Labels[randomPrefixLabel]
grouped[ep.DNSName] = append(grouped[ep.DNSName], ep)
}
for dnsName, group := range grouped {
Expand All @@ -313,7 +315,7 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error {
if ep.RecordType == endpoint.RecordTypeTXT {
continue
}
prefix := ep.Labels["prefix"]
prefix := ep.Labels[randomPrefixLabel]
if prefix == "" {
prefix = fmt.Sprintf("%08x", rand.Int31())
}
Expand All @@ -322,6 +324,7 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error {
Text: ep.Labels["originalText"],
Key: etcdKeyFor(prefix + "." + dnsName),
TargetStrip: strings.Count(prefix, ".") + 1,
TTL: uint32(ep.RecordTTL),
}
services = append(services, service)
}
Expand All @@ -331,13 +334,14 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error {
continue
}
if index >= len(services) {
prefix := ep.Labels["prefix"]
prefix := ep.Labels[randomPrefixLabel]
if prefix == "" {
prefix = fmt.Sprintf("%08x", rand.Int31())
}
services = append(services, Service{
Key: etcdKeyFor(prefix + "." + dnsName),
TargetStrip: strings.Count(prefix, ".") + 1,
TTL: uint32(ep.RecordTTL),
})
}
services[index].Text = ep.Targets[0]
Expand All @@ -349,7 +353,7 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error {
}

for _, service := range services {
log.Infof("Add/set key %s to Host=%s, Text=%s", service.Key, service.Host, service.Text)
log.Infof("Add/set key %s to Host=%s, Text=%s, TTL=%d", service.Key, service.Host, service.Text, service.TTL)
if !p.dryRun {
err := p.client.SaveService(&service)
if err != nil {
Expand All @@ -361,8 +365,8 @@ func (p coreDNSProvider) ApplyChanges(changes *plan.Changes) error {

for _, ep := range changes.Delete {
dnsName := ep.DNSName
if ep.Labels["prefix"] != "" {
dnsName = ep.Labels["prefix"] + "." + dnsName
if ep.Labels[randomPrefixLabel] != "" {
dnsName = ep.Labels[randomPrefixLabel] + "." + dnsName
}
key := etcdKeyFor(dnsName)
log.Infof("Delete key %s", key)
Expand All @@ -387,7 +391,7 @@ func guessRecordType(target string) string {
func etcdKeyFor(dnsName string) string {
domains := strings.Split(dnsName, ".")
reverse(domains)
return "/skydns/" + strings.Join(domains, "/")
return coreDNSPrefix + strings.Join(domains, "/")
}

func reverse(slice []string) {
Expand Down
Loading