diff --git a/pkg/cmd/server/kubernetes/network/network.go b/pkg/cmd/server/kubernetes/network/network.go new file mode 100644 index 000000000000..ca3a6880a737 --- /dev/null +++ b/pkg/cmd/server/kubernetes/network/network.go @@ -0,0 +1,216 @@ +package network + +import ( + "net" + + "github.com/golang/glog" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilnet "k8s.io/apimachinery/pkg/util/net" + utilwait "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes/scheme" + kv1core "k8s.io/client-go/kubernetes/typed/core/v1" + kclientv1 "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/apis/componentconfig" + kclientsetcorev1 "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1" + proxy "k8s.io/kubernetes/pkg/proxy" + pconfig "k8s.io/kubernetes/pkg/proxy/config" + "k8s.io/kubernetes/pkg/proxy/healthcheck" + "k8s.io/kubernetes/pkg/proxy/iptables" + "k8s.io/kubernetes/pkg/proxy/userspace" + utildbus "k8s.io/kubernetes/pkg/util/dbus" + kexec "k8s.io/kubernetes/pkg/util/exec" + utilexec "k8s.io/kubernetes/pkg/util/exec" + utiliptables "k8s.io/kubernetes/pkg/util/iptables" + utilnode "k8s.io/kubernetes/pkg/util/node" + utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" + + "github.com/openshift/origin/pkg/proxy/hybrid" + "github.com/openshift/origin/pkg/proxy/unidler" +) + +// RunSDN starts the SDN, if the OpenShift SDN network plugin is enabled in configuration. +func (c *NetworkConfig) RunSDN() { + if c.SDNNode == nil { + return + } + if err := c.SDNNode.Start(); err != nil { + glog.Fatalf("error: SDN node startup failed: %v", err) + } +} + +// RunDNS starts the DNS server as soon as services are loaded. +func (c *NetworkConfig) RunDNS() { + go func() { + glog.Infof("Starting DNS on %s", c.DNSServer.Config.DnsAddr) + err := c.DNSServer.ListenAndServe() + glog.Fatalf("DNS server failed to start: %v", err) + }() +} + +// RunProxy starts the proxy +func (c *NetworkConfig) RunProxy() { + protocol := utiliptables.ProtocolIpv4 + bindAddr := net.ParseIP(c.ProxyConfig.BindAddress) + if bindAddr.To4() == nil { + protocol = utiliptables.ProtocolIpv6 + } + + portRange := utilnet.ParsePortRangeOrDie(c.ProxyConfig.PortRange) + + hostname := utilnode.GetHostname(c.ProxyConfig.HostnameOverride) + + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: c.KubeClientset.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, kclientv1.EventSource{Component: "kube-proxy", Host: hostname}) + + execer := kexec.New() + dbus := utildbus.New() + iptInterface := utiliptables.New(execer, dbus, protocol) + + var proxier proxy.ProxyProvider + var servicesHandler pconfig.ServiceHandler + var endpointsHandler pconfig.EndpointsHandler + + switch c.ProxyConfig.Mode { + case componentconfig.ProxyModeIPTables: + glog.V(0).Info("Using iptables Proxier.") + if bindAddr.Equal(net.IPv4zero) { + bindAddr = getNodeIP(c.ExternalKubeClientset.CoreV1(), hostname) + } + var healthzServer *healthcheck.HealthzServer + if len(c.ProxyConfig.HealthzBindAddress) > 0 { + healthzServer = healthcheck.NewDefaultHealthzServer(c.ProxyConfig.HealthzBindAddress, 2*c.ProxyConfig.IPTables.SyncPeriod.Duration) + } + if c.ProxyConfig.IPTables.MasqueradeBit == nil { + // IPTablesMasqueradeBit must be specified or defaulted. + glog.Fatalf("Unable to read IPTablesMasqueradeBit from config") + } + proxierIptables, err := iptables.NewProxier( + iptInterface, + utilsysctl.New(), + execer, + c.ProxyConfig.IPTables.SyncPeriod.Duration, + c.ProxyConfig.IPTables.MinSyncPeriod.Duration, + c.ProxyConfig.IPTables.MasqueradeAll, + int(*c.ProxyConfig.IPTables.MasqueradeBit), + c.ProxyConfig.ClusterCIDR, + hostname, + bindAddr, + recorder, + healthzServer, + ) + + if err != nil { + glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err) + } + proxier = proxierIptables + endpointsHandler = proxierIptables + servicesHandler = proxierIptables + // No turning back. Remove artifacts that might still exist from the userspace Proxier. + glog.V(0).Info("Tearing down userspace rules.") + userspace.CleanupLeftovers(iptInterface) + case componentconfig.ProxyModeUserspace: + glog.V(0).Info("Using userspace Proxier.") + // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for + // our config.EndpointsHandler. + loadBalancer := userspace.NewLoadBalancerRR() + // set EndpointsHandler to our loadBalancer + endpointsHandler = loadBalancer + + execer := utilexec.New() + proxierUserspace, err := userspace.NewProxier( + loadBalancer, + bindAddr, + iptInterface, + execer, + *portRange, + c.ProxyConfig.IPTables.SyncPeriod.Duration, + c.ProxyConfig.IPTables.MinSyncPeriod.Duration, + c.ProxyConfig.UDPIdleTimeout.Duration, + ) + if err != nil { + glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err) + } + proxier = proxierUserspace + servicesHandler = proxierUserspace + // Remove artifacts from the pure-iptables Proxier. + glog.V(0).Info("Tearing down pure-iptables proxy rules.") + iptables.CleanupLeftovers(iptInterface) + default: + glog.Fatalf("Unknown proxy mode %q", c.ProxyConfig.Mode) + } + + // Create configs (i.e. Watches for Services and Endpoints) + // Note: RegisterHandler() calls need to happen before creation of Sources because sources + // only notify on changes, and the initial update (on process start) may be lost if no handlers + // are registered yet. + serviceConfig := pconfig.NewServiceConfig( + c.InternalKubeInformers.Core().InternalVersion().Services(), + c.ProxyConfig.ConfigSyncPeriod.Duration, + ) + + if c.EnableUnidling { + unidlingLoadBalancer := userspace.NewLoadBalancerRR() + signaler := unidler.NewEventSignaler(recorder) + unidlingUserspaceProxy, err := unidler.NewUnidlerProxier(unidlingLoadBalancer, bindAddr, iptInterface, execer, *portRange, c.ProxyConfig.IPTables.SyncPeriod.Duration, c.ProxyConfig.IPTables.MinSyncPeriod.Duration, c.ProxyConfig.UDPIdleTimeout.Duration, signaler) + if err != nil { + glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err) + } + hybridProxier, err := hybrid.NewHybridProxier( + unidlingLoadBalancer, + unidlingUserspaceProxy, + endpointsHandler, + servicesHandler, + proxier, + unidlingUserspaceProxy, + c.ProxyConfig.IPTables.SyncPeriod.Duration, + c.InternalKubeInformers.Core().InternalVersion().Services().Lister(), + ) + if err != nil { + glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root (and if containerized, in the host network namespace as privileged) to use the service proxy: %v", err) + } + endpointsHandler = hybridProxier + servicesHandler = hybridProxier + proxier = hybridProxier + } + + iptInterface.AddReloadFunc(proxier.Sync) + serviceConfig.RegisterEventHandler(servicesHandler) + go serviceConfig.Run(utilwait.NeverStop) + + endpointsConfig := pconfig.NewEndpointsConfig( + c.InternalKubeInformers.Core().InternalVersion().Endpoints(), + c.ProxyConfig.ConfigSyncPeriod.Duration, + ) + // customized handling registration that inserts a filter if needed + if c.SDNProxy != nil { + if err := c.SDNProxy.Start(endpointsHandler); err != nil { + glog.Fatalf("error: node proxy plugin startup failed: %v", err) + } + endpointsHandler = c.SDNProxy + } + endpointsConfig.RegisterEventHandler(endpointsHandler) + go endpointsConfig.Run(utilwait.NeverStop) + + // periodically sync k8s iptables rules + go utilwait.Forever(proxier.SyncLoop, 0) + glog.Infof("Started Kubernetes Proxy on %s", c.ProxyConfig.BindAddress) +} + +// getNodeIP is copied from the upstream proxy config to retrieve the IP of a node. +func getNodeIP(client kclientsetcorev1.CoreV1Interface, hostname string) net.IP { + var nodeIP net.IP + node, err := client.Nodes().Get(hostname, metav1.GetOptions{}) + if err != nil { + glog.Warningf("Failed to retrieve node info: %v", err) + return nil + } + nodeIP, err = utilnode.GetNodeHostIP(node) + if err != nil { + glog.Warningf("Failed to retrieve node IP: %v", err) + return nil + } + return nodeIP +} diff --git a/pkg/cmd/server/kubernetes/network/network_config.go b/pkg/cmd/server/kubernetes/network/network_config.go new file mode 100644 index 000000000000..9c5c2efe3a14 --- /dev/null +++ b/pkg/cmd/server/kubernetes/network/network_config.go @@ -0,0 +1,170 @@ +package network + +import ( + "fmt" + "net" + "strings" + + "github.com/golang/glog" + + miekgdns "github.com/miekg/dns" + + kerrs "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kclientset "k8s.io/client-go/kubernetes" + "k8s.io/kubernetes/pkg/apis/componentconfig" + kclientsetexternal "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" + + osclient "github.com/openshift/origin/pkg/client" + configapi "github.com/openshift/origin/pkg/cmd/server/api" + "github.com/openshift/origin/pkg/dns" + "github.com/openshift/origin/pkg/network" + networkapi "github.com/openshift/origin/pkg/network/apis/network" +) + +// NetworkConfig represents the required parameters to start OpenShift networking +// through Kubernetes. All fields are required. +type NetworkConfig struct { + // External kube client + KubeClientset kclientset.Interface + // External kube client + ExternalKubeClientset kclientsetexternal.Interface + // Internal kubernetes shared informer factory. + InternalKubeInformers kinternalinformers.SharedInformerFactory + + // ProxyConfig is the configuration for the kube-proxy, fully initialized + ProxyConfig *componentconfig.KubeProxyConfiguration + // EnableUnidling indicates whether or not the unidling hybrid proxy should be used + EnableUnidling bool + + // DNSConfig controls the DNS configuration. + DNSServer *dns.Server + + // SDNNode is an optional SDN node interface + SDNNode network.NodeInterface + // SDNProxy is an optional service endpoints filterer + SDNProxy network.ProxyInterface +} + +// New creates a new network config object for running the networking components of the OpenShift node. +func New(options configapi.NodeConfig, clusterDomain string, proxyConfig *componentconfig.KubeProxyConfiguration, enableProxy, enableDNS bool) (*NetworkConfig, error) { + originClient, _, err := configapi.GetOpenShiftClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides) + if err != nil { + return nil, err + } + internalKubeClient, kubeConfig, err := configapi.GetInternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides) + if err != nil { + return nil, err + } + externalKubeClient, _, err := configapi.GetExternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides) + if err != nil { + return nil, err + } + kubeClient, err := kclientset.NewForConfig(kubeConfig) + if err != nil { + return nil, err + } + + if err = validateNetworkPluginName(originClient, options.NetworkConfig.NetworkPluginName); err != nil { + return nil, err + } + + internalKubeInformers := kinternalinformers.NewSharedInformerFactory(internalKubeClient, proxyConfig.ConfigSyncPeriod.Duration) + + var sdnNode network.NodeInterface + var sdnProxy network.ProxyInterface + if network.IsOpenShiftNetworkPlugin(options.NetworkConfig.NetworkPluginName) { + sdnNode, sdnProxy, err = NewSDNInterfaces(options, originClient, internalKubeClient, internalKubeInformers, proxyConfig) + if err != nil { + return nil, fmt.Errorf("SDN initialization failed: %v", err) + } + } + + config := &NetworkConfig{ + KubeClientset: kubeClient, + ExternalKubeClientset: externalKubeClient, + InternalKubeInformers: internalKubeInformers, + + ProxyConfig: proxyConfig, + EnableUnidling: options.EnableUnidling, + + SDNNode: sdnNode, + SDNProxy: sdnProxy, + } + + if enableDNS { + dnsConfig, err := dns.NewServerDefaults() + if err != nil { + return nil, fmt.Errorf("DNS configuration was not possible: %v", err) + } + if len(options.DNSBindAddress) > 0 { + dnsConfig.DnsAddr = options.DNSBindAddress + } + dnsConfig.Domain = clusterDomain + "." + dnsConfig.Local = "openshift.default.svc." + dnsConfig.Domain + + // identify override nameservers + var nameservers []string + for _, s := range options.DNSNameservers { + nameservers = append(nameservers, s) + } + if len(options.DNSRecursiveResolvConf) > 0 { + c, err := miekgdns.ClientConfigFromFile(options.DNSRecursiveResolvConf) + if err != nil { + return nil, fmt.Errorf("could not start DNS, unable to read config file: %v", err) + } + for _, s := range c.Servers { + nameservers = append(nameservers, net.JoinHostPort(s, c.Port)) + } + } + + if len(nameservers) > 0 { + dnsConfig.Nameservers = nameservers + } + + services, err := dns.NewCachedServiceAccessor(internalKubeInformers.Core().InternalVersion().Services()) + if err != nil { + return nil, fmt.Errorf("could not start DNS: failed to add ClusterIP index: %v", err) + } + + endpoints, err := dns.NewCachedEndpointsAccessor(internalKubeInformers.Core().InternalVersion().Endpoints()) + if err != nil { + return nil, fmt.Errorf("could not start DNS: failed to add HostnameIP index: %v", err) + } + + // TODO: use kubeletConfig.ResolverConfig as an argument to etcd in the event the + // user sets it, instead of passing it to the kubelet. + glog.Infof("DNS Bind to %s", options.DNSBindAddress) + config.DNSServer = dns.NewServer( + dnsConfig, + services, + endpoints, + "node", + ) + } + + return config, nil +} + +func validateNetworkPluginName(originClient *osclient.Client, pluginName string) error { + if network.IsOpenShiftNetworkPlugin(pluginName) { + // Detect any plugin mismatches between node and master + clusterNetwork, err := originClient.ClusterNetwork().Get(networkapi.ClusterNetworkDefault, metav1.GetOptions{}) + if kerrs.IsNotFound(err) { + return fmt.Errorf("master has not created a default cluster network, network plugin %q can not start", pluginName) + } else if err != nil { + return fmt.Errorf("cannot fetch %q cluster network: %v", networkapi.ClusterNetworkDefault, err) + } + + if clusterNetwork.PluginName != strings.ToLower(pluginName) { + if len(clusterNetwork.PluginName) != 0 { + return fmt.Errorf("detected network plugin mismatch between OpenShift node(%q) and master(%q)", pluginName, clusterNetwork.PluginName) + } else { + // Do not return error in this case + glog.Warningf(`either there is network plugin mismatch between OpenShift node(%q) and master or OpenShift master is running an older version where we did not persist plugin name`, pluginName) + } + } + } + return nil +} diff --git a/pkg/cmd/server/kubernetes/node/sdn_linux.go b/pkg/cmd/server/kubernetes/network/sdn_linux.go similarity index 99% rename from pkg/cmd/server/kubernetes/node/sdn_linux.go rename to pkg/cmd/server/kubernetes/network/sdn_linux.go index 5920999cce59..155dda9297d9 100644 --- a/pkg/cmd/server/kubernetes/node/sdn_linux.go +++ b/pkg/cmd/server/kubernetes/network/sdn_linux.go @@ -1,4 +1,4 @@ -package node +package network import ( "k8s.io/kubernetes/pkg/apis/componentconfig" diff --git a/pkg/cmd/server/kubernetes/node/sdn_unsupported.go b/pkg/cmd/server/kubernetes/network/sdn_unsupported.go similarity index 97% rename from pkg/cmd/server/kubernetes/node/sdn_unsupported.go rename to pkg/cmd/server/kubernetes/network/sdn_unsupported.go index c3d454c75645..72382c3eaa26 100644 --- a/pkg/cmd/server/kubernetes/node/sdn_unsupported.go +++ b/pkg/cmd/server/kubernetes/network/sdn_unsupported.go @@ -1,6 +1,6 @@ // +build !linux -package node +package network import ( "fmt" diff --git a/pkg/cmd/server/kubernetes/node/node.go b/pkg/cmd/server/kubernetes/node/node.go index e9343859aa09..9781bd049771 100644 --- a/pkg/cmd/server/kubernetes/node/node.go +++ b/pkg/cmd/server/kubernetes/node/node.go @@ -13,34 +13,13 @@ import ( dockerclient "github.com/fsouza/go-dockerclient" "github.com/golang/glog" - "github.com/openshift/origin/pkg/proxy/hybrid" - "github.com/openshift/origin/pkg/proxy/unidler" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilnet "k8s.io/apimachinery/pkg/util/net" - utilwait "k8s.io/apimachinery/pkg/util/wait" - kv1core "k8s.io/client-go/kubernetes/typed/core/v1" - kclientv1 "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/record" kubeletapp "k8s.io/kubernetes/cmd/kubelet/app" - kapi "k8s.io/kubernetes/pkg/api" kapiv1 "k8s.io/kubernetes/pkg/api/v1" - "k8s.io/kubernetes/pkg/apis/componentconfig" - kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/kubelet/cadvisor" cadvisortesting "k8s.io/kubernetes/pkg/kubelet/cadvisor/testing" "k8s.io/kubernetes/pkg/kubelet/cm" dockertools "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" - proxy "k8s.io/kubernetes/pkg/proxy" - pconfig "k8s.io/kubernetes/pkg/proxy/config" - "k8s.io/kubernetes/pkg/proxy/healthcheck" - "k8s.io/kubernetes/pkg/proxy/iptables" - "k8s.io/kubernetes/pkg/proxy/userspace" - utildbus "k8s.io/kubernetes/pkg/util/dbus" - kexec "k8s.io/kubernetes/pkg/util/exec" - utilexec "k8s.io/kubernetes/pkg/util/exec" - utiliptables "k8s.io/kubernetes/pkg/util/iptables" - utilnode "k8s.io/kubernetes/pkg/util/node" - utilsysctl "k8s.io/kubernetes/pkg/util/sysctl" "k8s.io/kubernetes/pkg/volume" configapi "github.com/openshift/origin/pkg/cmd/server/api" @@ -244,7 +223,7 @@ func (c *NodeConfig) EnsureLocalQuota(nodeConfig configapi.NodeConfig) { func (c *NodeConfig) RunKubelet() { var clusterDNS net.IP if len(c.KubeletServer.ClusterDNS) == 0 { - if service, err := c.Client.Core().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}); err == nil { + if service, err := c.KubeletDeps.KubeClient.Core().Services(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}); err == nil { if includesServicePort(service.Spec.Ports, 53, "dns") { // Use master service if service includes "dns" port 53. clusterDNS = net.ParseIP(service.Spec.ClusterIP) @@ -252,7 +231,7 @@ func (c *NodeConfig) RunKubelet() { } } if clusterDNS == nil { - if endpoint, err := c.Client.Core().Endpoints(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}); err == nil { + if endpoint, err := c.KubeletDeps.KubeClient.Core().Endpoints(metav1.NamespaceDefault).Get("kubernetes", metav1.GetOptions{}); err == nil { if endpointIP, ok := firstEndpointIPWithNamedPort(endpoint, 53, "dns"); ok { // Use first endpoint if endpoint includes "dns" port 53. clusterDNS = net.ParseIP(endpointIP) @@ -301,209 +280,8 @@ func SetFakeContainerManagerInterfaceForIntegrationTest() { defaultContainerManagerInterface = cm.NewStubContainerManager() } -// RunSDN starts the SDN, if the OpenShift SDN network plugin is enabled in configuration. -func (c *NodeConfig) RunSDN() { - if c.SDNNode == nil { - return - } - if err := c.SDNNode.Start(); err != nil { - glog.Fatalf("error: SDN node startup failed: %v", err) - } -} - -// RunDNS starts the DNS server as soon as services are loaded. -func (c *NodeConfig) RunDNS() { - go func() { - glog.Infof("Starting DNS on %s", c.DNSServer.Config.DnsAddr) - err := c.DNSServer.ListenAndServe() - glog.Fatalf("DNS server failed to start: %v", err) - }() -} - -// RunProxy starts the proxy -func (c *NodeConfig) RunProxy() { - protocol := utiliptables.ProtocolIpv4 - bindAddr := net.ParseIP(c.ProxyConfig.BindAddress) - if bindAddr.To4() == nil { - protocol = utiliptables.ProtocolIpv6 - } - - portRange := utilnet.ParsePortRangeOrDie(c.ProxyConfig.PortRange) - - hostname := utilnode.GetHostname(c.KubeletServer.HostnameOverride) - - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: kv1core.New(c.ExternalKubeClientset.CoreV1().RESTClient()).Events("")}) - recorder := eventBroadcaster.NewRecorder(kapi.Scheme, kclientv1.EventSource{Component: "kube-proxy", Host: hostname}) - - execer := kexec.New() - dbus := utildbus.New() - iptInterface := utiliptables.New(execer, dbus, protocol) - - var proxier proxy.ProxyProvider - var servicesHandler pconfig.ServiceHandler - var endpointsHandler pconfig.EndpointsHandler - - switch c.ProxyConfig.Mode { - case componentconfig.ProxyModeIPTables: - glog.V(0).Info("Using iptables Proxier.") - if bindAddr.Equal(net.IPv4zero) { - bindAddr = getNodeIP(c.Client, hostname) - } - var healthzServer *healthcheck.HealthzServer - if len(c.ProxyConfig.HealthzBindAddress) > 0 { - healthzServer = healthcheck.NewDefaultHealthzServer(c.ProxyConfig.HealthzBindAddress, 2*c.ProxyConfig.IPTables.SyncPeriod.Duration) - } - if c.ProxyConfig.IPTables.MasqueradeBit == nil { - // IPTablesMasqueradeBit must be specified or defaulted. - glog.Fatalf("Unable to read IPTablesMasqueradeBit from config") - } - proxierIptables, err := iptables.NewProxier( - iptInterface, - utilsysctl.New(), - execer, - c.ProxyConfig.IPTables.SyncPeriod.Duration, - c.ProxyConfig.IPTables.MinSyncPeriod.Duration, - c.ProxyConfig.IPTables.MasqueradeAll, - int(*c.ProxyConfig.IPTables.MasqueradeBit), - c.ProxyConfig.ClusterCIDR, - hostname, - bindAddr, - recorder, - healthzServer, - ) - - if err != nil { - if c.Containerized { - glog.Fatalf("error: Could not initialize Kubernetes Proxy: %v\n When running in a container, you must run the container in the host network namespace with --net=host and with --privileged", err) - } else { - glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root to use the service proxy: %v", err) - } - } - proxier = proxierIptables - endpointsHandler = proxierIptables - servicesHandler = proxierIptables - // No turning back. Remove artifacts that might still exist from the userspace Proxier. - glog.V(0).Info("Tearing down userspace rules.") - userspace.CleanupLeftovers(iptInterface) - case componentconfig.ProxyModeUserspace: - glog.V(0).Info("Using userspace Proxier.") - // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for - // our config.EndpointsHandler. - loadBalancer := userspace.NewLoadBalancerRR() - // set EndpointsHandler to our loadBalancer - endpointsHandler = loadBalancer - - execer := utilexec.New() - proxierUserspace, err := userspace.NewProxier( - loadBalancer, - bindAddr, - iptInterface, - execer, - *portRange, - c.ProxyConfig.IPTables.SyncPeriod.Duration, - c.ProxyConfig.IPTables.MinSyncPeriod.Duration, - c.ProxyConfig.UDPIdleTimeout.Duration, - ) - if err != nil { - if c.Containerized { - glog.Fatalf("error: Could not initialize Kubernetes Proxy: %v\n When running in a container, you must run the container in the host network namespace with --net=host and with --privileged", err) - } else { - glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root to use the service proxy: %v", err) - } - } - proxier = proxierUserspace - servicesHandler = proxierUserspace - // Remove artifacts from the pure-iptables Proxier. - glog.V(0).Info("Tearing down pure-iptables proxy rules.") - iptables.CleanupLeftovers(iptInterface) - default: - glog.Fatalf("Unknown proxy mode %q", c.ProxyConfig.Mode) - } - - // Create configs (i.e. Watches for Services and Endpoints) - // Note: RegisterHandler() calls need to happen before creation of Sources because sources - // only notify on changes, and the initial update (on process start) may be lost if no handlers - // are registered yet. - serviceConfig := pconfig.NewServiceConfig( - c.InternalKubeInformers.Core().InternalVersion().Services(), - c.ProxyConfig.ConfigSyncPeriod.Duration, - ) - - if c.EnableUnidling { - unidlingLoadBalancer := userspace.NewLoadBalancerRR() - signaler := unidler.NewEventSignaler(recorder) - unidlingUserspaceProxy, err := unidler.NewUnidlerProxier(unidlingLoadBalancer, bindAddr, iptInterface, execer, *portRange, c.ProxyConfig.IPTables.SyncPeriod.Duration, c.ProxyConfig.IPTables.MinSyncPeriod.Duration, c.ProxyConfig.UDPIdleTimeout.Duration, signaler) - if err != nil { - if c.Containerized { - glog.Fatalf("error: Could not initialize Kubernetes Proxy: %v\n When running in a container, you must run the container in the host network namespace with --net=host and with --privileged", err) - } else { - glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root to use the service proxy: %v", err) - } - } - hybridProxier, err := hybrid.NewHybridProxier( - unidlingLoadBalancer, - unidlingUserspaceProxy, - endpointsHandler, - servicesHandler, - proxier, - unidlingUserspaceProxy, - c.ProxyConfig.IPTables.SyncPeriod.Duration, - c.InternalKubeInformers.Core().InternalVersion().Services().Lister(), - ) - if err != nil { - if c.Containerized { - glog.Fatalf("error: Could not initialize Kubernetes Proxy: %v\n When running in a container, you must run the container in the host network namespace with --net=host and with --privileged", err) - } else { - glog.Fatalf("error: Could not initialize Kubernetes Proxy. You must run this process as root to use the service proxy: %v", err) - } - } - endpointsHandler = hybridProxier - servicesHandler = hybridProxier - proxier = hybridProxier - } - - iptInterface.AddReloadFunc(proxier.Sync) - serviceConfig.RegisterEventHandler(servicesHandler) - go serviceConfig.Run(utilwait.NeverStop) - - endpointsConfig := pconfig.NewEndpointsConfig( - c.InternalKubeInformers.Core().InternalVersion().Endpoints(), - c.ProxyConfig.ConfigSyncPeriod.Duration, - ) - // customized handling registration that inserts a filter if needed - if c.SDNProxy != nil { - if err := c.SDNProxy.Start(endpointsHandler); err != nil { - glog.Fatalf("error: node proxy plugin startup failed: %v", err) - } - endpointsHandler = c.SDNProxy - } - endpointsConfig.RegisterEventHandler(endpointsHandler) - go endpointsConfig.Run(utilwait.NeverStop) - - // periodically sync k8s iptables rules - go utilwait.Forever(proxier.SyncLoop, 0) - glog.Infof("Started Kubernetes Proxy on %s", c.ProxyConfig.BindAddress) -} - -// getNodeIP is copied from the upstream proxy config to retrieve the IP of a node. -func getNodeIP(client kclientset.Interface, hostname string) net.IP { - var nodeIP net.IP - node, err := client.Core().Nodes().Get(hostname, metav1.GetOptions{}) - if err != nil { - glog.Warningf("Failed to retrieve node info: %v", err) - return nil - } - nodeIP, err = utilnode.InternalGetNodeHostIP(node) - if err != nil { - glog.Warningf("Failed to retrieve node IP: %v", err) - return nil - } - return nodeIP -} - // TODO: more generic location -func includesServicePort(ports []kapi.ServicePort, port int, portName string) bool { +func includesServicePort(ports []kapiv1.ServicePort, port int, portName string) bool { for _, p := range ports { if p.Port == int32(port) && p.Name == portName { return true @@ -513,7 +291,7 @@ func includesServicePort(ports []kapi.ServicePort, port int, portName string) bo } // TODO: more generic location -func includesEndpointPort(ports []kapi.EndpointPort, port int) bool { +func includesEndpointPort(ports []kapiv1.EndpointPort, port int) bool { for _, p := range ports { if p.Port == int32(port) { return true @@ -523,7 +301,7 @@ func includesEndpointPort(ports []kapi.EndpointPort, port int) bool { } // TODO: more generic location -func firstEndpointIP(endpoints *kapi.Endpoints, port int) (string, bool) { +func firstEndpointIP(endpoints *kapiv1.Endpoints, port int) (string, bool) { for _, s := range endpoints.Subsets { if !includesEndpointPort(s.Ports, port) { continue @@ -536,7 +314,7 @@ func firstEndpointIP(endpoints *kapi.Endpoints, port int) (string, bool) { } // TODO: more generic location -func firstEndpointIPWithNamedPort(endpoints *kapi.Endpoints, port int, portName string) (string, bool) { +func firstEndpointIPWithNamedPort(endpoints *kapiv1.Endpoints, port int, portName string) (string, bool) { for _, s := range endpoints.Subsets { if !includesNamedEndpointPort(s.Ports, port, portName) { continue @@ -549,7 +327,7 @@ func firstEndpointIPWithNamedPort(endpoints *kapi.Endpoints, port int, portName } // TODO: more generic location -func includesNamedEndpointPort(ports []kapi.EndpointPort, port int, portName string) bool { +func includesNamedEndpointPort(ports []kapiv1.EndpointPort, port int, portName string) bool { for _, p := range ports { if p.Port == int32(port) && p.Name == portName { return true diff --git a/pkg/cmd/server/kubernetes/node/node_config.go b/pkg/cmd/server/kubernetes/node/node_config.go index 05e66c6f07bd..5ed343f967c7 100644 --- a/pkg/cmd/server/kubernetes/node/node_config.go +++ b/pkg/cmd/server/kubernetes/node/node_config.go @@ -2,47 +2,24 @@ package node import ( "crypto/tls" - "fmt" - "net" - "strconv" - "strings" - "time" "github.com/golang/glog" - miekgdns "github.com/miekg/dns" - - kerrs "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - kerrors "k8s.io/apimachinery/pkg/util/errors" clientgoclientset "k8s.io/client-go/kubernetes" kv1core "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/util/cert" - kubeproxyoptions "k8s.io/kubernetes/cmd/kube-proxy/app" kubeletapp "k8s.io/kubernetes/cmd/kubelet/app" kubeletoptions "k8s.io/kubernetes/cmd/kubelet/app/options" - "k8s.io/kubernetes/pkg/apis/componentconfig" "k8s.io/kubernetes/pkg/apis/componentconfig/v1alpha1" - kclientsetexternal "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - kinternalinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion" "k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/kubelet" dockertools "k8s.io/kubernetes/pkg/kubelet/dockershim/libdocker" - kubeletcni "k8s.io/kubernetes/pkg/kubelet/network/cni" kubeletserver "k8s.io/kubernetes/pkg/kubelet/server" - kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" - osclient "github.com/openshift/origin/pkg/client" configapi "github.com/openshift/origin/pkg/cmd/server/api" "github.com/openshift/origin/pkg/cmd/server/crypto" cmdutil "github.com/openshift/origin/pkg/cmd/util" - cmdflags "github.com/openshift/origin/pkg/cmd/util/flags" - "github.com/openshift/origin/pkg/cmd/util/variable" - "github.com/openshift/origin/pkg/dns" - "github.com/openshift/origin/pkg/network" - networkapi "github.com/openshift/origin/pkg/network/apis/network" ) // NodeConfig represents the required parameters to start the OpenShift node @@ -57,57 +34,15 @@ type NodeConfig struct { // Containerized is true if we are expected to be running inside of a container Containerized bool - // Client to connect to the master. - Client kclientset.Interface - // External kube client - ExternalKubeClientset kclientsetexternal.Interface - // Internal kubernetes shared informer factory. - InternalKubeInformers kinternalinformers.SharedInformerFactory // DockerClient is a client to connect to Docker DockerClient dockertools.Interface // KubeletServer contains the KubeletServer configuration KubeletServer *kubeletoptions.KubeletServer // KubeletDeps are the injected code dependencies for the kubelet, fully initialized KubeletDeps *kubelet.KubeletDeps - // ProxyConfig is the configuration for the kube-proxy, fully initialized - ProxyConfig *componentconfig.KubeProxyConfiguration - // IPTablesSyncPeriod is how often iptable rules are refreshed - IPTablesSyncPeriod string - // EnableUnidling indicates whether or not the unidling hybrid proxy should be used - EnableUnidling bool - - // DNSConfig controls the DNS configuration. - DNSServer *dns.Server - - // SDNNode is an optional SDN node interface - SDNNode network.NodeInterface - // SDNProxy is an optional service endpoints filterer - SDNProxy network.ProxyInterface } -func BuildKubernetesNodeConfig(options configapi.NodeConfig, enableProxy, enableDNS bool) (*NodeConfig, error) { - originClient, _, err := configapi.GetOpenShiftClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides) - if err != nil { - return nil, err - } - kubeClient, privilegedKubeConfig, err := configapi.GetInternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides) - if err != nil { - return nil, err - } - externalKubeClient, _, err := configapi.GetExternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides) - if err != nil { - return nil, err - } - // Make a separate client for event reporting, to avoid event QPS blocking node calls - eventClient, _, err := configapi.GetExternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides) - if err != nil { - return nil, err - } - clientgoClientSet, err := clientgoclientset.NewForConfig(privilegedKubeConfig) - if err != nil { - return nil, err - } - +func New(options configapi.NodeConfig, server *kubeletoptions.KubeletServer) (*NodeConfig, error) { if options.NodeName == "localhost" { glog.Warningf(`Using "localhost" as node name will not resolve from all locations`) } @@ -117,138 +52,19 @@ func BuildKubernetesNodeConfig(options configapi.NodeConfig, enableProxy, enable return nil, err } - imageTemplate := variable.NewDefaultImageTemplate() - imageTemplate.Format = options.ImageConfig.Format - imageTemplate.Latest = options.ImageConfig.Latest - - var path string - var fileCheckInterval int64 - if options.PodManifestConfig != nil { - path = options.PodManifestConfig.Path - fileCheckInterval = options.PodManifestConfig.FileCheckIntervalSeconds - } - - kubeAddressStr, kubePortStr, err := net.SplitHostPort(options.ServingInfo.BindAddress) + externalKubeClient, kubeConfig, err := configapi.GetExternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides) if err != nil { - return nil, fmt.Errorf("cannot parse node address: %v", err) - } - kubePort, err := strconv.Atoi(kubePortStr) - if err != nil { - return nil, fmt.Errorf("cannot parse node port: %v", err) - } - - if err = validateNetworkPluginName(originClient, options.NetworkConfig.NetworkPluginName); err != nil { return nil, err } - - // Defaults are tested in TestKubeletDefaults - server := kubeletoptions.NewKubeletServer() - // Adjust defaults - server.RequireKubeConfig = true - server.KubeConfig.Default(options.MasterKubeConfig) - server.PodManifestPath = path - server.RootDirectory = options.VolumeDirectory - server.NodeIP = options.NodeIP - server.HostnameOverride = options.NodeName - server.AllowPrivileged = true - server.RegisterNode = true - server.Address = kubeAddressStr - server.Port = int32(kubePort) - server.ReadOnlyPort = 0 // no read only access - server.CAdvisorPort = 0 // no unsecured cadvisor access - server.HealthzPort = 0 // no unsecured healthz access - server.HealthzBindAddress = "" // no unsecured healthz access - server.ClusterDNS = []string{options.DNSIP} - server.ClusterDomain = options.DNSDomain - server.NetworkPluginName = options.NetworkConfig.NetworkPluginName - server.HostNetworkSources = []string{kubelettypes.ApiserverSource, kubelettypes.FileSource} - server.HostPIDSources = []string{kubelettypes.ApiserverSource, kubelettypes.FileSource} - server.HostIPCSources = []string{kubelettypes.ApiserverSource, kubelettypes.FileSource} - server.HTTPCheckFrequency = metav1.Duration{Duration: time.Duration(0)} // no remote HTTP pod creation access - server.FileCheckFrequency = metav1.Duration{Duration: time.Duration(fileCheckInterval) * time.Second} - server.KubeletFlags.ContainerRuntimeOptions.PodSandboxImage = imageTemplate.ExpandOrDie("pod") - server.LowDiskSpaceThresholdMB = 256 // this the previous default - server.CPUCFSQuota = true // enable cpu cfs quota enforcement by default - server.MaxPods = 250 - server.PodsPerCore = 10 - server.CgroupDriver = "systemd" - server.DockerExecHandlerName = string(options.DockerConfig.ExecHandlerName) - server.RemoteRuntimeEndpoint = options.DockerConfig.DockerShimSocket - server.RemoteImageEndpoint = options.DockerConfig.DockerShimSocket - server.DockershimRootDirectory = options.DockerConfig.DockershimRootDirectory - - if network.IsOpenShiftNetworkPlugin(server.NetworkPluginName) { - // set defaults for openshift-sdn - server.HairpinMode = componentconfig.HairpinNone - } - - // prevents kube from generating certs - server.TLSCertFile = options.ServingInfo.ServerCert.CertFile - server.TLSPrivateKeyFile = options.ServingInfo.ServerCert.KeyFile - - containerized := cmdutil.Env("OPENSHIFT_CONTAINERIZED", "") == "true" - server.Containerized = containerized - - // force the authentication and authorization - // Setup auth - authnTTL, err := time.ParseDuration(options.AuthConfig.AuthenticationCacheTTL) + // Make a separate client for event reporting, to avoid event QPS blocking node calls + eventClient, _, err := configapi.GetExternalKubeClient(options.MasterKubeConfig, options.MasterClientConnectionOverrides) if err != nil { return nil, err } - server.Authentication = componentconfig.KubeletAuthentication{ - X509: componentconfig.KubeletX509Authentication{ - ClientCAFile: options.ServingInfo.ClientCA, - }, - Webhook: componentconfig.KubeletWebhookAuthentication{ - Enabled: true, - CacheTTL: metav1.Duration{Duration: authnTTL}, - }, - Anonymous: componentconfig.KubeletAnonymousAuthentication{ - Enabled: true, - }, - } - authzTTL, err := time.ParseDuration(options.AuthConfig.AuthorizationCacheTTL) + kubeClient, err := clientgoclientset.NewForConfig(kubeConfig) if err != nil { return nil, err } - server.Authorization = componentconfig.KubeletAuthorization{ - Mode: componentconfig.KubeletAuthorizationModeWebhook, - Webhook: componentconfig.KubeletWebhookAuthorization{ - CacheAuthorizedTTL: metav1.Duration{Duration: authzTTL}, - CacheUnauthorizedTTL: metav1.Duration{Duration: authzTTL}, - }, - } - - // resolve extended arguments - // TODO: this should be done in config validation (along with the above) so we can provide - // proper errors - if err := cmdflags.Resolve(options.KubeletArguments, server.AddFlags); len(err) > 0 { - return nil, kerrors.NewAggregate(err) - } - - proxyconfig, err := buildKubeProxyConfig(options) - if err != nil { - return nil, err - } - - internalKubeInformers := kinternalinformers.NewSharedInformerFactory(kubeClient, proxyconfig.ConfigSyncPeriod.Duration) - - // Initialize SDN before building kubelet config so it can modify option - var sdnNode network.NodeInterface - var sdnProxy network.ProxyInterface - if network.IsOpenShiftNetworkPlugin(options.NetworkConfig.NetworkPluginName) { - sdnNode, sdnProxy, err = NewSDNInterfaces(options, originClient, kubeClient, internalKubeInformers, proxyconfig) - if err != nil { - return nil, fmt.Errorf("SDN initialization failed: %v", err) - } - - // SDN plugin pod setup/teardown is implemented as a CNI plugin - server.NetworkPluginName = kubeletcni.CNIPluginName - server.NetworkPluginDir = kubeletcni.DefaultNetDir - server.CNIConfDir = kubeletcni.DefaultNetDir - server.CNIBinDir = kubeletcni.DefaultCNIDir - server.HairpinMode = componentconfig.HairpinNone - } deps, err := kubeletapp.UnsecuredKubeletDeps(server) if err != nil { @@ -267,7 +83,7 @@ func BuildKubernetesNodeConfig(options configapi.NodeConfig, enableProxy, enable deps.KubeClient = externalKubeClient deps.EventClient = kv1core.New(eventClient.CoreV1().RESTClient()) - deps.Auth, err = kubeletapp.BuildAuth(types.NodeName(options.NodeName), clientgoClientSet, server.KubeletConfiguration) + deps.Auth, err = kubeletapp.BuildAuth(types.NodeName(options.NodeName), kubeClient, server.KubeletConfiguration) if err != nil { return nil, err } @@ -298,163 +114,17 @@ func BuildKubernetesNodeConfig(options configapi.NodeConfig, enableProxy, enable BindAddress: options.ServingInfo.BindAddress, AllowDisabledDocker: options.AllowDisabledDocker, - Containerized: containerized, - - Client: kubeClient, - ExternalKubeClientset: externalKubeClient, - InternalKubeInformers: internalKubeInformers, + Containerized: server.Containerized, VolumeDir: options.VolumeDirectory, KubeletServer: server, KubeletDeps: deps, - - ProxyConfig: proxyconfig, - EnableUnidling: options.EnableUnidling, - - SDNNode: sdnNode, - SDNProxy: sdnProxy, - } - - if enableDNS { - dnsConfig, err := dns.NewServerDefaults() - if err != nil { - return nil, fmt.Errorf("DNS configuration was not possible: %v", err) - } - if len(options.DNSBindAddress) > 0 { - dnsConfig.DnsAddr = options.DNSBindAddress - } - dnsConfig.Domain = server.ClusterDomain + "." - dnsConfig.Local = "openshift.default.svc." + dnsConfig.Domain - - // identify override nameservers - var nameservers []string - for _, s := range options.DNSNameservers { - nameservers = append(nameservers, s) - } - if len(options.DNSRecursiveResolvConf) > 0 { - c, err := miekgdns.ClientConfigFromFile(options.DNSRecursiveResolvConf) - if err != nil { - return nil, fmt.Errorf("could not start DNS, unable to read config file: %v", err) - } - for _, s := range c.Servers { - nameservers = append(nameservers, net.JoinHostPort(s, c.Port)) - } - } - - if len(nameservers) > 0 { - dnsConfig.Nameservers = nameservers - } - - services, err := dns.NewCachedServiceAccessor(internalKubeInformers.Core().InternalVersion().Services()) - if err != nil { - return nil, fmt.Errorf("could not start DNS: failed to add ClusterIP index: %v", err) - } - - endpoints, err := dns.NewCachedEndpointsAccessor(internalKubeInformers.Core().InternalVersion().Endpoints()) - if err != nil { - return nil, fmt.Errorf("could not start DNS: failed to add HostnameIP index: %v", err) - } - - // TODO: use kubeletConfig.ResolverConfig as an argument to etcd in the event the - // user sets it, instead of passing it to the kubelet. - glog.Infof("DNS Bind to %s", options.DNSBindAddress) - config.DNSServer = dns.NewServer( - dnsConfig, - services, - endpoints, - "node", - ) } return config, nil } -func buildKubeProxyConfig(options configapi.NodeConfig) (*componentconfig.KubeProxyConfiguration, error) { - proxyOptions, err := kubeproxyoptions.NewOptions() - if err != nil { - return nil, err - } - // get default config - proxyconfig := proxyOptions.GetConfig() - - // BindAddress - Override default bind address from our config - addr := options.ServingInfo.BindAddress - host, _, err := net.SplitHostPort(addr) - if err != nil { - return nil, fmt.Errorf("The provided value to bind to must be an ip:port %q", addr) - } - ip := net.ParseIP(host) - if ip == nil { - return nil, fmt.Errorf("The provided value to bind to must be an ip:port: %q", addr) - } - proxyconfig.BindAddress = ip.String() - - // HealthzPort, HealthzBindAddress - disable - proxyconfig.HealthzBindAddress = "" - proxyconfig.MetricsBindAddress = "" - - // OOMScoreAdj, ResourceContainer - clear, we don't run in a container - oomScoreAdj := int32(0) - proxyconfig.OOMScoreAdj = &oomScoreAdj - proxyconfig.ResourceContainer = "" - - // use the same client as the node - proxyconfig.ClientConnection.KubeConfigFile = options.MasterKubeConfig - - // ProxyMode, set to iptables - proxyconfig.Mode = "iptables" - - // IptablesSyncPeriod, set to our config value - syncPeriod, err := time.ParseDuration(options.IPTablesSyncPeriod) - if err != nil { - return nil, fmt.Errorf("Cannot parse the provided ip-tables sync period (%s) : %v", options.IPTablesSyncPeriod, err) - } - proxyconfig.IPTables.SyncPeriod = metav1.Duration{ - Duration: syncPeriod, - } - masqueradeBit := int32(0) - proxyconfig.IPTables.MasqueradeBit = &masqueradeBit - - // PortRange, use default - // HostnameOverride, use default - // ConfigSyncPeriod, use default - // MasqueradeAll, use default - // CleanupAndExit, use default - // KubeAPIQPS, use default, doesn't apply until we build a separate client - // KubeAPIBurst, use default, doesn't apply until we build a separate client - // UDPIdleTimeout, use default - - // Resolve cmd flags to add any user overrides - if err := cmdflags.Resolve(options.ProxyArguments, proxyOptions.AddFlags); len(err) > 0 { - return nil, kerrors.NewAggregate(err) - } - - return proxyconfig, nil -} - -func validateNetworkPluginName(originClient *osclient.Client, pluginName string) error { - if network.IsOpenShiftNetworkPlugin(pluginName) { - // Detect any plugin mismatches between node and master - clusterNetwork, err := originClient.ClusterNetwork().Get(networkapi.ClusterNetworkDefault, metav1.GetOptions{}) - if kerrs.IsNotFound(err) { - return fmt.Errorf("master has not created a default cluster network, network plugin %q can not start", pluginName) - } else if err != nil { - return fmt.Errorf("cannot fetch %q cluster network: %v", networkapi.ClusterNetworkDefault, err) - } - - if clusterNetwork.PluginName != strings.ToLower(pluginName) { - if len(clusterNetwork.PluginName) != 0 { - return fmt.Errorf("detected network plugin mismatch between OpenShift node(%q) and master(%q)", pluginName, clusterNetwork.PluginName) - } else { - // Do not return error in this case - glog.Warningf(`either there is network plugin mismatch between OpenShift node(%q) and master or OpenShift master is running an older version where we did not persist plugin name`, pluginName) - } - } - } - return nil -} - func buildCloudProvider(server *kubeletoptions.KubeletServer) (cloudprovider.Interface, error) { if len(server.CloudProvider) == 0 || server.CloudProvider == v1alpha1.AutoDetectCloudProvider { return nil, nil diff --git a/pkg/cmd/server/kubernetes/node/node_config_test.go b/pkg/cmd/server/kubernetes/node/node_config_test.go index a3296f991ede..fbabf91b577b 100644 --- a/pkg/cmd/server/kubernetes/node/node_config_test.go +++ b/pkg/cmd/server/kubernetes/node/node_config_test.go @@ -138,6 +138,10 @@ func TestKubeletDefaults(t *testing.T) { }, } + if goruntime.GOOS == "darwin" { + expectedDefaults.KubeletConfiguration.RemoteRuntimeEndpoint = "" + } + if !reflect.DeepEqual(defaults, expectedDefaults) { t.Logf("expected defaults, actual defaults: \n%s", diff.ObjectReflectDiff(expectedDefaults, defaults)) t.Errorf("Got different defaults than expected, adjust in BuildKubernetesNodeConfig and update expectedDefaults") diff --git a/pkg/cmd/server/kubernetes/node/options/options.go b/pkg/cmd/server/kubernetes/node/options/options.go new file mode 100644 index 000000000000..31122ff84393 --- /dev/null +++ b/pkg/cmd/server/kubernetes/node/options/options.go @@ -0,0 +1,205 @@ +package node + +import ( + "fmt" + "net" + "strconv" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kerrors "k8s.io/apimachinery/pkg/util/errors" + kubeproxyoptions "k8s.io/kubernetes/cmd/kube-proxy/app" + kubeletoptions "k8s.io/kubernetes/cmd/kubelet/app/options" + "k8s.io/kubernetes/pkg/apis/componentconfig" + kubeletcni "k8s.io/kubernetes/pkg/kubelet/network/cni" + kubelettypes "k8s.io/kubernetes/pkg/kubelet/types" + + configapi "github.com/openshift/origin/pkg/cmd/server/api" + cmdutil "github.com/openshift/origin/pkg/cmd/util" + cmdflags "github.com/openshift/origin/pkg/cmd/util/flags" + "github.com/openshift/origin/pkg/cmd/util/variable" + "github.com/openshift/origin/pkg/network" +) + +// Build creates the core Kubernetes component configs for a given NodeConfig, or returns +// an error +func Build(options configapi.NodeConfig) (*kubeletoptions.KubeletServer, *componentconfig.KubeProxyConfiguration, error) { + imageTemplate := variable.NewDefaultImageTemplate() + imageTemplate.Format = options.ImageConfig.Format + imageTemplate.Latest = options.ImageConfig.Latest + + var path string + var fileCheckInterval int64 + if options.PodManifestConfig != nil { + path = options.PodManifestConfig.Path + fileCheckInterval = options.PodManifestConfig.FileCheckIntervalSeconds + } + + kubeAddressStr, kubePortStr, err := net.SplitHostPort(options.ServingInfo.BindAddress) + if err != nil { + return nil, nil, fmt.Errorf("cannot parse node address: %v", err) + } + kubePort, err := strconv.Atoi(kubePortStr) + if err != nil { + return nil, nil, fmt.Errorf("cannot parse node port: %v", err) + } + + // Defaults are tested in TestKubeletDefaults + server := kubeletoptions.NewKubeletServer() + // Adjust defaults + server.RequireKubeConfig = true + server.KubeConfig.Default(options.MasterKubeConfig) + server.PodManifestPath = path + server.RootDirectory = options.VolumeDirectory + server.NodeIP = options.NodeIP + server.HostnameOverride = options.NodeName + server.AllowPrivileged = true + server.RegisterNode = true + server.Address = kubeAddressStr + server.Port = int32(kubePort) + server.ReadOnlyPort = 0 // no read only access + server.CAdvisorPort = 0 // no unsecured cadvisor access + server.HealthzPort = 0 // no unsecured healthz access + server.HealthzBindAddress = "" // no unsecured healthz access + server.ClusterDNS = []string{options.DNSIP} + server.ClusterDomain = options.DNSDomain + server.NetworkPluginName = options.NetworkConfig.NetworkPluginName + server.HostNetworkSources = []string{kubelettypes.ApiserverSource, kubelettypes.FileSource} + server.HostPIDSources = []string{kubelettypes.ApiserverSource, kubelettypes.FileSource} + server.HostIPCSources = []string{kubelettypes.ApiserverSource, kubelettypes.FileSource} + server.HTTPCheckFrequency = metav1.Duration{Duration: time.Duration(0)} // no remote HTTP pod creation access + server.FileCheckFrequency = metav1.Duration{Duration: time.Duration(fileCheckInterval) * time.Second} + server.KubeletFlags.ContainerRuntimeOptions.PodSandboxImage = imageTemplate.ExpandOrDie("pod") + server.LowDiskSpaceThresholdMB = 256 // this the previous default + server.CPUCFSQuota = true // enable cpu cfs quota enforcement by default + server.MaxPods = 250 + server.PodsPerCore = 10 + server.CgroupDriver = "systemd" + server.DockerExecHandlerName = string(options.DockerConfig.ExecHandlerName) + server.RemoteRuntimeEndpoint = options.DockerConfig.DockerShimSocket + server.RemoteImageEndpoint = options.DockerConfig.DockerShimSocket + server.DockershimRootDirectory = options.DockerConfig.DockershimRootDirectory + + // prevents kube from generating certs + server.TLSCertFile = options.ServingInfo.ServerCert.CertFile + server.TLSPrivateKeyFile = options.ServingInfo.ServerCert.KeyFile + + containerized := cmdutil.Env("OPENSHIFT_CONTAINERIZED", "") == "true" + server.Containerized = containerized + + // force the authentication and authorization + // Setup auth + authnTTL, err := time.ParseDuration(options.AuthConfig.AuthenticationCacheTTL) + if err != nil { + return nil, nil, err + } + server.Authentication = componentconfig.KubeletAuthentication{ + X509: componentconfig.KubeletX509Authentication{ + ClientCAFile: options.ServingInfo.ClientCA, + }, + Webhook: componentconfig.KubeletWebhookAuthentication{ + Enabled: true, + CacheTTL: metav1.Duration{Duration: authnTTL}, + }, + Anonymous: componentconfig.KubeletAnonymousAuthentication{ + Enabled: true, + }, + } + authzTTL, err := time.ParseDuration(options.AuthConfig.AuthorizationCacheTTL) + if err != nil { + return nil, nil, err + } + server.Authorization = componentconfig.KubeletAuthorization{ + Mode: componentconfig.KubeletAuthorizationModeWebhook, + Webhook: componentconfig.KubeletWebhookAuthorization{ + CacheAuthorizedTTL: metav1.Duration{Duration: authzTTL}, + CacheUnauthorizedTTL: metav1.Duration{Duration: authzTTL}, + }, + } + + // resolve extended arguments + // TODO: this should be done in config validation (along with the above) so we can provide + // proper errors + if err := cmdflags.Resolve(options.KubeletArguments, server.AddFlags); len(err) > 0 { + return nil, nil, kerrors.NewAggregate(err) + } + + proxyconfig, err := buildKubeProxyConfig(options) + if err != nil { + return nil, nil, err + } + + if network.IsOpenShiftNetworkPlugin(options.NetworkConfig.NetworkPluginName) { + // SDN plugin pod setup/teardown is implemented as a CNI plugin + server.NetworkPluginName = kubeletcni.CNIPluginName + server.NetworkPluginDir = kubeletcni.DefaultNetDir + server.CNIConfDir = kubeletcni.DefaultNetDir + server.CNIBinDir = kubeletcni.DefaultCNIDir + server.HairpinMode = componentconfig.HairpinNone + } + + return server, proxyconfig, nil +} + +func buildKubeProxyConfig(options configapi.NodeConfig) (*componentconfig.KubeProxyConfiguration, error) { + proxyOptions, err := kubeproxyoptions.NewOptions() + if err != nil { + return nil, err + } + // get default config + proxyconfig := proxyOptions.GetConfig() + + // BindAddress - Override default bind address from our config + addr := options.ServingInfo.BindAddress + host, _, err := net.SplitHostPort(addr) + if err != nil { + return nil, fmt.Errorf("The provided value to bind to must be an ip:port %q", addr) + } + ip := net.ParseIP(host) + if ip == nil { + return nil, fmt.Errorf("The provided value to bind to must be an ip:port: %q", addr) + } + proxyconfig.BindAddress = ip.String() + + // HealthzPort, HealthzBindAddress - disable + proxyconfig.HealthzBindAddress = "" + proxyconfig.MetricsBindAddress = "" + + // OOMScoreAdj, ResourceContainer - clear, we don't run in a container + oomScoreAdj := int32(0) + proxyconfig.OOMScoreAdj = &oomScoreAdj + proxyconfig.ResourceContainer = "" + + // use the same client as the node + proxyconfig.ClientConnection.KubeConfigFile = options.MasterKubeConfig + + // ProxyMode, set to iptables + proxyconfig.Mode = "iptables" + + // IptablesSyncPeriod, set to our config value + syncPeriod, err := time.ParseDuration(options.IPTablesSyncPeriod) + if err != nil { + return nil, fmt.Errorf("Cannot parse the provided ip-tables sync period (%s) : %v", options.IPTablesSyncPeriod, err) + } + proxyconfig.IPTables.SyncPeriod = metav1.Duration{ + Duration: syncPeriod, + } + masqueradeBit := int32(0) + proxyconfig.IPTables.MasqueradeBit = &masqueradeBit + + // PortRange, use default + // HostnameOverride, use default + // ConfigSyncPeriod, use default + // MasqueradeAll, use default + // CleanupAndExit, use default + // KubeAPIQPS, use default, doesn't apply until we build a separate client + // KubeAPIBurst, use default, doesn't apply until we build a separate client + // UDPIdleTimeout, use default + + // Resolve cmd flags to add any user overrides + if err := cmdflags.Resolve(options.ProxyArguments, proxyOptions.AddFlags); len(err) > 0 { + return nil, kerrors.NewAggregate(err) + } + + return proxyconfig, nil +} diff --git a/pkg/cmd/server/kubernetes/node/proxy.go b/pkg/cmd/server/kubernetes/node/proxy.go deleted file mode 100644 index 21285f65cab1..000000000000 --- a/pkg/cmd/server/kubernetes/node/proxy.go +++ /dev/null @@ -1,34 +0,0 @@ -package node - -import ( - "fmt" - "net/url" - - restclient "k8s.io/client-go/rest" - - restful "github.com/emicklei/go-restful" - - "github.com/openshift/origin/pkg/util/httpproxy" -) - -type ProxyConfig struct { - ClientConfig *restclient.Config -} - -func (c *ProxyConfig) InstallAPI(container *restful.Container) ([]string, error) { - kubeAddr, err := url.Parse(c.ClientConfig.Host) - if err != nil { - return nil, err - } - - proxy, err := httpproxy.NewUpgradeAwareSingleHostReverseProxy(c.ClientConfig, kubeAddr) - if err != nil { - return nil, fmt.Errorf("Unable to initialize the Kubernetes proxy: %v", err) - } - - container.Handle("/api/", proxy) - - return []string{ - "Started Kubernetes proxy at %s/api/", - }, nil -} diff --git a/pkg/cmd/server/start/start_node.go b/pkg/cmd/server/start/start_node.go index 079712ae6198..dc3cf8ec3b8e 100644 --- a/pkg/cmd/server/start/start_node.go +++ b/pkg/cmd/server/start/start_node.go @@ -22,11 +22,12 @@ import ( configapilatest "github.com/openshift/origin/pkg/cmd/server/api/latest" "github.com/openshift/origin/pkg/cmd/server/api/validation" "github.com/openshift/origin/pkg/cmd/server/crypto" - kubernetes "github.com/openshift/origin/pkg/cmd/server/kubernetes/node" + "github.com/openshift/origin/pkg/cmd/server/kubernetes/network" + "github.com/openshift/origin/pkg/cmd/server/kubernetes/node" + nodeoptions "github.com/openshift/origin/pkg/cmd/server/kubernetes/node/options" cmdutil "github.com/openshift/origin/pkg/cmd/util" "github.com/openshift/origin/pkg/cmd/util/docker" utilflags "github.com/openshift/origin/pkg/cmd/util/flags" - "github.com/openshift/origin/pkg/network" "github.com/openshift/origin/pkg/version" ) @@ -328,17 +329,19 @@ func (o NodeOptions) IsRunFromConfig() bool { } func StartNode(nodeConfig configapi.NodeConfig, components *utilflags.ComponentFlag) error { - config, err := kubernetes.BuildKubernetesNodeConfig(nodeConfig, components.Enabled(ComponentProxy), components.Enabled(ComponentDNS) && len(nodeConfig.DNSBindAddress) > 0) + server, proxyConfig, err := nodeoptions.Build(nodeConfig) if err != nil { return err } - if network.IsOpenShiftNetworkPlugin(config.KubeletServer.NetworkPluginName) { - // TODO: SDN plugin depends on the Kubelet registering as a Node and doesn't retry cleanly, - // and Kubelet also can't start the PodSync loop until the SDN plugin has loaded. - if components.Enabled(ComponentKubelet) != components.Enabled(ComponentPlugins) { - return fmt.Errorf("the SDN plugin must be run in the same process as the kubelet") - } + networkConfig, err := network.New(nodeConfig, server.ClusterDomain, proxyConfig, components.Enabled(ComponentProxy), components.Enabled(ComponentDNS) && len(nodeConfig.DNSBindAddress) > 0) + if err != nil { + return err + } + + config, err := node.New(nodeConfig, server) + if err != nil { + return err } if components.Enabled(ComponentKubelet) { @@ -347,12 +350,6 @@ func StartNode(nodeConfig configapi.NodeConfig, components *utilflags.ComponentF glog.Infof("Starting node networking %s (%s)", config.KubeletServer.HostnameOverride, version.Get().String()) } - _, kubeClientConfig, err := configapi.GetInternalKubeClient(nodeConfig.MasterKubeConfig, nodeConfig.MasterClientConnectionOverrides) - if err != nil { - return err - } - glog.Infof("Connecting to API server %s", kubeClientConfig.Host) - // preconditions if components.Enabled(ComponentKubelet) { config.EnsureKubeletAccess() @@ -365,16 +362,16 @@ func StartNode(nodeConfig configapi.NodeConfig, components *utilflags.ComponentF config.RunKubelet() } if components.Enabled(ComponentPlugins) { - config.RunSDN() + networkConfig.RunSDN() } if components.Enabled(ComponentProxy) { - config.RunProxy() + networkConfig.RunProxy() } - if components.Enabled(ComponentDNS) && config.DNSServer != nil { - config.RunDNS() + if components.Enabled(ComponentDNS) && networkConfig.DNSServer != nil { + networkConfig.RunDNS() } - config.InternalKubeInformers.Start(wait.NeverStop) + networkConfig.InternalKubeInformers.Start(wait.NeverStop) return nil }