Skip to content

Commit

Permalink
Add a healthcheck to detect when OVS is restarted
Browse files Browse the repository at this point in the history
A periodic background process watches for when OVS is reset to the
default state and causes the entire process to restart. This avoids the
need to order the SDN process with OVS, and makes it easier to run the
process in a pod.

In the future it should be possible to avoid restarting the process to
perform this check.
  • Loading branch information
smarterclayton committed Oct 10, 2017
1 parent b0073eb commit 189e581
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 12 deletions.
100 changes: 100 additions & 0 deletions pkg/network/node/healthcheck.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package node

import (
"fmt"
"time"

"github.com/golang/glog"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilwait "k8s.io/apimachinery/pkg/util/wait"

"github.com/openshift/origin/pkg/util/ovs/ovsclient"
)

const (
ovsDialTimeout = 5 * time.Second
ovsHealthcheckInterval = 30 * time.Second
ovsRecoveryTimeout = 10 * time.Second
ovsDialDefaultNetwork = "unix"
ovsDialDefaultAddress "/var/run/openvswitch/db.sock"
)

// waitForOVS polls until the OVS server responds to a connection and an 'echo'
// command.
func waitForOVS(network, addr string) error {
return utilwait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
c, err := ovsclient.DialTimeout(network, addr, ovsDialTimeout)
if err != nil {
glog.V(2).Infof("waiting for OVS to start: %v", err)
return false, nil
}
defer c.Close()
if err := c.Ping(); err != nil {
glog.V(2).Infof("waiting for OVS to start, ping failed: %v", err)
return false, nil
}
return true, nil
})
}

// runOVSHealthCheck runs two background loops - one that waits for disconnection
// from the OVS server and then checks healthFn, and one that periodically checks
// healthFn. If healthFn returns false in either of these two cases while the OVS
// server is responsive the node process will terminate.
func runOVSHealthCheck(network, addr string, healthFn func() bool) {
// this loop holds an open socket connection to OVS until it times out, then
// checks for health
go utilwait.Until(func() {
c, err := ovsclient.DialTimeout(network, addr, ovsDialTimeout)
if err != nil {
utilruntime.HandleError(fmt.Errorf("SDN healthcheck unable to connect to OVS server: %v", err))
return
}
defer c.Close()

err = c.WaitForDisconnect()
utilruntime.HandleError(fmt.Errorf("SDN healthcheck disconnected from OVS server: %v", err))

err = utilwait.PollImmediate(100*time.Millisecond, ovsRecoveryTimeout, func() (bool, error) {
c, err := ovsclient.DialTimeout(network, addr, ovsDialTimeout)
if err != nil {
glog.V(2).Infof("SDN healthcheck unable to reconnect to OVS server: %v", err)
return false, nil
}
defer c.Close()
if err := c.Ping(); err != nil {
glog.V(2).Infof("SDN healthcheck unable to ping OVS server: %v", err)
return false, nil
}
if !healthFn() {
return false, fmt.Errorf("OVS health check failed")
}
return true, nil
})
if err != nil {
// If OVS restarts and our health check fails, we exit
// TODO: make openshift-sdn able to reconcile without a restart
glog.Fatalf("SDN healthcheck detected unhealthy OVS server, restarting: %v", err)
}
}, ovsDialTimeout, utilwait.NeverStop)

// this loop periodically verifies we can still connect to the OVS server and
// is an upper bound on the time we wait before detecting a failed OVS configuartion
go utilwait.Until(func() {
c, err := ovsclient.DialTimeout(network, addr, ovsDialTimeout)
if err != nil {
glog.V(2).Infof("SDN healthcheck unable to reconnect to OVS server: %v", err)
return
}
defer c.Close()
if err := c.Ping(); err != nil {
glog.V(2).Infof("SDN healthcheck unable to ping OVS server: %v", err)
return
}
if !healthFn() {
glog.Fatalf("SDN healthcheck detected unhealthy OVS server, restarting: %v", err)
}
glog.V(4).Infof("SDN healthcheck succeeded")
}, ovsHealthcheckInterval, utilwait.NeverStop)
}
45 changes: 33 additions & 12 deletions pkg/network/node/sdn_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,11 @@ func (plugin *OsdnNode) SetupSDN() (bool, error) {
clusterNetworkCIDRs = append(clusterNetworkCIDRs, cn.ClusterCIDR.String())
}

serviceNetworkCIDR := plugin.networkInfo.ServiceNetwork.String()

localSubnetCIDR := plugin.localSubnetCIDR
_, ipnet, err := net.ParseCIDR(localSubnetCIDR)
if err != nil {
return false, fmt.Errorf("invalid local subnet CIDR: %v", err)
}
localSubnetMaskLength, _ := ipnet.Mask.Size()
localSubnetGateway := netutils.GenerateDefaultGateway(ipnet).String()

