Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for polling a Kubernetes Master server #91

Merged
merged 7 commits into from
Oct 16, 2014
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ install:
- go get github.com/rcrowley/go-metrics
- go get github.com/rcrowley/go-metrics/influxdb
- go get github.com/rcrowley/go-metrics/stathat
- go get github.com/GoogleCloudPlatform/kubernetes/pkg/client
- go get github.com/GoogleCloudPlatform/kubernetes/pkg/api
- go get github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config
- go get github.com/GoogleCloudPlatform/kubernetes/pkg/util

before_script:
- go build -o $HOME/gopath/src/github.com/coreos/etcd/etcd.run github.com/coreos/etcd
Expand Down
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,37 @@ in the etcd backend so a restart of SkyDNS with the same unique value will give

;; ANSWER SECTION:
local.dns.skydns.local. 3600 IN A 192.0.2.1
## Kubernetes
SkyDNS now has primitive support for watching the API of a Kubernetes master and
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why primitive

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because their service definitions are evolving, eventually they will have more things published other than services, we'll support those as they come.

inserting DNS records to represent the services running in a Kubernetes cluster.

The service name in Kubernetes will be registered as a host (A) record under the SkyDNS
domain. For example, if you use the default `skydns.local` configuration, a service called
`redismaster` will be available at `redismaster.skydns.local`. Additionally, SRV records
are created for each service that is registered, so queries for SRV records will return all
information necessary to connect to your service:

```
;; ANSWER SECTION:
redismaster.skydns.local. 30 IN SRV 10 100 10000 10.0.2.17
```
In the query above, you can see the IP address, the weight and the port have been set
by SkyDNS.

Kubernets support is experimental and will improve with time. To enable it, start SkyDNS
with the `-kubernetes` flag and the client configuration parameters that you would use to connect
to an APIServer instance. At a minimum you need to pass the -master flag. A common example to
start a SkyDNS server:

```
sudo skydns -kubernetes -domain kubernetes.local. -master="http://127.0.0.1:8080"
```
This command starts a SkyDNS service listening on port 53/udp, connecting to the
Kubernetes APIServer on localhost, and serving the domain `kubernetes.local`, meaning all
services in Kubernetes will be resolved in the form `servicename.kubernetes.local`

For questions on SkyDNS/Kubernetes integration please see the #google-containers channel
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are in that channel a lot?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

on freenode, or open tickets in the SkyDNS repository.

# FAQ

Expand Down
179 changes: 179 additions & 0 deletions kubernetes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package main

import (
"flag"
"log"
"net"
"sync"
"time"

"encoding/json"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
pconfig "github.com/GoogleCloudPlatform/kubernetes/pkg/proxy/config"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/coreos/go-etcd/etcd"
"github.com/skynetservices/skydns/msg"
)

// The periodic interval for checking the state of things.
const syncInterval = 5 * time.Second
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no way to get callbacks as of yet, I presume?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct.


type KubernetesSync struct {
mu sync.Mutex // protects serviceMap
serviceMap map[string]*serviceInfo
eclient *etcd.Client
}

func NewKubernetesSync(client *etcd.Client) *KubernetesSync {
ks := &KubernetesSync{
serviceMap: make(map[string]*serviceInfo),
eclient: client,
}
return ks
}

// SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really long comment line here.

func (ksync *KubernetesSync) SyncLoop() {
for {
select {
case <-time.After(syncInterval):
log.Println("Periodic sync")
ksync.ensureDNS()
}
}
}

// Ensure that dns records exist for all services.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused by the first line of this comment and then again by the second line? If not sure, remove it and see what breaks?

// This seems a bit redundant. TBD - remove?
func (ksync *KubernetesSync) ensureDNS() {
ksync.mu.Lock()
defer ksync.mu.Unlock()
for name, info := range ksync.serviceMap {
err := ksync.addDNS(name, info)
if err != nil {
log.Println("Failed to ensure dns for %q: %s", name, err)
}
}
}

// OnUpdate manages the active set of service records.
// Active service records get ttl bumps if found in the update set or
// removed if missing from the update set.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove empty line here.

func (ksync *KubernetesSync) OnUpdate(services []api.Service) {
activeServices := util.StringSet{}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

util.StringSet? Is the some uber cool thing better than []string?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the Kubernetes team, yes :) Some of this code is modeled directly from their internal polling routines.

for _, service := range services {
activeServices.Insert(service.ID)
info, exists := ksync.getServiceInfo(service.ID)
serviceIP := net.ParseIP(service.PortalIP)
if exists && (info.portalPort != service.Port || !info.portalIP.Equal(serviceIP)) {
err := ksync.removeDNS(service.ID, info)
if err != nil {
log.Printf("Failed to remove dns for %q: %s\n", service.ID, err)
}
}
log.Printf("Adding new service %q at %s:%d/%s (local :%d)\n", service.ID, serviceIP, service.Port, service.Protocol, service.ProxyPort)
si := &serviceInfo{
proxyPort: service.ProxyPort,
protocol: service.Protocol,
active: true,
}
ksync.setServiceInfo(service.ID, si)
si.portalIP = serviceIP
si.portalPort = service.Port
err := ksync.addDNS(service.ID, si)
if err != nil {
log.Println("Failed to add dns %q: %s", service.ID, err)
}
}
ksync.mu.Lock()
defer ksync.mu.Unlock()
for name, info := range ksync.serviceMap {
if !activeServices.Has(name) {
err := ksync.removeDNS(name, info)
if err != nil {
log.Println("Failed to remove dns for %q: %s", name, err)
}
delete(ksync.serviceMap, name)
}
}
}

