Skip to content

Commit

Permalink
fix: fs服务缓存存储降级时异常 TencentBlueKing#2663
Browse files Browse the repository at this point in the history
  • Loading branch information
yaoxuwan committed Oct 25, 2024
1 parent d9cd99e commit 876b358
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ class StorageHealthMonitor(
}
}

private fun changeToUnhealthy() {
private fun changeToUnhealthy() {
// 修改状态
if (healthy.compareAndSet(true, false)) {
logger.error("Path[${getPrimaryPath()}] change to unhealthy, reason: $fallBackReason")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ package com.tencent.bkrepo.fs.server.service

import com.tencent.bkrepo.common.artifact.stream.ArtifactInputStream
import com.tencent.bkrepo.common.artifact.stream.Range
import com.tencent.bkrepo.common.metadata.client.RRepositoryClient
import com.tencent.bkrepo.common.metadata.constant.FAKE_MD5
import com.tencent.bkrepo.common.metadata.constant.FAKE_SHA256
import com.tencent.bkrepo.common.metadata.model.TBlockNode
import com.tencent.bkrepo.common.metadata.client.RRepositoryClient
import com.tencent.bkrepo.common.metadata.service.metadata.RMetadataService
import com.tencent.bkrepo.fs.server.config.properties.StreamProperties
import com.tencent.bkrepo.fs.server.constant.FS_ATTR_KEY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ package com.tencent.bkrepo.fs.server.storage
import com.tencent.bkrepo.common.api.constant.StringPool
import com.tencent.bkrepo.common.artifact.stream.DigestCalculateListener
import com.tencent.bkrepo.common.artifact.stream.closeQuietly
import com.tencent.bkrepo.common.storage.config.MonitorProperties
import com.tencent.bkrepo.common.storage.config.ReceiveProperties
import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitor
import com.tencent.bkrepo.common.storage.monitor.Throughput
import com.tencent.bkrepo.common.storage.util.createFile
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.awaitSingleOrNull
import kotlinx.coroutines.withContext
import org.slf4j.LoggerFactory
import org.springframework.core.io.buffer.DataBuffer
import org.springframework.core.io.buffer.DataBufferUtils
Expand All @@ -45,6 +49,7 @@ import reactor.core.publisher.Sinks
import reactor.core.scheduler.Schedulers
import java.io.ByteArrayInputStream
import java.io.File
import java.io.IOException
import java.io.InputStream
import java.nio.channels.AsynchronousFileChannel
import java.nio.file.Files
Expand All @@ -56,10 +61,22 @@ import java.nio.file.StandardOpenOption
* */
class CoArtifactDataReceiver(
receiveProperties: ReceiveProperties,
monitorProperties: MonitorProperties,
private var path: Path,
private val fileName: String = generateRandomName(),
) : StorageHealthMonitor.Observer {

/**
* 传输过程中发生存储降级时,是否将数据转移到本地磁盘
*/
private val enableTransfer = monitorProperties.enableTransfer

/**
* 数据传输buffer大小
*/
private val bufferSize = receiveProperties.bufferSize.toBytes().toInt()


/**
* 文件内存接收阈值
* */
Expand Down Expand Up @@ -93,13 +110,11 @@ class CoArtifactDataReceiver(
/**
* 文件异步接收通道
* */
private val channel: AsynchronousFileChannel by lazy {
AsynchronousFileChannel.open(
filePath,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
)
}
private var channel: AsynchronousFileChannel = AsynchronousFileChannel.open(
filePath,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
)

/**
* 是否降级
Expand All @@ -111,6 +126,11 @@ class CoArtifactDataReceiver(
* */
var finished: Boolean = false

/**
* 数据是否转移到本地磁盘
*/
private var hasTransferred: Boolean = false

/**
* 降级路径
* */
Expand Down Expand Up @@ -173,6 +193,10 @@ class CoArtifactDataReceiver(
if (inMemory) {
val cacheData = cacheData!!.copyOfRange(0, pos.toInt())
val buf = DefaultDataBufferFactory.sharedInstance.wrap(cacheData)
val filePath = this.filePath.apply { this.createFile() }
channel = withContext(Dispatchers.IO) {
AsynchronousFileChannel.open(filePath, StandardOpenOption.WRITE, StandardOpenOption.CREATE)
}
DataBufferUtils.write(Mono.just(buf), channel).awaitSingle()
inMemory = false
// help gc
Expand All @@ -188,23 +212,71 @@ class CoArtifactDataReceiver(
}

override fun unhealthy(fallbackPath: Path?, reason: String?) {
if (!finished && !fallback) {
if (!finished && !fallback && !hasTransferred) {
fallBackPath = fallbackPath
fallback = true
logger.warn("Path[$path] is unhealthy, fallback to use [$fallBackPath], reason: $reason")
}
}

private fun checkFallback() {
if (!fallback) {
/**
* 检查是否需要fall back操作
*/
private suspend fun checkFallback() {
if (!fallback || hasTransferred) {
return
}
if (fallBackPath == null || fallBackPath == path) {
logger.info("Fallback path is null or equals to primary path,skip")
logger.info("Fallback path is null or equals to primary path, skip transfer data")
hasTransferred = true
return
}
if (inMemory) {
path = fallBackPath!!
// originalPath表示NFS位置, fallBackPath表示本地磁盘位置
val originalPath = path
// 更新当前path为本地磁盘
path = fallBackPath!!
// transfer date
if (!inMemory) {
// 当文件已经落到NFS
if (enableTransfer) {
// 开Transfer功能时,从NFS转移到本地盘
cleanOriginalChannel()
val originalFile = originalPath.resolve(fileName)
val filePath = this.filePath.apply { this.createFile() }
val dataBuffer = DataBufferUtils.read(originalPath, DefaultDataBufferFactory(), bufferSize)
channel = withContext(Dispatchers.IO) {
AsynchronousFileChannel.open(
filePath,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
)
}
DataBufferUtils.write(dataBuffer, channel, 0).awaitSingle()
withContext(Dispatchers.IO) {
Files.deleteIfExists(originalFile)
}
logger.info("Success to transfer data from [$originalPath] to [$path]")
} else {
// 禁用Transfer功能时,忽略操作,继续使用NFS
path = originalPath
fallback = false
}
}
hasTransferred = true
}

/**
* 关闭原始输出流
*/
private fun cleanOriginalChannel() {
try {
channel.force(true)
} catch (ignored: IOException) {
}

try {
channel.close()
} catch (ignored: IOException) {
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class CoArtifactFile(
val path = storageCredentials.upload.location.toPath()
receiver = CoArtifactDataReceiver(
storageProperties.receive,
storageProperties.monitor,
path
)
monitor.add(receiver)
Expand Down Expand Up @@ -108,7 +109,8 @@ class CoArtifactFile(
}

override fun isFallback(): Boolean {
return false
runBlocking { finish() }
return receiver.fallback
}

override fun isInLocalDisk(): Boolean {
Expand Down

0 comments on commit 876b358

Please sign in to comment.