Skip to content

Commit

Permalink
Merge pull request #16269 from smarterclayton/snip_network
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 16269, 13282, 16386)

Split networking out from node initialization into its own package

Node and networking are now completely independent initialization paths.

Builds off of #16268, only second commit is new

@deads2k second step towards removing the need for node to be tied together
  • Loading branch information
openshift-merge-robot authored Sep 16, 2017
2 parents a462de4 + 9fc4265 commit 0a33b4c
Show file tree
Hide file tree
Showing 10 changed files with 628 additions and 622 deletions.
216 changes: 216 additions & 0 deletions pkg/cmd/server/kubernetes/network/network.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package network

import (
"net"

"github.com/golang/glog"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilnet "k8s.io/apimachinery/pkg/util/net"
utilwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/scheme"
kv1core "k8s.io/client-go/kubernetes/typed/core/v1"
kclientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kclientsetcorev1 "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
proxy "k8s.io/kubernetes/pkg/proxy"
pconfig "k8s.io/kubernetes/pkg/proxy/config"
"k8s.io/kubernetes/pkg/proxy/healthcheck"
"k8s.io/kubernetes/pkg/proxy/iptables"
"k8s.io/kubernetes/pkg/proxy/userspace"
utildbus "k8s.io/kubernetes/pkg/util/dbus"
kexec "k8s.io/kubernetes/pkg/util/exec"
utilexec "k8s.io/kubernetes/pkg/util/exec"
utiliptables "k8s.io/kubernetes/pkg/util/iptables"
utilnode "k8s.io/kubernetes/pkg/util/node"
utilsysctl "k8s.io/kubernetes/pkg/util/sysctl"

"github.com/openshift/origin/pkg/proxy/hybrid"
"github.com/openshift/origin/pkg/proxy/unidler"
)

// RunSDN starts the SDN, if the OpenShift SDN network plugin is enabled in configuration.
func (c *NetworkConfig) RunSDN() {
if c.SDNNode == nil {
return
}
if err := c.SDNNode.Start(); err != nil {
glog.Fatalf("error: SDN node startup failed: %v", err)
}
}

// RunDNS starts the DNS server as soon as services are loaded.
func (c *NetworkConfig) RunDNS() {
go func() {
glog.Infof("Starting DNS on %s", c.DNSServer.Config.DnsAddr)
err := c.DNSServer.ListenAndServe()
glog.Fatalf("DNS server failed to start: %v", err)
}()
}

// RunProxy starts the proxy
func (c *NetworkConfig) RunProxy() {
protocol := utiliptables.ProtocolIpv4
bindAddr := net.ParseIP(c.ProxyConfig.BindAddress)
if bindAddr.To4() == nil {
protocol = utiliptables.ProtocolIpv6
}

portRange := utilnet.ParsePortRangeOrDie(c.ProxyConfig.PortRange)

hostname := utilnode.GetHostname(c.ProxyConfig.HostnameOverride)

eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: c.KubeClientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, kclientv1.EventSource{Component: "kube-proxy", Host: hostname})

execer := kexec.New()
dbus := utildbus.New()
iptInterface := utiliptables.New(execer, dbus, protocol)

var proxier proxy.ProxyProvider
var servicesHandler pconfig.ServiceHandler
var endpointsHandler pconfig.EndpointsHandler

switch c.ProxyConfig.Mode {
case componentconfig.ProxyModeIPTables:
glog.V(0).Info("Using iptables Proxier.")
if bindAddr.Equal(net.IPv4zero) {
bindAddr = getNodeIP(c.ExternalKubeClientset.CoreV1(), hostname)
}
var healthzServer *healthcheck.HealthzServer
if len(c.ProxyConfig.HealthzBindAddress) > 0 {
healthzServer = healthcheck.NewDefaultHealthzServer(c.ProxyConfig.HealthzBindAddress, 2*c.ProxyConfig.IPTables.SyncPeriod.Duration)
}
if c.ProxyConfig.IPTables.MasqueradeBit == nil {
// IPTablesMasqueradeBit must be specified or defaulted.
glog.Fatalf("Unable to read IPTablesMasqueradeBit from config")
}
proxierIptables, err := iptables.NewProxier(
iptInterface,
utilsysctl.New(),
execer,
c.ProxyConfig.IPTables.SyncPeriod.Duration,
c.ProxyConfig.IPTables.MinSyncPeriod.Duration,
c.ProxyConfig.IPTables.MasqueradeAll,
int(*c.ProxyConfig.IPTables.MasqueradeBit),
c.ProxyConfig.ClusterCIDR,
hostname,
bindAddr,
recorder,
healthzServer,
)

if err != nil {
glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err)
}
proxier = proxierIptables
endpointsHandler = proxierIptables
servicesHandler = proxierIptables
// No turning back. Remove artifacts that might still exist from the userspace Proxier.
glog.V(0).Info("Tearing down userspace rules.")
userspace.CleanupLeftovers(iptInterface)
case componentconfig.ProxyModeUserspace:
glog.V(0).Info("Using userspace Proxier.")
// This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for
// our config.EndpointsHandler.
loadBalancer := userspace.NewLoadBalancerRR()
// set EndpointsHandler to our loadBalancer
endpointsHandler = loadBalancer

