Skip to content

Commit

Permalink
Split networking out from node initialization into its own package
Browse files Browse the repository at this point in the history
Node and networking are now completely independent initialization paths.
  • Loading branch information
smarterclayton committed Sep 15, 2017
1 parent c38a98b commit 9fc4265
Show file tree
Hide file tree
Showing 8 changed files with 410 additions and 403 deletions.
216 changes: 216 additions & 0 deletions pkg/cmd/server/kubernetes/network/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package network

import (
"net"

"github.com/golang/glog"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
kv1core "k8s.io/client-go/kubernetes/typed/core/v1"
kclientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kclientsetcorev1 "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
proxy "k8s.io/kubernetes/pkg/proxy"
pconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/proxy/userspace"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
kexec "k8s.io/kubernetes/pkg/util/exec"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilnode "k8s.io/kubernetes/pkg/util/node"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"

"github.com/openshift/origin/pkg/proxy/hybrid"
"github.com/openshift/origin/pkg/proxy/unidler"
)

// RunSDN starts the SDN, if the OpenShift SDN network plugin is enabled in configuration.
func (c *NetworkConfig) RunSDN() {
if c.SDNNode == nil {
return
}
if err := c.SDNNode.Start(); err != nil {
glog.Fatalf("error: SDN node startup failed: %v", err)
}
}

// RunDNS starts the DNS server as soon as services are loaded.
func (c *NetworkConfig) RunDNS() {
go func() {
glog.Infof("Starting DNS on %s", c.DNSServer.Config.DnsAddr)
err := c.DNSServer.ListenAndServe()
glog.Fatalf("DNS server failed to start: %v", err)
}()
}

// RunProxy starts the proxy
func (c *NetworkConfig) RunProxy() {
protocol := utiliptables.ProtocolIpv4
bindAddr := net.ParseIP(c.ProxyConfig.BindAddress)
if bindAddr.To4() == nil {
protocol = utiliptables.ProtocolIpv6
}

portRange := utilnet.ParsePortRangeOrDie(c.ProxyConfig.PortRange)

hostname := utilnode.GetHostname(c.ProxyConfig.HostnameOverride)

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: c.KubeClientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, kclientv1.EventSource{Component: "kube-proxy", Host: hostname})

execer := kexec.New()
dbus := utildbus.New()
iptInterface := utiliptables.New(execer, dbus, protocol)

var proxier proxy.ProxyProvider
var servicesHandler pconfig.ServiceHandler
var endpointsHandler pconfig.EndpointsHandler

switch c.ProxyConfig.Mode {
case componentconfig.ProxyModeIPTables:
glog.V(0).Info("Using iptables Proxier.")
if bindAddr.Equal(net.IPv4zero) {
bindAddr = getNodeIP(c.ExternalKubeClientset.CoreV1(), hostname)
}
var healthzServer *healthcheck.HealthzServer
if len(c.ProxyConfig.HealthzBindAddress) > 0 {
healthzServer = healthcheck.NewDefaultHealthzServer(c.ProxyConfig.HealthzBindAddress, 2*c.ProxyConfig.IPTables.SyncPeriod.Duration)
}
if c.ProxyConfig.IPTables.MasqueradeBit == nil {
// IPTablesMasqueradeBit must be specified or defaulted.
glog.Fatalf("Unable to read IPTablesMasqueradeBit from config")
}
proxierIptables, err := iptables.NewProxier(
iptInterface,
utilsysctl.New(),
execer,
c.ProxyConfig.IPTables.SyncPeriod.Duration,
c.ProxyConfig.IPTables.MinSyncPeriod.Duration,
c.ProxyConfig.IPTables.MasqueradeAll,
int(*c.ProxyConfig.IPTables.MasqueradeBit),
c.ProxyConfig.ClusterCIDR,
hostname,
bindAddr,
recorder,
healthzServer,
)

if err != nil {
glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err)
}
proxier = proxierIptables
endpointsHandler = proxierIptables
servicesHandler = proxierIptables
// No turning back. Remove artifacts that might still exist from the userspace Proxier.
glog.V(0).Info("Tearing down userspace rules.")
userspace.CleanupLeftovers(iptInterface)
case componentconfig.ProxyModeUserspace:
glog.V(0).Info("Using userspace Proxier.")
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
// our config.EndpointsHandler.
loadBalancer := userspace.NewLoadBalancerRR()
// set EndpointsHandler to our loadBalancer
endpointsHandler = loadBalancer

