diff --git a/server/src/instance/instance-task.service.ts b/server/src/instance/instance-task.service.ts index c5997ec6f3..1d49c36a63 100644 --- a/server/src/instance/instance-task.service.ts +++ b/server/src/instance/instance-task.service.ts @@ -14,8 +14,8 @@ export class InstanceTaskService { constructor( private readonly instanceService: InstanceService, - private readonly cronService: CronJobService, - ) {} + ) { } + @Cron(CronExpression.EVERY_SECOND) async tick() { @@ -47,16 +47,10 @@ export class InstanceTaskService { this.logger.debug(err?.response?.toJSON() || JSON.stringify(err)) }) - // Phase `Started` -> `Stopping` - this.handleRestartingStateDown().catch((err) => { - this.logger.error('handleRestartingStateDown error', err) - this.logger.debug(err?.response?.toJSON() || JSON.stringify(err)) - }) - - // Phase `Stopped` -> `Starting` - this.handleRestartingStateUp().catch((err) => { - this.logger.error('handleRestartingStateUp error', err) - this.logger.debug(err?.response?.toJSON() || JSON.stringify(err)) + // Phase `Started` -> `Starting` + this.handleRestartingState().catch((err) => { + this.logger.error('handleRestartingPhase error', err) + err?.response && this.logger.debug(err?.response?.data || err?.response) }) } @@ -128,6 +122,12 @@ export class InstanceTaskService { const appid = app.appid const instance = await this.instanceService.get(app) + const unavailable = instance.deployment?.status?.unavailableReplicas || false + if (unavailable) { + await this.relock(appid, waitingTime) + return + } + const available = isConditionTrue( 'Available', instance.deployment?.status?.conditions || [], @@ -247,42 +247,31 @@ export class InstanceTaskService { this.logger.log(`Application ${app.appid} updated to phase Stopped`) } - /** - * State `Restarting`: - * - move phase `Started` to `Stopping` - */ - async handleRestartingStateDown() { + + async handleRestartingState() { + + const db = SystemDatabase.db - await db.collection('Application').updateMany( - { - state: ApplicationState.Restarting, - phase: ApplicationPhase.Started, - lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) }, - }, - { - $set: { - phase: ApplicationPhase.Stopping, - lockedAt: TASK_LOCK_INIT_TIME, - updatedAt: new Date(), + const res = await db + .collection('Application') + .findOneAndUpdate( + { + state: ApplicationState.Restarting, + phase: ApplicationPhase.Started, + lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) }, }, - }, - ) - } + { $set: { lockedAt: new Date() } }, + ) - /** - * State `Restarting`: - * - move phase `Stopped` to `Starting` - */ - async handleRestartingStateUp() { - const db = SystemDatabase.db + if (!res.value) return + const app = res.value - await db.collection('Application').updateMany( - { - state: ApplicationState.Restarting, - phase: ApplicationPhase.Stopped, - lockedAt: { $lt: new Date(Date.now() - 1000 * this.lockTimeout) }, - }, + await this.instanceService.restart(app.appid) + + // update application phase to `Starting` + await db.collection('Application').updateOne( + { appid: app.appid, phase: ApplicationPhase.Started }, { $set: { phase: ApplicationPhase.Starting, @@ -291,6 +280,7 @@ export class InstanceTaskService { }, }, ) + this.logger.log(`Application ${app.appid} updated to phase Starting`) } /** diff --git a/server/src/instance/instance.service.ts b/server/src/instance/instance.service.ts index e4b62be7e5..19ed43a70c 100644 --- a/server/src/instance/instance.service.ts +++ b/server/src/instance/instance.service.ts @@ -1,4 +1,4 @@ -import { V1Deployment } from '@kubernetes/client-node' +import { V1Deployment, V1DeploymentSpec } from '@kubernetes/client-node' import { Injectable, Logger } from '@nestjs/common' import { GetApplicationNamespaceByAppId } from '../utils/getter' import { @@ -26,7 +26,7 @@ export class InstanceService { private readonly storageService: StorageService, private readonly databaseService: DatabaseService, private readonly prisma: PrismaService, - ) {} + ) { } async create(app: Application) { const appid = app.appid @@ -67,6 +67,127 @@ export class InstanceService { // add bundle label labels[LABEL_KEY_BUNDLE] = app.bundle.name + // create deployment + const data = new V1Deployment() + data.metadata = { name: app.appid, labels } + data.spec = await this.makeDeploymentSpec(app, labels) + + const appsV1Api = this.clusterService.makeAppsV1Api(app.region) + const res = await appsV1Api.createNamespacedDeployment(namespace, data) + + this.logger.log(`create k8s deployment ${res.body?.metadata?.name}`) + + return res.body + } + + async createService(app: ApplicationWithRegion, labels: any) { + const namespace = GetApplicationNamespaceByAppId(app.appid) + const serviceName = app.appid + const coreV1Api = this.clusterService.makeCoreV1Api(app.region) + const res = await coreV1Api.createNamespacedService(namespace, { + metadata: { name: serviceName, labels }, + spec: { + selector: labels, + type: 'ClusterIP', + ports: [{ port: 8000, targetPort: 8000, protocol: 'TCP' }], + }, + }) + this.logger.log(`create k8s service ${res.body?.metadata?.name}`) + return res.body + } + + async remove(app: Application) { + const appid = app.appid + const region = await this.regionService.findByAppId(appid) + const { deployment, service } = await this.get(app) + + const namespace = await this.clusterService.getAppNamespace( + region, + app.appid, + ) + if (!namespace) return + + const appsV1Api = this.clusterService.makeAppsV1Api(region) + const coreV1Api = this.clusterService.makeCoreV1Api(region) + + if (deployment) { + await appsV1Api.deleteNamespacedDeployment(appid, namespace.metadata.name) + } + if (service) { + const name = appid + await coreV1Api.deleteNamespacedService(name, namespace.metadata.name) + } + this.logger.log(`remove k8s deployment ${deployment?.metadata?.name}`) + } + + async get(app: Application) { + const region = await this.regionService.findByAppId(app.appid) + const namespace = await this.clusterService.getAppNamespace( + region, + app.appid, + ) + if (!namespace) { + return { deployment: null, service: null } + } + + const appWithRegion = { ...app, region } + const deployment = await this.getDeployment(appWithRegion) + const service = await this.getService(appWithRegion) + return { deployment, service } + } + + async getDeployment(app: ApplicationWithRegion) { + const appid = app.appid + const appsV1Api = this.clusterService.makeAppsV1Api(app.region) + try { + const namespace = GetApplicationNamespaceByAppId(appid) + const res = await appsV1Api.readNamespacedDeployment(appid, namespace) + return res.body + } catch (error) { + if (error?.response?.body?.reason === 'NotFound') return null + throw error + } + } + + async getService(app: ApplicationWithRegion) { + const appid = app.appid + const coreV1Api = this.clusterService.makeCoreV1Api(app.region) + + try { + const serviceName = appid + const namespace = GetApplicationNamespaceByAppId(appid) + const res = await coreV1Api.readNamespacedService(serviceName, namespace) + return res.body + } catch (error) { + if (error?.response?.body?.reason === 'NotFound') return null + throw error + } + } + + // 修改整个spec + async restart(appid: string) { + const app = await this.prisma.application.findUnique({ + where: { appid }, + include: { + configuration: true, + bundle: true, + runtime: true, + region: true, + }, + }) + const { deployment } = await this.get(app) + deployment.spec = await this.makeDeploymentSpec(app, deployment.spec.template.metadata.labels) + const region = await this.regionService.findByAppId(app.appid) + const appsV1Api = this.clusterService.makeAppsV1Api(region) + const namespace = GetApplicationNamespaceByAppId(app.appid) + const res = await appsV1Api.replaceNamespacedDeployment(app.appid, namespace, deployment) + + this.logger.log(`restart k8s deployment ${res.body?.metadata?.name}`) + + } + + async makeDeploymentSpec(app: any, labels: any): Promise { + // prepare params const limitMemory = app.bundle.resource.limitMemory const limitCpu = app.bundle.resource.limitCPU @@ -78,13 +199,13 @@ export class InstanceService { const dependencies_string = dependencies.join(' ') // db connection uri - const database = await this.databaseService.findOne(appid) + const database = await this.databaseService.findOne(app.appid) const dbConnectionUri = this.databaseService.getInternalConnectionUri( app.region, database, ) - const storage = await this.storageService.findOne(appid) + const storage = await this.storageService.findOne(app.appid) const env = [ { name: 'DB_URI', value: dbConnectionUri }, @@ -105,6 +226,9 @@ export class InstanceService { value: `--max_old_space_size=${max_old_space_size} --max-http-header-size=${max_http_header_size}`, }, { name: 'DEPENDENCIES', value: dependencies_string }, + { + name: 'RESTART_AT', value: new Date().getTime().toString(), + } ] // merge env from app configuration, override if exists @@ -118,10 +242,7 @@ export class InstanceService { } }) - // create deployment - const data = new V1Deployment() - data.metadata = { name: app.appid, labels } - data.spec = { + const spec = { replicas: 1, selector: { matchLabels: labels }, template: { @@ -234,95 +355,6 @@ export class InstanceService { }, // end of spec {} }, // end of template {} } - - const appsV1Api = this.clusterService.makeAppsV1Api(app.region) - const res = await appsV1Api.createNamespacedDeployment(namespace, data) - - this.logger.log(`create k8s deployment ${res.body?.metadata?.name}`) - - return res.body - } - - async createService(app: ApplicationWithRegion, labels: any) { - const namespace = GetApplicationNamespaceByAppId(app.appid) - const serviceName = app.appid - const coreV1Api = this.clusterService.makeCoreV1Api(app.region) - const res = await coreV1Api.createNamespacedService(namespace, { - metadata: { name: serviceName, labels }, - spec: { - selector: labels, - type: 'ClusterIP', - ports: [{ port: 8000, targetPort: 8000, protocol: 'TCP' }], - }, - }) - this.logger.log(`create k8s service ${res.body?.metadata?.name}`) - return res.body - } - - async remove(app: Application) { - const appid = app.appid - const region = await this.regionService.findByAppId(appid) - const { deployment, service } = await this.get(app) - - const namespace = await this.clusterService.getAppNamespace( - region, - app.appid, - ) - if (!namespace) return - - const appsV1Api = this.clusterService.makeAppsV1Api(region) - const coreV1Api = this.clusterService.makeCoreV1Api(region) - - if (deployment) { - await appsV1Api.deleteNamespacedDeployment(appid, namespace.metadata.name) - } - if (service) { - const name = appid - await coreV1Api.deleteNamespacedService(name, namespace.metadata.name) - } - } - - async get(app: Application) { - const region = await this.regionService.findByAppId(app.appid) - const namespace = await this.clusterService.getAppNamespace( - region, - app.appid, - ) - if (!namespace) { - return { deployment: null, service: null } - } - - const appWithRegion = { ...app, region } - const deployment = await this.getDeployment(appWithRegion) - const service = await this.getService(appWithRegion) - return { deployment, service } - } - - async getDeployment(app: ApplicationWithRegion) { - const appid = app.appid - const appsV1Api = this.clusterService.makeAppsV1Api(app.region) - try { - const namespace = GetApplicationNamespaceByAppId(appid) - const res = await appsV1Api.readNamespacedDeployment(appid, namespace) - return res.body - } catch (error) { - if (error?.response?.body?.reason === 'NotFound') return null - throw error - } - } - - async getService(app: ApplicationWithRegion) { - const appid = app.appid - const coreV1Api = this.clusterService.makeCoreV1Api(app.region) - - try { - const serviceName = appid - const namespace = GetApplicationNamespaceByAppId(appid) - const res = await coreV1Api.readNamespacedService(serviceName, namespace) - return res.body - } catch (error) { - if (error?.response?.body?.reason === 'NotFound') return null - throw error - } + return spec } }