Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The DNS subsystem should manage keeping dnsmasq in sync #16740

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions images/node/system-container/service.template
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ Type=notify
EnvironmentFile=/etc/sysconfig/$NAME
EnvironmentFile=/etc/sysconfig/$NAME-dep

ExecStartPre=/usr/bin/cp /etc/origin/node/node-dnsmasq.conf /etc/dnsmasq.d/
ExecStartPre=/usr/bin/dbus-send --system --dest=uk.org.thekelleys.dnsmasq /uk/org/thekelleys/dnsmasq uk.org.thekelleys.SetDomainServers array:string:/in-addr.arpa/127.0.0.1,/${DNS_DOMAIN}/127.0.0.1
ExecStopPost=/usr/bin/rm /etc/dnsmasq.d/node-dnsmasq.conf
ExecStopPost=/usr/bin/dbus-send --system --dest=uk.org.thekelleys.dnsmasq /uk/org/thekelleys/dnsmasq uk.org.thekelleys.SetDomainServers array:string:

ExecStartPre=/bin/bash -c 'export -p > /run/$NAME-env'
ExecStart=$EXEC_START
ExecStop=$EXEC_STOP
Expand Down
127 changes: 127 additions & 0 deletions pkg/dns/dnsmasq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package dns

import (
"fmt"
"sync"
"time"

godbus "github.com/godbus/dbus"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
)

const (
// dnsmasqRetryInterval is the duration between attempts to register and listen to DBUS.
dnsmasqRetryInterval = 2 * time.Second
// dnsmasqRefreshInterval is the maximum time between refreshes of the current dnsmasq configuration.
dnsmasqRefreshInterval = 30 * time.Second
dbusDnsmasqPath = "/uk/org/thekelleys/dnsmasq"
dbusDnsmasqInterface = "uk.org.thekelleys.dnsmasq"
)

type dnsmasqMonitor struct {
// metricsName is the prefix to apply to registered prometheus metrics. If unset no
// metrics will be registered.
metricsName string
metricError *prometheus.CounterVec
metricRestart prometheus.Counter

// dnsIP is the IP address this DNS server is reachable at from dnsmasq
dnsIP string
// dnsDomain is the domain name for this DNS server that dnsmasq should forward to
dnsDomain string
// lock controls sending a dnsmasq refresh
lock sync.Mutex
}

func (m *dnsmasqMonitor) initMetrics() {
m.metricError = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: m.metricsName,
Subsystem: "dnsmasq_sync",
Name: "error_count_total",
Help: "Counter of sync failures with dnsmasq.",
}, []string{"type"})
m.metricRestart = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: m.metricsName,
Subsystem: "dnsmasq_sync",
Name: "restart_count_total",
Help: "Counter of restarts detected from dnsmasq.",
})
if len(m.metricsName) > 0 {
prometheus.MustRegister(m.metricError)
prometheus.MustRegister(m.metricRestart)
}
}

func (m *dnsmasqMonitor) Start() error {
m.initMetrics()
conn, err := utildbus.New().SystemBus()
if err != nil {
return fmt.Errorf("cannot connect to DBus: %v", err)
}
if err := conn.BusObject().Call("org.freedesktop.DBus.AddMatch", 0, fmt.Sprintf("type='signal',path='%s',interface='%s'", dbusDnsmasqPath, dbusDnsmasqInterface)).Store(); err != nil {
return fmt.Errorf("unable to add a match rule to the system DBus: %v", err)
}
go m.run(conn, utilwait.NeverStop)
return nil
}

func (m *dnsmasqMonitor) run(conn utildbus.Connection, stopCh <-chan struct{}) {
ch := make(chan *godbus.Signal, 20)
defer func() {
utilruntime.HandleCrash()
// unregister the handler
conn.Signal(ch)
}()
conn.Signal(ch)

// watch for dnsmasq restart
go utilwait.Until(func() {
for s := range ch {
if s.Path != dbusDnsmasqPath {
continue
}
switch s.Name {
case "uk.org.thekelleys.dnsmasq.Up":
m.metricRestart.Inc()
glog.V(2).Infof("dnsmasq restarted, refreshing server configuration")
if err := m.refresh(conn); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to refresh dnsmasq status on dnsmasq startup: %v", err))
m.metricError.WithLabelValues("restart").Inc()
} else {
m.metricError.WithLabelValues("restart").Add(0)
}
}
}
}, dnsmasqRetryInterval, stopCh)

// no matter what, always keep trying to refresh dnsmasq
go utilwait.Until(func() {
if err := m.refresh(conn); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to periodically refresh dnsmasq status: %v", err))
m.metricError.WithLabelValues("periodic").Inc()
} else {
m.metricError.WithLabelValues("periodic").Add(0)
}
}, dnsmasqRefreshInterval, stopCh)

<-stopCh
}