execer := utilexec.New()
proxierUserspace, err := userspace.NewProxier(
loadBalancer,
bindAddr,
iptInterface,
execer,
*portRange,
c.ProxyConfig.IPTables.SyncPeriod.Duration,
c.ProxyConfig.IPTables.MinSyncPeriod.Duration,
c.ProxyConfig.UDPIdleTimeout.Duration,
)
if err != nil {
glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err)
}
proxier = proxierUserspace
servicesHandler = proxierUserspace
// Remove artifacts from the pure-iptables Proxier.
glog.V(0).Info("Tearing down pure-iptables proxy rules.")
iptables.CleanupLeftovers(iptInterface)
default:
glog.Fatalf("Unknown proxy mode %q", c.ProxyConfig.Mode)
}

// Create configs (i.e. Watches for Services and Endpoints)
// Note: RegisterHandler() calls need to happen before creation of Sources because sources
// only notify on changes, and the initial update (on process start) may be lost if no handlers
// are registered yet.
serviceConfig := pconfig.NewServiceConfig(
c.InternalKubeInformers.Core().InternalVersion().Services(),
c.ProxyConfig.ConfigSyncPeriod.Duration,
)

if c.EnableUnidling {
unidlingLoadBalancer := userspace.NewLoadBalancerRR()
signaler := unidler.NewEventSignaler(recorder)
unidlingUserspaceProxy, err := unidler.NewUnidlerProxier(unidlingLoadBalancer, bindAddr, iptInterface, execer, *portRange, c.ProxyConfig.IPTables.SyncPeriod.Duration, c.ProxyConfig.IPTables.MinSyncPeriod.Duration, c.ProxyConfig.UDPIdleTimeout.Duration, signaler)
if err != nil {
glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err)
}
hybridProxier, err := hybrid.NewHybridProxier(
unidlingLoadBalancer,
unidlingUserspaceProxy,
endpointsHandler,
servicesHandler,
proxier,
unidlingUserspaceProxy,
c.ProxyConfig.IPTables.SyncPeriod.Duration,
c.InternalKubeInformers.Core().InternalVersion().Services().Lister(),
)
if err != nil {
glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err)
}
endpointsHandler = hybridProxier
servicesHandler = hybridProxier
proxier = hybridProxier
}

iptInterface.AddReloadFunc(proxier.Sync)
serviceConfig.RegisterEventHandler(servicesHandler)
go serviceConfig.Run(utilwait.NeverStop)

endpointsConfig := pconfig.NewEndpointsConfig(
c.InternalKubeInformers.Core().InternalVersion().Endpoints(),
c.ProxyConfig.ConfigSyncPeriod.Duration,
)
// customized handling registration that inserts a filter if needed
if c.SDNProxy != nil {
if err := c.SDNProxy.Start(endpointsHandler); err != nil {
glog.Fatalf("error: node proxy plugin startup failed: %v", err)
}
endpointsHandler = c.SDNProxy
}
endpointsConfig.RegisterEventHandler(endpointsHandler)
go endpointsConfig.Run(utilwait.NeverStop)

// periodically sync k8s iptables rules
go utilwait.Forever(proxier.SyncLoop, 0)
glog.Infof("Started Kubernetes Proxy on %s", c.ProxyConfig.BindAddress)
}

// getNodeIP is copied from the upstream proxy config to retrieve the IP of a node.
func getNodeIP(client kclientsetcorev1.CoreV1Interface, hostname string) net.IP {
var nodeIP net.IP
node, err := client.Nodes().Get(hostname, metav1.GetOptions{})
if err != nil {
glog.Warningf("Failed to retrieve node info: %v", err)
return nil
}
nodeIP, err = utilnode.GetNodeHostIP(node)
if err != nil {
glog.Warningf("Failed to retrieve node IP: %v", err)
return nil
}
return nodeIP
}
170 changes: 170 additions & 0 deletions pkg/cmd/server/kubernetes/network/network_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package network

import (
"fmt"
"net"
"strings"

"github.com/golang/glog"

miekgdns "github.com/miekg/dns"

kerrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kclientset "k8s.io/client-go/kubernetes"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kclientsetexternal "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion"

osclient "github.com/openshift/origin/pkg/client"
configapi "github.com/openshift/origin/pkg/cmd/server/api"
"github.com/openshift/origin/pkg/dns"
"github.com/openshift/origin/pkg/network"
networkapi "github.com/openshift/origin/pkg/network/apis/network"
)

// NetworkConfig represents the required parameters to start OpenShift networking
// through Kubernetes. All fields are required.
type NetworkConfig struct {
// External kube client
KubeClientset kclientset.Interface
// External kube client
ExternalKubeClientset kclientsetexternal.Interface
// Internal kubernetes shared informer factory.
InternalKubeInformers kinternalinformers.SharedInformerFactory

// ProxyConfig is the configuration for the kube-proxy, fully initialized
ProxyConfig *componentconfig.KubeProxyConfiguration
// EnableUnidling indicates whether or not the unidling hybrid proxy should be used
EnableUnidling bool

// DNSConfig controls the DNS configuration.
DNSServer *dns.Server

// SDNNode is an optional SDN node interface
SDNNode network.NodeInterface
// SDNProxy is an optional service endpoints filterer
SDNProxy network.ProxyInterface
}