execer := utilexec.New()
proxierUserspace, err := userspace.NewProxier(
loadBalancer,
bindAddr,
iptInterface,
execer,
*portRange,
c.ProxyConfig.IPTables.SyncPeriod.Duration,
c.ProxyConfig.IPTables.MinSyncPeriod.Duration,
c.ProxyConfig.UDPIdleTimeout.Duration,
)
if err != nil {
glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err)
}
proxier = proxierUserspace
servicesHandler = proxierUserspace
// Remove artifacts from the pure-iptables Proxier.
glog.V(0).Info("Tearing down pure-iptables proxy rules.")
iptables.CleanupLeftovers(iptInterface)
default:
glog.Fatalf("Unknown proxy mode %q", c.ProxyConfig.Mode)
}

// Create configs (i.e. Watches for Services and Endpoints)
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
serviceConfig := pconfig.NewServiceConfig(
c.InternalKubeInformers.Core().InternalVersion().Services(),
c.ProxyConfig.ConfigSyncPeriod.Duration,
)

if c.EnableUnidling {
unidlingLoadBalancer := userspace.NewLoadBalancerRR()
signaler := unidler.NewEventSignaler(recorder)
unidlingUserspaceProxy, err := unidler.NewUnidlerProxier(unidlingLoadBalancer, bindAddr, iptInterface, execer, *portRange, c.ProxyConfig.IPTables.SyncPeriod.Duration, c.ProxyConfig.IPTables.MinSyncPeriod.Duration, c.ProxyConfig.UDPIdleTimeout.Duration, signaler)
if err != nil {
glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err)
}
hybridProxier, err := hybrid.NewHybridProxier(
unidlingLoadBalancer,
unidlingUserspaceProxy,
endpointsHandler,
servicesHandler,
proxier,
unidlingUserspaceProxy,
c.ProxyConfig.IPTables.SyncPeriod.Duration,
c.InternalKubeInformers.Core().InternalVersion().Services().Lister(),
)
if err != nil {
glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err)
}
endpointsHandler = hybridProxier
servicesHandler = hybridProxier
proxier = hybridProxier
}

iptInterface.AddReloadFunc(proxier.Sync)
serviceConfig.RegisterEventHandler(servicesHandler)
go serviceConfig.Run(utilwait.NeverStop)

endpointsConfig := pconfig.NewEndpointsConfig(
c.InternalKubeInformers.Core().InternalVersion().Endpoints(),
c.ProxyConfig.ConfigSyncPeriod.Duration,
)
// customized handling registration that inserts a filter if needed
if c.SDNProxy != nil {
if err := c.SDNProxy.Start(endpointsHandler); err != nil {
glog.Fatalf("error: node proxy plugin startup failed: %v", err)
}
endpointsHandler = c.SDNProxy
}
endpointsConfig.RegisterEventHandler(endpointsHandler)
go endpointsConfig.Run(utilwait.NeverStop)

// periodically sync k8s iptables rules
go utilwait.Forever(proxier.SyncLoop, 0)
glog.Infof("Started Kubernetes Proxy on %s", c.ProxyConfig.BindAddress)
}

// getNodeIP is copied from the upstream proxy config to retrieve the IP of a node.
func getNodeIP(client kclientsetcorev1.CoreV1Interface, hostname string) net.IP {
var nodeIP net.IP
node, err := client.Nodes().Get(hostname, metav1.GetOptions{})
if err != nil {
glog.Warningf("Failed to retrieve node info: %v", err)
return nil
}
nodeIP, err = utilnode.GetNodeHostIP(node)
if err != nil {
glog.Warningf("Failed to retrieve node IP: %v", err)
return nil
}
return nodeIP
}
170 changes: 170 additions & 0 deletions pkg/cmd/server/kubernetes/network/network_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package network

import (
"fmt"
"net"
"strings"

"github.com/golang/glog"

miekgdns "github.com/miekg/dns"

kerrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kclientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kclientsetexternal "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"

osclient "github.com/openshift/origin/pkg/client"
configapi "github.com/openshift/origin/pkg/cmd/server/api"
"github.com/openshift/origin/pkg/dns"
"github.com/openshift/origin/pkg/network"
networkapi "github.com/openshift/origin/pkg/network/apis/network"
)

// NetworkConfig represents the required parameters to start OpenShift networking
// through Kubernetes. All fields are required.
type NetworkConfig struct {
// External kube client
KubeClientset kclientset.Interface
// External kube client
ExternalKubeClientset kclientsetexternal.Interface
// Internal kubernetes shared informer factory.
InternalKubeInformers kinternalinformers.SharedInformerFactory

// ProxyConfig is the configuration for the kube-proxy, fully initialized
ProxyConfig *componentconfig.KubeProxyConfiguration
// EnableUnidling indicates whether or not the unidling hybrid proxy should be used
EnableUnidling bool

// DNSConfig controls the DNS configuration.
DNSServer *dns.Server

// SDNNode is an optional SDN node interface
SDNNode network.NodeInterface
// SDNProxy is an optional service endpoints filterer
SDNProxy network.ProxyInterface
}