// refresh invokes dnsmasq with the requested configuration
func (m *dnsmasqMonitor) refresh(conn utildbus.Connection) error {
m.lock.Lock()
defer m.lock.Unlock()
addresses := []string{
fmt.Sprintf("/in-addr.arpa/%s", m.dnsIP),
fmt.Sprintf("/%s/%s", m.dnsDomain, m.dnsIP),
}
glog.V(5).Infof("Instructing dnsmasq to set the following servers: %v", addresses)
return conn.Object(dbusDnsmasqInterface, dbusDnsmasqPath).
Call("uk.org.thekelleys.SetDomainServers", 0, addresses).
Store()
}
197 changes: 197 additions & 0 deletions pkg/dns/dnsmasq_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package dns

import (
"fmt"
"reflect"
"sync"
"testing"
"time"

godbus "github.com/godbus/dbus"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"

utildbus "k8s.io/kubernetes/pkg/util/dbus"
)

func Test_dnsmasqMonitor_run(t *testing.T) {
m := &dnsmasqMonitor{
dnsIP: "127.0.0.1",
dnsDomain: "test.domain",
}
m.initMetrics()
conn := utildbus.NewFakeConnection()
//fake := utildbus.NewFake(conn, nil)

callCh := make(chan string, 1)
conn.AddObject(dbusDnsmasqInterface, dbusDnsmasqPath, func(method string, args ...interface{}) ([]interface{}, error) {
defer func() { callCh <- method }()
switch method {
case "uk.org.thekelleys.SetDomainServers":
if len(args) != 1 {
t.Errorf("unexpected args: %v", args)
return nil, fmt.Errorf("unexpected args")
}
if arr, ok := args[0].([]string); !ok || !reflect.DeepEqual([]string{"/in-addr.arpa/127.0.0.1", "/test.domain/127.0.0.1"}, arr) {
t.Errorf("unexpected args: %v", args)
return nil, fmt.Errorf("unexpected args")
}
return nil, nil
default:
t.Errorf("unexpected method: %v", method)
return nil, fmt.Errorf("unexpected method")
}
})

stopCh := make(chan struct{})
go m.run(conn, stopCh)

// should always set on startup
if s := <-callCh; s != "uk.org.thekelleys.SetDomainServers" {
t.Errorf("unexpected call: %s", s)
}
select {
case s := <-callCh:
t.Fatalf("got an unexpected second call: %s", s)
default:
}

// restart and ensure we get a set
conn.EmitSignal(dbusDnsmasqInterface, dbusDnsmasqPath, dbusDnsmasqInterface, "Up")
if s := <-callCh; s != "uk.org.thekelleys.SetDomainServers" {
t.Errorf("unexpected call: %s", s)
}

// send a bogus signal and check whether anything was invoked
conn.EmitSignal(dbusDnsmasqInterface, dbusDnsmasqPath, dbusDnsmasqInterface, "Ignore")
select {
case s := <-callCh:
t.Fatalf("got an unexpected second call: %s", s)
default:
}

// shutdown, send one more bogus signal to ensure the channel is empty and the goroutines are done
close(stopCh)
conn.EmitSignal(dbusDnsmasqInterface, dbusDnsmasqPath, dbusDnsmasqInterface, "Ignore")
select {
case s := <-callCh:
t.Fatalf("got an unexpected second call: %s", s)
default:
}
}

type threadsafeDBusConn struct {
lock sync.Mutex
conn *utildbus.DBusFakeConnection
}

func (c *threadsafeDBusConn) BusObject() utildbus.Object {
c.lock.Lock()
defer c.lock.Unlock()
return c.conn.BusObject()
}

func (c *threadsafeDBusConn) Object(name, path string) utildbus.Object {
c.lock.Lock()
defer c.lock.Unlock()
return c.conn.Object(name, path)
}

func (c *threadsafeDBusConn) Signal(ch chan<- *godbus.Signal) {
c.lock.Lock()
defer c.lock.Unlock()
c.conn.Signal(ch)
}

func (c *threadsafeDBusConn) EmitSignal(name, path, iface, signal string, args ...interface{}) {
c.lock.Lock()
defer c.lock.Unlock()
c.conn.EmitSignal(name, path, iface, signal, args...)
}