// New creates a new network config object for running the networking components of the OpenShift node.
func New(options configapi.NodeConfig, clusterDomain string, proxyConfig *componentconfig.KubeProxyConfiguration, enableProxy, enableDNS bool) (*NetworkConfig, error) {
originClient, _, err := configapi.GetOpenShiftClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides)
if err != nil {
return nil, err
}
internalKubeClient, kubeConfig, err := configapi.GetInternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides)
if err != nil {
return nil, err
}
externalKubeClient, _, err := configapi.GetExternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides)
if err != nil {
return nil, err
}
kubeClient, err := kclientset.NewForConfig(kubeConfig)
if err != nil {
return nil, err
}

if err = validateNetworkPluginName(originClient, options.NetworkConfig.NetworkPluginName); err != nil {
return nil, err
}

internalKubeInformers := kinternalinformers.NewSharedInformerFactory(internalKubeClient, proxyConfig.ConfigSyncPeriod.Duration)

var sdnNode network.NodeInterface
var sdnProxy network.ProxyInterface
if network.IsOpenShiftNetworkPlugin(options.NetworkConfig.NetworkPluginName) {
sdnNode, sdnProxy, err = NewSDNInterfaces(options, originClient, internalKubeClient, internalKubeInformers, proxyConfig)
if err != nil {
return nil, fmt.Errorf("SDN initialization failed: %v", err)
}
}

config := &NetworkConfig{
KubeClientset: kubeClient,
ExternalKubeClientset: externalKubeClient,
InternalKubeInformers: internalKubeInformers,

ProxyConfig: proxyConfig,
EnableUnidling: options.EnableUnidling,

SDNNode: sdnNode,
SDNProxy: sdnProxy,
}

if enableDNS {
dnsConfig, err := dns.NewServerDefaults()
if err != nil {
return nil, fmt.Errorf("DNS configuration was not possible: %v", err)
}
if len(options.DNSBindAddress) > 0 {
dnsConfig.DnsAddr = options.DNSBindAddress
}
dnsConfig.Domain = clusterDomain + "."
dnsConfig.Local = "openshift.default.svc." + dnsConfig.Domain

// identify override nameservers
var nameservers []string
for _, s := range options.DNSNameservers {
nameservers = append(nameservers, s)
}
if len(options.DNSRecursiveResolvConf) > 0 {
c, err := miekgdns.ClientConfigFromFile(options.DNSRecursiveResolvConf)
if err != nil {
return nil, fmt.Errorf("could not start DNS, unable to read config file: %v", err)
}
for _, s := range c.Servers {
nameservers = append(nameservers, net.JoinHostPort(s, c.Port))
}
}

if len(nameservers) > 0 {
dnsConfig.Nameservers = nameservers
}

services, err := dns.NewCachedServiceAccessor(internalKubeInformers.Core().InternalVersion().Services())
if err != nil {
return nil, fmt.Errorf("could not start DNS: failed to add ClusterIP index: %v", err)
}

endpoints, err := dns.NewCachedEndpointsAccessor(internalKubeInformers.Core().InternalVersion().Endpoints())
if err != nil {
return nil, fmt.Errorf("could not start DNS: failed to add HostnameIP index: %v", err)
}

// TODO: use kubeletConfig.ResolverConfig as an argument to etcd in the event the
// user sets it, instead of passing it to the kubelet.
glog.Infof("DNS Bind to %s", options.DNSBindAddress)
config.DNSServer = dns.NewServer(
dnsConfig,
services,
endpoints,
"node",
)
}

return config, nil
}

func validateNetworkPluginName(originClient *osclient.Client, pluginName string) error {
if network.IsOpenShiftNetworkPlugin(pluginName) {
// Detect any plugin mismatches between node and master
clusterNetwork, err := originClient.ClusterNetwork().Get(networkapi.ClusterNetworkDefault, metav1.GetOptions{})
if kerrs.IsNotFound(err) {
return fmt.Errorf("master has not created a default cluster network, network plugin %q can not start", pluginName)
} else if err != nil {
return fmt.Errorf("cannot fetch %q cluster network: %v", networkapi.ClusterNetworkDefault, err)
}

if clusterNetwork.PluginName != strings.ToLower(pluginName) {
if len(clusterNetwork.PluginName) != 0 {
return fmt.Errorf("detected network plugin mismatch between OpenShift node(%q) and master(%q)", pluginName, clusterNetwork.PluginName)
} else {
// Do not return error in this case
glog.Warningf(`either there is network plugin mismatch between OpenShift node(%q) and master or OpenShift master is running an older version where we did not persist plugin name`, pluginName)
}
}
}
return nil
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package node
package network

import (
"k8s.io/kubernetes/pkg/apis/componentconfig"
Expand Down
Loading

0 comments on commit 0a33b4c

Please sign in to comment.