Skip to content

Commit

Permalink
feat: 接入devops schedule框架,新增媒体处理任务 #2346
Browse files Browse the repository at this point in the history
  • Loading branch information
felixncheng authored Jul 30, 2024
1 parent ae038ca commit 6a37309
Show file tree
Hide file tree
Showing 27 changed files with 459 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,7 @@ data class TemporaryTokenCreateRequest(
@ApiModelProperty("允许访问次数,为空表示无限制")
val permits: Int? = null,
@ApiModelProperty("token类型")
val type: TokenType
val type: TokenType,
@ApiModelProperty("创建人")
val createdBy: String? = null,
)
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ class TemporaryTokenServiceImpl(
token = generateToken(),
permits = permits,
type = type,
createdBy = SecurityUtils.getUserId(),
createdBy = createdBy ?: SecurityUtils.getUserId(),
createdDate = LocalDateTime.now(),
lastModifiedBy = SecurityUtils.getUserId(),
lastModifiedBy = createdBy ?: SecurityUtils.getUserId(),
lastModifiedDate = LocalDateTime.now()
)
temporaryTokenRepository.save(temporaryToken)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const val ANALYSIS_EXECUTOR_SERVICE_NAME = "\${service.prefix:}analysis-executor
const val HELM_SERVICE_NAME = "\${service.prefix:}helm\${service.suffix:}"
const val OCI_SERVICE_NAME = "\${service.prefix:}docker\${service.suffix:}"
const val JOB_SERVICE_NAME = "\${service.prefix:}job\${service.suffix:}"
const val SCHEDULE_SERVICE_NAME = "\${service.prefix:}job-schedule\${service.suffix:}"
const val FS_SERVER_SERVICE_NAME = "\${service.prefix:}fs-server\${service.suffix:}"
const val MAVEN_SERVICE_NAME = "\${service.prefix:}maven\${service.suffix:}"
const val ARCHIVE_SERVICE_NAME = "\${service.prefix:}archive\${service.suffix:}"
Expand Down
4 changes: 4 additions & 0 deletions src/backend/job/api-schedule/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies {
implementation(project(":common:common-api"))
compileOnly("org.springframework.cloud:spring-cloud-openfeign-core")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.tencent.bkrepo.job.schedule.api

import com.tencent.bkrepo.common.api.constant.SCHEDULE_SERVICE_NAME
import org.springframework.cloud.openfeign.FeignClient
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping

@FeignClient(SCHEDULE_SERVICE_NAME, contextId = "JobScheduleClient")
@RequestMapping("/service/job")
interface JobScheduleClient {
/**
* 触发任务执行
* @param id 任务id
* @param executorParam 执行参数
* */
@PostMapping("/trigger/{id}")
fun triggerJob(@PathVariable id: String, @RequestBody executorParam: String)
}
6 changes: 6 additions & 0 deletions src/backend/job/boot-job-schedule/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
dependencies {
implementation(project(":common:common-service"))
implementation(project(":common:common-security"))
implementation(project(":job:api-schedule"))
implementation("com.tencent.devops:devops-boot-starter-schedule-server")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.tencent.bkrepo.job.schedule

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class JobScheduleApplication

fun main(args: Array<String>) {
runApplication<JobScheduleApplication>(*args)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.tencent.bkrepo.job.schedule.config

import com.tencent.bkrepo.common.security.http.core.HttpAuthSecurity
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration
class JobScheduleConfiguration {
@Bean
fun httpAuthSecurity(): HttpAuthSecurity {
return HttpAuthSecurity()
.withPrefix("/schedule")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.tencent.bkrepo.job.schedule.controller

import com.tencent.bkrepo.job.schedule.api.JobScheduleClient
import com.tencent.devops.schedule.manager.JobManager
import org.springframework.web.bind.annotation.RestController

@RestController
class JobScheduleController(
private val jobManager: JobManager,
) : JobScheduleClient {
override fun triggerJob(id: String, executorParam: String) {
jobManager.triggerJob(id, executorParam)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
server.port: 25913
spring.application.name: job-schedule

devops:
schedule:
server:
auth:
access-token: xxx
4 changes: 4 additions & 0 deletions src/backend/job/boot-job-worker/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
dependencies {
implementation(project(":common:common-service"))
implementation("com.tencent.devops:devops-boot-starter-schedule-worker")
}
44 changes: 44 additions & 0 deletions src/backend/job/boot-job-worker/shell/media-process.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
#!/bin/bash
get_field() {
local field_name=$1
field_value=$(echo $DEVOPS_SCHEDULE_JOB_PARAMETERS| jq -r ".$field_name")
if [ -z "$field_value" ]; then
echo "Error: Field '$field_name' not found in input file"
exit 1
fi
echo "$field_value"
}

echo "1. 获取参数"
inputUrl=$(get_field inputUrl)
callbackUrl=$(get_field callbackUrl)
inputFileName=$(get_field inputFileName)
scale=$(get_field scale)
videoCodec=$(get_field videoCodec)
outputFileName=$(get_field outputFileName)

echo "2. 下载待转码文件 - $inputFileName"
# 使用 -s 参数静默执行,-w "%{http_code}" 输出HTTP状态码
http_status=$(curl -s -w "%{http_code}" -o $inputFileName $inputUrl)
if [ $http_status -ne 200 ];then
echo "文件下载失败[$http_status],Url: $inputUrl"
exit 1
fi
ls -l

echo "3. 开始转码 - $inputFileName > $outputFileName"
echo "ffmpeg -i $inputFileName -vf scale=$scale -c:a copy -c:v $videoCodec $outputFileName"
ffmpeg -i $inputFileName -vf scale=$scale -c:a copy -c:v $videoCodec $outputFileName
if [ $? -ne 0 ];then
echo "转码失败"
exit 1
fi

echo "4. 上传转码后的文件 - $outputFileName"
http_status=$(curl -s -w "%{http_code}" -X PUT -T $outputFileName "$callbackUrl")
if [ $http_status -ne 200 ];then
echo 文件上传失败[$http_status],Url: $callbackUrl
exit 1
fi

echo "转码完成 - $inputFileName"
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.tencent.bkrepo.job.worker

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class JobWorkerApplication

fun main(args: Array<String>) {
runApplication<JobWorkerApplication>(*args)
}
10 changes: 10 additions & 0 deletions src/backend/job/boot-job-worker/src/main/resources/bootstrap.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
server.port: 25821
spring.application.name: job-worker

devops:
schedule:
worker:
mode: DISCOVERY
server:
address: http://${service.prefix}job-schedule
access-token: xxx
1 change: 1 addition & 0 deletions src/backend/media/biz-media/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dependencies {
api(project(":common:common-artifact:artifact-service"))
api(project(":job:api-schedule"))
implementation("org.bytedeco:javacpp:${Versions.JavaCpp}")
implementation("org.bytedeco:ffmpeg:${Versions.FFmpegPlatform}")
implementation("org.bytedeco:javacpp:${Versions.JavaCpp}:windows-x86_64")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,16 @@
package com.tencent.bkrepo.media.artifact.repository

import com.tencent.bkrepo.common.artifact.repository.context.ArtifactRemoveContext
import com.tencent.bkrepo.common.artifact.repository.local.LocalRepository
import com.tencent.bkrepo.repository.pojo.node.service.NodeDeleteRequest
import org.springframework.stereotype.Component

@Component
class MediaLocalRepository : LocalRepository()
class MediaLocalRepository : LocalRepository() {
override fun remove(context: ArtifactRemoveContext) {
with(context.artifactInfo) {
val nodeDeleteRequest = NodeDeleteRequest(projectId, repoName, getArtifactFullPath(), context.userId)
nodeClient.deleteNode(nodeDeleteRequest)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package com.tencent.bkrepo.media.config

import cn.hutool.core.io.unit.DataSize
import com.tencent.bkrepo.media.stream.TranscodeConfig
import org.springframework.boot.context.properties.ConfigurationProperties

@ConfigurationProperties(prefix = "media")
data class MediaProperties(
var maxRecordFileSize: DataSize = DataSize.ofGigabytes(100),
var serverAddress: String = "",
var fileExpireDays: Int = 180,
var transcodeConfig: Map<String, TranscodeConfig> = mutableMapOf(),
var repoHost: String = "",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package com.tencent.bkrepo.media.controller

import com.tencent.bkrepo.auth.pojo.token.TokenType
import com.tencent.bkrepo.common.artifact.api.ArtifactFile
import com.tencent.bkrepo.media.artifact.MediaArtifactInfo
import com.tencent.bkrepo.media.artifact.MediaArtifactInfo.Companion.DEFAULT_STREAM_MAPPING_URI
import com.tencent.bkrepo.media.service.TokenService
import com.tencent.bkrepo.media.service.TranscodeService
import com.tencent.bkrepo.media.stream.TranscodeConfig
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PutMapping
import org.springframework.web.bind.annotation.RequestAttribute
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RequestParam
import org.springframework.web.bind.annotation.RestController

/**
* 转码资源控制器
* 用于临时下载文件,和上传转码后的文件
* */
@RestController
@RequestMapping("/user/transcode")
class TranscodeController(
private val tokenService: TokenService,
private val transcodeService: TranscodeService,
) {
/**
* 下载源文件
* @param artifactInfo 要下载的源文件
* @param token 下载token
* */
@GetMapping("/download/$DEFAULT_STREAM_MAPPING_URI")
fun download(
artifactInfo: MediaArtifactInfo,
@RequestParam token: String,
) {
val tokenInfo = tokenService.validateToken(token, artifactInfo, TokenType.DOWNLOAD)
transcodeService.download(artifactInfo)
tokenService.decrementPermits(tokenInfo)
}

/**
* 回传转码后文件
* @param artifactInfo 转码后的文件信息
* @param file 转码后的文件
* @param token 上传token
* @param origin 转码源文件完整路径
* */
@PutMapping("/upload/$DEFAULT_STREAM_MAPPING_URI")
fun callback(
artifactInfo: MediaArtifactInfo,
file: ArtifactFile,
@RequestParam token: String,
@RequestParam origin: String,
@RequestParam originToken: String,
) {
val tokenInfo = tokenService.validateToken(token, artifactInfo, TokenType.UPLOAD)
val originArtifactInfo = MediaArtifactInfo(artifactInfo.projectId, artifactInfo.repoName, origin)
val originTokenInfo = tokenService.validateToken(originToken, originArtifactInfo, TokenType.UPLOAD)
transcodeService.transcodeCallback(artifactInfo, file, originArtifactInfo)
tokenService.decrementPermits(tokenInfo)
tokenService.decrementPermits(originTokenInfo)
}

/**
* 视频转码
* @param artifactInfo 待转码文件
* @param transcodeConfig 转码配置
* */
@PutMapping(DEFAULT_STREAM_MAPPING_URI)
fun transcode(
artifactInfo: MediaArtifactInfo,
@RequestAttribute userId: String,
@RequestBody transcodeConfig: TranscodeConfig,
) {
transcodeService.transcode(artifactInfo, transcodeConfig, userId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@ import com.tencent.bkrepo.common.storage.core.StorageProperties
import com.tencent.bkrepo.media.STREAM_PATH
import com.tencent.bkrepo.media.artifact.MediaArtifactInfo
import com.tencent.bkrepo.media.config.MediaProperties
import com.tencent.bkrepo.media.stream.ArtifactFileConsumer
import com.tencent.bkrepo.media.stream.MediaArtifactFileConsumer
import com.tencent.bkrepo.media.stream.ArtifactFileRecordingListener
import com.tencent.bkrepo.media.stream.ClientStream
import com.tencent.bkrepo.media.stream.MediaType
import com.tencent.bkrepo.media.stream.RemuxRecordingListener
import com.tencent.bkrepo.media.stream.StreamManger
import com.tencent.bkrepo.media.stream.StreamMode
import com.tencent.bkrepo.media.stream.TranscodeConfig
import com.tencent.bkrepo.repository.api.NodeClient
import com.tencent.bkrepo.repository.api.RepositoryClient
import com.tencent.bkrepo.repository.pojo.node.service.NodeCreateRequest
Expand All @@ -38,6 +39,7 @@ class StreamService(
private val storageManager: StorageManager,
private val storageProperties: StorageProperties,
private val streamManger: StreamManger,
private val transcodeService: TranscodeService,
) : ArtifactService() {

/**
Expand Down Expand Up @@ -74,7 +76,7 @@ class StreamService(
fullPathSet = setOf(STREAM_PATH),
type = TokenType.UPLOAD,
)
val token = tokenService.createToken(temporaryTokenRequest)
val token = tokenService.createToken(temporaryTokenRequest).firstOrNull()
return "${mediaProperties.serverAddress}/$projectId/$repoName$STREAM_PATH?token=$token"
}

Expand All @@ -100,12 +102,13 @@ class StreamService(
val repoId = ArtifactContextHolder.RepositoryId(projectId, repoName)
val repo = ArtifactContextHolder.getRepoDetail(repoId)
val credentials = repo.storageCredentials ?: storageProperties.defaultStorageCredentials()
val fileConsumer = ArtifactFileConsumer(
val fileConsumer = MediaArtifactFileConsumer(
storageManager,
transcodeService,
repo,
userId,
STREAM_PATH,
mediaProperties.fileExpireDays,
getTranscodeConfig(projectId),
)
val recordingListener = if (remux) {
RemuxRecordingListener(credentials.upload.location, scheduler, saveType, fileConsumer)
Expand All @@ -130,7 +133,13 @@ class StreamService(
repository.download(context)
}

private fun getTranscodeConfig(projectId: String): TranscodeConfig? {
val transcodeConfig = mediaProperties.transcodeConfig
return transcodeConfig[projectId] ?: transcodeConfig[DEFAULT]
}

companion object {
private val logger = LoggerFactory.getLogger(StreamService::class.java)
private const val DEFAULT = "default"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ class TokenService(
private val temporaryTokenClient: ServiceTemporaryTokenClient,
) {

fun createToken(tokenCreateRequest: TemporaryTokenCreateRequest): String {
return temporaryTokenClient.createToken(tokenCreateRequest).data?.firstOrNull()?.token.orEmpty()
fun createToken(tokenCreateRequest: TemporaryTokenCreateRequest): List<String> {
return temporaryTokenClient.createToken(tokenCreateRequest).data?.map { it.token }.orEmpty()
}

/**
Expand Down
Loading

0 comments on commit 6a37309

Please sign in to comment.