func (ksync *KubernetesSync) getServiceInfo(service string) (*serviceInfo, bool) {
ksync.mu.Lock()
defer ksync.mu.Unlock()
info, ok := ksync.serviceMap[service]
return info, ok
}

func (ksync *KubernetesSync) setServiceInfo(service string, info *serviceInfo) {
ksync.mu.Lock()
defer ksync.mu.Unlock()
ksync.serviceMap[service] = info
}

func (ksync *KubernetesSync) removeDNS(service string, info *serviceInfo) error {
record := service + "." + config.Domain
// Remove from SkyDNS registration
log.Printf("removing %s from DNS", record)
_, err := ksync.eclient.Delete(msg.Path(record), true)
return err
}

func (ksync *KubernetesSync) addDNS(service string, info *serviceInfo) error {
// ADD to SkyDNS registry
svc := msg.Service{
Host: info.portalIP.String(),
Port: info.portalPort,
Priority: 10,
Weight: 10,
Ttl: 30,
}
b, err := json.Marshal(svc)
record := service + "." + config.Domain
//Set with no TTL, and hope that kubernetes events are accurate.
//TODO(BJK) Think this through a little more

log.Printf("Setting dns record: %v\n", record)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use smaller case 'S' more inline with the rest of the logging.

_, err = ksync.eclient.Set(msg.Path(record), string(b), uint64(0))
return err
}

type serviceInfo struct {
portalIP net.IP
portalPort int
protocol api.Protocol
proxyPort int
mu sync.Mutex // protects active
active bool
}

func init() {
client.BindClientConfigFlags(flag.CommandLine, clientConfig)
}

func WatchKubernetes(eclient *etcd.Client) {
serviceConfig := pconfig.NewServiceConfig()
endpointsConfig := pconfig.NewEndpointsConfig()

// define api config source
if clientConfig.Host != "" {
log.Println("Using api calls to get Kubernetes config %v", clientConfig.Host)
client, err := client.New(clientConfig)
if err != nil {
log.Fatalf("Kubernetes requested, but received invalid API configuration: %v", err)
}
pconfig.NewSourceAPI(
client,
30*time.Second,
serviceConfig.Channel("api"),
endpointsConfig.Channel("api"),
)
}
ks := NewKubernetesSync(eclient)
// Wire skydns to handle changes to services
serviceConfig.RegisterHandler(ks)
ks.SyncLoop()
}
26 changes: 16 additions & 10 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,24 @@ import (
"strings"
"time"

kclient "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/coreos/go-etcd/etcd"
"github.com/miekg/dns"
)

const Version = "2.0.0g"

var (
tlskey = ""
tlspem = ""
cacert = ""
config = &Config{ReadTimeout: 0, Domain: "", DnsAddr: "", DNSSEC: ""}
nameserver = ""
machine = ""
discover = false
verbose = false
tlskey = ""
tlspem = ""
cacert = ""
config = &Config{ReadTimeout: 0, Domain: "", DnsAddr: "", DNSSEC: ""}
nameserver = ""
machine = ""
discover = false
verbose = false
kubernetes = false
clientConfig = &kclient.Config{}
)

const (
Expand Down Expand Up @@ -59,6 +62,7 @@ func init() {
flag.BoolVar(&discover, "discover", false, "discover new machines by watching /v2/_etcd/machines")
flag.BoolVar(&verbose, "verbose", false, "log queries")
flag.BoolVar(&config.Systemd, "systemd", false, "bind to socket(s) activated by systemd (ignore -addr)")
flag.BoolVar(&kubernetes, "kubernetes", false, "read endpoints from a kubernetes master")

// TTl
// Minttl
Expand Down Expand Up @@ -108,7 +112,7 @@ func main() {
s.config.log.Infof("ectd machine cluster update failed, sleeping %s", duration)
time.Sleep(duration)
duration *= 2
if duration > 32 * time.Second {
if duration > 32*time.Second {
duration = 32 * time.Second
}
}
Expand All @@ -118,7 +122,9 @@ func main() {
}

statsCollect()

if kubernetes {
go WatchKubernetes(client)
}
if err := s.Run(); err != nil {
log.Fatal(err)
}
Expand Down
1 change: 0 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ func (s *server) ServeDNS(w dns.ResponseWriter, req *dns.Msg) {
dnssec := false
tcp := false

// fuck ANY queries
if req.Question[0].Qtype == dns.TypeANY {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahhhh, sad face :)

m.Authoritative = false
m.Rcode = dns.RcodeRefused
Expand Down