Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): support deployment rolling update #1112

Merged
merged 4 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 33 additions & 43 deletions server/src/instance/instance-task.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ export class InstanceTaskService {

constructor(
private readonly instanceService: InstanceService,
private readonly cronService: CronJobService,
) {}
) { }


@Cron(CronExpression.EVERY_SECOND)
async tick() {
Expand Down Expand Up @@ -47,16 +47,10 @@ export class InstanceTaskService {
this.logger.debug(err?.response?.toJSON() || JSON.stringify(err))
})

// Phase `Started` -> `Stopping`
this.handleRestartingStateDown().catch((err) => {
this.logger.error('handleRestartingStateDown error', err)
this.logger.debug(err?.response?.toJSON() || JSON.stringify(err))
})

// Phase `Stopped` -> `Starting`
this.handleRestartingStateUp().catch((err) => {
this.logger.error('handleRestartingStateUp error', err)
this.logger.debug(err?.response?.toJSON() || JSON.stringify(err))
// Phase `Started` -> `Starting`
this.handleRestartingState().catch((err) => {
this.logger.error('handleRestartingPhase error', err)
err?.response && this.logger.debug(err?.response?.data || err?.response)
})
}

Expand Down Expand Up @@ -128,6 +122,12 @@ export class InstanceTaskService {

const appid = app.appid
const instance = await this.instanceService.get(app)
const unavailable = instance.deployment?.status?.unavailableReplicas || false
if (unavailable) {
await this.relock(appid, waitingTime)
return
}

const available = isConditionTrue(
'Available',
instance.deployment?.status?.conditions || [],
Expand Down Expand Up @@ -247,42 +247,31 @@ export class InstanceTaskService {
this.logger.log(`Application ${app.appid} updated to phase Stopped`)
}

/**
* State `Restarting`:
* - move phase `Started` to `Stopping`
*/
async handleRestartingStateDown() {

async handleRestartingState() {


const db = SystemDatabase.db

await db.collection<Application>('Application').updateMany(
{
state: ApplicationState.Restarting,
phase: ApplicationPhase.Started,
lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) },
},
{
$set: {
phase: ApplicationPhase.Stopping,
lockedAt: TASK_LOCK_INIT_TIME,
updatedAt: new Date(),
const res = await db
.collection<Application>('Application')
.findOneAndUpdate(
{
state: ApplicationState.Restarting,
phase: ApplicationPhase.Started,
lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) },
},
},
)
}
{ $set: { lockedAt: new Date() } },
)

/**
* State `Restarting`:
* - move phase `Stopped` to `Starting`
*/
async handleRestartingStateUp() {
const db = SystemDatabase.db
if (!res.value) return
const app = res.value

await db.collection<Application>('Application').updateMany(
{
state: ApplicationState.Restarting,
phase: ApplicationPhase.Stopped,
lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) },
},
await this.instanceService.restart(app.appid)

// update application phase to `Starting`
await db.collection<Application>('Application').updateOne(
{ appid: app.appid, phase: ApplicationPhase.Started },
{
$set: {
phase: ApplicationPhase.Starting,
Expand All @@ -291,6 +280,7 @@ export class InstanceTaskService {
},
},
)
this.logger.log(`Application ${app.appid} updated to phase Starting`)
}

/**
Expand Down
228 changes: 130 additions & 98 deletions server/src/instance/instance.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { V1Deployment } from '@kubernetes/client-node'
import { V1Deployment, V1DeploymentSpec } from '@kubernetes/client-node'
import { Injectable, Logger } from '@nestjs/common'
import { GetApplicationNamespaceByAppId } from '../utils/getter'
import {
Expand Down Expand Up @@ -26,7 +26,7 @@ export class InstanceService {
private readonly storageService: StorageService,
private readonly databaseService: DatabaseService,
private readonly prisma: PrismaService,
) {}
) { }

async create(app: Application) {
const appid = app.appid
Expand Down Expand Up @@ -67,6 +67,127 @@ export class InstanceService {
// add bundle label
labels[LABEL_KEY_BUNDLE] = app.bundle.name

// create deployment
const data = new V1Deployment()
data.metadata = { name: app.appid, labels }
data.spec = await this.makeDeploymentSpec(app, labels)

const appsV1Api = this.clusterService.makeAppsV1Api(app.region)
const res = await appsV1Api.createNamespacedDeployment(namespace, data)

this.logger.log(`create k8s deployment ${res.body?.metadata?.name}`)

return res.body
}

async createService(app: ApplicationWithRegion, labels: any) {
const namespace = GetApplicationNamespaceByAppId(app.appid)
const serviceName = app.appid
const coreV1Api = this.clusterService.makeCoreV1Api(app.region)
const res = await coreV1Api.createNamespacedService(namespace, {
metadata: { name: serviceName, labels },
spec: {
selector: labels,
type: 'ClusterIP',
ports: [{ port: 8000, targetPort: 8000, protocol: 'TCP' }],
},
})
this.logger.log(`create k8s service ${res.body?.metadata?.name}`)
return res.body
}

async remove(app: Application) {
const appid = app.appid
const region = await this.regionService.findByAppId(appid)
const { deployment, service } = await this.get(app)

const namespace = await this.clusterService.getAppNamespace(
region,
app.appid,
)
if (!namespace) return

const appsV1Api = this.clusterService.makeAppsV1Api(region)
const coreV1Api = this.clusterService.makeCoreV1Api(region)

if (deployment) {
await appsV1Api.deleteNamespacedDeployment(appid, namespace.metadata.name)
}
if (service) {
const name = appid
await coreV1Api.deleteNamespacedService(name, namespace.metadata.name)
}
this.logger.log(`remove k8s deployment ${deployment?.metadata?.name}`)
}

async get(app: Application) {
const region = await this.regionService.findByAppId(app.appid)
const namespace = await this.clusterService.getAppNamespace(
region,
app.appid,
)
if (!namespace) {
return { deployment: null, service: null }
}

const appWithRegion = { ...app, region }
const deployment = await this.getDeployment(appWithRegion)
const service = await this.getService(appWithRegion)
return { deployment, service }
}

async getDeployment(app: ApplicationWithRegion) {
const appid = app.appid
const appsV1Api = this.clusterService.makeAppsV1Api(app.region)
try {
const namespace = GetApplicationNamespaceByAppId(appid)
const res = await appsV1Api.readNamespacedDeployment(appid, namespace)
return res.body
} catch (error) {
if (error?.response?.body?.reason === 'NotFound') return null
throw error
}
}

async getService(app: ApplicationWithRegion) {
const appid = app.appid
const coreV1Api = this.clusterService.makeCoreV1Api(app.region)

try {
const serviceName = appid
const namespace = GetApplicationNamespaceByAppId(appid)
const res = await coreV1Api.readNamespacedService(serviceName, namespace)
return res.body
} catch (error) {
if (error?.response?.body?.reason === 'NotFound') return null
throw error
}
}

// 修改整个spec
async restart(appid: string) {
const app = await this.prisma.application.findUnique({
where: { appid },
include: {
configuration: true,
bundle: true,
runtime: true,
region: true,
},
})
const { deployment } = await this.get(app)
deployment.spec = await this.makeDeploymentSpec(app, deployment.spec.template.metadata.labels)
const region = await this.regionService.findByAppId(app.appid)
const appsV1Api = this.clusterService.makeAppsV1Api(region)
const namespace = GetApplicationNamespaceByAppId(app.appid)
const res = await appsV1Api.replaceNamespacedDeployment(app.appid, namespace, deployment)

this.logger.log(`restart k8s deployment ${res.body?.metadata?.name}`)

}

async makeDeploymentSpec(app: any, labels: any): Promise<V1DeploymentSpec> {

// prepare params
const limitMemory = app.bundle.resource.limitMemory
const limitCpu = app.bundle.resource.limitCPU
Expand All @@ -78,13 +199,13 @@ export class InstanceService {
const dependencies_string = dependencies.join(' ')

// db connection uri
const database = await this.databaseService.findOne(appid)
const database = await this.databaseService.findOne(app.appid)
const dbConnectionUri = this.databaseService.getInternalConnectionUri(
app.region,
database,
)

const storage = await this.storageService.findOne(appid)
const storage = await this.storageService.findOne(app.appid)

const env = [
{ name: 'DB_URI', value: dbConnectionUri },
Expand All @@ -105,6 +226,9 @@ export class InstanceService {
value: `--max_old_space_size=${max_old_space_size} --max-http-header-size=${max_http_header_size}`,
},
{ name: 'DEPENDENCIES', value: dependencies_string },
{
name: 'RESTART_AT', value: new Date().getTime().toString(),
}
]

// merge env from app configuration, override if exists
Expand All @@ -118,10 +242,7 @@ export class InstanceService {
}
})

// create deployment
const data = new V1Deployment()
data.metadata = { name: app.appid, labels }
data.spec = {
const spec = {
replicas: 1,
selector: { matchLabels: labels },
template: {
Expand Down Expand Up @@ -234,95 +355,6 @@ export class InstanceService {
}, // end of spec {}
}, // end of template {}
}

const appsV1Api = this.clusterService.makeAppsV1Api(app.region)
const res = await appsV1Api.createNamespacedDeployment(namespace, data)

this.logger.log(`create k8s deployment ${res.body?.metadata?.name}`)

return res.body
}

async createService(app: ApplicationWithRegion, labels: any) {
const namespace = GetApplicationNamespaceByAppId(app.appid)
const serviceName = app.appid
const coreV1Api = this.clusterService.makeCoreV1Api(app.region)
const res = await coreV1Api.createNamespacedService(namespace, {
metadata: { name: serviceName, labels },
spec: {
selector: labels,
type: 'ClusterIP',
ports: [{ port: 8000, targetPort: 8000, protocol: 'TCP' }],
},
})
this.logger.log(`create k8s service ${res.body?.metadata?.name}`)
return res.body
}

async remove(app: Application) {
const appid = app.appid
const region = await this.regionService.findByAppId(appid)
const { deployment, service } = await this.get(app)

const namespace = await this.clusterService.getAppNamespace(
region,
app.appid,
)
if (!namespace) return

const appsV1Api = this.clusterService.makeAppsV1Api(region)
const coreV1Api = this.clusterService.makeCoreV1Api(region)

if (deployment) {
await appsV1Api.deleteNamespacedDeployment(appid, namespace.metadata.name)
}
if (service) {
const name = appid
await coreV1Api.deleteNamespacedService(name, namespace.metadata.name)
}
}

async get(app: Application) {
const region = await this.regionService.findByAppId(app.appid)
const namespace = await this.clusterService.getAppNamespace(
region,
app.appid,
)
if (!namespace) {
return { deployment: null, service: null }
}

const appWithRegion = { ...app, region }
const deployment = await this.getDeployment(appWithRegion)
const service = await this.getService(appWithRegion)
return { deployment, service }
}

async getDeployment(app: ApplicationWithRegion) {
const appid = app.appid
const appsV1Api = this.clusterService.makeAppsV1Api(app.region)
try {
const namespace = GetApplicationNamespaceByAppId(appid)
const res = await appsV1Api.readNamespacedDeployment(appid, namespace)
return res.body
} catch (error) {
if (error?.response?.body?.reason === 'NotFound') return null
throw error
}
}

async getService(app: ApplicationWithRegion) {
const appid = app.appid
const coreV1Api = this.clusterService.makeCoreV1Api(app.region)

try {
const serviceName = appid
const namespace = GetApplicationNamespaceByAppId(appid)
const res = await coreV1Api.readNamespacedService(serviceName, namespace)
return res.body
} catch (error) {
if (error?.response?.body?.reason === 'NotFound') return null
throw error
}
return spec
}
}