diff --git a/pkg/apis/compute/container.go b/pkg/apis/compute/container.go index d0dc141bb08..4111765cc78 100644 --- a/pkg/apis/compute/container.go +++ b/pkg/apis/compute/container.go @@ -16,6 +16,7 @@ package compute import ( "reflect" + "time" "yunion.io/x/jsonutils" "yunion.io/x/pkg/gotypes" @@ -55,27 +56,28 @@ const ( ) const ( - CONTAINER_STATUS_PULLING_IMAGE = "pulling_image" - CONTAINER_STATUS_PULL_IMAGE_FAILED = "pull_image_failed" - CONTAINER_STATUS_PULLED_IMAGE = "pulled_image" - CONTAINER_STATUS_CREATING = "creating" - CONTAINER_STATUS_CREATE_FAILED = "create_failed" - CONTAINER_STATUS_SAVING_IMAGE = "saving_image" - CONTAINER_STATUS_SAVE_IMAGE_FAILED = "save_image_failed" - CONTAINER_STATUS_STARTING = "starting" - CONTAINER_STATUS_START_FAILED = "start_failed" - CONTAINER_STATUS_STOPPING = "stopping" - CONTAINER_STATUS_STOP_FAILED = "stop_failed" - CONTAINER_STATUS_SYNC_STATUS = "sync_status" - CONTAINER_STATUS_SYNC_STATUS_FAILED = "sync_status_failed" - CONTAINER_STATUS_UNKNOWN = "unknown" - CONTAINER_STATUS_CREATED = "created" - CONTAINER_STATUS_EXITED = "exited" - CONTAINER_STATUS_RUNNING = "running" - CONTAINER_STATUS_DELETING = "deleting" - CONTAINER_STATUS_DELETE_FAILED = "delete_failed" - CONTAINER_STATUS_COMMITTING = "committing" - CONTAINER_STATUS_COMMIT_FAILED = "commit_failed" + CONTAINER_STATUS_PULLING_IMAGE = "pulling_image" + CONTAINER_STATUS_PULL_IMAGE_FAILED = "pull_image_failed" + CONTAINER_STATUS_PULLED_IMAGE = "pulled_image" + CONTAINER_STATUS_CREATING = "creating" + CONTAINER_STATUS_CREATE_FAILED = "create_failed" + CONTAINER_STATUS_SAVING_IMAGE = "saving_image" + CONTAINER_STATUS_SAVE_IMAGE_FAILED = "save_image_failed" + CONTAINER_STATUS_STARTING = "starting" + CONTAINER_STATUS_START_FAILED = "start_failed" + CONTAINER_STATUS_STOPPING = "stopping" + CONTAINER_STATUS_STOP_FAILED = "stop_failed" + CONTAINER_STATUS_SYNC_STATUS = "sync_status" + CONTAINER_STATUS_SYNC_STATUS_FAILED = "sync_status_failed" + CONTAINER_STATUS_UNKNOWN = "unknown" + CONTAINER_STATUS_CREATED = "created" + CONTAINER_STATUS_EXITED = "exited" + CONTAINER_STATUS_CRASH_LOOP_BACK_OFF = "crash_loop_back_off" + CONTAINER_STATUS_RUNNING = "running" + CONTAINER_STATUS_DELETING = "deleting" + CONTAINER_STATUS_DELETE_FAILED = "delete_failed" + CONTAINER_STATUS_COMMITTING = "committing" + CONTAINER_STATUS_COMMIT_FAILED = "commit_failed" // for health check CONTAINER_STATUS_PROBING = "probing" CONTAINER_STATUS_PROBE_FAILED = "probe_failed" @@ -83,6 +85,7 @@ const ( var ( ContainerRunningStatus = sets.NewString(CONTAINER_STATUS_RUNNING, CONTAINER_STATUS_PROBING) + ContainerExitedStatus = sets.NewString(CONTAINER_STATUS_EXITED, CONTAINER_STATUS_CRASH_LOOP_BACK_OFF) ) const ( @@ -132,7 +135,9 @@ type ContainerStopInput struct { } type ContainerSyncStatusResponse struct { - Status string `json:"status"` + Status string `json:"status"` + StartedAt time.Time `json:"started_at"` + RestartCount int `json:"restart_count"` } type ContainerHostDevice struct { @@ -232,3 +237,10 @@ type KubeServerContainerRegistryDetails struct { Type string `json:"type"` Config *KubeServerContainerRegistryConfig `json:"config"` } + +type ContainerPerformStatusInput struct { + apis.PerformStatusInput + RestartCount int `json:"restart_count"` + StartedAt *time.Time `json:"started_at"` + LastFinishedAt *time.Time `json:"last_finished_at"` +} diff --git a/pkg/apis/compute/pod.go b/pkg/apis/compute/pod.go index 338fa13a471..4d5ee58523d 100644 --- a/pkg/apis/compute/pod.go +++ b/pkg/apis/compute/pod.go @@ -31,6 +31,8 @@ const ( POD_STATUS_DELETE_CONTAINER_FAILED = "delete_container_failed" POD_STATUS_SYNCING_CONTAINER_STATUS = "syncing_container_status" POD_STATUS_SYNCING_CONTAINER_STATUS_FAILED = "sync_container_status_failed" + POD_STATUS_CRASH_LOOP_BACK_OFF = "crash_loop_back_off" + POD_STATUS_CONTAINER_EXITED = "container_exited" ) const ( diff --git a/pkg/apis/host/container.go b/pkg/apis/host/container.go index f9073be88dd..dc09f9dff97 100644 --- a/pkg/apis/host/container.go +++ b/pkg/apis/host/container.go @@ -15,6 +15,8 @@ package host import ( + "time" + "yunion.io/x/onecloud/pkg/apis" ) @@ -86,9 +88,10 @@ type ContainerDiskDevice struct { } type ContainerCreateInput struct { - Name string `json:"name"` - GuestId string `json:"guest_id"` - Spec *ContainerSpec `json:"spec"` + Name string `json:"name"` + GuestId string `json:"guest_id"` + Spec *ContainerSpec `json:"spec"` + RestartCount int `json:"restart_count"` } type ContainerPullImageInput struct { @@ -103,9 +106,12 @@ type ContainerPushImageInput struct { } type ContainerDesc struct { - Id string `json:"id"` - Name string `json:"name"` - Spec *ContainerSpec `json:"spec"` + Id string `json:"id"` + Name string `json:"name"` + Spec *ContainerSpec `json:"spec"` + StartedAt time.Time `json:"started_at"` + LastFinishedAt time.Time `json:"last_finished_at"` + RestartCount int `json:"restart_count"` } type ContainerSaveVolumeMountToImageInput struct { diff --git a/pkg/compute/guestdrivers/pod.go b/pkg/compute/guestdrivers/pod.go index 43698178928..9744572e69b 100644 --- a/pkg/compute/guestdrivers/pod.go +++ b/pkg/compute/guestdrivers/pod.go @@ -426,9 +426,10 @@ func (p *SPodDriver) getContainerCreateInput(ctx context.Context, userCred mccli return nil, errors.Wrap(err, "ToHostContainerSpec") } input := &hostapi.ContainerCreateInput{ - Name: ctr.GetName(), - GuestId: ctr.GuestId, - Spec: spec, + Name: ctr.GetName(), + GuestId: ctr.GuestId, + Spec: spec, + RestartCount: ctr.RestartCount, } return input, nil } diff --git a/pkg/compute/models/containers.go b/pkg/compute/models/containers.go index ea73087a868..74b3627035c 100644 --- a/pkg/compute/models/containers.go +++ b/pkg/compute/models/containers.go @@ -72,6 +72,14 @@ type SContainer struct { GuestId string `width:"36" charset:"ascii" create:"required" list:"user" index:"true"` // Spec stores all container running options Spec *api.ContainerSpec `length:"long" create:"required" list:"user" update:"user"` + + // 启动时间 + StartedAt time.Time `nullable:"false" created_at:"false" index:"true" get:"user" list:"user" json:"started_at"` + // 上次退出时间 + LastFinishedAt time.Time `nullable:"true" created_at:"false" index:"true" get:"user" list:"user" json:"last_finished_at"` + + // 重启次数 + RestartCount int `nullable:"true" list:"user"` } func (m *SContainerManager) CreateOnPod( @@ -593,9 +601,12 @@ func (c *SContainer) GetJsonDescAtHost(ctx context.Context, userCred mcclient.To return nil, errors.Wrap(err, "ToHostContainerSpec") } return &hostapi.ContainerDesc{ - Id: c.GetId(), - Name: c.GetName(), - Spec: spec, + Id: c.GetId(), + Name: c.GetName(), + Spec: spec, + StartedAt: c.StartedAt, + LastFinishedAt: c.LastFinishedAt, + RestartCount: c.RestartCount, }, nil } @@ -755,13 +766,27 @@ func (c *SContainer) GetReleasedDevices(ctx context.Context, userCred mcclient.T return out, nil } -func (c *SContainer) PerformStatus(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input apis.PerformStatusInput) (jsonutils.JSONObject, error) { - if c.GetStatus() == api.CONTAINER_STATUS_EXITED { +func (c *SContainer) PerformStatus(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input api.ContainerPerformStatusInput) (jsonutils.JSONObject, error) { + if api.ContainerExitedStatus.Has(c.GetStatus()) { if input.Status == api.CONTAINER_STATUS_PROBE_FAILED { return nil, httperrors.NewInputParameterError("can't set container status to %s when %s", input.Status, c.Status) } } - return c.SVirtualResourceBase.PerformStatus(ctx, userCred, query, input) + if _, err := db.Update(c, func() error { + if input.RestartCount > 0 { + c.RestartCount = input.RestartCount + } + if input.StartedAt != nil { + c.StartedAt = *input.StartedAt + } + if input.LastFinishedAt != nil { + c.LastFinishedAt = *input.LastFinishedAt + } + return nil + }); err != nil { + return nil, errors.Wrap(err, "Update container status") + } + return c.SVirtualResourceBase.PerformStatus(ctx, userCred, query, input.PerformStatusInput) } func (c *SContainer) getContainerHostCommitInput(ctx context.Context, userCred mcclient.TokenCredential, input *api.ContainerCommitInput) (*hostapi.ContainerCommitInput, error) { diff --git a/pkg/compute/models/guest_actions.go b/pkg/compute/models/guest_actions.go index fc76afa3bec..454b0b37418 100644 --- a/pkg/compute/models/guest_actions.go +++ b/pkg/compute/models/guest_actions.go @@ -3461,7 +3461,7 @@ func (self *SGuest) PerformStatus(ctx context.Context, userCred mcclient.TokenCr func (self *SGuest) PerformStop(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, input api.ServerStopInput) (jsonutils.JSONObject, error) { // XXX if is force, force stop guest - if input.IsForce || utils.IsInStringArray(self.Status, []string{api.VM_RUNNING, api.VM_STOP_FAILED}) { + if input.IsForce || utils.IsInStringArray(self.Status, []string{api.VM_RUNNING, api.VM_STOP_FAILED, api.POD_STATUS_CRASH_LOOP_BACK_OFF, api.POD_STATUS_CONTAINER_EXITED}) { if err := self.ValidateEncryption(ctx, userCred); err != nil { return nil, errors.Wrap(httperrors.ErrForbidden, "encryption key not accessible") } diff --git a/pkg/compute/tasks/container_sync_status_task.go b/pkg/compute/tasks/container_sync_status_task.go index c56ce51806b..d3a28a74fed 100644 --- a/pkg/compute/tasks/container_sync_status_task.go +++ b/pkg/compute/tasks/container_sync_status_task.go @@ -54,6 +54,15 @@ func (t *ContainerSyncStatusTask) OnSyncStatus(ctx context.Context, container *m resp := new(api.ContainerSyncStatusResponse) data.Unmarshal(resp) container.SetStatus(ctx, t.GetUserCred(), resp.Status, "") + if _, err := db.Update(container, func() error { + if resp.RestartCount > 0 { + container.RestartCount = resp.RestartCount + } + container.StartedAt = resp.StartedAt + return nil + }); err != nil { + log.Errorf("Update container started_at: %s", err) + } t.SetStageComplete(ctx, nil) } diff --git a/pkg/hostman/container/status/status_manager.go b/pkg/hostman/container/status/status_manager.go index 797796fce67..50d55f40d61 100644 --- a/pkg/hostman/container/status/status_manager.go +++ b/pkg/hostman/container/status/status_manager.go @@ -42,9 +42,11 @@ func (m *manager) SetContainerStartup(podId string, containerId string, started if started { status = computeapi.CONTAINER_STATUS_RUNNING } - input := &apis.PerformStatusInput{ - Status: status, - Reason: result.Reason, + input := &computeapi.ContainerPerformStatusInput{ + PerformStatusInput: apis.PerformStatusInput{ + Status: status, + Reason: result.Reason, + }, } if _, err := hostutils.UpdateContainerStatus(context.Background(), containerId, input); err != nil { log.Errorf("set container(%s/%s) status failed: %s", podId, containerId, err) diff --git a/pkg/hostman/guestman/guestman.go b/pkg/hostman/guestman/guestman.go index e7e7aa9cc84..549990b8e3e 100644 --- a/pkg/hostman/guestman/guestman.go +++ b/pkg/hostman/guestman/guestman.go @@ -32,6 +32,7 @@ import ( "yunion.io/x/jsonutils" "yunion.io/x/log" "yunion.io/x/pkg/errors" + "yunion.io/x/pkg/util/clock" "yunion.io/x/pkg/util/seclib" "yunion.io/x/pkg/utils" @@ -44,6 +45,8 @@ import ( "yunion.io/x/onecloud/pkg/hostman/guestman/desc" fwd "yunion.io/x/onecloud/pkg/hostman/guestman/forwarder" fwdpb "yunion.io/x/onecloud/pkg/hostman/guestman/forwarder/api" + "yunion.io/x/onecloud/pkg/hostman/guestman/pod/pleg" + "yunion.io/x/onecloud/pkg/hostman/guestman/pod/runtime" "yunion.io/x/onecloud/pkg/hostman/guestman/types" deployapi "yunion.io/x/onecloud/pkg/hostman/hostdeployer/apis" "yunion.io/x/onecloud/pkg/hostman/hostinfo/hostconsts" @@ -111,6 +114,9 @@ type SGuestManager struct { // container related members containerProbeManager prober.Manager enableDirtyRecoveryFeature bool + containerRuntimeManager runtime.Runtime + pleg pleg.PodLifecycleEventGenerator + podCache runtime.Cache } func NewGuestManager(host hostutils.IHost, serversPath string, workerCnt int) (*SGuestManager, error) { @@ -138,6 +144,20 @@ func NewGuestManager(host hostutils.IHost, serversPath string, workerCnt int) (* } if manager.host.IsContainerHost() { manager.startContainerProbeManager() + runtimeMan, err := runtime.NewRuntimeManager(manager.GetCRI()) + if err != nil { + return nil, errors.Wrap(err, "new container runtime manager") + } + manager.podCache = runtime.NewCache() + manager.containerRuntimeManager = runtimeMan + manager.pleg = pleg.NewGenericPLEG(runtimeMan, pleg.ChannelCapacity, pleg.RelistPeriod, manager.podCache, clock.RealClock{}) + manager.pleg.Start() + go func() { + manager.syncContainerLoop(manager.pleg.Watch()) + }() + go func() { + manager.reconcileContainerLoop(manager.podCache) + }() } return manager, nil } diff --git a/pkg/hostman/guestman/pod.go b/pkg/hostman/guestman/pod.go index cf16f2b1b0f..db4da9393f3 100644 --- a/pkg/hostman/guestman/pod.go +++ b/pkg/hostman/guestman/pod.go @@ -44,6 +44,7 @@ import ( "yunion.io/x/onecloud/pkg/hostman/container/status" "yunion.io/x/onecloud/pkg/hostman/container/volume_mount" "yunion.io/x/onecloud/pkg/hostman/guestman/desc" + "yunion.io/x/onecloud/pkg/hostman/guestman/pod/runtime" deployapi "yunion.io/x/onecloud/pkg/hostman/hostdeployer/apis" "yunion.io/x/onecloud/pkg/hostman/hostinfo" "yunion.io/x/onecloud/pkg/hostman/hostutils" @@ -111,7 +112,9 @@ type PodInstance interface { GetContainerById(ctrId string) *hostapi.ContainerDesc CreateContainer(ctx context.Context, userCred mcclient.TokenCredential, id string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error) StartContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerCreateInput) (jsonutils.JSONObject, error) + StartLocalContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error) DeleteContainer(ctx context.Context, cred mcclient.TokenCredential, id string) (jsonutils.JSONObject, error) + SyncStatus(reason string) SyncContainerStatus(ctx context.Context, cred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error) StopContainer(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, body jsonutils.JSONObject) (jsonutils.JSONObject, error) PullImage(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerPullImageInput) (jsonutils.JSONObject, error) @@ -125,6 +128,9 @@ type PodInstance interface { // for monitoring GetVolumeMountUsages() (map[ContainerVolumeKey]*volume_mount.ContainerVolumeMountUsage, error) + + IsInternalStopped(ctrCriId string) (*ContainerExpectedStatus, bool) + IsInternalRemoved(ctrCriId string) bool } type sContainer struct { @@ -205,8 +211,9 @@ func (h startStatHelper) RemoveContainerFile(ctrId string) error { type sPodGuestInstance struct { *sBaseGuestInstance - containers map[string]*sContainer - startStat *startStatHelper + containers map[string]*sContainer + startStat *startStatHelper + expectedStatus *PodExpectedStatus } func newPodGuestInstance(id string, man *SGuestManager) PodInstance { @@ -214,6 +221,11 @@ func newPodGuestInstance(id string, man *SGuestManager) PodInstance { sBaseGuestInstance: newBaseGuestInstance(id, man, computeapi.HYPERVISOR_POD), containers: make(map[string]*sContainer), } + es, err := NewPodExpectedStatus(p.HomeDir(), computeapi.VM_UNKNOWN) + if err != nil { + log.Fatalf("NewPodExpectedStatus failed of %s: %s", p.GetId(), err) + } + p.expectedStatus = es p.startStat = newStartStatHelper(id, p.HomeDir()) return p } @@ -281,12 +293,41 @@ func (s *sPodGuestInstance) IsDirtyShutdown() bool { return false } +func (s *sPodGuestInstance) getStatus(ctx context.Context, defaultStatus string) string { + status := defaultStatus + if status == "" { + status = computeapi.VM_READY + } + if s.IsRunning() { + status = computeapi.VM_RUNNING + } + for _, c := range s.containers { + cStatus, cs, err := s.getContainerStatus(ctx, c.Id) + if err != nil { + log.Errorf("get container %s status of pod %s", c.Id, s.Id) + continue + } + if cs != nil { + if cStatus == computeapi.CONTAINER_STATUS_CRASH_LOOP_BACK_OFF { + status = computeapi.POD_STATUS_CRASH_LOOP_BACK_OFF + } + if cStatus == computeapi.CONTAINER_STATUS_EXITED { + status = computeapi.POD_STATUS_CONTAINER_EXITED + } + } + } + return status +} + func (s *sPodGuestInstance) SyncStatus(reason string) { // sync pod status var status = computeapi.VM_READY if s.IsRunning() { status = computeapi.VM_RUNNING } + if err := s.expectedStatus.SetStatus(status); err != nil { + log.Warningf("set expected status to %s, reason: %s, err: %s", status, reason, err.Error()) + } ctx := context.Background() if status == computeapi.VM_READY { // remove pod @@ -294,31 +335,59 @@ func (s *sPodGuestInstance) SyncStatus(reason string) { log.Warningf("stop cri pod when sync status: %s: %v", s.Id, err) } } - statusInput := &apis.PerformStatusInput{ - Status: status, - Reason: reason, - PowerStates: GetPowerStates(s), - HostId: hostinfo.Instance().HostId, - } - - if _, err := hostutils.UpdateServerStatus(ctx, s.Id, statusInput); err != nil { - log.Errorf("failed update guest status %s", err) - } // sync container's status for _, c := range s.containers { - status, err := s.getContainerStatus(ctx, c.Id) + cStatus, cs, err := s.getContainerStatus(ctx, c.Id) if err != nil { log.Errorf("get container %s status of pod %s", c.Id, s.Id) continue } - statusInput = &apis.PerformStatusInput{ - Status: status, - Reason: reason, - HostId: hostinfo.Instance().HostId, + if err := s.expectedStatus.SetContainerStatus(c.CRIId, c.Id, cStatus); err != nil { + log.Warningf("expectedStatus.SetContainerStatus(%s, %s) to %s, error: %s", s.GetId(), c.Id, cStatus, err.Error()) + } + + ctrStatusInput := &computeapi.ContainerPerformStatusInput{ + PerformStatusInput: apis.PerformStatusInput{ + Status: cStatus, + Reason: reason, + HostId: hostinfo.Instance().HostId, + }, } - if _, err := hostutils.UpdateContainerStatus(ctx, c.Id, statusInput); err != nil { + if cs != nil { + ctrStatusInput.RestartCount = cs.RestartCount + ctrStatusInput.StartedAt = &cs.StartedAt + if !cs.FinishedAt.IsZero() { + ctrStatusInput.LastFinishedAt = &cs.FinishedAt + } + if ctr := s.GetContainerById(c.Id); ctr != nil { + ctr.RestartCount = cs.RestartCount + ctr.StartedAt = cs.StartedAt + ctr.LastFinishedAt = cs.FinishedAt + if err := s.SaveContainerDesc(ctr); err != nil { + log.Errorf("save container desc for %s/%s: %v", ctr.Id, ctr.Name, err) + } + } + } + if _, err := hostutils.UpdateContainerStatus(ctx, c.Id, ctrStatusInput); err != nil { log.Errorf("failed update container %s status: %s", c.Id, err) } + if cStatus == computeapi.CONTAINER_STATUS_CRASH_LOOP_BACK_OFF { + status = computeapi.POD_STATUS_CRASH_LOOP_BACK_OFF + } + if cStatus == computeapi.CONTAINER_STATUS_EXITED { + status = computeapi.POD_STATUS_CONTAINER_EXITED + } + } + + statusInput := &apis.PerformStatusInput{ + Status: status, + Reason: reason, + PowerStates: GetPowerStates(s), + HostId: hostinfo.Instance().HostId, + } + + if _, err := hostutils.UpdateServerStatus(ctx, s.Id, statusInput); err != nil { + log.Errorf("failed update guest status %s", err) } } @@ -386,7 +455,7 @@ func (s *sPodGuestInstance) IsRunning() bool { } func (s *sPodGuestInstance) IsContainerRunning(ctx context.Context, ctrId string) bool { - status, err := s.getContainerStatus(ctx, ctrId) + status, _, err := s.getContainerStatus(ctx, ctrId) if err != nil { return false } @@ -397,7 +466,7 @@ func (s *sPodGuestInstance) IsContainerRunning(ctx context.Context, ctrId string } func (s *sPodGuestInstance) HandleGuestStatus(ctx context.Context, status string, body *jsonutils.JSONDict) (jsonutils.JSONObject, error) { - body.Set("status", jsonutils.NewString(status)) + body.Set("status", jsonutils.NewString(s.getStatus(ctx, status))) hostutils.TaskComplete(ctx, body) return nil, nil } @@ -504,6 +573,18 @@ func (s *sPodGuestInstance) GetContainerById(ctrId string) *hostapi.ContainerDes return nil } +func (s *sPodGuestInstance) SaveContainerDesc(ctr *hostapi.ContainerDesc) error { + ctrs := s.GetContainers() + for i := range ctrs { + tmp := ctrs[i] + if tmp.Id == ctr.Id { + ctrs[i] = ctr + } + } + s.GetDesc().Containers = ctrs + return SaveDesc(s, s.GetDesc()) +} + func (s *sPodGuestInstance) getContainerVolumeMounts() map[string][]*hostapi.ContainerVolumeMount { result := make(map[string][]*hostapi.ContainerVolumeMount, 0) for _, ctr := range s.GetDesc().Containers { @@ -883,9 +964,10 @@ func (s *sPodGuestInstance) StartLocalContainer(ctx context.Context, userCred mc return nil, errors.Wrapf(errors.ErrNotFound, "Not found container %s", ctrId) } input := &hostapi.ContainerCreateInput{ - Name: ctr.Name, - GuestId: s.GetId(), - Spec: ctr.Spec, + Name: ctr.Name, + GuestId: s.GetId(), + Spec: ctr.Spec, + RestartCount: ctr.RestartCount + 1, } ret, err := s.StartContainer(ctx, userCred, ctrId, input) if err != nil { @@ -898,7 +980,7 @@ func (s *sPodGuestInstance) StartContainer(ctx context.Context, userCred mcclien _, hasCtr := s.containers[ctrId] needRecreate := false if hasCtr { - status, err := s.getContainerStatus(ctx, ctrId) + status, _, err := s.getContainerStatus(ctx, ctrId) if err != nil { if errors.Cause(err) == errors.ErrNotFound || strings.Contains(err.Error(), "not found") { needRecreate = true @@ -906,10 +988,10 @@ func (s *sPodGuestInstance) StartContainer(ctx context.Context, userCred mcclien return nil, errors.Wrap(err, "get container status") } } else { - if status == computeapi.CONTAINER_STATUS_EXITED { + if computeapi.ContainerExitedStatus.Has(status) { needRecreate = true } else if status != computeapi.CONTAINER_STATUS_CREATED { - return nil, errors.Wrapf(err, "can't start container when status is %s", status) + return nil, errors.Errorf("can't start container when status is %s", status) } } } @@ -930,6 +1012,11 @@ func (s *sPodGuestInstance) StartContainer(ctx context.Context, userCred mcclien if err != nil { return nil, errors.Wrap(err, "get container cri id") } + + if err := s.expectedStatus.SetContainerStatus(criId, ctrId, computeapi.CONTAINER_STATUS_RUNNING); err != nil { + log.Warningf("set container %s(%s) expected status to running: %v", criId, ctrId, err) + } + if err := s.getCRI().StartContainer(ctx, criId); err != nil { return nil, errors.Wrap(err, "CRI.StartContainer") } @@ -1014,6 +1101,9 @@ func (s *sPodGuestInstance) StopContainer(ctx context.Context, userCred mcclient return nil, errors.Wrap(err, "get container cri id") } var timeout int64 = 0 + + s.expectedStatus.SetContainerStatus(criId, ctrId, computeapi.CONTAINER_STATUS_EXITED) + if body.Contains("timeout") { timeout, _ = body.Int("timeout") } @@ -1325,7 +1415,17 @@ func (s *sPodGuestInstance) createContainer(ctx context.Context, userCred mcclie ctrCfg := &runtimeapi.ContainerConfig{ Metadata: &runtimeapi.ContainerMetadata{ - Name: input.Name, + Name: input.Name, + Attempt: uint32(input.RestartCount), + }, + Labels: map[string]string{ + runtime.PodNameLabel: s.GetDesc().Name, + runtime.PodUIDLabel: s.GetId(), + runtime.ContainerNameLabel: input.Name, + runtime.ContainerRestartCountLabel: fmt.Sprintf("%d", input.RestartCount), + }, + Annotations: map[string]string{ + runtime.ContainerRestartCountLabel: fmt.Sprintf("%d", input.RestartCount), }, Image: &runtimeapi.ImageSpec{ Image: spec.Image, @@ -1635,6 +1735,8 @@ func (s *sPodGuestInstance) DeleteContainer(ctx context.Context, userCred mcclie return nil, errors.Wrap(err, "getContainerCRIId") } if criId != "" { + s.expectedStatus.RemoveContainer(criId) + if err := s.getCRI().RemoveContainer(ctx, criId); err != nil && !strings.Contains(err.Error(), "not found") { return nil, errors.Wrap(err, "cri.RemoveContainer") } @@ -1650,22 +1752,23 @@ func (s *sPodGuestInstance) DeleteContainer(ctx context.Context, userCred mcclie return nil, nil } -func (s *sPodGuestInstance) getContainerStatus(ctx context.Context, ctrId string) (string, error) { +func (s *sPodGuestInstance) getContainerStatus(ctx context.Context, ctrId string) (string, *runtime.Status, error) { criId, err := s.getContainerCRIId(ctrId) if err != nil { if errors.Cause(err) == errors.ErrNotFound { // not found, already stopped - return computeapi.CONTAINER_STATUS_EXITED, nil + return computeapi.CONTAINER_STATUS_EXITED, nil, nil } - return "", errors.Wrapf(err, "get container cri_id by %s", ctrId) + return "", nil, errors.Wrapf(err, "get container cri_id by %s", ctrId) } resp, err := s.getCRI().ContainerStatus(ctx, criId) if err != nil { if strings.Contains(err.Error(), "NotFound") { - return computeapi.CONTAINER_STATUS_EXITED, nil + return computeapi.CONTAINER_STATUS_EXITED, nil, nil } - return "", errors.Wrap(err, "cri.ContainerStatus") + return "", nil, errors.Wrap(err, "cri.ContainerStatus") } + cs := runtime.ToContainerStatus(resp.Status, "containerd") status := computeapi.CONTAINER_STATUS_UNKNOWN switch resp.Status.State { case runtimeapi.ContainerState_CONTAINER_CREATED: @@ -1680,17 +1783,20 @@ func (s *sPodGuestInstance) getContainerStatus(ctx context.Context, ctrId string if status == computeapi.CONTAINER_STATUS_RUNNING { ctr := s.GetContainerById(ctrId) if ctr == nil { - return "", errors.Wrapf(httperrors.ErrNotFound, "not found container by id %s", ctrId) + return "", cs, errors.Wrapf(httperrors.ErrNotFound, "not found container by id %s", ctrId) } if ctr.Spec.NeedProbe() { status = computeapi.CONTAINER_STATUS_PROBING } } - return status, nil + if status == computeapi.CONTAINER_STATUS_EXITED && resp.Status.ExitCode != 0 { + status = computeapi.CONTAINER_STATUS_CRASH_LOOP_BACK_OFF + } + return status, cs, nil } func (s *sPodGuestInstance) SyncContainerStatus(ctx context.Context, userCred mcclient.TokenCredential, ctrId string) (jsonutils.JSONObject, error) { - status, err := s.getContainerStatus(ctx, ctrId) + status, cs, err := s.getContainerStatus(ctx, ctrId) if err != nil { return nil, errors.Wrap(err, "get container status") } @@ -1698,7 +1804,14 @@ func (s *sPodGuestInstance) SyncContainerStatus(ctx context.Context, userCred mc log.Infof("mark container %s to dirty after syncing status", ctrId) s.getProbeManager().SetDirtyContainer(ctrId) } - return jsonutils.Marshal(computeapi.ContainerSyncStatusResponse{Status: status}), nil + resp := computeapi.ContainerSyncStatusResponse{ + Status: status, + } + if cs != nil { + resp.StartedAt = cs.StartedAt + resp.RestartCount = cs.RestartCount + } + return jsonutils.Marshal(resp), nil } func (s *sPodGuestInstance) PullImage(ctx context.Context, userCred mcclient.TokenCredential, ctrId string, input *hostapi.ContainerPullImageInput) (jsonutils.JSONObject, error) { diff --git a/pkg/hostman/guestman/pod/pleg/doc.go b/pkg/hostman/guestman/pod/pleg/doc.go new file mode 100644 index 00000000000..8a07387c516 --- /dev/null +++ b/pkg/hostman/guestman/pod/pleg/doc.go @@ -0,0 +1 @@ +package pleg // import "yunion.io/x/onecloud/pkg/hostman/guestman/pod/pleg" diff --git a/pkg/hostman/guestman/pod/pleg/generic.go b/pkg/hostman/guestman/pod/pleg/generic.go new file mode 100644 index 00000000000..9cd5ead7943 --- /dev/null +++ b/pkg/hostman/guestman/pod/pleg/generic.go @@ -0,0 +1,436 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pleg + +import ( + "fmt" + "sync/atomic" + "time" + + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + + "yunion.io/x/log" + "yunion.io/x/pkg/util/clock" + "yunion.io/x/pkg/util/sets" + "yunion.io/x/pkg/util/wait" + + "yunion.io/x/onecloud/pkg/hostman/guestman/pod/runtime" +) + +// plegContainerState has a one-to-one mapping to the +// kubecontainer.State except for the non-existent state. This state +// is introduced here to complete the state transition scenarios. +type plegContainerState string + +const ( + plegContainerRunning plegContainerState = "running" + plegContainerExited plegContainerState = "exited" + plegContainerUnknown plegContainerState = "unknown" + plegContainerNonExistent plegContainerState = "non-existent" + + // The threshold needs to be greater than the relisting period + the + // relisting time, which can vary significantly. Set a conservative + // threshold to avoid flipping between healthy and unhealthy. + relistThreshold = 3 * time.Minute +) + +func convertState(state runtime.State) plegContainerState { + switch state { + case runtime.ContainerStateCreated: + // kubelet doesn't use the "created" state yet, hence convert it to "unknown". + return plegContainerUnknown + case runtime.ContainerStateRunning: + return plegContainerRunning + case runtime.ContainerStateExited: + return plegContainerExited + case runtime.ContainerStateUnknown: + return plegContainerUnknown + default: + panic(fmt.Sprintf("unrecognized container state: %v", state)) + } +} + +type GenericPLEG struct { + // The period for relisting. + relistPeriod time.Duration + // The container runtime. + runtime runtime.Runtime + // The channel from which the subscriber listens events. + eventChannel chan *PodLifecycleEvent + podRecords podRecords + // Time of the last relisting. + relistTime atomic.Value + // Cache for storing the runtime states required for syncing pods. + cache runtime.Cache + clock clock.Clock + // Pods that failed to have their status retrieved during a relist. These pods will be + // retried during the next relisting. + podsToReinspect map[string]*runtime.Pod +} + +const ( + // Capacity of the channel for receiving pod lifecycle events. This number + // is a bit arbitrary and may be adjusted in the future. + ChannelCapacity = 1000 + + // Generic PLEG relies on relisting for discovering container events. + // A longer period means that kubelet will take longer to detect container + // changes and to update pod status. On the other hand, a shorter period + // will cause more frequent relisting (e.g., container runtime operations), + // leading to higher cpu usage. + // Note that even though we set the period to 1s, the relisting itself can + // take more than 1s to finish if the container runtime responds slowly + // and/or when there are many container changes in one cycle. + RelistPeriod = time.Second * 1 +) + +func NewGenericPLEG(runtimeManager runtime.Runtime, channelCapacity int, + relistPeriod time.Duration, cache runtime.Cache, clock clock.Clock) PodLifecycleEventGenerator { + return &GenericPLEG{ + relistPeriod: relistPeriod, + runtime: runtimeManager, + eventChannel: make(chan *PodLifecycleEvent, channelCapacity), + podRecords: make(podRecords), + cache: cache, + clock: clock, + } +} + +func (g *GenericPLEG) Start() { + go wait.Until(g.relist, g.relistPeriod, wait.NeverStop) +} + +func (g *GenericPLEG) Watch() chan *PodLifecycleEvent { + return g.eventChannel +} + +func (g *GenericPLEG) getRelistTime() time.Time { + val := g.relistTime.Load() + if val == nil { + return time.Time{} + } + return val.(time.Time) +} + +func (g *GenericPLEG) updateRelistTime(timestamp time.Time) { + g.relistTime.Store(timestamp) +} + +// relist queries the container runtime for list of pods/containers, compare +// with the internal pods/containers, and generates events accordingly. +func (g *GenericPLEG) relist() { + log.Debugf("GenericPLEG: Relisting") + + timestamp := g.clock.Now() + + // Get all the pods. + podList, err := g.runtime.GetPods(true) + if err != nil { + log.Errorf("Unable to retrieve pods: %v", err) + return + } + + g.updateRelistTime(timestamp) + + pods := runtime.Pods(podList) + g.podRecords.setCurrent(pods) + + // Compare the old and the current pods, and generate events. + eventsByPodID := map[string][]*PodLifecycleEvent{} + for pid := range g.podRecords { + oldPod := g.podRecords.getOld(pid) + pod := g.podRecords.getCurrent(pid) + // Get all containers in the old and the new pod. + allContainers := getContainersFromPods(oldPod, pod) + for _, container := range allContainers { + events := computeEvents(oldPod, pod, &container.ID) + for _, e := range events { + updateEvents(eventsByPodID, e) + } + } + } + + var needsReinspection map[string]*runtime.Pod + if g.cacheEnabled() { + needsReinspection = make(map[string]*runtime.Pod) + } + + // If there are events associated with a pod, we should update the + // podCache. + for pid, events := range eventsByPodID { + pod := g.podRecords.getCurrent(pid) + if g.cacheEnabled() { + // updateCache() will inspect the pod and update the cache. If an + // error occurs during the inspection, we want PLEG to retry again + // in the next relist. To achieve this, we do not update the + // associated podRecord of the pod, so that the change will be + // detect again in the next relist. + // TODO: If many pods changed during the same relist period, + // inspecting the pod and getting the PodStatus to update the cache + // serially may take a while. We should be aware of this and + // parallelize if needed. + if err := g.updateCache(pod, pid); err != nil { + // Rely on updateCache calling GetPodStatus to log the actual error. + log.Infof("PLEG: Ignoring events for pod %s/%s: %v", pod.Name, pod.Namespace, err) + + // make sure we try to reinspect the pod during the next relisting + needsReinspection[pid] = pod + + continue + } else { + // this pod was in the list to reinspect and we did so because it had events, so remove it + // from the list (we don't want the reinspection code below to inspect it a second time in + // this relist execution) + delete(g.podsToReinspect, pid) + } + } + // Update the internal storage and send out the events. + g.podRecords.update(pid) + for i := range events { + // Filter out events that are not reliable and no other components use yet. + if events[i].Type == ContainerChanged { + continue + } + select { + case g.eventChannel <- events[i]: + default: + log.Errorf("event channel is full, discard this relist() cycle event") + } + } + } + + if g.cacheEnabled() { + // reinspect any pods that failed inspection during the previous relist + if len(g.podsToReinspect) > 0 { + log.Infof("GenericPLEG: Reinspecting pods that previously failed inspection") + for pid, pod := range g.podsToReinspect { + if err := g.updateCache(pod, pid); err != nil { + // Rely on updateCache calling GetPodStatus to log the actual error. + log.Infof("PLEG: pod %s/%s failed reinspection: %v", pod.Name, pod.Namespace, err) + needsReinspection[pid] = pod + } + } + } + + // Update the cache timestamp. This needs to happen *after* + // all pods have been properly updated in the cache. + g.cache.UpdateTime(timestamp) + } + + // make sure we retain the list of pods that need reinspecting the next time relist is called + g.podsToReinspect = needsReinspection +} + +func (g *GenericPLEG) cacheEnabled() bool { + return g.cache != nil +} + +func (g *GenericPLEG) updateCache(pod *runtime.Pod, pid string) error { + if pod == nil { + // The pod is missing in the current relist. This means that + // the pod has no visible (active or inactive) containers. + log.Infof("PLEG: Delete status for pod %q", string(pid)) + g.cache.Delete(pid) + return nil + } + timestamp := g.clock.Now() + // TODO: Consider adding a new runtime method + // GetPodStatus(pod *kubecontainer.Pod) so that Docker can avoid listing + // all containers again. + status, err := g.runtime.GetPodStatus(pod.Id, pod.Name, pod.Namespace) + log.Debugf("PLEG: Write status for %s/%s: %#v (err: %v)", pod.Name, pod.Namespace, status, err) + if err == nil { + // Preserve the pod IP across cache updates if the new IP is empty. + // When a pod is torn down, kubelet may race with PLEG and retrieve + // a pod status after network teardown, but the kubernetes API expects + // the completed pod's IP to be available after the pod is dead. + status.IPs = g.getPodIPs(pid, status) + } + + g.cache.Set(pod.Id, status, err, timestamp) + return err +} + +// getPodIP preserves an older cached status' pod IP if the new status has no pod IPs +// and its sandboxes have exited +func (g *GenericPLEG) getPodIPs(pid string, status *runtime.PodStatus) []string { + if len(status.IPs) != 0 { + return status.IPs + } + + oldStatus, err := g.cache.Get(pid) + if err != nil || len(oldStatus.IPs) == 0 { + return nil + } + + for _, sandboxStatus := range status.SandboxStatuses { + // If at least one sandbox is ready, then use this status update's pod IP + if sandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY { + return status.IPs + } + } + + // For pods with no ready containers or sandboxes (like exited pods) + // use the old status' pod IP + return oldStatus.IPs +} + +func updateEvents(eventsByPodID map[string][]*PodLifecycleEvent, e *PodLifecycleEvent) { + if e == nil { + return + } + eventsByPodID[e.Id] = append(eventsByPodID[e.Id], e) +} + +func getContainersFromPods(pods ...*runtime.Pod) []*runtime.Container { + cidSet := sets.NewString() + var containers []*runtime.Container + for _, p := range pods { + if p == nil { + continue + } + for _, c := range p.Containers { + cid := string(c.ID.ID) + if cidSet.Has(cid) { + continue + } + cidSet.Insert(cid) + containers = append(containers, c) + } + // Update sandboxes as containers + // TODO: keep track of sandboxes explicitly. + for _, c := range p.Sandboxes { + cid := string(c.ID.ID) + if cidSet.Has(cid) { + continue + } + cidSet.Insert(cid) + containers = append(containers, c) + } + + } + return containers +} + +func computeEvents(oldPod, newPod *runtime.Pod, cid *runtime.ContainerID) []*PodLifecycleEvent { + var pid string + if oldPod != nil { + pid = oldPod.Id + } else if newPod != nil { + pid = newPod.Id + } + oldState := getContainerState(oldPod, cid) + newState := getContainerState(newPod, cid) + return generateEvents(pid, cid.ID, oldState, newState) +} + +func generateEvents(podID string, cid string, oldState, newState plegContainerState) []*PodLifecycleEvent { + if newState == oldState { + return nil + } + + log.Infof("GenericPLEG: %v/%v: %v -> %v", podID, cid, oldState, newState) + switch newState { + case plegContainerRunning: + return []*PodLifecycleEvent{{Id: podID, Type: ContainerStarted, Data: cid}} + case plegContainerExited: + return []*PodLifecycleEvent{{Id: podID, Type: ContainerDied, Data: cid}} + case plegContainerUnknown: + return []*PodLifecycleEvent{{Id: podID, Type: ContainerChanged, Data: cid}} + case plegContainerNonExistent: + switch oldState { + case plegContainerExited: + // We already reported that the container died before. + return []*PodLifecycleEvent{{Id: podID, Type: ContainerRemoved, Data: cid}} + default: + return []*PodLifecycleEvent{{Id: podID, Type: ContainerDied, Data: cid}, {Id: podID, Type: ContainerRemoved, Data: cid}} + } + default: + panic(fmt.Sprintf("unrecognized container state: %v", newState)) + } +} + +func getContainerState(pod *runtime.Pod, cid *runtime.ContainerID) plegContainerState { + // Default to the non-existent state. + state := plegContainerNonExistent + if pod == nil { + return state + } + c := pod.FindContainerByID(*cid) + if c != nil { + return convertState(c.State) + } + // Search through sandboxes too. + c = pod.FindSandboxByID(*cid) + if c != nil { + return convertState(c.State) + } + + return state +} + +type podRecord struct { + old *runtime.Pod + current *runtime.Pod +} + +type podRecords map[string]*podRecord + +func (pr podRecords) getOld(id string) *runtime.Pod { + r, ok := pr[id] + if !ok { + return nil + } + return r.old +} + +func (pr podRecords) getCurrent(id string) *runtime.Pod { + r, ok := pr[id] + if !ok { + return nil + } + return r.current +} + +func (pr podRecords) setCurrent(pods []*runtime.Pod) { + for i := range pr { + pr[i].current = nil + } + for _, pod := range pods { + if r, ok := pr[pod.Id]; ok { + r.current = pod + } else { + pr[pod.Id] = &podRecord{current: pod} + } + } +} + +func (pr podRecords) update(id string) { + r, ok := pr[id] + if !ok { + return + } + pr.updateInternal(id, r) +} + +func (pr podRecords) updateInternal(id string, r *podRecord) { + if r.current == nil { + // Pod no longer exists; delete the entry. + delete(pr, id) + return + } + r.old = r.current + r.current = nil +} diff --git a/pkg/hostman/guestman/pod/pleg/pleg.go b/pkg/hostman/guestman/pod/pleg/pleg.go new file mode 100644 index 00000000000..a8f9db2fbd7 --- /dev/null +++ b/pkg/hostman/guestman/pod/pleg/pleg.go @@ -0,0 +1,50 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pleg + +// PodLifecycleEventType define the event type of pod life cycle events. +type PodLifecycleEventType string + +const ( + // ContainerStarted - event type when the new state of container is running + ContainerStarted PodLifecycleEventType = "ContainerStarted" + // ContainerDied - event type when the new state of container is exited. + ContainerDied PodLifecycleEventType = "ContainerDied" + // ContainerRemoved - event type when the old state of container is exited. + ContainerRemoved PodLifecycleEventType = "ContainerRemoved" + // PodSync is used to trigger syncing of a pod when the observed change of + // the state of the pod cannot be captured by any single event above. + PodSync PodLifecycleEventType = "PodSync" + // ContainerChanged - event type when the new state of container is unknown. + ContainerChanged PodLifecycleEventType = "ContainerChanged" +) + +// PodLifecycleEvent is an event that reflects the change of the pod state. +type PodLifecycleEvent struct { + // The pod ID. + Id string + // The type of the event. + Type PodLifecycleEventType + // The accompanied data which varies based on the event type. + // - ContainerStarted/ContainerStopped: the container name (string). + // - All other event types: unused. + Data interface{} +} + +// PodLifecycleEventGenerator contains functions for generating pod life cycle events. +type PodLifecycleEventGenerator interface { + Start() + Watch() chan *PodLifecycleEvent +} diff --git a/pkg/hostman/guestman/pod/runtime/cache.go b/pkg/hostman/guestman/pod/runtime/cache.go new file mode 100644 index 00000000000..1d38174b412 --- /dev/null +++ b/pkg/hostman/guestman/pod/runtime/cache.go @@ -0,0 +1,198 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtime + +import ( + "sync" + "time" +) + +// Cache stores the PodStatus for the pods. It represents *all* the visible +// pods/containers in the container runtime. All cache entries are at least as +// new or newer than the global timestamp (set by UpdateTime()), while +// individual entries may be slightly newer than the global timestamp. If a pod +// has no states known by the runtime, Cache returns an empty PodStatus object +// with ID populated. +// +// Cache provides two methods to retrieve the PodStatus: the non-blocking Get() +// and the blocking GetNewerThan() method. The component responsible for +// populating the cache is expected to call Delete() to explicitly free the +// cache entries. +type Cache interface { + Get(string) (*PodStatus, error) + Set(string, *PodStatus, error, time.Time) + // GetNewerThan is a blocking call that only returns the status + // when it is newer than the given time. + GetNewerThan(string, time.Time) (*PodStatus, error) + Delete(string) + UpdateTime(time.Time) +} + +type data struct { + // Status of the pod. + status *PodStatus + // Error got when trying to inspect the pod. + err error + // Time when the data was last modified. + modified time.Time +} + +type subRecord struct { + time time.Time + ch chan *data +} + +// cache implements Cache. +type cache struct { + // Lock which guards all internal data structures. + lock sync.RWMutex + // Map that stores the pod statuses. + pods map[string]*data + // A global timestamp represents how fresh the cached data is. All + // cache content is at the least newer than this timestamp. Note that the + // timestamp is nil after initialization, and will only become non-nil when + // it is ready to serve the cached statuses. + timestamp *time.Time + // Map that stores the subscriber records. + subscribers map[string][]*subRecord +} + +// NewCache creates a pod cache. +func NewCache() Cache { + return &cache{ + pods: map[string]*data{}, + subscribers: map[string][]*subRecord{}, + } +} + +// Get returns the PodStatus for the pod; callers are expected not to +// modify the objects returned. +func (c *cache) Get(id string) (*PodStatus, error) { + c.lock.RLock() + defer c.lock.RUnlock() + d := c.get(id) + return d.status, d.err +} + +func (c *cache) GetNewerThan(id string, minTime time.Time) (*PodStatus, error) { + ch := c.subscribe(id, minTime) + d := <-ch + return d.status, d.err +} + +// Set sets the PodStatus for the pod. +func (c *cache) Set(id string, status *PodStatus, err error, timestamp time.Time) { + c.lock.Lock() + defer c.lock.Unlock() + defer c.notify(id, timestamp) + c.pods[id] = &data{status: status, err: err, modified: timestamp} +} + +// Delete removes the entry of the pod. +func (c *cache) Delete(id string) { + c.lock.Lock() + defer c.lock.Unlock() + delete(c.pods, id) +} + +// UpdateTime modifies the global timestamp of the cache and notify +// subscribers if needed. +func (c *cache) UpdateTime(timestamp time.Time) { + c.lock.Lock() + defer c.lock.Unlock() + c.timestamp = ×tamp + // Notify all the subscribers if the condition is met. + for id := range c.subscribers { + c.notify(id, *c.timestamp) + } +} + +func makeDefaultData(id string) *data { + return &data{status: &PodStatus{ID: id}, err: nil} +} + +func (c *cache) get(id string) *data { + d, ok := c.pods[id] + if !ok { + // Cache should store *all* pod/container information known by the + // container runtime. A cache miss indicates that there are no states + // regarding the pod last time we queried the container runtime. + // What this *really* means is that there are no visible pod/containers + // associated with this pod. Simply return an default (mostly empty) + // PodStatus to reflect this. + return makeDefaultData(id) + } + return d +} + +// getIfNewerThan returns the data it is newer than the given time. +// Otherwise, it returns nil. The caller should acquire the lock. +func (c *cache) getIfNewerThan(id string, minTime time.Time) *data { + d, ok := c.pods[id] + globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime)) + if !ok && globalTimestampIsNewer { + // Status is not cached, but the global timestamp is newer than + // minTime, return the default status. + return makeDefaultData(id) + } + if ok && (d.modified.After(minTime) || globalTimestampIsNewer) { + // Status is cached, return status if either of the following is true. + // * status was modified after minTime + // * the global timestamp of the cache is newer than minTime. + return d + } + // The pod status is not ready. + return nil +} + +// notify sends notifications for pod with the given id, if the requirements +// are met. Note that the caller should acquire the lock. +func (c *cache) notify(id string, timestamp time.Time) { + list, ok := c.subscribers[id] + if !ok { + // No one to notify. + return + } + newList := []*subRecord{} + for i, r := range list { + if timestamp.Before(r.time) { + // Doesn't meet the time requirement; keep the record. + newList = append(newList, list[i]) + continue + } + r.ch <- c.get(id) + close(r.ch) + } + if len(newList) == 0 { + delete(c.subscribers, id) + } else { + c.subscribers[id] = newList + } +} + +func (c *cache) subscribe(id string, timestamp time.Time) chan *data { + ch := make(chan *data, 1) + c.lock.Lock() + defer c.lock.Unlock() + d := c.getIfNewerThan(id, timestamp) + if d != nil { + // If the cache entry is ready, send the data and return immediately. + ch <- d + return ch + } + // Add the subscription record. + c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch}) + return ch +} diff --git a/pkg/hostman/guestman/pod/runtime/doc.go b/pkg/hostman/guestman/pod/runtime/doc.go new file mode 100644 index 00000000000..01379173d41 --- /dev/null +++ b/pkg/hostman/guestman/pod/runtime/doc.go @@ -0,0 +1 @@ +package runtime // import "yunion.io/x/onecloud/pkg/hostman/guestman/pod/runtime" diff --git a/pkg/hostman/guestman/pod/runtime/helpers.go b/pkg/hostman/guestman/pod/runtime/helpers.go new file mode 100644 index 00000000000..f8eb4ffbe22 --- /dev/null +++ b/pkg/hostman/guestman/pod/runtime/helpers.go @@ -0,0 +1,161 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtime + +import ( + "fmt" + "strconv" + + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + + "yunion.io/x/log" + "yunion.io/x/pkg/errors" +) + +func (m *runtimeManager) sandboxToContainer(s *runtimeapi.PodSandbox) (*Container, error) { + if s == nil || s.Id == "" { + return nil, errors.Errorf("unable to convert a nil pointer to a runtime container") + } + + return &Container{ + ID: ContainerID{Type: m.runtimeName, ID: s.Id}, + State: SandboxToContainerState(s.State), + }, nil +} + +func SandboxToContainerState(state runtimeapi.PodSandboxState) State { + switch state { + case runtimeapi.PodSandboxState_SANDBOX_READY: + return ContainerStateRunning + case runtimeapi.PodSandboxState_SANDBOX_NOTREADY: + return ContainerStateExited + } + return ContainerStateUnknown +} + +func (m *runtimeManager) toContainer(c *runtimeapi.Container) (*Container, error) { + if c == nil || c.Id == "" || c.Image == nil { + return nil, fmt.Errorf("unable to convert a nil pointer to a runtime container") + } + + return &Container{ + ID: ContainerID{Type: m.runtimeName, ID: c.Id}, + Name: c.GetMetadata().GetName(), + Image: c.ImageRef, + ImageID: c.Image.Image, + State: toContainerState(c.State), + }, nil +} + +// toContainerState converts runtime.ContainerState to State. +func toContainerState(state runtimeapi.ContainerState) State { + switch state { + case runtimeapi.ContainerState_CONTAINER_CREATED: + return ContainerStateCreated + case runtimeapi.ContainerState_CONTAINER_RUNNING: + return ContainerStateRunning + case runtimeapi.ContainerState_CONTAINER_EXITED: + return ContainerStateExited + case runtimeapi.ContainerState_CONTAINER_UNKNOWN: + return ContainerStateUnknown + } + return ContainerStateUnknown +} + +type labeledContainerInfo struct { + ContainerName string + PodName string + PodNamespace string + PodUid string +} + +func getStringValueFromLabel(labels map[string]string, label string) string { + if labels == nil { + return "" + } + if value, found := labels[label]; found { + return value + } + // Do not report error, because there should be many old containers without label now. + // Return empty string "" for these containers, the caller will get value by other ways. + return "" +} + +func getIntValueFromLabel(labels map[string]string, label string) (int, error) { + if strValue, found := labels[label]; found { + intValue, err := strconv.Atoi(strValue) + if err != nil { + // This really should not happen. Just set value to 0 to handle this abnormal case + return 0, err + } + return intValue, nil + } + // Do not report error, because there should be many old containers without label now. + log.Infof("Container doesn't have label %s, it may be an old or invalid container", label) + // Just set the value to 0 + return 0, nil +} + +func getContainerInfoFromLabels(labels, annotations map[string]string) *labeledContainerInfo { + podName := getStringValueFromLabel(labels, PodNameLabel) + if podName == "" { + podName = getStringValueFromLabel(annotations, SandboxNameAnnotation) + } + podNamespace := getStringValueFromLabel(labels, PodNamespaceLabel) + if podNamespace == "" { + podNamespace = getStringValueFromLabel(annotations, SandboxNamespaceAnnotation) + } + podUid := getStringValueFromLabel(labels, PodUIDLabel) + if podUid == "" { + podUid = getStringValueFromLabel(annotations, SandboxUidAnnotation) + } + containerName := getStringValueFromLabel(labels, ContainerNameLabel) + if containerName == "" { + containerName = getStringValueFromLabel(annotations, ContainerNameAnnotation) + } + return &labeledContainerInfo{ + PodName: podName, + PodNamespace: podNamespace, + PodUid: podUid, + ContainerName: containerName, + } +} + +type annotatedContainerInfo struct { + Hash uint64 + RestartCount int + PodDeletionGracePeriod *int64 + PodTerminationGracePeriod *int64 + TerminationMessagePath string +} + +// getContainerInfoFromAnnotations gets annotatedContainerInfo from annotations. +func getContainerInfoFromAnnotations(annotations map[string]string) *annotatedContainerInfo { + if annotations == nil { + return nil + } + var err error + containerInfo := &annotatedContainerInfo{} + if containerInfo.RestartCount, err = getIntValueFromLabel(annotations, ContainerRestartCountLabel); err != nil { + log.Errorf("Unable to get %q from annotations %v: %v", ContainerRestartCountLabel, annotations, err) + } + return containerInfo +} + +type containerStatusByCreated []*Status + +func (c containerStatusByCreated) Len() int { return len(c) } +func (c containerStatusByCreated) Swap(i, j int) { c[i], c[j] = c[j], c[i] } +func (c containerStatusByCreated) Less(i, j int) bool { return c[i].CreatedAt.After(c[j].CreatedAt) } diff --git a/pkg/hostman/guestman/pod/runtime/labels.go b/pkg/hostman/guestman/pod/runtime/labels.go new file mode 100644 index 00000000000..02c4b65c6b7 --- /dev/null +++ b/pkg/hostman/guestman/pod/runtime/labels.go @@ -0,0 +1,49 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtime + +const ( + PodNameLabel = "io.yunion.pod.name" + PodNamespaceLabel = "io.yunion.pod.namespace" + PodUIDLabel = "io.yunion.pod.uid" + ContainerNameLabel = "io.yunion.container.name" + ContainerRestartCountLabel = "io.yunion.container.restart_count" +) + +const ( + ContainerNameAnnotation = "io.kubernetes.cri.container-name" + ContainerTypeAnnotation = "io.kubernetes.cri.container-type" + ImageNameAnnotation = "io.kubernetes.cri.image-name" + SandboxIdAnnotation = "io.kubernetes.cri.sandbox-id" + SandboxNameAnnotation = "io.kubernetes.cri.sandbox-name" + SandboxNamespaceAnnotation = "io.kubernetes.cri.sandbox-namespace" + SandboxUidAnnotation = "io.kubernetes.cri.sandbox-uid" +) + +func GetContainerName(labels map[string]string) string { + return labels[ContainerNameLabel] +} + +func GetPodName(labels map[string]string) string { + return labels[PodNameLabel] +} + +func GetPodUID(labels map[string]string) string { + return labels[PodUIDLabel] +} + +func GetPodNamespace(labels map[string]string) string { + return labels[PodNamespaceLabel] +} diff --git a/pkg/hostman/guestman/pod/runtime/runtime.go b/pkg/hostman/guestman/pod/runtime/runtime.go new file mode 100644 index 00000000000..2cf23e9a218 --- /dev/null +++ b/pkg/hostman/guestman/pod/runtime/runtime.go @@ -0,0 +1,223 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtime + +import ( + "time" + + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" +) + +// Runtime interface defines the interfaces that should be implemented +// by a container runtime. +// Thread safety is required from implementations of this interface. +type Runtime interface { + // Type returns the type of the container runtime. + Type() string + + // GetPods returns a list of containers grouped by pods. The boolean parameter + // specifies whether the runtime returns all containers including those already + // exited and dead containers (used for garbage collection). + GetPods(all bool) ([]*Pod, error) + + // GetPodStatus retrieves the status of the pod, including the + // information of all containers in the pod that are visible in Runtime. + GetPodStatus(uid, name, namespace string) (*PodStatus, error) +} + +// Pod is a group of containers. +type Pod struct { + // The ID of the pod, which can be used to retrieve a particular pod + // from the pod list returned by GetPods(). + Id string + CRIId string + // The name and namespace of the pod, which is readable by human. + Name string + Namespace string + // List of containers that belongs to the pod. It may contain only + // running containers, or mixed with dead ones (when GetPods(true)). + Containers []*Container + // List of sandboxes associated with this pod. The sandboxes are converted + // to Container temporarily to avoid substantial changes to other + // components. This is only populated by kuberuntime. + Sandboxes []*Container +} + +// ContainerID is a type that identifies a container. +type ContainerID struct { + // The type of the container runtime. e.g. 'docker'. + Type string + // The identification of the container, this is comsumable by + // the underlying container runtime. (Note that the container + // runtime interface still takes the whole struct as input). + ID string +} + +// State represents the state of a container +type State string + +const ( + // ContainerStateCreated indicates a container that has been created (e.g. with docker create) but not started. + ContainerStateCreated State = "created" + // ContainerStateRunning indicates a currently running container. + ContainerStateRunning State = "running" + // ContainerStateExited indicates a container that ran and completed ("stopped" in other contexts, although a created container is technically also "stopped"). + ContainerStateExited State = "exited" + // ContainerStateUnknown encompasses all the states that we currently don't care about (like restarting, paused, dead). + ContainerStateUnknown State = "unknown" +) + +// Container provides the runtime information for a container, such as ID, hash, +// state of the container. +type Container struct { + // The ID of the container, used by the container runtime to identify + // a container. + ID ContainerID + // The name of the container, which should be the same as specified by + // v1.Container. + Name string + // The image name of the container, this also includes the tag of the image, + // the expected form is "NAME:TAG". + Image string + // The id of the image used by the container. + ImageID string + // State is the state of the container. + State State +} + +// Pods represents the list of pods +type Pods []*Pod + +// FindPodByID finds and returns a pod in the pod list by UID. It will return an empty pod +// if not found. +func (p Pods) FindPodByID(podUID string) Pod { + for i := range p { + if p[i].Id == podUID { + return *p[i] + } + } + return Pod{} +} + +// FindPodByFullName finds and returns a pod in the pod list by the full name. +// It will return an empty pod if not found. +func (p Pods) FindPodByFullName(podFullName string) Pod { + for i := range p { + if BuildPodFullName(p[i].Name, p[i].Namespace) == podFullName { + return *p[i] + } + } + return Pod{} +} + +// FindPod combines FindPodByID and FindPodByFullName, it finds and returns a pod in the +// pod list either by the full name or the pod ID. It will return an empty pod +// if not found. +func (p Pods) FindPod(podFullName string, podUID string) Pod { + if len(podFullName) > 0 { + return p.FindPodByFullName(podFullName) + } + return p.FindPodByID(podUID) +} + +// FindContainerByName returns a container in the pod with the given name. +// When there are multiple containers with the same name, the first match will +// be returned. +func (p *Pod) FindContainerByName(containerName string) *Container { + for _, c := range p.Containers { + if c.Name == containerName { + return c + } + } + return nil +} + +// FindContainerByID returns a container in the pod with the given ContainerID. +func (p *Pod) FindContainerByID(id ContainerID) *Container { + for _, c := range p.Containers { + if c.ID == id { + return c + } + } + return nil +} + +// FindSandboxByID returns a sandbox in the pod with the given ContainerID. +func (p *Pod) FindSandboxByID(id ContainerID) *Container { + for _, c := range p.Sandboxes { + if c.ID == id { + return c + } + } + return nil +} + +// BuildPodFullName builds the pod full name from pod name and namespace. +func BuildPodFullName(name, namespace string) string { + return name + "_" + namespace +} + +type PodStatus struct { + ID string + Name string + Namespace string + IPs []string + ContainerStatuses []*Status + SandboxStatuses []*runtimeapi.PodSandboxStatus +} + +func (ps PodStatus) GetContainerStatus(ctrId string) *Status { + for i := range ps.ContainerStatuses { + cs := ps.ContainerStatuses[i] + if cs.ID.ID == ctrId { + return cs + } + } + return nil +} + +// Status represents the status of a container. +type Status struct { + // ID of the container. + ID ContainerID + // Name of the container. + Name string + // ID of the sandbox to which this container belongs. + PodSandboxID string + // Status of the container. + State State + // Creation time of the container. + CreatedAt time.Time + // Start time of the container. + StartedAt time.Time + // Finish time of the container. + FinishedAt time.Time + // Exit code of the container. + ExitCode int + // Name of the image, this also includes the tag of the image, + // the expected form is "NAME:TAG". + Image string + // ID of the image. + ImageID string + // Hash of the container, used for comparison. + Hash uint64 + // Number of times that the container has been restarted. + RestartCount int + // A string explains why container is in such a status. + Reason string + // Message written by the container before exiting (stored in + // TerminationMessagePath). + Message string +} diff --git a/pkg/hostman/guestman/pod/runtime/runtime_manager.go b/pkg/hostman/guestman/pod/runtime/runtime_manager.go new file mode 100644 index 00000000000..9727b77f3c0 --- /dev/null +++ b/pkg/hostman/guestman/pod/runtime/runtime_manager.go @@ -0,0 +1,411 @@ +// Copyright 2019 Yunion +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package runtime + +import ( + "context" + "encoding/json" + "net" + "sort" + "time" + + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + + "yunion.io/x/log" + "yunion.io/x/pkg/errors" + "yunion.io/x/pkg/utils" + + "yunion.io/x/onecloud/pkg/util/pod" +) + +const ( + runtimeAPIVersion = "0.1.0" +) + +type runtimeManager struct { + runtimeName string + // gRPC service clients. + cri pod.CRI +} + +func NewRuntimeManager(cri pod.CRI) (Runtime, error) { + man := &runtimeManager{ + cri: cri, + } + + typedVersion, err := man.getTypedVersion() + if err != nil { + return nil, errors.Wrap(err, "getTypedVersion") + } + man.runtimeName = typedVersion.RuntimeName + log.Infof("Container runtime %s initialized, version: %s, apiVersion: %s", typedVersion.RuntimeName, typedVersion.RuntimeVersion, typedVersion.RuntimeApiVersion) + + return man, nil +} + +func (m *runtimeManager) getTypedVersion() (*runtimeapi.VersionResponse, error) { + resp, err := m.cri.GetRuntimeClient().Version(context.Background(), &runtimeapi.VersionRequest{Version: runtimeAPIVersion}) + if err != nil { + return nil, errors.Wrap(err, "get remote runtime typed version") + } + return resp, nil +} + +// Type returns the type of the container runtime. +func (m *runtimeManager) Type() string { + return m.runtimeName +} + +// getSandboxes lists all (or just the running) sandboxes. +func (m *runtimeManager) getSandboxes(all bool) ([]*runtimeapi.PodSandbox, error) { + var filter *runtimeapi.PodSandboxFilter + if !all { + readyState := runtimeapi.PodSandboxState_SANDBOX_READY + filter = &runtimeapi.PodSandboxFilter{ + State: &runtimeapi.PodSandboxStateValue{ + State: readyState, + }, + } + } + + resp, err := m.cri.GetRuntimeClient().ListPodSandbox(context.Background(), &runtimeapi.ListPodSandboxRequest{ + Filter: filter, + }) + if err != nil { + log.Errorf("ListPodSandbox failed: %v", err) + return nil, err + } + + return resp.Items, nil +} + +type ContainerRuntimeSpec struct { + Annotations map[string]string `json:"annotations"` +} + +type ContainerExtraInfo struct { + SandboxID string `json:"sandbox_id"` + Pid int `json:"pid"` + RuntimeSpec ContainerRuntimeSpec `json:"runtimeSpec"` +} + +// GetPods returns a list of containers grouped by pods. The boolean parameter +// specifies whether the runtime returns all containers including those already +// exited and dead containers (used for garbage collection). +func (m *runtimeManager) GetPods(all bool) ([]*Pod, error) { + pods := make(map[string]*Pod) + sandboxes, err := m.getSandboxes(all) + if err != nil { + return nil, err + } + for i := range sandboxes { + s := sandboxes[i] + if s.Metadata == nil { + log.Infof("Sandbox does not have metadata: %#v", s) + continue + } + podUid := s.Metadata.Uid + if _, ok := pods[podUid]; !ok { + pods[podUid] = &Pod{ + Id: podUid, + CRIId: s.Id, + Name: s.Metadata.Name, + Namespace: s.Metadata.Namespace, + } + } + p := pods[podUid] + converted, err := m.sandboxToContainer(s) + if err != nil { + log.Infof("Convert %q sandbox %v of pod %q failed: %v", m.runtimeName, s, podUid, err) + continue + } + p.Sandboxes = append(p.Sandboxes, converted) + } + + containers, err := m.getContainers(all) + if err != nil { + return nil, err + } + for i := range containers { + c := containers[i] + if c.Metadata == nil { + log.Infof("Container does not have metadata: %+v", c) + continue + } + + labelledInfo := getContainerInfoFromLabels(c.Labels, c.Annotations) + if labelledInfo.PodUid == "" { + // 旧的容器没设置 labels 标签,需要从 status.info.runtimeSpec.annotations 里面找 pod 关联信息 + resp, err := m.cri.GetRuntimeClient().ContainerStatus(context.Background(), &runtimeapi.ContainerStatusRequest{ + ContainerId: c.Id, + Verbose: true, + }) + if err != nil { + log.Infof("get container %s status failed: %v", c.GetId(), err) + continue + } + infoStr, ok := resp.GetInfo()["info"] + if !ok { + log.Infof("not found container %s info", c.GetId()) + continue + } + info := new(ContainerExtraInfo) + if err := json.Unmarshal([]byte(infoStr), info); err != nil { + log.Infof("unmarshal container %s info failed: %v", c.GetId(), err) + continue + } + labelledInfo = getContainerInfoFromLabels(nil, info.RuntimeSpec.Annotations) + } + pod, found := pods[labelledInfo.PodUid] + if !found { + pod = &Pod{ + Id: labelledInfo.PodUid, + Name: labelledInfo.PodName, + Namespace: labelledInfo.PodNamespace, + } + pods[labelledInfo.PodUid] = pod + } + + converted, err := m.toContainer(c) + if err != nil { + log.Warningf("Convert %s container %v of pod %q failed: %v", m.runtimeName, c, labelledInfo.PodUid, err) + continue + } + pod.Containers = append(pod.Containers, converted) + } + + // convert map to list. + var result []*Pod + for i := range pods { + result = append(result, pods[i]) + } + return result, nil +} + +func (m *runtimeManager) getContainers(allContainers bool) ([]*runtimeapi.Container, error) { + filter := &runtimeapi.ContainerFilter{} + if !allContainers { + filter.State = &runtimeapi.ContainerStateValue{ + State: runtimeapi.ContainerState_CONTAINER_RUNNING, + } + } + + containers, err := m.cri.GetRuntimeClient().ListContainers(context.Background(), &runtimeapi.ListContainersRequest{Filter: filter}) + if err != nil { + return nil, errors.Wrap(err, "ListContainers failed") + } + return containers.Containers, nil +} + +func (m *runtimeManager) GetPodStatus(uid, name, namespace string) (*PodStatus, error) { + // Now we retain restart count of container as a container label. Each time a container + // restarts, pod will read the restart count from the registered dead container, increment + // it to get the new restart count, and then add a label with the new restart count on + // the newly started container. + // However, there are some limitations of this method: + // 1. When all dead containers were garbage collected, the container status could + // not get the historical value and would be *inaccurate*. Fortunately, the chance + // is really slim. + // 2. When working with old version containers which have no restart count label, + // we can only assume their restart count is 0. + // Anyhow, we only promised "best-effort" restart count reporting, we can just ignore + // these limitations now. + podSandboxIDs, err := m.getSandboxIDByPodUID(uid, nil) + if err != nil { + return nil, err + } + + podFullName := BuildPodFullName(name, namespace) + + log.Debugf("getSandboxIDByPodUID got sandbox IDs %q for pod %q", podSandboxIDs, podFullName) + + sandboxStatuses := make([]*runtimeapi.PodSandboxStatus, len(podSandboxIDs)) + podIPs := []string{} + for idx, podSandboxID := range podSandboxIDs { + req := &runtimeapi.PodSandboxStatusRequest{ + PodSandboxId: podSandboxID, + } + resp, err := m.cri.GetRuntimeClient().PodSandboxStatus(context.Background(), req) + if err != nil { + log.Errorf("PodSandboxStatus of sandbox %q for pod %q error: %v", podSandboxID, podFullName, err) + return nil, err + } + podSandboxStatus := resp.Status + sandboxStatuses[idx] = podSandboxStatus + + // Only get pod IP from latest sandbox + if idx == 0 && podSandboxStatus.State == runtimeapi.PodSandboxState_SANDBOX_READY { + podIPs = m.determinePodSandboxIPs(podSandboxStatus) + } + } + + // Get statuses of all containers visible in the pod. + containerStatuses, err := m.getPodContainerStatuses(uid, podSandboxIDs) + if err != nil { + log.Errorf("getPodContainerStatuses for pod %q failed: %v", podFullName, err) + return nil, err + } + + return &PodStatus{ + ID: uid, + Name: name, + Namespace: namespace, + IPs: podIPs, + SandboxStatuses: sandboxStatuses, + ContainerStatuses: containerStatuses, + }, nil +} + +func (m *runtimeManager) getSandboxIDByPodUID(podUID string, state *runtimeapi.PodSandboxState) ([]string, error) { + filter := &runtimeapi.PodSandboxFilter{ + LabelSelector: map[string]string{ + PodUIDLabel: podUID, + }, + } + if state != nil { + filter.State = &runtimeapi.PodSandboxStateValue{ + State: *state, + } + } + resp, err := m.cri.GetRuntimeClient().ListPodSandbox(context.Background(), &runtimeapi.ListPodSandboxRequest{Filter: filter}) + if err != nil { + return nil, errors.Wrap(err, "ListPodSandbox failed") + } + sandboxes := resp.Items + if len(sandboxes) == 0 { + // 兼容旧版没有打标签的 pods + pods, err := m.cri.ListPods(context.Background(), pod.ListPodOptions{}) + if err != nil { + return nil, errors.Wrap(err, "List all pods failed") + } + for i := range pods { + item := pods[i] + if item.Metadata.Uid == podUID { + sandboxes = append(sandboxes, item) + } + } + } + + // Sort with newest first. + sandboxIDs := make([]string, len(sandboxes)) + for i, s := range sandboxes { + sandboxIDs[i] = s.Id + } + + return sandboxIDs, nil +} + +// determinePodSandboxIP determines the IP addresses of the given pod sandbox. +func (m *runtimeManager) determinePodSandboxIPs(podSandbox *runtimeapi.PodSandboxStatus) []string { + podIPs := make([]string, 0) + if podSandbox.Network == nil { + log.Warningf("Pod Sandbox status doesn't have network information, cannot report IPs") + return podIPs + } + + // ip could be an empty string if runtime is not responsible for the + // IP (e.g., host networking). + + // pick primary IP + if len(podSandbox.Network.Ip) != 0 { + if net.ParseIP(podSandbox.Network.Ip) == nil { + log.Warningf("Pod Sandbox reported an unparseable IP (Primary) %v", podSandbox.Network.Ip) + return nil + } + podIPs = append(podIPs, podSandbox.Network.Ip) + } + + // pick additional ips, if cri reported them + for _, podIP := range podSandbox.Network.AdditionalIps { + if nil == net.ParseIP(podIP.Ip) { + log.Warningf("Pod Sandbox reported an unparseable IP (additional) %v", podIP.Ip) + return nil + } + podIPs = append(podIPs, podIP.Ip) + } + + return podIPs +} + +func (m *runtimeManager) getPodContainerStatuses(uid string, criId []string) ([]*Status, error) { + resp, err := m.cri.GetRuntimeClient().ListContainers(context.Background(), &runtimeapi.ListContainersRequest{Filter: &runtimeapi.ContainerFilter{ + LabelSelector: map[string]string{PodUIDLabel: uid}, + }}) + if err != nil { + return nil, errors.Wrap(err, "ListContainers with label selector failed") + } + containers := resp.Containers + if len(containers) == 0 { + // 兼容旧版没有打标签的容器 + allContainers, err := m.cri.ListContainers(context.Background(), pod.ListContainerOptions{}) + if err != nil { + return nil, errors.Wrapf(err, "ListContainers by pod uid: %s", uid) + } + for i := range allContainers { + container := allContainers[i] + if utils.IsInStringArray(container.PodSandboxId, criId) { + containers = append(containers, container) + } + } + } + + statuses := make([]*Status, len(containers)) + for i, c := range containers { + sResp, err := m.cri.ContainerStatus(context.Background(), c.Id) + if err != nil { + return nil, errors.Wrapf(err, "ContainerStatus by container id: %s", c.Id) + } + status := sResp.Status + cStatus := ToContainerStatus(status, m.runtimeName) + cStatus.PodSandboxID = c.PodSandboxId + statuses[i] = cStatus + } + + sort.Sort(containerStatusByCreated(statuses)) + return statuses, nil +} + +func ToContainerStatus(status *runtimeapi.ContainerStatus, runtimeName string) *Status { + annotatedInfo := getContainerInfoFromAnnotations(status.Annotations) + labeledInfo := getContainerInfoFromLabels(status.Labels, status.Annotations) + cStatus := &Status{ + ID: ContainerID{ + Type: runtimeName, + ID: status.Id, + }, + Name: labeledInfo.ContainerName, + Image: status.Image.Image, + ImageID: status.ImageRef, + State: toContainerState(status.State), + CreatedAt: time.Unix(0, status.CreatedAt), + } + if annotatedInfo != nil { + // cStatus.Hash = annotatedInfo.Hash + cStatus.RestartCount = annotatedInfo.RestartCount + } + + if status.State != runtimeapi.ContainerState_CONTAINER_CREATED { + // If container is not in the created state, we have tried and + // started the container. Set the StartedAt time. + cStatus.StartedAt = time.Unix(0, status.StartedAt) + } + if status.State == runtimeapi.ContainerState_CONTAINER_EXITED { + cStatus.Reason = status.Reason + cStatus.Message = status.Message + cStatus.ExitCode = int(status.ExitCode) + cStatus.FinishedAt = time.Unix(0, status.FinishedAt) + } + return cStatus +} diff --git a/pkg/hostman/guestman/pod_sync_loop.go b/pkg/hostman/guestman/pod_sync_loop.go new file mode 100644 index 00000000000..d21ad9e9a46 --- /dev/null +++ b/pkg/hostman/guestman/pod_sync_loop.go @@ -0,0 +1,269 @@ +package guestman + +import ( + "context" + "fmt" + "path/filepath" + "sync" + "time" + + "yunion.io/x/jsonutils" + "yunion.io/x/log" + "yunion.io/x/pkg/errors" + + computeapi "yunion.io/x/onecloud/pkg/apis/compute" + hostapi "yunion.io/x/onecloud/pkg/apis/host" + "yunion.io/x/onecloud/pkg/hostman/guestman/pod/pleg" + "yunion.io/x/onecloud/pkg/hostman/guestman/pod/runtime" + "yunion.io/x/onecloud/pkg/hostman/hostutils" + "yunion.io/x/onecloud/pkg/util/fileutils2" +) + +func (m *SGuestManager) reconcileContainerLoop(cache runtime.Cache) { + for { + m.Servers.Range(func(id, obj interface{}) bool { + podObj, ok := obj.(*sPodGuestInstance) + if !ok { + return true + } + if err := m.reconcileContainer(podObj, cache); err != nil { + log.Warningf("reconcile pod %s: %v", podObj.GetId(), err) + } + return true + }) + time.Sleep(10 * time.Second) + } +} + +func (m *SGuestManager) reconcileContainer(obj *sPodGuestInstance, cache runtime.Cache) error { + ps, err := cache.Get(obj.GetId()) + if err != nil { + return errors.Wrapf(err, "get pod status") + } + getContainerStatus := func(name string) *runtime.Status { + for i := range ps.ContainerStatuses { + cs := ps.ContainerStatuses[i] + if cs.Name == name { + return cs + } + } + return nil + } + ctrs := obj.GetContainers() + var errs []error + for i := range ctrs { + ctr := ctrs[i] + cs := getContainerStatus(ctr.Name) + if cs == nil { + // container is deleted + continue + } + if cs.State == runtime.ContainerStateExited && cs.ExitCode != 0 { + if err := m.startContainer(obj, ctr, cs); err != nil { + errs = append(errs, errors.Wrapf(err, "start container %s", ctr.Name)) + } + } + } + return errors.NewAggregate(errs) +} + +func (m *SGuestManager) startContainer(obj *sPodGuestInstance, ctr *hostapi.ContainerDesc, cs *runtime.Status) error { + _, isInternalStopped := obj.IsInternalStopped(cs.ID.ID) + if isInternalStopped { + return nil + } + finishedAt := ctr.StartedAt + if !ctr.LastFinishedAt.IsZero() { + finishedAt = ctr.LastFinishedAt + } + attempt := ctr.RestartCount + step := 5 * time.Second + internal := time.Duration(int(step) * (attempt * attempt)) + curInternal := time.Now().Sub(finishedAt) + if curInternal < internal { + log.Infof("current internal time (%s) < crash_back_off time (%s), skipping restart container(%s/%s)", curInternal, internal, obj.GetId(), ctr.Name) + return nil + } + + reason := fmt.Sprintf("start died container %s when exit code is %d", ctr.Id, cs.ExitCode) + ctx := context.Background() + userCred := hostutils.GetComputeSession(ctx).GetToken() + _, err := obj.StartLocalContainer(ctx, userCred, ctr.Id) + if err != nil { + return errors.Wrap(err, reason) + } else { + log.Infof("%s: start local container (%s/%s) success", reason, obj.GetId(), ctr.Name) + } + return nil +} + +func (m *SGuestManager) syncContainerLoop(plegCh chan *pleg.PodLifecycleEvent) { + for { + m.syncContainerLoopIteration(plegCh) + } +} + +func (m *SGuestManager) syncContainerLoopIteration(plegCh chan *pleg.PodLifecycleEvent) { + select { + case e := <-plegCh: + podMan := m.getPodByEvent(e) + if podMan == nil { + log.Warningf("can not find pod manager by %s", jsonutils.Marshal(e)) + return + } + if e.Type == pleg.ContainerStarted { + log.Infof("pod container started: %s", jsonutils.Marshal(e)) + podMan.SyncStatus("pod container started") + } + if e.Type == pleg.ContainerRemoved { + /*isInternalRemoved := podMan.IsInternalRemoved(e) + if !isInternalRemoved { + log.Infof("pod container removed: %s, try recreated", jsonutils.Marshal(e)) + } else { + log.Infof("pod container removed: %s", jsonutils.Marshal(e)) + }*/ + log.Infof("pod container removed: %s", jsonutils.Marshal(e)) + } + if e.Type == pleg.ContainerDied { + ctrId := e.Data.(string) + ctr, isInternalStopped := podMan.IsInternalStopped(ctrId) + if !isInternalStopped { + podStatus, err := m.podCache.Get(e.Id) + if err != nil { + log.Errorf("get pod %s status error: %v", e.Id, err) + return + } + log.Infof("pod container exited: %s", jsonutils.Marshal(e)) + // start container again + ctrStatus := podStatus.GetContainerStatus(ctrId) + var reason string + if ctrStatus == nil { + log.Errorf("can't get container %s status", ctrId) + reason = "container not exist" + } else { + if ctrStatus.ExitCode == 0 { + log.Infof("container %s exited", ctrId) + reason = fmt.Sprintf("container %s exited", ctrId) + } else { + reason = fmt.Sprintf("exit code of died container %s is %d", ctr.Id, ctrStatus.ExitCode) + } + } + log.Infof("sync pod %s container %s status: %s", e.Id, ctrId, reason) + podMan.SyncStatus(reason) + } else { + log.Infof("pod container exited: %s", jsonutils.Marshal(e)) + } + } + } +} + +func (m *SGuestManager) getPodByEvent(event *pleg.PodLifecycleEvent) PodInstance { + obj, ok := m.GetServer(event.Id) + if !ok { + return nil + } + return obj.(PodInstance) +} + +func (s *sPodGuestInstance) IsInternalStopped(ctrCriId string) (*ContainerExpectedStatus, bool) { + ctr, ok := s.expectedStatus.Containers[ctrCriId] + if !ok { + return nil, true + } + if ctr.Status == computeapi.CONTAINER_STATUS_EXITED { + return ctr, true + } + return ctr, false +} + +func (s *sPodGuestInstance) IsInternalRemoved(ctrCriId string) bool { + _, ok := s.expectedStatus.Containers[ctrCriId] + if !ok { + return true + } + return false +} + +type ContainerExpectedStatus struct { + Id string `json:"id"` + Name string `json:"name"` + Status string `json:"status"` +} + +type PodExpectedStatus struct { + lock sync.RWMutex + homeDir string + Status string `json:"status"` + Containers map[string]*ContainerExpectedStatus `json:"containers"` +} + +func NewPodExpectedStatus(homeDir string, status string) (*PodExpectedStatus, error) { + ps := &PodExpectedStatus{ + homeDir: homeDir, + Status: status, + Containers: make(map[string]*ContainerExpectedStatus), + } + if fileutils2.Exists(ps.getFilePath()) { + content, err := fileutils2.FileGetContents(ps.getFilePath()) + if err != nil { + return nil, errors.Wrapf(err, "get %s content", ps.getFilePath()) + } + obj, err := jsonutils.ParseString(content) + if err != nil { + return nil, errors.Wrapf(err, "parse %s content: %s", ps.getFilePath(), content) + } + if err := obj.Unmarshal(ps); err != nil { + return nil, errors.Wrapf(err, "unmarshal to expected status %s", ps.getFilePath()) + } + } + return ps, nil +} + +func (s *PodExpectedStatus) getFilePath() string { + return filepath.Join(s.homeDir, "expected_status.json") +} + +func (s *PodExpectedStatus) updateFile() error { + content := jsonutils.Marshal(s).PrettyString() + if err := fileutils2.FilePutContents(s.getFilePath(), content, false); err != nil { + return errors.Wrapf(err, "put %s content: %s", s.getFilePath(), content) + } + return nil +} + +func (s *PodExpectedStatus) SetStatus(status string) error { + s.lock.Lock() + defer s.lock.Unlock() + + s.Status = status + if err := s.updateFile(); err != nil { + return errors.Wrapf(err, "update file") + } + return nil +} + +func (s *PodExpectedStatus) SetContainerStatus(criId string, id string, status string) error { + s.lock.Lock() + defer s.lock.Unlock() + + s.Containers[criId] = &ContainerExpectedStatus{ + Id: id, + Status: status, + } + if err := s.updateFile(); err != nil { + return errors.Wrapf(err, "update file") + } + return nil +} + +func (s *PodExpectedStatus) RemoveContainer(id string) error { + s.lock.Lock() + defer s.lock.Unlock() + + delete(s.Containers, id) + + if err := s.updateFile(); err != nil { + return errors.Wrapf(err, "update file") + } + return nil +} diff --git a/pkg/hostman/hostutils/hostutils.go b/pkg/hostman/hostutils/hostutils.go index 1d4b8dd5261..fdb557cd20d 100644 --- a/pkg/hostman/hostutils/hostutils.go +++ b/pkg/hostman/hostutils/hostutils.go @@ -27,6 +27,7 @@ import ( "yunion.io/x/pkg/util/regutils" "yunion.io/x/onecloud/pkg/apis" + computeapi "yunion.io/x/onecloud/pkg/apis/compute" hostapi "yunion.io/x/onecloud/pkg/apis/host" "yunion.io/x/onecloud/pkg/appsrv" "yunion.io/x/onecloud/pkg/cloudcommon/consts" @@ -183,8 +184,8 @@ func UpdateResourceStatus(ctx context.Context, man modulebase.IResourceManager, return man.PerformAction(GetComputeSession(ctx), id, "status", jsonutils.Marshal(statusInput)) } -func UpdateContainerStatus(ctx context.Context, cid string, statusInput *apis.PerformStatusInput) (jsonutils.JSONObject, error) { - return UpdateResourceStatus(ctx, &modules.Containers, cid, statusInput) +func UpdateContainerStatus(ctx context.Context, cid string, statusInput *computeapi.ContainerPerformStatusInput) (jsonutils.JSONObject, error) { + return modules.Containers.PerformAction(GetComputeSession(ctx), cid, "status", jsonutils.Marshal(statusInput)) } func UpdateServerStatus(ctx context.Context, sid string, statusInput *apis.PerformStatusInput) (jsonutils.JSONObject, error) { diff --git a/pkg/mcclient/modules/compute/mod_containers.go b/pkg/mcclient/modules/compute/mod_containers.go index c694066b3d1..0a2a13701d5 100644 --- a/pkg/mcclient/modules/compute/mod_containers.go +++ b/pkg/mcclient/modules/compute/mod_containers.go @@ -149,7 +149,7 @@ var ( func init() { Containers = ContainerManager{ modules.NewComputeManager("container", "containers", - []string{"ID", "Name", "Guest_ID", "Spec", "Status"}, + []string{"ID", "Name", "Guest_ID", "Status", "Started_At", "Last_Finished_At", "Restart_Count", "Spec"}, []string{}), } modules.RegisterCompute(&Containers) diff --git a/pkg/scheduler/cache/candidate/common.go b/pkg/scheduler/cache/candidate/common.go index dc94b1603a5..505f8ed474d 100644 --- a/pkg/scheduler/cache/candidate/common.go +++ b/pkg/scheduler/cache/candidate/common.go @@ -33,6 +33,8 @@ var ( computeapi.VM_BLOCK_STREAM, computeapi.VM_UNKNOWN, computeapi.VM_BACKUP_STARTING, + computeapi.POD_STATUS_CONTAINER_EXITED, + computeapi.POD_STATUS_CRASH_LOOP_BACK_OFF, ) VMCreatingStatus = sets.NewString(