Skip to content

Commit

Permalink
fix: 增加Milvus Http客户端 TencentBlueKing#2573
Browse files Browse the repository at this point in the history
  • Loading branch information
cnlkl committed Sep 13, 2024
1 parent dc94719 commit a8941a2
Show file tree
Hide file tree
Showing 19 changed files with 295 additions and 34 deletions.
1 change: 0 additions & 1 deletion src/backend/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ allprojects {
dependency("commons-io:commons-io:${Versions.CommonsIO}")
dependency("com.squareup.okhttp3:okhttp:${Versions.OKhttp}")
dependency("com.google.guava:guava:${Versions.Guava}")
dependency("com.google.protobuf:protobuf-java:${Versions.ProtobufJava}")
dependency("com.google.protobuf:protobuf-java-util:${Versions.ProtobufJava}")
dependency("com.tencent.polaris:polaris-discovery-factory:${Versions.Polaris}")
dependency("org.apache.commons:commons-text:${Versions.CommonsText}")
Expand Down
3 changes: 1 addition & 2 deletions src/backend/buildSrc/src/main/kotlin/Versions.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object Versions {
const val Redline = "1.2.10"
const val SkyWalkingApmToolkit = "8.10.0"
const val Gson = "2.9.0"
const val ProtobufJava = "3.24.0"
const val ProtobufJava = "3.19.4"
const val Guava = "31.1-jre"
const val Shedlock = "4.12.0"
const val JGit = "5.11.0.202103091610-r"
Expand Down Expand Up @@ -70,5 +70,4 @@ object Versions {
const val JavaCpp = "1.5.9"
const val Notice = "1.0.0"
const val SpringCloudFunction = "3.2.11"
const val Milvus = "2.4.1"
}
12 changes: 0 additions & 12 deletions src/backend/job/biz-job/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,18 +45,6 @@ dependencies {
implementation(project(":common:common-operate:operate-service"))
implementation("org.springframework.boot:spring-boot-starter-data-mongodb")
implementation("io.micrometer:micrometer-registry-prometheus")
implementation("io.milvus:milvus-sdk-java:${Versions.Milvus}") {
exclude(group = "org.slf4j")
exclude(group = "org.apache.logging.log4j")
exclude(group = "org.testcontainers")
exclude(group = "com.azure")
exclude(group = "com.amazonaws")
exclude(group = "io.minio")
exclude(group = "org.apache.hadoop")
exclude(group = "org.apache.parquet")
exclude(group = "com.squareup.okhttp3")
exclude(group = "io.grpc")
}
implementation("com.tencent.devops:devops-schedule-common")
testImplementation("org.springframework.boot:spring-boot-starter-test")
testImplementation("de.flapdoodle.embed:de.flapdoodle.embed.mongo")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ import com.tencent.bkrepo.job.batch.base.JobContext
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.AiProperties
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.Document
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.EmbeddingModel
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.MilvusVectorStore
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.MilvusVectorStoreProperties
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusVectorStore
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusVectorStoreProperties
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.VectorStore
import com.tencent.bkrepo.job.config.properties.ArtifactAccessLogEmbeddingJobProperties
import io.milvus.client.MilvusClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import com.tencent.bkrepo.common.artifact.cache.service.ArtifactPreloadPlanGener
import com.tencent.bkrepo.common.mongo.dao.util.sharding.MonthRangeShardingUtils
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.AiProperties
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.EmbeddingModel
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.MilvusVectorStore
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.MilvusVectorStoreProperties
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusVectorStore
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusVectorStoreProperties
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.SearchRequest
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.VectorStore
import io.milvus.client.MilvusClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

package com.tencent.bkrepo.job.batch.task.cache.preload.ai

import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.MilvusClientProperties
import io.milvus.client.MilvusClient
import io.milvus.client.MilvusServiceClient
import io.milvus.param.ConnectParam
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus

import com.tencent.bkrepo.common.api.constant.HttpHeaders
import com.tencent.bkrepo.common.api.constant.MediaTypes
import com.tencent.bkrepo.common.api.util.toJsonString
import com.tencent.bkrepo.common.storage.innercos.http.toRequestBody
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.request.CreateCollectionReq
import okhttp3.MediaType.Companion.toMediaType
import okhttp3.OkHttpClient
import okhttp3.Request

class MilvusClient(
private val clientProperties: MilvusClientProperties
) {
val client = OkHttpClient.Builder().addInterceptor {
val newReq = it.request().newBuilder()

// add token to header
with(clientProperties) {
val token = if (username.isNotEmpty() && password.isNotEmpty()) {
"Bearer ${username}:${password}"
} else if (!clientProperties.token.isNullOrEmpty()) {
"Bearer ${clientProperties.token}"
} else {
null
}
token?.let { newReq.header(HttpHeaders.AUTHORIZATION, it) }
}

it.proceed(newReq.build())
}.build()

fun createCollection(req: CreateCollectionReq) {
val reqBody = req.toJsonString().toRequestBody(MediaTypes.APPLICATION_JSON.toMediaType())
val request = Request.Builder()
.post(reqBody)
.url("${clientProperties.uri}/v2/vectordb/collections/create")
.build()
client.newCall(request).execute().use { response ->
if (!response.isSuccessful) {
throw RuntimeException("create collection[${req.collectionName}] failed")
}
}
}

fun collectionExists(collectionName: String): Boolean {
TODO()
}

fun dropCollection(collectionName: String) {
TODO()
}

fun search() {}

fun describeIndex(collectionName: String) {}

fun createIndex(collectionName: String) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,16 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.job.batch.task.cache.preload.ai
package com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus

import org.springframework.boot.context.properties.ConfigurationProperties
import java.util.concurrent.TimeUnit

@ConfigurationProperties("spring.ai.vectorstore.milvus.client")
data class MilvusClientProperties(
var host: String = "localhost",
var port: Int = 19530,
var uri: String? = null,
var token: String? = null,
var connectTimeoutMs: Long = 10000L,
var keepAliveTimeMs: Long = 55000L,
var keepAliveTimeoutMs: Long = 20000L,
var rpcDeadlineMs: Long = 0L,
var idleTimeoutMs: Long = TimeUnit.MILLISECONDS.convert(24L, TimeUnit.HOURS),
var username: String = "root",
var password: String = "milvus"
)
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.job.batch.task.cache.preload.ai
package com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus

import com.alibaba.fastjson.JSONObject
import io.milvus.client.MilvusClient
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.Document
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.EmbeddingModel
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.SearchRequest
import com.tencent.bkrepo.job.batch.task.cache.preload.ai.VectorStore
import io.milvus.common.clientenum.ConsistencyLevelEnum
import io.milvus.grpc.DataType
import io.milvus.param.MetricType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@
* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/

package com.tencent.bkrepo.job.batch.task.cache.preload.ai

import io.milvus.param.IndexType
import io.milvus.param.MetricType
package com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus

data class MilvusVectorStoreProperties(
var databaseName: String = "default",
var collectionName: String = "vector_store",
var embeddingDimension: Int = 1536,
var indexType: IndexType = IndexType.IVF_FLAT,
var metricType: MetricType = MetricType.COSINE,
var indexType: String = "IVF_FLAT",
var metricType: String = "COSINE",
var indexParameters: String = "{\"nlist\":1024}",
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.request

import com.fasterxml.jackson.annotation.JsonProperty

data class CollectionSchema(
private val enableDynamicField: Boolean = java.lang.Boolean.TRUE,
private val fields: MutableList<FieldSchema> = ArrayList(),
@JsonProperty("autoID")
private val autoId: Boolean = false,
) {
fun addField(fieldSchema: FieldSchema): CollectionSchema {
if (fieldSchema.dataType == DataType.Array.name) {
if (fieldSchema.elementDataType.isNullOrEmpty() || fieldSchema.elementTypeParams?.maxCapacity == null) {
throw IllegalArgumentException("Element type, maxCapacity are required for array field")
}
}

if (fieldSchema.dataType == DataType.FloatVector.name ||
fieldSchema.dataType == DataType.BinaryVector.name ||
fieldSchema.dataType == DataType.Float16Vector.name ||
fieldSchema.dataType == DataType.BFloat16Vector.name
) {
if (fieldSchema.elementTypeParams?.dim == null) {
throw IllegalArgumentException("Dimension is required for vector field")
}
}

fields.add(fieldSchema)
return this
}

fun getField(fieldName: String): FieldSchema? {
for (field in fields) {
if (field.fieldName == fieldName) {
return field
}
}
return null
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.request

enum class ConsistencyLevel(private val code: Int) {
Strong(0),
Bounded(2),
Eventually(3),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.request

import com.fasterxml.jackson.annotation.JsonProperty


data class CreateCollectionReq(
val dbName: String,
val collectionName: String,
val dimension: Int,
val metricType: String = MetricType.COSINE.name,
val idType: DataType = DataType.Int64,
@JsonProperty("autoID")
val autoId: Boolean = false,
val primaryFieldName: String = "id",
val vectorFieldName: String = "vector",
val schema: CollectionSchema? = null,
val indexParams: List<IndexParam> = ArrayList(),
val params: Params? = null,
) {
data class Params(
@JsonProperty("max_length")
val maxLength: Int? = null,
val enableDynamicField: Boolean? = null,
val shardsNum: Int? = null,
val consistencyLevel: String? = ConsistencyLevel.Bounded.name,
val partitionsNum: Int? = null,
val ttlSeconds: Int? = null,
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.request

enum class DataType(private val code: Int) {
None(0),
Bool(1),
Int8(2),
Int16(3),
Int32(4),
Int64(5),

Float(10),
Double(11),

String(20),
VarChar(21), // variable-length strings with a specified maximum length
Array(22),
JSON(23),

BinaryVector(100),
FloatVector(101),
Float16Vector(102),
BFloat16Vector(103),
SparseFloatVector(104)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.request

import com.fasterxml.jackson.annotation.JsonProperty

data class FieldSchema (
val fieldName: String,
val dataType: String,
val isPrimary: Boolean = false,
val isPartitionKey: Boolean = false,
/**
* An optional parameter for Array field values
*/
val elementDataType: String? = null,
val elementTypeParams: ElementTypeParams? = null
)

data class ElementTypeParams(
/**
* An optional parameter for VarChar values that determines the maximum length of the value in the current field.
*/
@JsonProperty("max_length")
val maxLength: Int? = 65535,
/**
* An optional parameter for FloatVector or BinaryVector fields that determines the vector dimension.
*/
val dim: Int? = null,
/**
* An optional parameter for Array field values that
* determines the maximum number of elements in the current array field.
*/
@JsonProperty("max_capacity")
val maxCapacity: Int? = null,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.tencent.bkrepo.job.batch.task.cache.preload.ai.milvus.request

import com.fasterxml.jackson.annotation.JsonProperty

data class IndexParam(
var fieldName: String,
val metricType: MetricType = MetricType.COSINE,
val indexName: String? = null,
val params: Map<String, Any>? = null,
)

data class Params(
@JsonProperty("index_type")
val indexType: IndexType = IndexType.AUTOINDEX,
/**
* The maximum degree of the node and applies only when index_type is set to HNSW.
*/
@JsonProperty("M")
val m: Int,
/**
* The search scope. This applies only when index_type is set to HNSW
*/
val efConstruction: Int,
/**
* The number of cluster units. This applies to IVF-related index types.
*/
val nlist: Int,
)
Loading

0 comments on commit a8941a2

Please sign in to comment.