Skip to content

Commit

Permalink
rkt: rewrote GetPods to use rkt's api service
Browse files Browse the repository at this point in the history
  • Loading branch information
Derek Gonyeo committed Nov 25, 2015
1 parent 2df2dd1 commit 833b035
Showing 1 changed file with 185 additions and 23 deletions.
208 changes: 185 additions & 23 deletions pkg/kubelet/rkt/rkt.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ const (
unitRktID = "RktID"
unitRestartCount = "RestartCount"

kubernetesInfoAnnoName = "k8s.io/pod/information"
k8sRktUIDAnno = "k8s.io/rkt/uid"
k8sRktNameAnno = "k8s.io/rkt/name"
k8sRktNamespaceAnno = "k8s.io/rkt/namespace"
k8sRktCreationTimeAnno = "k8s.io/rkt/created"

This comment has been minimized.

Copy link
@yifan-gu

yifan-gu Nov 25, 2015

Can we add a TODO to indicate that we will remove this later?
rkt/rkt#1789

k8sRktContainerHashAnno = "k8s/rkt/containerhash"

This comment has been minimized.

Copy link
@yifan-gu

yifan-gu Nov 25, 2015

k8s.io for consistency?

This comment has been minimized.

Copy link
@cgonyeo

cgonyeo Nov 25, 2015

Owner

whoops, missed that

This comment has been minimized.

Copy link
@yifan-gu

yifan-gu Nov 25, 2015

Also we want to add an annotation to indicate this is a pod managed by kubelet, so when doing ListPods(), only kubelet managed pods are returned.

This comment has been minimized.

Copy link
@cgonyeo

cgonyeo Nov 25, 2015

Owner

I was planning on relying on the k8s.io/rkt/uid annotation being set to determine if a given pod is managed by the kubelet. Is that ok, or do you want an additional label for that?

This comment has been minimized.

Copy link
@cgonyeo

cgonyeo Nov 25, 2015

Owner

Nevernind, looks like having a separate tag will make it easier to use a PodFilter in the rkt api.

k8sRktRestartCountAnno = "k8s/rkt/restartCount"

dockerPrefix = "docker://"

authDir = "auth.d"
Expand Down Expand Up @@ -426,6 +434,73 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
var globalPortMappings []kubecontainer.PortMapping
manifest := appcschema.BlankPodManifest()

listReq := &rktapi.ListPodsRequest{
Filter: &rktapi.PodFilter{
States: []rktapi.PodState{

This comment has been minimized.

Copy link
@yifan-gu

yifan-gu Nov 25, 2015

Also need another TODO here to remind us that in the future, the pods can be running while some of the apps exit appc/spec#500

rktapi.PodState_POD_STATE_RUNNING,
rktapi.PodState_POD_STATE_EXITED,
rktapi.PodState_POD_STATE_DELETING,
rktapi.PodState_POD_STATE_GARBAGE,
},
},
}
listResp, err := r.apisvc.ListPods(context.Background(), listReq)
if err != nil {
return nil, fmt.Errorf("couldn't list pods: %v", err)
}

restartCount := 0
for _, rktpod := range listResp.Pods {
manifest := appcschema.PodManifest{}
err = json.Unmarshal(rktpod.Manifest, manifest)
if err != nil {
return nil, err
}
if uid, ok := manifest.Annotations.Get(k8sRktUIDAnno); ok && uid == string(pod.UID) {

This comment has been minimized.

Copy link
@yifan-gu

yifan-gu Nov 25, 2015

Can we use the filter to return the pods with the k8sRktUIDAnno we want?

if countString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno); ok {
num, err := strconv.Atoi(countString)
if err != nil {
continue
}
if num+1 > restartCount {
restartCount = num + 1
}
}
}
}

//containerHashes := ""
//for _, c := range pod.Spec.Containers {
// hash := kubecontainer.HashContainer(&c)
// if containerHashes != "" {
// containerHashes += ":"
// }
// containerHashes += strconv.FormatUint(hash, 10)
//}

manifest.Annotations = append(manifest.Annotations, []appctypes.Annotation{
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktUIDAnno),
Value: string(pod.UID),
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktNameAnno),
Value: pod.Name,
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktNamespaceAnno),
Value: pod.Namespace,
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktCreationTimeAnno),
Value: strconv.FormatInt(time.Now().Unix(), 10),
},
appctypes.Annotation{
Name: *appctypes.MustACIdentifier(k8sRktRestartCountAnno),
Value: strconv.Itoa(restartCount),
},
}...)