// New creates a new network config object for running the networking components of the OpenShift node.
func New(options configapi.NodeConfig, clusterDomain string, proxyConfig *componentconfig.KubeProxyConfiguration, enableProxy, enableDNS bool) (*NetworkConfig, error) {
originClient, _, err := configapi.GetOpenShiftClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides)
if err != nil {
return nil, err
}
internalKubeClient, kubeConfig, err := configapi.GetInternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides)
if err != nil {
return nil, err
}
externalKubeClient, _, err := configapi.GetExternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides)
if err != nil {
return nil, err
}
kubeClient, err := kclientset.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}

if err = validateNetworkPluginName(originClient, options.NetworkConfig.NetworkPluginName); err != nil {
return nil, err
}

internalKubeInformers := kinternalinformers.NewSharedInformerFactory(internalKubeClient, proxyConfig.ConfigSyncPeriod.Duration)

var sdnNode network.NodeInterface
var sdnProxy network.ProxyInterface
if network.IsOpenShiftNetworkPlugin(options.NetworkConfig.NetworkPluginName) {
sdnNode, sdnProxy, err = NewSDNInterfaces(options, originClient, internalKubeClient, internalKubeInformers, proxyConfig)
if err != nil {
return nil, fmt.Errorf("SDN initialization failed: %v", err)
}
}

config := &NetworkConfig{
KubeClientset: kubeClient,
ExternalKubeClientset: externalKubeClient,
InternalKubeInformers: internalKubeInformers,

ProxyConfig: proxyConfig,
EnableUnidling: options.EnableUnidling,

SDNNode: sdnNode,
SDNProxy: sdnProxy,
}

if enableDNS {
dnsConfig, err := dns.NewServerDefaults()
if err != nil {
return nil, fmt.Errorf("DNS configuration was not possible: %v", err)
}
if len(options.DNSBindAddress) > 0 {
dnsConfig.DnsAddr = options.DNSBindAddress
}
dnsConfig.Domain = clusterDomain + "."
dnsConfig.Local = "openshift.default.svc." + dnsConfig.Domain

// identify override nameservers
var nameservers []string
for _, s := range options.DNSNameservers {
nameservers = append(nameservers, s)
}
if len(options.DNSRecursiveResolvConf) > 0 {
c, err := miekgdns.ClientConfigFromFile(options.DNSRecursiveResolvConf)
if err != nil {
return nil, fmt.Errorf("could not start DNS, unable to read config file: %v", err)
}
for _, s := range c.Servers {
nameservers = append(nameservers, net.JoinHostPort(s, c.Port))
}
}

if len(nameservers) > 0 {
dnsConfig.Nameservers = nameservers
}

services, err := dns.NewCachedServiceAccessor(internalKubeInformers.Core().InternalVersion().Services())
if err != nil {
return nil, fmt.Errorf("could not start DNS: failed to add ClusterIP index: %v", err)
}

endpoints, err := dns.NewCachedEndpointsAccessor(internalKubeInformers.Core().InternalVersion().Endpoints())
if err != nil {
return nil, fmt.Errorf("could not start DNS: failed to add HostnameIP index: %v", err)
}

// TODO: use kubeletConfig.ResolverConfig as an argument to etcd in the event the
// user sets it, instead of passing it to the kubelet.
glog.Infof("DNS Bind to %s", options.DNSBindAddress)
config.DNSServer = dns.NewServer(
dnsConfig,
services,
endpoints,
"node",
)
}

return config, nil
}

func validateNetworkPluginName(originClient *osclient.Client, pluginName string) error {
if network.IsOpenShiftNetworkPlugin(pluginName) {
// Detect any plugin mismatches between node and master
clusterNetwork, err := originClient.ClusterNetwork().Get(networkapi.ClusterNetworkDefault, metav1.GetOptions{})
if kerrs.IsNotFound(err) {
return fmt.Errorf("master has not created a default cluster network, network plugin %q can not start", pluginName)
} else if err != nil {
return fmt.Errorf("cannot fetch %q cluster network: %v", networkapi.ClusterNetworkDefault, err)
}

if clusterNetwork.PluginName != strings.ToLower(pluginName) {
if len(clusterNetwork.PluginName) != 0 {
return fmt.Errorf("detected network plugin mismatch between OpenShift node(%q) and master(%q)", pluginName, clusterNetwork.PluginName)
} else {
// Do not return error in this case
glog.Warningf(`either there is network plugin mismatch between OpenShift node(%q) and master or OpenShift master is running an older version where we did not persist plugin name`, pluginName)
}
}
}
return nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package node
package network

import (
"k8s.io/kubernetes/pkg/apis/componentconfig"
Expand Down
Loading

0 comments on commit 9fc4265

Please sign in to comment.