Skip to content

Commit

Permalink
fix: fs服务缓存存储降级时异常 TencentBlueKing#2663
Browse files Browse the repository at this point in the history
  • Loading branch information
yaoxuwan committed Oct 25, 2024
1 parent 70b009c commit a56de7a
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,12 @@ class CoArtifactDataReceiver(
if (inMemory) {
val cacheData = cacheData!!.copyOfRange(0, pos.toInt())
val buf = DefaultDataBufferFactory.sharedInstance.wrap(cacheData)
val filePath = this.filePath.apply { this.createFile() }
channel = withContext(Dispatchers.IO) {
AsynchronousFileChannel.open(filePath, StandardOpenOption.WRITE, StandardOpenOption.CREATE)
}
DataBufferUtils.write(Mono.just(buf), channel).awaitSingle()
logger.info("flush to file: $filePath")
inMemory = false
// help gc
this.cacheData = null
Expand All @@ -208,7 +213,7 @@ class CoArtifactDataReceiver(
}

override fun unhealthy(fallbackPath: Path?, reason: String?) {
if (!finished && !fallback) {
if (!finished && !fallback && !hasTransferred) {
fallBackPath = fallbackPath
fallback = true
logger.warn("Path[$path] is unhealthy, fallback to use [$fallBackPath], reason: $reason")
Expand All @@ -224,44 +229,43 @@ class CoArtifactDataReceiver(
}
if (fallBackPath == null || fallBackPath == path) {
logger.info("Fallback path is null or equals to primary path, skip transfer data")
// hasTransferred = true
hasTransferred = true
return
}
if (inMemory) {
path = fallBackPath!!
// originalPath表示NFS位置, fallBackPath表示本地磁盘位置
val originalPath = path
// 更新当前path为本地磁盘
path = fallBackPath!!
logger.info("transfer: $inMemory")
// transfer date
if (!inMemory) {
// 当文件已经落到NFS
if (enableTransfer) {
// 开Transfer功能时,从NFS转移到本地盘
cleanOriginalChannel()
val originalFile = originalPath.resolve(fileName)
val filePath = this.filePath.apply { this.createFile() }
logger.info("fallback filepath: $filePath")
val dataBuffer = DataBufferUtils.read(originalPath, DefaultDataBufferFactory(), bufferSize)
channel = withContext(Dispatchers.IO) {
AsynchronousFileChannel.open(
filePath,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE_NEW,
)
}
DataBufferUtils.write(dataBuffer, channel, 0).awaitSingle()
withContext(Dispatchers.IO) {
Files.deleteIfExists(originalFile)
}
logger.info("Success to transfer data from [$originalPath] to [$path]")
} else {
// 禁用Transfer功能时,忽略操作,继续使用NFS
path = originalPath
fallback = false
}
}
// // originalPath表示NFS位置, fallBackPath表示本地磁盘位置
// val originalPath = path
// // 更新当前path为本地磁盘
// path = fallBackPath!!
// // transfer date
// if (!inMemory) {
// // 当文件已经落到NFS
// if (enableTransfer) {
// // 开Transfer功能时,从NFS转移到本地盘
// cleanOriginalChannel()
// val originalFile = originalPath.resolve(fileName)
// val filePath = this.filePath.apply { this.createFile() }
// val dataBuffer = DataBufferUtils.read(originalPath, DefaultDataBufferFactory(), bufferSize)
// channel = withContext(Dispatchers.IO) {
// AsynchronousFileChannel.open(
// filePath,
// StandardOpenOption.WRITE,
// StandardOpenOption.CREATE_NEW,
// )
// }
// DataBufferUtils.write(dataBuffer, channel, 0).awaitSingle()
// withContext(Dispatchers.IO) {
// Files.deleteIfExists(originalFile)
// }
// logger.info("Success to transfer data from [$originalPath] to [$path]")
// } else {
// // 禁用Transfer功能时,忽略操作,继续使用NFS
// path = originalPath
// fallback = false
// }
// }
// hasTransferred = true
hasTransferred = true
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ class CoArtifactFile(
}

override fun isFallback(): Boolean {
return false
runBlocking { finish() }
return receiver.fallback
}

override fun isInLocalDisk(): Boolean {
Expand Down

0 comments on commit a56de7a

Please sign in to comment.