Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fs服务缓存存储降级时异常 #2663 #2707

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ package com.tencent.bkrepo.fs.server.storage
import com.tencent.bkrepo.common.api.constant.StringPool
import com.tencent.bkrepo.common.artifact.stream.DigestCalculateListener
import com.tencent.bkrepo.common.artifact.stream.closeQuietly
import com.tencent.bkrepo.common.storage.config.MonitorProperties
import com.tencent.bkrepo.common.storage.config.ReceiveProperties
import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitor
import com.tencent.bkrepo.common.storage.monitor.Throughput
import com.tencent.bkrepo.common.storage.util.createFile
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.reactive.awaitSingle
import kotlinx.coroutines.reactor.awaitSingleOrNull
import kotlinx.coroutines.withContext
import org.slf4j.LoggerFactory
import org.springframework.core.io.buffer.DataBuffer
import org.springframework.core.io.buffer.DataBufferUtils
Expand All @@ -45,6 +49,7 @@ import reactor.core.publisher.Sinks
import reactor.core.scheduler.Schedulers
import java.io.ByteArrayInputStream
import java.io.File
import java.io.IOException
import java.io.InputStream
import java.nio.channels.AsynchronousFileChannel
import java.nio.file.Files
Expand All @@ -56,10 +61,22 @@ import java.nio.file.StandardOpenOption
* */
class CoArtifactDataReceiver(
receiveProperties: ReceiveProperties,
monitorProperties: MonitorProperties,
private var path: Path,
private val fileName: String = generateRandomName(),
) : StorageHealthMonitor.Observer {

/**
* 传输过程中发生存储降级时,是否将数据转移到本地磁盘
*/
private val enableTransfer = monitorProperties.enableTransfer

/**
* 数据传输buffer大小
*/
private val bufferSize = receiveProperties.bufferSize.toBytes().toInt()


/**
* 文件内存接收阈值
* */
Expand Down Expand Up @@ -93,13 +110,7 @@ class CoArtifactDataReceiver(
/**
* 文件异步接收通道
* */
private val channel: AsynchronousFileChannel by lazy {
AsynchronousFileChannel.open(
filePath,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
)
}
private var channel: AsynchronousFileChannel? = null

/**
* 是否降级
Expand All @@ -111,6 +122,11 @@ class CoArtifactDataReceiver(
* */
var finished: Boolean = false

/**
* 数据是否转移到本地磁盘
*/
private var hasTransferred: Boolean = false

/**
* 降级路径
* */
Expand Down Expand Up @@ -157,12 +173,14 @@ class CoArtifactDataReceiver(
checkFallback()
val len = buffer.readableByteCount()
if (pos + len > fileSizeThreshold && inMemory) {
initChannel()
flushToFile()
DataBufferUtils.write(Mono.just(buffer), channel, pos).awaitSingle()
DataBufferUtils.write(Mono.just(buffer), channel!!, pos).awaitSingle()
} else if (inMemory) {
buffer.read(cacheData!!, pos.toInt(), len)
} else {
DataBufferUtils.write(Mono.just(buffer), channel, pos).awaitSingle()
initChannel()
DataBufferUtils.write(Mono.just(buffer), channel!!, pos).awaitSingle()
}
buffer.readPosition(0)
digest(buffer)
Expand All @@ -173,13 +191,30 @@ class CoArtifactDataReceiver(
if (inMemory) {
val cacheData = cacheData!!.copyOfRange(0, pos.toInt())
val buf = DefaultDataBufferFactory.sharedInstance.wrap(cacheData)
DataBufferUtils.write(Mono.just(buf), channel).awaitSingle()
val filePath = this.filePath.apply { this.createFile() }
channel = withContext(Dispatchers.IO) {
AsynchronousFileChannel.open(filePath, StandardOpenOption.WRITE, StandardOpenOption.CREATE)
}
DataBufferUtils.write(Mono.just(buf), channel!!).awaitSingle()
inMemory = false
// help gc
this.cacheData = null
}
}

private suspend fun initChannel() {
if (channel == null) {
channel = withContext(Dispatchers.IO) {
Files.createDirectories(filePath.parent)
AsynchronousFileChannel.open(
filePath,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
)
}
}
}

private fun digest(buffer: DataBuffer) {
val len = buffer.readableByteCount()
val digestArray = ByteArray(len)
Expand All @@ -188,23 +223,71 @@ class CoArtifactDataReceiver(
}

override fun unhealthy(fallbackPath: Path?, reason: String?) {
if (!finished && !fallback) {
if (!finished && !fallback && !hasTransferred) {
fallBackPath = fallbackPath
fallback = true
logger.warn("Path[$path] is unhealthy, fallback to use [$fallBackPath], reason: $reason")
}
}

private fun checkFallback() {
if (!fallback) {
/**
* 检查是否需要fall back操作
*/
private suspend fun checkFallback() {
if (!fallback || hasTransferred) {
return
}
if (fallBackPath == null || fallBackPath == path) {
logger.info("Fallback path is null or equals to primary path,skip")
logger.info("Fallback path is null or equals to primary path, skip transfer data")
hasTransferred = true
return
}
if (inMemory) {
path = fallBackPath!!
// originalPath表示NFS位置, fallBackPath表示本地磁盘位置
val originalPath = path
// 更新当前path为本地磁盘
path = fallBackPath!!
// transfer date
if (!inMemory) {
// 当文件已经落到NFS
if (enableTransfer) {
// 开Transfer功能时,从NFS转移到本地盘
cleanOriginalChannel()
val originalFile = originalPath.resolve(fileName)
val filePath = this.filePath.apply { this.createFile() }
val dataBuffer = DataBufferUtils.read(originalPath, DefaultDataBufferFactory(), bufferSize)
channel = withContext(Dispatchers.IO) {
AsynchronousFileChannel.open(
filePath,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
)
}
DataBufferUtils.write(dataBuffer, channel!!, 0).awaitSingle()
withContext(Dispatchers.IO) {
Files.deleteIfExists(originalFile)
}
logger.info("Success to transfer data from [$originalPath] to [$path]")
} else {
// 禁用Transfer功能时,忽略操作,继续使用NFS
path = originalPath
fallback = false
}
}
hasTransferred = true
}

/**
* 关闭原始输出流
*/
private fun cleanOriginalChannel() {
try {
channel?.force(true)
} catch (ignored: IOException) {
}

try {
channel?.close()
} catch (ignored: IOException) {
}
}

Expand All @@ -217,14 +300,14 @@ class CoArtifactDataReceiver(
return Throughput(pos, endTime - startTime)
} finally {
if (!inMemory) {
channel.closeQuietly()
channel?.closeQuietly()
}
}
}

fun close() {
if (!inMemory) {
channel.closeQuietly()
channel?.closeQuietly()
Files.deleteIfExists(filePath)
logger.info("Delete path $filePath")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class CoArtifactFile(
val path = storageCredentials.upload.location.toPath()
receiver = CoArtifactDataReceiver(
storageProperties.receive,
storageProperties.monitor,
path
)
monitor.add(receiver)
Expand Down Expand Up @@ -108,7 +109,8 @@ class CoArtifactFile(
}

override fun isFallback(): Boolean {
return false
runBlocking { finish() }
return receiver.fallback
}

override fun isInLocalDisk(): Boolean {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
package com.tencent.com.bkrepo.fs.storage

import com.tencent.bkrepo.common.api.constant.StringPool
import com.tencent.bkrepo.common.storage.config.MonitorProperties
import com.tencent.bkrepo.common.storage.config.ReceiveProperties
import com.tencent.bkrepo.common.storage.util.toPath
import com.tencent.bkrepo.fs.server.storage.CoArtifactDataReceiver
Expand Down Expand Up @@ -140,6 +141,7 @@ class CoArtifactDataReceiverTest {
fileSizeThreshold = DataSize.ofBytes(fileSizeThreshold),
rateLimit = DataSize.ofBytes(-1)
)
return CoArtifactDataReceiver(receive, primaryPath, filename)
val monitorProperties = MonitorProperties(enabled = true, enableTransfer = true)
return CoArtifactDataReceiver(receive, monitorProperties, primaryPath, filename)
}
}
Loading