Expand All @@ -167,15 +168,35 @@ func (plugin *OsdnNode) SetupSDN() (bool, error) {
}

gwCIDR := fmt.Sprintf("%s/%d", localSubnetGateway, localSubnetMaskLength)

if err := waitForOVS(ovsDialDefaultNetwork, ovsDialDefaultAddress); err != nil {
return false, err
}

var changed bool
if plugin.alreadySetUp(gwCIDR, clusterNetworkCIDRs) {
glog.V(5).Infof("[SDN setup] no SDN setup required")
return false, nil
} else {
glog.Infof("[SDN setup] full SDN setup required")
if err := plugin.setup(clusterNetworkCIDRs, localSubnetCIDR, localSubnetGateway, gwCIDR); err != nil {
return false, err
}
changed = true
}
glog.V(5).Infof("[SDN setup] full SDN setup required")

err = plugin.oc.SetupOVS(clusterNetworkCIDRs, serviceNetworkCIDR, localSubnetCIDR, localSubnetGateway)
if err != nil {
return false, err
// TODO: make it possible to safely reestablish node configuration after restart
// If OVS goes down and fails the health check, restart the entire process
healthFn := func() bool { return plugin.alreadySetUp(gwCIDR, clusterNetworkCIDRs) }
runOVSHealthCheck(ovsDialDefaultNetwork, ovsDialDefaultAddress, healthFn)

return changed, nil
}

func (plugin *OsdnNode) setup(clusterNetworkCIDRs []string, localSubnetCIDR, localSubnetGateway, gwCIDR string) error {
serviceNetworkCIDR := plugin.networkInfo.ServiceNetwork.String()

if err := plugin.oc.SetupOVS(clusterNetworkCIDRs, serviceNetworkCIDR, localSubnetCIDR, localSubnetGateway); err != nil {
return err
}

l, err := netlink.LinkByName(Tun0)
Expand All @@ -200,7 +221,7 @@ func (plugin *OsdnNode) SetupSDN() (bool, error) {
Dst: clusterNetwork.ClusterCIDR,
}
if err = netlink.RouteAdd(route); err != nil {
return false, err
return err
}
}
}
Expand All @@ -212,21 +233,21 @@ func (plugin *OsdnNode) SetupSDN() (bool, error) {
err = netlink.RouteAdd(route)
}
if err != nil {
return false, err
return err
}

sysctl := sysctl.New()

// Make sure IPv4 forwarding state is 1
val, err := sysctl.GetSysctl("net/ipv4/ip_forward")
if err != nil {
return false, fmt.Errorf("could not get IPv4 forwarding state: %s", err)
return fmt.Errorf("could not get IPv4 forwarding state: %s", err)
}
if val != 1 {
return false, fmt.Errorf("net/ipv4/ip_forward=0, it must be set to 1")
return fmt.Errorf("net/ipv4/ip_forward=0, it must be set to 1")
}

return true, nil
return nil
}

func (plugin *OsdnNode) updateEgressNetworkPolicyRules(vnid uint32) {
Expand Down
57 changes: 57 additions & 0 deletions pkg/util/ovs/ovsclient/ovsclient.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package ovsclient

import (
"fmt"
"io"
"io/ioutil"
"net"
"net/rpc"
"net/rpc/jsonrpc"
"time"
)

// Client is an RPC client for communicating with OVS.
type Client struct {
*rpc.Client
conn net.Conn
}

// New creates a new Client from a connection.
func New(conn net.Conn) *Client {
return &Client{
Client: jsonrpc.NewClient(conn),
conn: conn,
}
}

// DialTimeout dials the provided network and address, and if it responds within
// timeout will return a valid Client.
func DialTimeout(network, addr string, timeout time.Duration) (*Client, error) {
conn, err := net.DialTimeout(network, addr, timeout)
if err != nil {
return nil, err
}
return New(conn), nil
}

// Ping returns nil if the OVS server responded to an "echo" command.
func (c *Client) Ping() error {
var result interface{}
if err := c.Call("echo", []string{"hello"}, &result); err != nil {
return err
}
return nil
}

// WaitForDisconnect will block until the provided connection is closed
// and return an error. This consumes the connection.
func (c *Client) WaitForDisconnect() error {
n, err := io.Copy(ioutil.Discard, c.conn)
if err != nil && err != io.EOF {
return err
}
if n > 0 {
return fmt.Errorf("unexpected bytes read waiting for disconnect: %d", n)
}
return nil
}

0 comments on commit 189e581

Please sign in to comment.