Skip to content

Commit

Permalink
Merge pull request #1023 from simonferquel/k8s-stack-services-filters
Browse files Browse the repository at this point in the history
[Kubernetes] stack services filters
  • Loading branch information
thaJeztah committed May 24, 2018
2 parents 57ce5aa + 297866e commit 0089f17
Show file tree
Hide file tree
Showing 25 changed files with 362 additions and 11,972 deletions.
5 changes: 5 additions & 0 deletions cli/command/stack/kubernetes/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ func (s *Factory) ReplicaSets() typesappsv1beta2.ReplicaSetInterface {
return s.appsClientSet.ReplicaSets(s.namespace)
}

// DaemonSets returns a client for kubernetes daemon sets
func (s *Factory) DaemonSets() typesappsv1beta2.DaemonSetInterface {
return s.appsClientSet.DaemonSets(s.namespace)
}

// Stacks returns a client for Docker's Stack on Kubernetes
func (s *Factory) Stacks(allNamespaces bool) (StackClient, error) {
version, err := kubernetes.GetStackAPIVersion(s.clientSet)
Expand Down
133 changes: 93 additions & 40 deletions cli/command/stack/kubernetes/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package kubernetes

import (
"fmt"
"sort"
"strings"
"time"

"github.com/docker/cli/cli/command/formatter"
"github.com/docker/cli/kubernetes/labels"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
appsv1beta2 "k8s.io/api/apps/v1beta2"
apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -65,13 +68,43 @@ func toSwarmProtocol(protocol apiv1.Protocol) swarm.PortConfigProtocol {
return swarm.PortConfigProtocol("unknown")
}

func fetchPods(namespace string, pods corev1.PodInterface) ([]apiv1.Pod, error) {
labelSelector := labels.SelectorForStack(namespace)
podsList, err := pods.List(metav1.ListOptions{LabelSelector: labelSelector})
func fetchPods(stackName string, pods corev1.PodInterface, f filters.Args) ([]apiv1.Pod, error) {
services := f.Get("service")
// for existing script compatibility, support either <servicename> or <stackname>_<servicename> format
stackNamePrefix := stackName + "_"
for _, s := range services {
if strings.HasPrefix(s, stackNamePrefix) {
services = append(services, strings.TrimPrefix(s, stackNamePrefix))
}
}
listOpts := metav1.ListOptions{LabelSelector: labels.SelectorForStack(stackName, services...)}
var result []apiv1.Pod
podsList, err := pods.List(listOpts)
if err != nil {
return nil, err
}
return podsList.Items, nil
nodes := f.Get("node")
for _, pod := range podsList.Items {
if filterPod(pod, nodes) &&
// name filter is done client side for matching partials
f.FuzzyMatch("name", stackNamePrefix+pod.Name) {

result = append(result, pod)
}
}
return result, nil
}

func filterPod(pod apiv1.Pod, nodes []string) bool {
if len(nodes) == 0 {
return true
}
for _, name := range nodes {
if pod.Spec.NodeName == name {
return true
}
}
return false
}

func getContainerImage(containers []apiv1.Container) string {
Expand Down Expand Up @@ -121,56 +154,76 @@ const (
publishedOnRandomPortSuffix = "-random-ports"
)

// Replicas conversion
func replicasToServices(replicas *appsv1beta2.ReplicaSetList, services *apiv1.ServiceList) ([]swarm.Service, map[string]formatter.ServiceListInfo, error) {
func convertToServices(replicas *appsv1beta2.ReplicaSetList, daemons *appsv1beta2.DaemonSetList, services *apiv1.ServiceList) ([]swarm.Service, map[string]formatter.ServiceListInfo, error) {
result := make([]swarm.Service, len(replicas.Items))
infos := make(map[string]formatter.ServiceListInfo, len(replicas.Items))
infos := make(map[string]formatter.ServiceListInfo, len(replicas.Items)+len(daemons.Items))
for i, r := range replicas.Items {
serviceName := r.Labels[labels.ForServiceName]
serviceHeadless, ok := findService(services, serviceName)
if !ok {
return nil, nil, fmt.Errorf("could not find service '%s'", serviceName)
}
stack, ok := serviceHeadless.Labels[labels.ForStackName]
if ok {
stack += "_"
}
uid := string(serviceHeadless.UID)
s := swarm.Service{
ID: uid,
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: stack + serviceHeadless.Name,
},
TaskTemplate: swarm.TaskSpec{
ContainerSpec: &swarm.ContainerSpec{
Image: getContainerImage(r.Spec.Template.Spec.Containers),
},
},
},
}
if serviceNodePort, ok := findService(services, serviceName+publishedOnRandomPortSuffix); ok && serviceNodePort.Spec.Type == apiv1.ServiceTypeNodePort {
s.Endpoint = serviceEndpoint(serviceNodePort, swarm.PortConfigPublishModeHost)
}
if serviceLoadBalancer, ok := findService(services, serviceName+publishedServiceSuffix); ok && serviceLoadBalancer.Spec.Type == apiv1.ServiceTypeLoadBalancer {
s.Endpoint = serviceEndpoint(serviceLoadBalancer, swarm.PortConfigPublishModeIngress)
s, err := convertToService(r.Labels[labels.ForServiceName], services, r.Spec.Template.Spec.Containers)
if err != nil {
return nil, nil, err
}
result[i] = s
infos[uid] = formatter.ServiceListInfo{
result[i] = *s
infos[s.ID] = formatter.ServiceListInfo{
Mode: "replicated",
Replicas: fmt.Sprintf("%d/%d", r.Status.AvailableReplicas, r.Status.Replicas),
}
}
for _, d := range daemons.Items {
s, err := convertToService(d.Labels[labels.ForServiceName], services, d.Spec.Template.Spec.Containers)
if err != nil {
return nil, nil, err
}
result = append(result, *s)
infos[s.ID] = formatter.ServiceListInfo{
Mode: "global",
Replicas: fmt.Sprintf("%d/%d", d.Status.NumberReady, d.Status.DesiredNumberScheduled),
}
}
sort.Slice(result, func(i, j int) bool {
return result[i].ID < result[j].ID
})
return result, infos, nil
}

func findService(services *apiv1.ServiceList, name string) (apiv1.Service, bool) {
func convertToService(serviceName string, services *apiv1.ServiceList, containers []apiv1.Container) (*swarm.Service, error) {
serviceHeadless, err := findService(services, serviceName)
if err != nil {
return nil, err
}
stack, ok := serviceHeadless.Labels[labels.ForStackName]
if ok {
stack += "_"
}
uid := string(serviceHeadless.UID)
s := &swarm.Service{
ID: uid,
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: stack + serviceHeadless.Name,
},
TaskTemplate: swarm.TaskSpec{
ContainerSpec: &swarm.ContainerSpec{
Image: getContainerImage(containers),
},
},
},
}
if serviceNodePort, err := findService(services, serviceName+publishedOnRandomPortSuffix); err == nil && serviceNodePort.Spec.Type == apiv1.ServiceTypeNodePort {
s.Endpoint = serviceEndpoint(serviceNodePort, swarm.PortConfigPublishModeHost)
}
if serviceLoadBalancer, err := findService(services, serviceName+publishedServiceSuffix); err == nil && serviceLoadBalancer.Spec.Type == apiv1.ServiceTypeLoadBalancer {
s.Endpoint = serviceEndpoint(serviceLoadBalancer, swarm.PortConfigPublishModeIngress)
}
return s, nil
}

func findService(services *apiv1.ServiceList, name string) (apiv1.Service, error) {
for _, s := range services.Items {
if s.Name == name {
return s, true
return s, nil
}
}
return apiv1.Service{}, false
return apiv1.Service{}, fmt.Errorf("could not find service '%s'", name)
}

func serviceEndpoint(service apiv1.Service, publishMode swarm.PortConfigPublishMode) swarm.Endpoint {
Expand Down
4 changes: 2 additions & 2 deletions cli/command/stack/kubernetes/conversion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestReplicasConversionNeedsAService(t *testing.T) {
Items: []appsv1beta2.ReplicaSet{makeReplicaSet("unknown", 0, 0)},
}
services := apiv1.ServiceList{}
_, _, err := replicasToServices(&replicas, &services)
_, _, err := convertToServices(&replicas, &appsv1beta2.DaemonSetList{}, &services)
assert.ErrorContains(t, err, "could not find service")
}

Expand Down Expand Up @@ -124,7 +124,7 @@ func TestKubernetesServiceToSwarmServiceConversion(t *testing.T) {
}

for _, tc := range testCases {
swarmServices, listInfo, err := replicasToServices(tc.replicas, tc.services)
swarmServices, listInfo, err := convertToServices(tc.replicas, &appsv1beta2.DaemonSetList{}, tc.services)
assert.NilError(t, err)
assert.DeepEqual(t, tc.expectedServices, swarmServices)
assert.DeepEqual(t, tc.expectedListInfo, listInfo)
Expand Down
83 changes: 41 additions & 42 deletions cli/command/stack/kubernetes/ps.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,23 @@ import (
"github.com/docker/cli/cli/command/task"
"github.com/docker/docker/api/types/swarm"
apiv1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/kubernetes/pkg/api"
)

var supportedPSFilters = map[string]bool{
"name": true,
"service": true,
"node": true,
}

// RunPS is the kubernetes implementation of docker stack ps
func RunPS(dockerCli *KubeCli, options options.PS) error {
namespace := options.Namespace

// Initialize clients
filters := options.Filter.Value()
if err := filters.Validate(supportedPSFilters); err != nil {
return err
}
client, err := dockerCli.composeClient()
if err != nil {
return err
Expand All @@ -29,78 +35,71 @@ func RunPS(dockerCli *KubeCli, options options.PS) error {
if err != nil {
return err
}
podsClient := client.Pods()

// Fetch pods
if _, err := stacks.Get(namespace); err != nil {
return fmt.Errorf("nothing found in stack: %s", namespace)
stackName := options.Namespace
_, err = stacks.Get(stackName)
if apierrs.IsNotFound(err) {
return fmt.Errorf("nothing found in stack: %s", stackName)
}

pods, err := fetchPods(namespace, podsClient)
if err != nil {
return err
}

pods, err := fetchPods(stackName, client.Pods(), filters)
if err != nil {
return err
}
if len(pods) == 0 {
return fmt.Errorf("nothing found in stack: %s", namespace)
return fmt.Errorf("nothing found in stack: %s", stackName)
}
return printTasks(dockerCli, options, stackName, client, pods)
}

func printTasks(dockerCli command.Cli, options options.PS, namespace string, client corev1.NodesGetter, pods []apiv1.Pod) error {
format := options.Format
if len(format) == 0 {
if format == "" {
format = task.DefaultFormat(dockerCli.ConfigFile(), options.Quiet)
}
nodeResolver := makeNodeResolver(options.NoResolve, client.Nodes())

tasks := make([]swarm.Task, len(pods))
for i, pod := range pods {
tasks[i] = podToTask(pod)
}
return print(dockerCli, namespace, tasks, pods, nodeResolver, !options.NoTrunc, options.Quiet, format)
}

type idResolver func(name string) (string, error)

func print(dockerCli command.Cli, namespace string, tasks []swarm.Task, pods []apiv1.Pod, nodeResolver idResolver, trunc, quiet bool, format string) error {
sort.Stable(tasksBySlot(tasks))

names := map[string]string{}
nodes := map[string]string{}

tasksCtx := formatter.Context{
Output: dockerCli.Out(),
Format: formatter.NewTaskFormat(format, quiet),
Trunc: trunc,
n, err := client.Nodes().List(metav1.ListOptions{})
if err != nil {
return err
}

for i, task := range tasks {
nodeValue, err := nodeResolver(pods[i].Spec.NodeName)
nodeValue, err := resolveNode(pods[i].Spec.NodeName, n, options.NoResolve)
if err != nil {
return err
}

names[task.ID] = fmt.Sprintf("%s_%s", namespace, pods[i].Name)
nodes[task.ID] = nodeValue
}

tasksCtx := formatter.Context{
Output: dockerCli.Out(),
Format: formatter.NewTaskFormat(format, options.Quiet),
Trunc: !options.NoTrunc,
}

return formatter.TaskWrite(tasksCtx, tasks, names, nodes)
}

func makeNodeResolver(noResolve bool, nodes corev1.NodeInterface) func(string) (string, error) {
func resolveNode(name string, nodes *apiv1.NodeList, noResolve bool) (string, error) {
// Here we have a name and we need to resolve its identifier. To mimic swarm behavior
// we need to resolve the id when noresolve is set, otherwise we return the name.
// we need to resolve to the id when noResolve is set, otherwise we return the name.
if noResolve {
return func(name string) (string, error) {
n, err := nodes.List(metav1.ListOptions{
FieldSelector: fields.OneTermEqualSelector(api.ObjectNameField, name).String(),
})
if err != nil {
return "", err
}
if len(n.Items) != 1 {
return "", fmt.Errorf("could not find node '%s'", name)
for _, node := range nodes.Items {
if node.Name == name {
return string(node.UID), nil
}
return string(n.Items[0].UID), nil
}
return "", fmt.Errorf("could not find node '%s'", name)
}
return func(name string) (string, error) { return name, nil }
return name, nil
}
Loading

0 comments on commit 0089f17

Please sign in to comment.