Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ReRun job API supported #542

Merged
merged 4 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api/server/router/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ func (cr *clusterRouter) initRoutes(httpEngine *gin.Engine) {
kubeRoute.GET("/ws", cr.webShell)
// node ws
kubeRoute.GET("/nodes/ws", cr.nodeWebShell)
// 重启Job action=rerun
kubeRoute.POST("/clusters/:cluster/namespaces/:namespace/jobs/:name", cr.ReRunJob)
}

// 从 pixiu 缓存中获取 kubernetes 对象
Expand Down
9 changes: 2 additions & 7 deletions api/server/router/cluster/helm_routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,14 @@ import (
"github.com/gin-gonic/gin"

"github.com/caoyingjunz/pixiu/api/server/httputils"
"github.com/caoyingjunz/pixiu/pkg/types"
)

type HelmMeta struct {
Cluster string `uri:"cluster" binding:"required"`
Namespace string `uri:"namespace" binding:"required"`
Name string `uri:"name"`
}

func (cr *clusterRouter) ListReleases(c *gin.Context) {
r := httputils.NewResponse()
var (
err error
helmMeta HelmMeta
helmMeta types.PixiuObjectMeta
)
if err = c.ShouldBindUri(&helmMeta); err != nil {
httputils.SetFailed(c, r, err)
Expand Down
48 changes: 48 additions & 0 deletions api/server/router/cluster/proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
Copyright 2024 The Pixiu Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cluster

import (
"github.com/gin-gonic/gin"

"github.com/caoyingjunz/pixiu/api/server/httputils"
"github.com/caoyingjunz/pixiu/pkg/types"
)

type Action struct {
Act string `form:"action" binding:"required"`
ResourceVersion string `form:"resourceVersion" binding:"required"`
}

func (cr *clusterRouter) ReRunJob(c *gin.Context) {
r := httputils.NewResponse()
var (
jobMeta types.PixiuObjectMeta
action Action
err error
)
if err = httputils.ShouldBindAny(c, nil, &jobMeta, &action); err != nil {
httputils.SetFailed(c, r, err)
return
}
if err = cr.c.Cluster().ReRunJob(c, jobMeta.Cluster, jobMeta.Namespace, jobMeta.Name, action.ResourceVersion); err != nil {
httputils.SetFailed(c, r, err)
return
}

httputils.SetSuccess(c, r)
}
56 changes: 56 additions & 0 deletions pkg/controller/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/casbin/casbin/v2"
"github.com/gorilla/websocket"
"helm.sh/helm/v3/pkg/release"
batchv1 "k8s.io/api/batch/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -81,6 +82,8 @@ type Interface interface {

// WatchPodLog 实时获取 pod 的日志
WatchPodLog(ctx context.Context, cluster string, namespace string, podName string, containerName string, tailLine int64, w http.ResponseWriter, r *http.Request) error
// ReRunJob 重新执行指定任务
ReRunJob(ctx context.Context, cluster string, namespace string, jobName string, resourceVersion string) error

// ListReleases 获取 tenant release 列表
ListReleases(ctx context.Context, cluster string, namespace string) ([]*release.Release, error)
Expand Down Expand Up @@ -385,6 +388,59 @@ func (c *cluster) WatchPodLog(ctx context.Context, cluster string, namespace str
return nil
}

const Retries = 3

// ReRunJob 重新运行(创建)任务,通过先删除在创建的方式实现,极端情况下可能导致 job 丢失
func (c *cluster) ReRunJob(ctx context.Context, cluster string, namespace string, jobName string, resourceVersion string) error {
cs, err := c.GetClusterSetByName(ctx, cluster)
if err != nil {
return err
}

job, err := cs.Client.BatchV1().Jobs(namespace).Get(ctx, jobName, metav1.GetOptions{})
if err != nil {
return err
}
if job.ResourceVersion != resourceVersion {
return fmt.Errorf("please apply your changes to the latest and re-run")
}

newJob := *job
// 重置不必要字段
newJob.ResourceVersion = ""
newJob.ObjectMeta.UID = ""
newJob.Status = batchv1.JobStatus{}
// 重置 uid 和 label
delete(newJob.Spec.Selector.MatchLabels, "controller-uid")
delete(newJob.Spec.Selector.MatchLabels, "batch.kubernetes.io/controller-uid")
delete(newJob.Spec.Template.ObjectMeta.Labels, "controller-uid")
delete(newJob.Spec.Template.ObjectMeta.Labels, "batch.kubernetes.io/controller-uid")
delete(newJob.Spec.Template.ObjectMeta.Labels, "batch.kubernetes.io/job-name")
delete(newJob.Spec.Template.ObjectMeta.Labels, "job-name")

// TODO: 备份一次job,避免失败job丢失
// 2. 删除job
if err = cs.Client.BatchV1().Jobs(namespace).Delete(ctx, jobName, metav1.DeleteOptions{}); err != nil {
return fmt.Errorf("failed to rerun job(%s) %v", jobName, err)
}

var jobErr error
// 3. 新建job,最多重试 3 次
for i := 0; i < Retries; i++ {
_, jobErr = cs.Client.BatchV1().Jobs(namespace).Create(ctx, &newJob, metav1.CreateOptions{})
if jobErr != nil {
time.Sleep(time.Second)
continue
}
break
}
if jobErr != nil {
return fmt.Errorf("failed to rerun job(%s) %v", jobName, err)
}

return nil
}

// AggregateEvents 聚合 k8s 资源的所有 events,比如 kind 为 deployment 时,则聚合 deployment,所属 rs 以及 pod 的事件
func (c *cluster) AggregateEvents(ctx context.Context, cluster string, namespace string, name string, kind string) (*v1.EventList, error) {
clusterSet, err := c.GetClusterSetByName(ctx, cluster)
Expand Down
6 changes: 6 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ import (
"github.com/caoyingjunz/pixiu/pkg/db/model"
)

type PixiuObjectMeta struct {
Cluster string `uri:"cluster" binding:"required"`
Namespace string `uri:"namespace" binding:"required"`
Name string `uri:"name"`
}

type PixiuMeta struct {
// pixiu 对象 ID
Id int64 `json:"id"`
Expand Down
Loading