func Test_dnsmasqMonitor_run_metrics(t *testing.T) {
m := &dnsmasqMonitor{dnsIP: "127.0.0.1", dnsDomain: "test.domain"}
m.initMetrics()
fakeConn := utildbus.NewFakeConnection()
conn := &threadsafeDBusConn{conn: fakeConn}

callCh := make(chan string, 1)
fakeConn.AddObject(dbusDnsmasqInterface, dbusDnsmasqPath, func(method string, args ...interface{}) ([]interface{}, error) {
defer func() { callCh <- method }()
switch method {
case "uk.org.thekelleys.SetDomainServers":
return nil, fmt.Errorf("unable to send error")
default:
t.Errorf("unexpected method: %v", method)
return nil, fmt.Errorf("unexpected method")
}
})

// stops the test
stopCh := make(chan struct{})
// prevents the test from exiting until all values are checked
exitCh := make(chan struct{})
go func() {
m.run(conn, stopCh)
expectCounterValue(t, 2, m.metricRestart)
expectCounterVecValue(t, 1, m.metricError, "periodic")
expectCounterVecValue(t, 2, m.metricError, "restart")
close(exitCh)
}()

// should always set on startup
if s := <-callCh; s != "uk.org.thekelleys.SetDomainServers" {
t.Errorf("unexpected call: %s", s)
}

conn.EmitSignal(dbusDnsmasqInterface, dbusDnsmasqPath, dbusDnsmasqInterface, "Up")
if s := <-callCh; s != "uk.org.thekelleys.SetDomainServers" {
t.Errorf("unexpected call: %s", s)
}
conn.EmitSignal(dbusDnsmasqInterface, dbusDnsmasqPath, dbusDnsmasqInterface, "Up")
if s := <-callCh; s != "uk.org.thekelleys.SetDomainServers" {
t.Errorf("unexpected call: %s", s)
}

// shutdown, send one more bogus signal to ensure the channel is empty and the goroutines are done
close(stopCh)
conn.EmitSignal(dbusDnsmasqInterface, dbusDnsmasqPath, dbusDnsmasqInterface, "Ignore")
select {
case s := <-callCh:
t.Fatalf("got an unexpected second call: %s", s)
default:
}
<-exitCh
}

func expectCounterVecValue(t *testing.T, expect float64, vec *prometheus.CounterVec, labels ...string) {
// loop a number of times to let the value stabilize, because the metric is incremented in a goroutine
// we cannot signal from
for i := 0; ; i++ {
c, err := vec.GetMetricWithLabelValues(labels...)
if err != nil {
t.Error(err)
}
m := &dto.Metric{}
if err := c.Write(m); err != nil {
t.Error(err)
}
if m.Counter.GetValue() == expect {
break
}
if m.Counter.GetValue() > expect || i > 100 {
t.Errorf("%v: value %f != expected %f", labels, m.Counter.GetValue(), expect)
}
time.Sleep(time.Millisecond)
}
}

func expectCounterValue(t *testing.T, expect float64, c prometheus.Counter) {
m := &dto.Metric{}
if err := c.Write(m); err != nil {
t.Error(err)
}
if m.Counter.GetValue() != expect {
t.Errorf("value %f != expected %f", m.Counter.GetValue(), expect)
}
}
33 changes: 33 additions & 0 deletions pkg/dns/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package dns

import (
"net"
"strings"

"github.com/golang/glog"

"github.com/skynetservices/skydns/metrics"
Expand Down Expand Up @@ -41,6 +44,8 @@ func NewServer(config *server.Config, services ServiceAccessor, endpoints Endpoi
// ListenAndServe starts a DNS server that exposes services and values stored in etcd (if etcdclient
// is not nil). It will block until the server exits.
func (s *Server) ListenAndServe() error {
monitorDnsmasq(s.Config, s.MetricsName)

resolver := NewServiceResolver(s.Config, s.Services, s.Endpoints, openshiftFallback)
resolvers := server.FirstBackend{resolver}
if len(s.MetricsName) > 0 {
Expand All @@ -53,6 +58,34 @@ func (s *Server) ListenAndServe() error {
return dns.Run()
}

// monitorDnsmasq attempts to start the dnsmasq monitoring goroutines to keep dnsmasq
// in sync with this server. It will take no action if the current config DnsAddr does
// not point to port 53 (dnsmasq does not support alternate upstream ports). It will
// convert the bind address from 0.0.0.0 to the BindNetwork appropriate listen address.
func monitorDnsmasq(config *server.Config, metricsName string) {
if host, port, err := net.SplitHostPort(config.DnsAddr); err == nil && port == "53" {
if ip := net.ParseIP(host); ip != nil && ip.IsUnspecified() {
if config.BindNetwork == "ipv6" {
host = "::1"
} else {
host = "127.0.0.1"
}
}
monitor := &dnsmasqMonitor{
metricsName: metricsName,
dnsIP: host,
dnsDomain: strings.TrimSuffix(config.Domain, "."),
}
if err := monitor.Start(); err != nil {
glog.Warningf("Unable to start dnsmasq monitor: %v", err)
} else {
glog.V(2).Infof("Monitoring dnsmasq to point cluster queries to %s", host)
}
} else {
glog.Warningf("Unable to keep dnsmasq up to date, %s must point to port 53", config.DnsAddr)
}
}

func openshiftFallback(name string, exact bool) (string, bool) {
if name == "openshift.default.svc" {
return "kubernetes.default.svc.", true
Expand Down
Loading