diff --git a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/service/FileOperationService.kt b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/service/FileOperationService.kt index 087918687b..7e919c92f3 100644 --- a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/service/FileOperationService.kt +++ b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/service/FileOperationService.kt @@ -29,10 +29,10 @@ package com.tencent.bkrepo.fs.server.service import com.tencent.bkrepo.common.artifact.stream.ArtifactInputStream import com.tencent.bkrepo.common.artifact.stream.Range +import com.tencent.bkrepo.common.metadata.client.RRepositoryClient import com.tencent.bkrepo.common.metadata.constant.FAKE_MD5 import com.tencent.bkrepo.common.metadata.constant.FAKE_SHA256 import com.tencent.bkrepo.common.metadata.model.TBlockNode -import com.tencent.bkrepo.common.metadata.client.RRepositoryClient import com.tencent.bkrepo.common.metadata.service.metadata.RMetadataService import com.tencent.bkrepo.fs.server.config.properties.StreamProperties import com.tencent.bkrepo.fs.server.constant.FS_ATTR_KEY diff --git a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/storage/CoArtifactDataReceiver.kt b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/storage/CoArtifactDataReceiver.kt index 38f8546a90..4434d5a0d8 100644 --- a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/storage/CoArtifactDataReceiver.kt +++ b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/storage/CoArtifactDataReceiver.kt @@ -30,11 +30,15 @@ package com.tencent.bkrepo.fs.server.storage import com.tencent.bkrepo.common.api.constant.StringPool import com.tencent.bkrepo.common.artifact.stream.DigestCalculateListener import com.tencent.bkrepo.common.artifact.stream.closeQuietly +import com.tencent.bkrepo.common.storage.config.MonitorProperties import com.tencent.bkrepo.common.storage.config.ReceiveProperties import com.tencent.bkrepo.common.storage.monitor.StorageHealthMonitor import com.tencent.bkrepo.common.storage.monitor.Throughput +import com.tencent.bkrepo.common.storage.util.createFile +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.reactive.awaitSingle import kotlinx.coroutines.reactor.awaitSingleOrNull +import kotlinx.coroutines.withContext import org.slf4j.LoggerFactory import org.springframework.core.io.buffer.DataBuffer import org.springframework.core.io.buffer.DataBufferUtils @@ -45,6 +49,7 @@ import reactor.core.publisher.Sinks import reactor.core.scheduler.Schedulers import java.io.ByteArrayInputStream import java.io.File +import java.io.IOException import java.io.InputStream import java.nio.channels.AsynchronousFileChannel import java.nio.file.Files @@ -56,10 +61,22 @@ import java.nio.file.StandardOpenOption * */ class CoArtifactDataReceiver( receiveProperties: ReceiveProperties, + monitorProperties: MonitorProperties, private var path: Path, private val fileName: String = generateRandomName(), ) : StorageHealthMonitor.Observer { + /** + * 传输过程中发生存储降级时,是否将数据转移到本地磁盘 + */ + private val enableTransfer = monitorProperties.enableTransfer + + /** + * 数据传输buffer大小 + */ + private val bufferSize = receiveProperties.bufferSize.toBytes().toInt() + + /** * 文件内存接收阈值 * */ @@ -93,13 +110,7 @@ class CoArtifactDataReceiver( /** * 文件异步接收通道 * */ - private val channel: AsynchronousFileChannel by lazy { - AsynchronousFileChannel.open( - filePath, - StandardOpenOption.WRITE, - StandardOpenOption.CREATE_NEW, - ) - } + private var channel: AsynchronousFileChannel? = null /** * 是否降级 @@ -111,6 +122,11 @@ class CoArtifactDataReceiver( * */ var finished: Boolean = false + /** + * 数据是否转移到本地磁盘 + */ + private var hasTransferred: Boolean = false + /** * 降级路径 * */ @@ -157,12 +173,14 @@ class CoArtifactDataReceiver( checkFallback() val len = buffer.readableByteCount() if (pos + len > fileSizeThreshold && inMemory) { + initChannel() flushToFile() - DataBufferUtils.write(Mono.just(buffer), channel, pos).awaitSingle() + DataBufferUtils.write(Mono.just(buffer), channel!!, pos).awaitSingle() } else if (inMemory) { buffer.read(cacheData!!, pos.toInt(), len) } else { - DataBufferUtils.write(Mono.just(buffer), channel, pos).awaitSingle() + initChannel() + DataBufferUtils.write(Mono.just(buffer), channel!!, pos).awaitSingle() } buffer.readPosition(0) digest(buffer) @@ -173,13 +191,29 @@ class CoArtifactDataReceiver( if (inMemory) { val cacheData = cacheData!!.copyOfRange(0, pos.toInt()) val buf = DefaultDataBufferFactory.sharedInstance.wrap(cacheData) - DataBufferUtils.write(Mono.just(buf), channel).awaitSingle() + val filePath = this.filePath.apply { this.createFile() } + channel = withContext(Dispatchers.IO) { + AsynchronousFileChannel.open(filePath, StandardOpenOption.WRITE, StandardOpenOption.CREATE) + } + DataBufferUtils.write(Mono.just(buf), channel!!).awaitSingle() inMemory = false // help gc this.cacheData = null } } + private suspend fun initChannel() { + if (channel == null) { + channel = withContext(Dispatchers.IO) { + AsynchronousFileChannel.open( + filePath, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE_NEW, + ) + } + } + } + private fun digest(buffer: DataBuffer) { val len = buffer.readableByteCount() val digestArray = ByteArray(len) @@ -188,23 +222,71 @@ class CoArtifactDataReceiver( } override fun unhealthy(fallbackPath: Path?, reason: String?) { - if (!finished && !fallback) { + if (!finished && !fallback && !hasTransferred) { fallBackPath = fallbackPath fallback = true logger.warn("Path[$path] is unhealthy, fallback to use [$fallBackPath], reason: $reason") } } - private fun checkFallback() { - if (!fallback) { + /** + * 检查是否需要fall back操作 + */ + private suspend fun checkFallback() { + if (!fallback || hasTransferred) { return } if (fallBackPath == null || fallBackPath == path) { - logger.info("Fallback path is null or equals to primary path,skip") + logger.info("Fallback path is null or equals to primary path, skip transfer data") + hasTransferred = true return } - if (inMemory) { - path = fallBackPath!! + // originalPath表示NFS位置, fallBackPath表示本地磁盘位置 + val originalPath = path + // 更新当前path为本地磁盘 + path = fallBackPath!! + // transfer date + if (!inMemory) { + // 当文件已经落到NFS + if (enableTransfer) { + // 开Transfer功能时,从NFS转移到本地盘 + cleanOriginalChannel() + val originalFile = originalPath.resolve(fileName) + val filePath = this.filePath.apply { this.createFile() } + val dataBuffer = DataBufferUtils.read(originalPath, DefaultDataBufferFactory(), bufferSize) + channel = withContext(Dispatchers.IO) { + AsynchronousFileChannel.open( + filePath, + StandardOpenOption.WRITE, + StandardOpenOption.CREATE_NEW, + ) + } + DataBufferUtils.write(dataBuffer, channel!!, 0).awaitSingle() + withContext(Dispatchers.IO) { + Files.deleteIfExists(originalFile) + } + logger.info("Success to transfer data from [$originalPath] to [$path]") + } else { + // 禁用Transfer功能时,忽略操作,继续使用NFS + path = originalPath + fallback = false + } + } + hasTransferred = true + } + + /** + * 关闭原始输出流 + */ + private fun cleanOriginalChannel() { + try { + channel?.force(true) + } catch (ignored: IOException) { + } + + try { + channel?.close() + } catch (ignored: IOException) { } } @@ -217,14 +299,14 @@ class CoArtifactDataReceiver( return Throughput(pos, endTime - startTime) } finally { if (!inMemory) { - channel.closeQuietly() + channel?.closeQuietly() } } } fun close() { if (!inMemory) { - channel.closeQuietly() + channel?.closeQuietly() Files.deleteIfExists(filePath) logger.info("Delete path $filePath") } diff --git a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/storage/CoArtifactFile.kt b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/storage/CoArtifactFile.kt index 9c3c346cc2..48d64980e9 100644 --- a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/storage/CoArtifactFile.kt +++ b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/storage/CoArtifactFile.kt @@ -66,6 +66,7 @@ class CoArtifactFile( val path = storageCredentials.upload.location.toPath() receiver = CoArtifactDataReceiver( storageProperties.receive, + storageProperties.monitor, path ) monitor.add(receiver) @@ -108,7 +109,8 @@ class CoArtifactFile( } override fun isFallback(): Boolean { - return false + runBlocking { finish() } + return receiver.fallback } override fun isInLocalDisk(): Boolean { diff --git a/src/backend/fs/boot-fs-server/src/test/kotlin/com/tencent/com/bkrepo/fs/storage/CoArtifactDataReceiverTest.kt b/src/backend/fs/boot-fs-server/src/test/kotlin/com/tencent/com/bkrepo/fs/storage/CoArtifactDataReceiverTest.kt index 5087db8537..38bc0a8683 100644 --- a/src/backend/fs/boot-fs-server/src/test/kotlin/com/tencent/com/bkrepo/fs/storage/CoArtifactDataReceiverTest.kt +++ b/src/backend/fs/boot-fs-server/src/test/kotlin/com/tencent/com/bkrepo/fs/storage/CoArtifactDataReceiverTest.kt @@ -28,6 +28,7 @@ package com.tencent.com.bkrepo.fs.storage import com.tencent.bkrepo.common.api.constant.StringPool +import com.tencent.bkrepo.common.storage.config.MonitorProperties import com.tencent.bkrepo.common.storage.config.ReceiveProperties import com.tencent.bkrepo.common.storage.util.toPath import com.tencent.bkrepo.fs.server.storage.CoArtifactDataReceiver @@ -140,6 +141,7 @@ class CoArtifactDataReceiverTest { fileSizeThreshold = DataSize.ofBytes(fileSizeThreshold), rateLimit = DataSize.ofBytes(-1) ) - return CoArtifactDataReceiver(receive, primaryPath, filename) + val monitorProperties = MonitorProperties(enabled = true, enableTransfer = true) + return CoArtifactDataReceiver(receive, monitorProperties, primaryPath, filename) } }