for _, c := range pod.Spec.Containers {
if err, _ := r.imagePuller.PullImage(pod, &c, pullSecrets); err != nil {
return nil, err
Expand All @@ -435,6 +510,9 @@ func (r *Runtime) makePodManifest(pod *api.Pod, pullSecrets []api.Secret) (*appc
return nil, err
}

kubehash := kubecontainer.HashContainer(&c)
imgManifest.Annotations.Set(*appctypes.MustACIdentifier(k8sRktContainerHashAnno), strconv.FormatUint(kubehash, 10))

This comment has been minimized.

Copy link
@yifan-gu

yifan-gu Nov 25, 2015

Good idea, but it's not gonna working because the image manifest here is just used to get the default configuration of the app. If there's nothing in the kubelet's pod to override it, then we just use what's in the image manifest(exec, port mappings, etc).
So instead we need to write the hash in the appcschema.RuntimeApp.Annotations (haha, so many annotations)

This comment has been minimized.

Copy link
@yifan-gu

yifan-gu Nov 25, 2015

^ see Line 546 below

if imgManifest.App == nil {
imgManifest.App = new(appctypes.App)
}
Expand Down Expand Up @@ -730,6 +808,98 @@ func (r *Runtime) RunPod(pod *api.Pod, pullSecrets []api.Secret) error {
return nil
}

// convertRktPod will convert a rktapi.Pod to a kubecontainer.Pod
func (r *Runtime) convertRktPod(rktpod rktapi.Pod) (*kubecontainer.Pod, *rktInfo, error) {
manifest := &appcschema.ImageManifest{}
err := json.Unmarshal(rktpod.Manifest, manifest)
if err != nil {
return nil, nil, err
}

podUID, ok := manifest.Annotations.Get(k8sRktUIDAnno)
if !ok {
return nil, nil, fmt.Errorf("pod is missing annotation %s", k8sRktUIDAnno)
}
podName, ok := manifest.Annotations.Get(k8sRktNameAnno)
if !ok {
return nil, nil, fmt.Errorf("pod is missing annotation %s", k8sRktNameAnno)
}
podNamespace, ok := manifest.Annotations.Get(k8sRktNamespaceAnno)
if !ok {
return nil, nil, fmt.Errorf("pod is missing annotation %s", k8sRktNamespaceAnno)
}
podCreatedString, ok := manifest.Annotations.Get(k8sRktCreationTimeAnno)
if !ok {
return nil, nil, fmt.Errorf("pod is missing annotation %s", k8sRktCreationTimeAnno)
}
podCreated, err := strconv.ParseInt(podCreatedString, 10, 64)
if err != nil {
return nil, nil, fmt.Errorf("couldn't parse pod creation timestamp: %v", err)
}

restartCountString, ok := manifest.Annotations.Get(k8sRktRestartCountAnno)
if !ok {
return nil, nil, fmt.Errorf("pod is missing annotation: %s", k8sRktRestartCountAnno)
}
restartCount, err := strconv.Atoi(restartCountString)
if err != nil {
return nil, nil, fmt.Errorf("couldn't parse pod restart count: %v", err)
}

var containerHashes []uint64
for _, app := range rktpod.Apps {
manifest := appcschema.ImageManifest{}
err := json.Unmarshal(app.Image.Manifest, manifest)
if err != nil {
return nil, nil, err
}
containerHashString, ok := manifest.Annotations.Get(k8sRktContainerHashAnno)
if !ok {
return nil, nil, fmt.Errorf("app is missing annotation %s", k8sRktContainerHashAnno)
}
containerHash, err := strconv.ParseUint(containerHashString, 10, 64)
if err != nil {
return nil, nil, fmt.Errorf("couldn't parse container's hash: %v", err)
}
containerHashes = append(containerHashes, containerHash)
}

var status kubecontainer.ContainerStatus
switch rktpod.State {
case rktapi.PodState_POD_STATE_RUNNING:
status = kubecontainer.ContainerStatusRunning
case rktapi.PodState_POD_STATE_ABORTED_PREPARE, rktapi.PodState_POD_STATE_EXITED,
rktapi.PodState_POD_STATE_DELETING, rktapi.PodState_POD_STATE_GARBAGE:
status = kubecontainer.ContainerStatusExited
default:
status = kubecontainer.ContainerStatusUnknown
}

kubepod := &kubecontainer.Pod{
ID: types.UID(podUID),
Name: podName,
Namespace: podNamespace,
}
for i, app := range rktpod.Apps {
kubepod.Containers = append(kubepod.Containers, &kubecontainer.Container{
ID: buildContainerID(&containerID{podUID, app.Name}),
Name: app.Name,
Image: app.Image.Name,
Hash: containerHashes[i],
Created: podCreated,
Status: status,
})
}

info := &rktInfo{

This comment has been minimized.

Copy link
@yifan-gu

yifan-gu Nov 25, 2015

I believe we won't need rktInfo anymore :)

uuid: string(podUID),
//TODO: figure out how to get the restartCount
restartCount: restartCount,
}

return kubepod, info, nil
}

// readServiceFile reads the service file and constructs the runtime pod and the rkt info.
func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktInfo, error) {
f, err := os.Open(serviceFilePath(serviceName))
Expand Down Expand Up @@ -781,34 +951,26 @@ func (r *Runtime) readServiceFile(serviceName string) (*kubecontainer.Pod, *rktI
func (r *Runtime) GetPods(all bool) ([]*kubecontainer.Pod, error) {
glog.V(4).Infof("Rkt getting pods")

units, err := r.systemd.ListUnits()
ctx := context.Background()
listReq := &rktapi.ListPodsRequest{}
if !all {
listReq.Filter = &rktapi.PodFilter{
States: []rktapi.PodState{rktapi.PodState_POD_STATE_RUNNING},
}
}
listResp, err := r.apisvc.ListPods(ctx, listReq)
if err != nil {
return nil, err
return nil, fmt.Errorf("couldn't list pods: %v", err)
}

var pods []*kubecontainer.Pod
for _, u := range units {
if strings.HasPrefix(u.Name, kubernetesUnitPrefix) {
var status kubecontainer.ContainerStatus
switch {
case u.SubState == "running":
status = kubecontainer.ContainerStatusRunning
default:
status = kubecontainer.ContainerStatusExited
}
if !all && status != kubecontainer.ContainerStatusRunning {
continue
}
pod, _, err := r.readServiceFile(u.Name)
if err != nil {
glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
continue
}
for _, c := range pod.Containers {
c.Status = status
}
pods = append(pods, pod)
for _, rktpod := range listResp.Pods {
pod, _, err := r.convertRktPod(*rktpod)
if err != nil {
glog.Warningf("rkt: Cannot construct pod from unit file: %v.", err)
continue
}
pods = append(pods, pod)
}
return pods, nil
}
Expand Down

0 comments on commit 833b035

Please sign in to comment.