From b8866611b3edd5caf35afa0aec1888da61489def Mon Sep 17 00:00:00 2001 From: Ryan Fairfax Date: Thu, 1 Aug 2024 12:36:56 -0700 Subject: [PATCH 1/5] feat(core): Add ingestion support for Tantivy This implements enough of the interface to add documents to the Tantivy index. This is part 2 of the series and is not usable end to end until additional PRs complete. Document additional and removal is covered in this PR. Documents are created by calling a single native method that has many of the built-in fields, such as start and end time, passed as method parameters. Dyanmic fields are passed to the Rust code via a simple binary encoding to reduce JNI overhead. --- .../memstore/PartKeyTantivyIndex.scala | 140 ++++++++- core/src/rust/Cargo.lock | 243 ++++++++++++++-- core/src/rust/Cargo.toml | 15 + core/src/rust/src/errors.rs | 1 + core/src/rust/src/ingestion.rs | 266 ++++++++++++++++++ core/src/rust/src/ingestion/fields.rs | 202 +++++++++++++ core/src/rust/src/jnienv.rs | 34 ++- core/src/rust/src/lib.rs | 5 + core/src/rust/src/parser.rs | 144 ++++++++++ core/src/rust/src/state.rs | 18 +- core/src/rust/src/test_utils.rs | 83 ++++++ 11 files changed, 1107 insertions(+), 44 deletions(-) create mode 100644 core/src/rust/src/ingestion.rs create mode 100644 core/src/rust/src/ingestion/fields.rs create mode 100644 core/src/rust/src/parser.rs create mode 100644 core/src/rust/src/test_utils.rs diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala index 63e8c1361..2ee9bdf80 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala @@ -2,15 +2,21 @@ package filodb.core.memstore import java.io.File import java.nio.file.Files +import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit} + +import scala.collection.mutable.ArrayBuffer import debox.Buffer import org.apache.commons.lang3.SystemUtils import org.apache.lucene.util.BytesRef +import spire.implicits.cforRange import filodb.core.DatasetRef +import filodb.core.memstore.PartKeyIndexRaw.bytesRefToUnsafeOffset import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn} import filodb.core.metadata.PartitionSchema import filodb.core.query.ColumnFilter +import filodb.memory.format.UnsafeUtils class PartKeyTantivyIndex(ref: DatasetRef, schema: PartitionSchema, @@ -45,8 +51,19 @@ class PartKeyTantivyIndex(ref: DatasetRef, TantivyNativeMethods.reset(indexHandle) } + private var flushThreadPool: ScheduledThreadPoolExecutor = _ + override def startFlushThread(flushDelayMinSeconds: Int, flushDelayMaxSeconds: Int): Unit = { - ??? + if (flushThreadPool != UnsafeUtils.ZeroPointer) { + // Already running + logger.warn("startFlushThread called when already running, ignoring") + return + } + + flushThreadPool = new ScheduledThreadPoolExecutor(1) + + flushThreadPool.scheduleAtFixedRate(() => refreshReadersBlocking(), flushDelayMinSeconds, + flushDelayMinSeconds, TimeUnit.SECONDS) } override def partIdsEndedBefore(endedBefore: Long): Buffer[Int] = { @@ -54,11 +71,13 @@ class PartKeyTantivyIndex(ref: DatasetRef, } override def removePartitionsEndedBefore(endedBefore: Long, returnApproxDeletedCount: Boolean): Int = { - ??? + TantivyNativeMethods.removePartitionsEndedBefore(indexHandle, endedBefore, returnApproxDeletedCount) } override def removePartKeys(partIds: Buffer[Int]): Unit = { - ??? + if (!partIds.isEmpty) { + TantivyNativeMethods.removePartKeys(indexHandle, partIds.toArray) + } } override def indexRamBytes: Long = { @@ -72,6 +91,11 @@ class PartKeyTantivyIndex(ref: DatasetRef, override def closeIndex(): Unit = { logger.info(s"Closing index on dataset=$ref shard=$shardNum") + if (flushThreadPool != UnsafeUtils.ZeroPointer) { + flushThreadPool.shutdown() + flushThreadPool.awaitTermination(60, TimeUnit.SECONDS) + } + commit() TantivyNativeMethods.freeIndexHandle(indexHandle) indexHandle = 0 @@ -96,18 +120,26 @@ class PartKeyTantivyIndex(ref: DatasetRef, override def addPartKey(partKeyOnHeapBytes: Array[Byte], partId: Int, startTime: Long, endTime: Long, partKeyBytesRefOffset: Int)(partKeyNumBytes: Int, documentId: String): Unit = { - ??? + logger.debug(s"Adding document ${partKeyString(documentId, partKeyOnHeapBytes, partKeyBytesRefOffset)} " + + s"with startTime=$startTime endTime=$endTime into dataset=$ref shard=$shardNum") + makeDocument(partKeyOnHeapBytes, partKeyBytesRefOffset, partKeyNumBytes, partId, documentId, startTime, endTime, + upsert = false) } override def upsertPartKey(partKeyOnHeapBytes: Array[Byte], partId: Int, startTime: Long, endTime: Long, partKeyBytesRefOffset: Int)(partKeyNumBytes: Int, documentId: String): Unit = { - ??? + logger.debug(s"Upserting document ${partKeyString(documentId, partKeyOnHeapBytes, partKeyBytesRefOffset)} " + + s"with startTime=$startTime endTime=$endTime into dataset=$ref shard=$shardNum") + makeDocument(partKeyOnHeapBytes, partKeyBytesRefOffset, partKeyNumBytes, partId, documentId, startTime, endTime, + upsert = true) } override def partKeyFromPartId(partId: Int): Option[BytesRef] = { ??? } + private val NOT_FOUND = -1 + override def startTimeFromPartId(partId: Int): Long = { ??? } @@ -126,7 +158,17 @@ class PartKeyTantivyIndex(ref: DatasetRef, override def updatePartKeyWithEndTime(partKeyOnHeapBytes: Array[Byte], partId: Int, endTime: Long, partKeyBytesRefOffset: Int)(partKeyNumBytes: Int, documentId: String): Unit = { - ??? + var startTime = startTimeFromPartId(partId) // look up index for old start time + if (startTime == NOT_FOUND) { + startTime = System.currentTimeMillis() - retentionMillis + logger.warn(s"Could not find in Lucene startTime for partId=$partId in dataset=$ref. Using " + + s"$startTime instead.", new IllegalStateException()) // assume this time series started retention period ago + } + logger.debug(s"Updating document ${partKeyString(documentId, partKeyOnHeapBytes, partKeyBytesRefOffset)} " + + s"with startTime=$startTime endTime=$endTime into dataset=$ref shard=$shardNum") + + makeDocument(partKeyOnHeapBytes, partKeyBytesRefOffset, partKeyNumBytes, + partId, documentId, startTime, endTime, upsert = true) } override def refreshReadersBlocking(): Unit = { @@ -153,19 +195,78 @@ class PartKeyTantivyIndex(ref: DatasetRef, } override protected def addIndexedField(key: String, value: String): Unit = { - ??? + val buffer = docBufferLocal.get() + + // 1 - indexed field + buffer += 1 + TantivyQueryBuilder.writeStringToBuffer(key, buffer) + TantivyQueryBuilder.writeStringToBuffer(value, buffer) } protected def addIndexedMapField(mapColumn: String, key: String, value: String): Unit = { - ??? + val buffer = docBufferLocal.get() + + // 2 - map field + buffer += 2 + TantivyQueryBuilder.writeStringToBuffer(mapColumn, buffer) + TantivyQueryBuilder.writeStringToBuffer(key, buffer) + TantivyQueryBuilder.writeStringToBuffer(value, buffer) } protected override def addMultiColumnFacet(key: String, value: String): Unit = { - ??? + val buffer = docBufferLocal.get() + + // 3 - mc field + buffer += 3 + TantivyQueryBuilder.writeStringToBuffer(key, buffer) + TantivyQueryBuilder.writeStringToBuffer(value, buffer) + } + + private val docBufferLocal = new ThreadLocal[ArrayBuffer[Byte]]() { + override def initialValue(): ArrayBuffer[Byte] = new ArrayBuffer[Byte](4096) + } + + private def makeDocument(partKeyOnHeapBytes: Array[Byte], + partKeyBytesRefOffset: Int, + partKeyNumBytes: Int, + partId: Int, + documentId: String, + startTime: Long, + endTime: Long, + upsert: Boolean): Unit = { + docBufferLocal.get().clear() + + // If configured and enabled, Multi-column facets will be created on "partition-schema" columns + createMultiColumnFacets(partKeyOnHeapBytes, partKeyBytesRefOffset) + + cforRange { + 0 until numPartColumns + } { i => + indexers(i).fromPartKey(partKeyOnHeapBytes, bytesRefToUnsafeOffset(partKeyBytesRefOffset), partId) + } + + TantivyNativeMethods.ingestDocument(indexHandle, partKeyOnHeapBytes, partKeyBytesRefOffset, partKeyNumBytes, + partId, documentId, startTime, endTime, docBufferLocal.get().toArray, upsert) + } +} + +object TantivyQueryBuilder { + def writeStringToBuffer(s: String, buffer: ArrayBuffer[Byte]): Unit = { + val bytes = s.getBytes + writeLengthToBuffer(bytes.length, buffer) + buffer ++= bytes + } + + private def writeLengthToBuffer(len: Int, buffer: ArrayBuffer[Byte]): Unit = { + buffer += len.toByte + buffer += (len >> 8).toByte } } // JNI methods +// Thread safety - +// * Index handle creation / cleanup is not thread safe. +// * Other operations are thread safe and may involve an internal mutex protected object TantivyNativeMethods { // Load native library from jar private def loadLibrary(): Unit = { @@ -203,6 +304,10 @@ protected object TantivyNativeMethods { @native def freeIndexHandle(handle: Long): Unit + // Force refresh any readers to be up to date (primarily used by tests) + @native + def refreshReaders(handle: Long): Unit + // Reset index data (delete all docs) @native def reset(handle: Long): Unit @@ -210,4 +315,21 @@ protected object TantivyNativeMethods { // Commit changes to the index @native def commit(handle: Long): Unit + + // Ingest a new document + // scalastyle:off parameter.number + @native + def ingestDocument(handle: Long, partKeyData: Array[Byte], partKeyOffset: Int, + partKeyNumBytes: Int, partId: Int, documentId: String, + startTime: Long, endTime: Long, fields: Array[Byte], + upsert: Boolean): Unit + // scalastyle:on parameter.number + + // Remove docs with given part keys + @native + def removePartKeys(handle: Long, keys: Array[Int]): Unit + + // Remove partition IDs and return approximate deleted count + @native + def removePartitionsEndedBefore(handle: Long, endedBefore: Long, returnApproxDeletedCount: Boolean): Int } \ No newline at end of file diff --git a/core/src/rust/Cargo.lock b/core/src/rust/Cargo.lock index d00932f87..802cdee2d 100644 --- a/core/src/rust/Cargo.lock +++ b/core/src/rust/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e4503c46a5c0c7844e948c9a4d6acd9f50cccb4de1c48eb9e291ea17470c678" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "ahash" version = "0.8.11" @@ -9,9 +24,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", + "getrandom", "once_cell", "version_check", - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -52,6 +68,21 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" +[[package]] +name = "backtrace" +version = "0.3.73" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cc23269a4f8976d0a4d2e7109211a419fe30e8d88d677cd60b6bc79c5732e0a" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "base64" version = "0.22.1" @@ -87,19 +118,18 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.6.1" +version = "1.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a12916984aab3fa6e39d655a33e09c0071eb36d6ab3aea5c2d78551f1df6d952" +checksum = "fca2be1d5c43812bae364ee3f30b3afcb7877cf59f4aeb94c66f313a41d2fac9" [[package]] name = "cc" -version = "1.1.0" +version = "1.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eaff6f8ce506b9773fa786672d63fc7a191ffea1be33f72bbd4aeacefca9ffc8" +checksum = "26a5c3fd7bfa1ce3897a3a3501d362b2d87b7f2583ebcb4a949ec25911025cbc" dependencies = [ "jobserver", "libc", - "once_cell", ] [[package]] @@ -189,6 +219,22 @@ dependencies = [ "serde", ] +[[package]] +name = "dhat" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98cd11d84628e233de0ce467de10b8633f4ddaecafadefc86e13b84b8739b827" +dependencies = [ + "backtrace", + "lazy_static", + "mintex", + "parking_lot", + "rustc-hash", + "serde", + "serde_json", + "thousands", +] + [[package]] name = "downcast-rs" version = "1.2.1" @@ -201,6 +247,12 @@ version = "1.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" +[[package]] +name = "equivalent" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" + [[package]] name = "errno" version = "0.3.9" @@ -228,12 +280,18 @@ name = "filodb_core" version = "0.1.0" dependencies = [ "bytes", + "dhat", + "hashbrown", "jni", + "nohash-hasher", "nom", "num-derive", "num-traits", + "quick_cache", "regex", "tantivy", + "tantivy-common", + "tantivy-fst", "thiserror", ] @@ -264,6 +322,12 @@ dependencies = [ "wasi", ] +[[package]] +name = "gimli" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" + [[package]] name = "hashbrown" version = "0.14.5" @@ -337,9 +401,9 @@ checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" [[package]] name = "jobserver" -version = "0.1.31" +version = "0.1.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2b099aaa34a9751c5bf0878add70444e1ed2dd73f347be99003d4577277de6e" +checksum = "48d1dbcbbeb6a7fec7e059840aa538bd62aaccf972c7346c4d9d2059312853d0" dependencies = [ "libc", ] @@ -353,6 +417,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "levenshtein_automata" version = "0.2.1" @@ -377,6 +447,16 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + [[package]] name = "log" version = "0.4.22" @@ -385,9 +465,9 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "lru" -version = "0.12.3" +version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3262e75e648fce39813cb56ac41f3c3e3f65217ebf3844d818d1f9398cfb0dc" +checksum = "37ee39891760e7d94734f6f63fedc29a2e4a152f836120753a72503f09fcf904" dependencies = [ "hashbrown", ] @@ -429,12 +509,33 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" +[[package]] +name = "miniz_oxide" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8a240ddb74feaf34a79a7add65a741f3167852fba007066dcac1ca548d89c08" +dependencies = [ + "adler", +] + +[[package]] +name = "mintex" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bec4598fddb13cc7b528819e697852653252b760f1228b7642679bf2ff2cd07" + [[package]] name = "murmurhash32" version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2195bf6aa996a481483b29d62a7663eed3fe39600c460e323f8ff41e90bdd89b" +[[package]] +name = "nohash-hasher" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" + [[package]] name = "nom" version = "7.1.3" @@ -482,6 +583,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f203fa8daa7bb185f760ae12bd8e097f63d17041dcdcaf675ac54cdf863170e" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.19.0" @@ -503,6 +613,29 @@ dependencies = [ "stable_deref_trait", ] +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets 0.52.6", +] + [[package]] name = "pkg-config" version = "0.3.30" @@ -517,9 +650,12 @@ checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" [[package]] name = "ppv-lite86" -version = "0.2.17" +version = "0.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +checksum = "dee4364d9f3b902ef14fab8a1ddffb783a1cb6b4bba3bfc1fa3922732c7de97f" +dependencies = [ + "zerocopy 0.6.6", +] [[package]] name = "proc-macro2" @@ -530,6 +666,18 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quick_cache" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec932c60e6faf77dc6601ea149a23d821598b019b450bb1d98fe89c0301c0b61" +dependencies = [ + "ahash", + "equivalent", + "hashbrown", + "parking_lot", +] + [[package]] name = "quote" version = "1.0.36" @@ -599,6 +747,15 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "redox_syscall" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a908a6e00f1fdd0dfd9c0eb08ce85126f6d8bbda50017e74bc4a4b7d4a926a4" +dependencies = [ + "bitflags", +] + [[package]] name = "regex" version = "1.10.5" @@ -638,6 +795,12 @@ dependencies = [ "serde_derive", ] +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + [[package]] name = "rustc-hash" version = "1.1.0" @@ -672,6 +835,12 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "serde" version = "1.0.204" @@ -694,11 +863,12 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.120" +version = "1.0.121" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e0d21c9a8cae1235ad58a00c11cb40d4b1e5c784f1ef2c537876ed6ffd8b7c5" +checksum = "4ab380d7d9f22ef3f21ad3e6c1ebe8e4fc7a2000ccba2e4d71fc96f15b2cb609" dependencies = [ "itoa", + "memchr", "ryu", "serde", ] @@ -726,9 +896,9 @@ checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" [[package]] name = "syn" -version = "2.0.70" +version = "2.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f0209b68b3613b093e0ec905354eccaedcfe83b8cb37cbdeae64026c3064c16" +checksum = "dc4b9b9bf2add8093d3f2c0204471e951b2285580335de42f9d2534f3ae7a8af" dependencies = [ "proc-macro2", "quote", @@ -890,24 +1060,30 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.62" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2675633b1499176c2dff06b0856a27976a8f9d436737b4cf4f312d4d91d8bbb" +checksum = "c0342370b38b6a11b6cc11d6a805569958d54cfa061a29969c3b5ce2ea405724" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.62" +version = "1.0.63" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d20468752b09f49e909e55a5d338caa8bedf615594e9d80bc4c565d30faf798c" +checksum = "a4558b58466b9ad7ca0f102865eccc95938dca1a74a856f2b57b6629050da261" dependencies = [ "proc-macro2", "quote", "syn", ] +[[package]] +name = "thousands" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3bf63baf9f5039dadc247375c29eb13706706cfde997d0330d05aa63a77d8820" + [[package]] name = "time" version = "0.3.36" @@ -963,9 +1139,9 @@ dependencies = [ [[package]] name = "version_check" -version = "0.9.4" +version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] name = "walkdir" @@ -1217,13 +1393,34 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "zerocopy" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "854e949ac82d619ee9a14c66a1b674ac730422372ccb759ce0c39cabcf2bf8e6" +dependencies = [ + "byteorder", + "zerocopy-derive 0.6.6", +] + [[package]] name = "zerocopy" version = "0.7.35" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy-derive" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "125139de3f6b9d625c39e2efdd73d41bdac468ccd556556440e322be0e1bbd91" +dependencies = [ + "proc-macro2", + "quote", + "syn", ] [[package]] diff --git a/core/src/rust/Cargo.toml b/core/src/rust/Cargo.toml index edc187292..1d2e61f6d 100644 --- a/core/src/rust/Cargo.toml +++ b/core/src/rust/Cargo.toml @@ -7,13 +7,28 @@ edition = "2021" crate-type = ["cdylib"] [dependencies] +dhat = "0.3.3" +hashbrown = "0.14.5" jni = "0.21.1" +nohash-hasher = "0.2.0" nom = "7.1.3" num-derive = "0.4.2" num-traits = "0.2.19" +quick_cache = { version = "0.6.2", features = ["stats"] } regex = "1.10.5" tantivy = "0.22.0" +tantivy-common = "0.7.0" +tantivy-fst = "0.5.0" thiserror = "1.0.62" [dev-dependencies] bytes = "1.6.1" + +# Keep debug symbols in the final binary +# This makes the binary slightly larger (~20MB), but makes profiling much more useful +# and has no runtime impact +[profile.release] +debug = true + +[features] +dhat-heap = [] diff --git a/core/src/rust/src/errors.rs b/core/src/rust/src/errors.rs index f52539b62..87b2de147 100644 --- a/core/src/rust/src/errors.rs +++ b/core/src/rust/src/errors.rs @@ -10,6 +10,7 @@ const RUNTIME_EXCEPTION_CLASS: &str = "java/lang/RuntimeException"; pub type JavaResult = Result; /// Error type that can be thrown as an exception +#[derive(Debug)] pub struct JavaException { class: &'static str, message: Cow<'static, str>, diff --git a/core/src/rust/src/ingestion.rs b/core/src/rust/src/ingestion.rs new file mode 100644 index 000000000..2c1dd3548 --- /dev/null +++ b/core/src/rust/src/ingestion.rs @@ -0,0 +1,266 @@ +//! Methods that modify the index / do data ingestion + +use std::{ops::Bound, sync::atomic::Ordering}; + +use fields::add_fields; +use jni::{ + objects::{JByteArray, JClass, JIntArray, JString}, + sys::{jboolean, jint, jlong, JNI_TRUE}, + JNIEnv, +}; +use tantivy::{ + collector::Count, + indexer::UserOperation, + query::{RangeQuery, TermSetQuery}, + schema::Facet, + TantivyDocument, Term, +}; + +use crate::{ + errors::JavaResult, + exec::jni_exec, + jnienv::JNIEnvExt, + state::{ + field_constants::{self, facet_field_name}, + IndexHandle, IngestingDocument, + }, +}; + +mod fields; + +#[no_mangle] +pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_reset( + mut env: JNIEnv, + _class: JClass, + handle: jlong, +) { + jni_exec(&mut env, |_| { + let handle = IndexHandle::get_ref_from_handle(handle); + + handle.changes_pending.store(false, Ordering::SeqCst); + + let mut writer = handle.writer.write()?; + writer.delete_all_documents()?; + writer.commit()?; + + handle.changes_pending.store(false, Ordering::SeqCst); + + Ok(()) + }); +} + +#[no_mangle] +pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_commit( + mut env: JNIEnv, + _class: JClass, + handle: jlong, +) { + jni_exec(&mut env, |_| { + let handle = IndexHandle::get_ref_from_handle(handle); + + handle.changes_pending.store(false, Ordering::SeqCst); + + let mut writer = handle.writer.write()?; + writer.commit()?; + + Ok(()) + }); +} + +#[no_mangle] +pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_ingestDocument( + mut env: JNIEnv, + _class: JClass, + handle: jlong, + part_key_data: JByteArray, + part_key_offset: jint, + part_key_num_bytes: jint, + part_id: jint, + document_id: JString, + start_time: jlong, + end_time: jlong, + fields: JByteArray, + upsert: jboolean, +) { + jni_exec(&mut env, |env| { + let handle = IndexHandle::get_ref_from_handle(handle); + + let mut ingesting_doc = IngestingDocument::default(); + + if part_id > -1 { + ingesting_doc.doc.add_i64( + handle.schema.get_field(field_constants::PART_ID)?, + part_id.into(), + ); + } + + let document_id = env.get_rust_string(&document_id)?; + ingesting_doc.doc.add_text( + handle.schema.get_field(field_constants::DOCUMENT_ID)?, + document_id.clone(), + ); + + ingesting_doc.doc.add_i64( + handle.schema.get_field(field_constants::START_TIME)?, + start_time, + ); + + ingesting_doc.doc.add_i64( + handle.schema.get_field(field_constants::END_TIME)?, + end_time, + ); + + let bytes = env.get_byte_array_offset_len( + &part_key_data, + part_key_offset as usize, + part_key_num_bytes as usize, + )?; + + ingesting_doc + .doc + .add_bytes(handle.schema.get_field(field_constants::PART_KEY)?, bytes); + + // Add dynamic fields + let fields = env.get_byte_array(&fields)?; + add_fields(&fields, &mut ingesting_doc, &handle.schema)?; + + let doc = prepare_tantivy_doc(handle, &mut ingesting_doc)?; + + // Save it + let writer = handle.writer.read()?; + + if upsert == JNI_TRUE { + let delete_term = Term::from_field_text( + handle.schema.get_field(field_constants::DOCUMENT_ID)?, + &document_id, + ); + + let writer = handle.writer.read()?; + writer.run([UserOperation::Delete(delete_term), UserOperation::Add(doc)])?; + + handle.changes_pending.store(true, Ordering::SeqCst); + } else { + writer.add_document(doc)?; + } + + handle.changes_pending.store(true, Ordering::SeqCst); + + Ok(()) + }); +} + +fn prepare_tantivy_doc( + handle: &IndexHandle, + ingesting_doc: &mut IngestingDocument, +) -> JavaResult { + let mut map_values = std::mem::take(&mut ingesting_doc.map_values); + + // Insert map columns we've built up + for (key, value) in map_values.drain() { + ingesting_doc + .doc + .add_object(handle.schema.get_field(&key)?, value); + } + + // Build final facet for field list + let mut field_names = std::mem::take(&mut ingesting_doc.field_names); + field_names.sort(); + + for field in field_names { + add_facet( + handle, + ingesting_doc, + field_constants::LABEL_LIST, + &[field.as_str()], + )?; + } + + let doc = std::mem::take(&mut ingesting_doc.doc); + + Ok(doc) +} + +fn add_facet( + handle: &IndexHandle, + ingesting_doc: &mut IngestingDocument, + name: &str, + value: &[&str], +) -> JavaResult<()> { + if !name.is_empty() && !value.is_empty() { + ingesting_doc.doc.add_facet( + handle.schema.get_field(&facet_field_name(name))?, + Facet::from_path(value), + ); + } + + Ok(()) +} + +#[no_mangle] +pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_removePartKeys( + mut env: JNIEnv, + _class: JClass, + handle: jlong, + keys: JIntArray, +) { + jni_exec(&mut env, |env| { + let handle = IndexHandle::get_ref_from_handle(handle); + let mut terms = vec![]; + + let field = handle.schema.get_field(field_constants::PART_ID)?; + + let len = env.get_array_length(&keys)?; + let mut part_ids = vec![0i32; len as usize]; + + env.get_int_array_region(&keys, 0, &mut part_ids)?; + + for part_id in part_ids { + terms.push(Term::from_field_i64(field, part_id as i64)); + } + + let query = Box::new(TermSetQuery::new(terms)); + + let writer = handle.writer.read()?; + writer.delete_query(query)?; + + handle.changes_pending.store(true, Ordering::SeqCst); + + Ok(()) + }) +} + +#[no_mangle] +pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_removePartitionsEndedBefore( + mut env: JNIEnv, + _class: JClass, + handle: jlong, + ended_before: jlong, + return_deleted_count: jboolean, +) -> jint { + jni_exec(&mut env, |_| { + let handle = IndexHandle::get_ref_from_handle(handle); + + let query = RangeQuery::new_i64_bounds( + field_constants::END_TIME.to_string(), + Bound::Included(0), + Bound::Included(ended_before), + ); + + let java_ret = if return_deleted_count == JNI_TRUE { + let searcher = handle.reader.searcher(); + + let collector = Count; + + searcher.search(&query, &collector)? + } else { + 0 + }; + + let writer = handle.writer.read()?; + writer.delete_query(Box::new(query))?; + + handle.changes_pending.store(true, Ordering::SeqCst); + + Ok(java_ret as i32) + }) +} diff --git a/core/src/rust/src/ingestion/fields.rs b/core/src/rust/src/ingestion/fields.rs new file mode 100644 index 000000000..448ccd4c2 --- /dev/null +++ b/core/src/rust/src/ingestion/fields.rs @@ -0,0 +1,202 @@ +//! Working with field data + +use std::collections::BTreeMap; + +use nom::{Err, IResult}; +use num_derive::FromPrimitive; +use tantivy::schema::Schema; + +use crate::{ + parser::{parse_string, parse_type_id, AsNomError, ParserError, TypeParseResult}, + state::IngestingDocument, +}; + +#[derive(FromPrimitive)] +#[repr(u8)] +enum FieldTypeId { + Indexed = 1, + Map = 2, + Multicolumn = 3, +} + +pub fn add_fields<'a>( + input: &'a [u8], + doc: &mut IngestingDocument, + schema: &Schema, +) -> IResult<&'a [u8], (), ParserError> { + let mut next_input = input; + + while !next_input.is_empty() { + let (input, type_id) = parse_type_id(next_input)?; + + let (input, _) = match type_id { + TypeParseResult::Success(FieldTypeId::Indexed) => { + parse_indexed_field(input, doc, schema)? + } + TypeParseResult::Success(FieldTypeId::Map) => parse_map_field(input, doc)?, + TypeParseResult::Success(FieldTypeId::Multicolumn) => { + parse_multicolumn_field(input, doc, schema)? + } + TypeParseResult::Failure(type_id) => { + return Err(Err::Failure(ParserError::UnknownType(type_id))) + } + }; + + next_input = input; + } + + Ok((next_input, ())) +} + +fn parse_indexed_field<'a>( + input: &'a [u8], + doc: &mut IngestingDocument, + schema: &Schema, +) -> IResult<&'a [u8], (), ParserError> { + let (input, field_name) = parse_string(input)?; + let (input, value) = parse_string(input)?; + + let field = schema.get_field(&field_name).to_nom_err()?; + + doc.doc.add_text(field, value); + doc.field_names.push(field_name.to_string()); + + Ok((input, ())) +} + +fn parse_map_field<'a>( + input: &'a [u8], + doc: &mut IngestingDocument, +) -> IResult<&'a [u8], (), ParserError> { + let (input, map_name) = parse_string(input)?; + let (input, field_name) = parse_string(input)?; + let (input, value) = parse_string(input)?; + + // Create new map for this map column if needed + if !doc.map_values.contains_key(map_name.as_ref()) { + doc.map_values.insert(map_name.to_string(), BTreeMap::new()); + } + + // Capture value + doc.map_values + .get_mut(map_name.as_ref()) + .ok_or_else(|| Err::Failure(ParserError::InternalMapError))? + .insert(field_name.to_string(), value.to_string().into()); + doc.field_names.push(field_name.to_string()); + + Ok((input, ())) +} + +fn parse_multicolumn_field<'a>( + input: &'a [u8], + doc: &mut IngestingDocument, + schema: &Schema, +) -> IResult<&'a [u8], (), ParserError> { + let (input, field_name) = parse_string(input)?; + let (input, value) = parse_string(input)?; + + let field = schema.get_field(&field_name).to_nom_err()?; + + doc.doc.add_text(field, value); + doc.field_names.push(field_name.to_string()); + + Ok((input, ())) +} + +#[cfg(test)] +mod tests { + use bytes::BufMut; + use tantivy::{schema::OwnedValue, Document}; + + use crate::test_utils::{build_test_schema, COL1_NAME, JSON_ATTRIBUTE1_NAME, JSON_COL_NAME}; + + use super::*; + + #[test] + fn test_parse_indexed_field() { + let mut doc = IngestingDocument::default(); + let index = build_test_schema(); + + let mut buf = vec![]; + + let expected = "abcd"; + + buf.put_u16_le(COL1_NAME.len() as u16); + buf.put_slice(COL1_NAME.as_bytes()); + buf.put_u16_le(expected.len() as u16); + buf.put_slice(expected.as_bytes()); + + let _ = parse_indexed_field(&buf, &mut doc, &index.schema).expect("Should succeed"); + + assert!(doc.field_names.contains(&COL1_NAME.to_string())); + assert_eq!( + **doc + .doc + .get_sorted_field_values() + .first() + .unwrap() + .1 + .first() + .unwrap(), + OwnedValue::Str(expected.into()) + ); + } + + #[test] + fn test_parse_map_field() { + let mut doc = IngestingDocument::default(); + + let mut buf = vec![]; + + let expected = "abcd"; + + buf.put_u16_le(JSON_COL_NAME.len() as u16); + buf.put_slice(JSON_COL_NAME.as_bytes()); + buf.put_u16_le(JSON_ATTRIBUTE1_NAME.len() as u16); + buf.put_slice(JSON_ATTRIBUTE1_NAME.as_bytes()); + buf.put_u16_le(expected.len() as u16); + buf.put_slice(expected.as_bytes()); + + let _ = parse_map_field(&buf, &mut doc).expect("Should succeed"); + + assert!(doc.field_names.contains(&JSON_ATTRIBUTE1_NAME.to_string())); + assert_eq!( + *doc.map_values + .get(JSON_COL_NAME) + .unwrap() + .get(JSON_ATTRIBUTE1_NAME) + .unwrap(), + OwnedValue::Str(expected.into()) + ); + } + + #[test] + fn test_parse_multicolumn_field() { + let mut doc = IngestingDocument::default(); + let index = build_test_schema(); + + let mut buf = vec![]; + + let expected = "abcd"; + + buf.put_u16_le(COL1_NAME.len() as u16); + buf.put_slice(COL1_NAME.as_bytes()); + buf.put_u16_le(expected.len() as u16); + buf.put_slice(expected.as_bytes()); + + let _ = parse_multicolumn_field(&buf, &mut doc, &index.schema).expect("Should succeed"); + + assert!(doc.field_names.contains(&COL1_NAME.to_string())); + assert_eq!( + **doc + .doc + .get_sorted_field_values() + .first() + .unwrap() + .1 + .first() + .unwrap(), + OwnedValue::Str(expected.into()) + ); + } +} diff --git a/core/src/rust/src/jnienv.rs b/core/src/rust/src/jnienv.rs index c014a45fa..3adff82c8 100644 --- a/core/src/rust/src/jnienv.rs +++ b/core/src/rust/src/jnienv.rs @@ -1,7 +1,7 @@ //! Extensions to JNIEnv use jni::{ - objects::{JObject, JObjectArray, JString}, + objects::{JByteArray, JObject, JObjectArray, JString}, JNIEnv, }; @@ -21,6 +21,17 @@ pub trait JNIEnvExt<'a> { fn foreach_string_in_array(&mut self, array: &JObjectArray, func: F) -> JavaResult<()> where F: FnMut(String) -> JavaResult<()>; + + /// Get a byte array from the JVM + fn get_byte_array_offset_len( + &mut self, + array: &JByteArray, + offset: usize, + len: usize, + ) -> JavaResult>; + + /// Get a byte array from the JVM + fn get_byte_array(&mut self, array: &JByteArray) -> JavaResult>; } impl<'a> JNIEnvExt<'a> for JNIEnv<'a> { @@ -53,4 +64,25 @@ impl<'a> JNIEnvExt<'a> for JNIEnv<'a> { Ok(()) } + + fn get_byte_array_offset_len( + &mut self, + array: &JByteArray, + offset: usize, + len: usize, + ) -> JavaResult> { + let mut bytes = vec![0u8; len]; + let bytes_ptr = bytes.as_mut_ptr() as *mut i8; + let bytes_ptr = unsafe { std::slice::from_raw_parts_mut(bytes_ptr, len) }; + + self.get_byte_array_region(array, offset as i32, bytes_ptr)?; + + Ok(bytes) + } + + fn get_byte_array(&mut self, array: &JByteArray) -> JavaResult> { + let len = self.get_array_length(array)?; + + self.get_byte_array_offset_len(array, 0, len as usize) + } } diff --git a/core/src/rust/src/lib.rs b/core/src/rust/src/lib.rs index 483adaf29..775fcc8ae 100644 --- a/core/src/rust/src/lib.rs +++ b/core/src/rust/src/lib.rs @@ -21,5 +21,10 @@ mod errors; mod exec; mod index; +mod ingestion; mod jnienv; +mod parser; mod state; + +#[cfg(test)] +mod test_utils; diff --git a/core/src/rust/src/parser.rs b/core/src/rust/src/parser.rs new file mode 100644 index 000000000..32b3979dc --- /dev/null +++ b/core/src/rust/src/parser.rs @@ -0,0 +1,144 @@ +//! Binary parser helpers + +use std::borrow::Cow; + +use nom::{ + bytes::streaming::take, + error::{ErrorKind, ParseError}, + number::streaming::{le_u16, u8}, + IResult, +}; +use num_traits::FromPrimitive; +use tantivy::TantivyError; +use thiserror::Error; + +/// Error type for query parsing issues +#[derive(Error, Debug)] +pub enum ParserError { + #[error("Core parsing error: {0:?}")] + Nom(ErrorKind), + #[error("Index error: {0}")] + IndexError(#[from] TantivyError), + #[error("Unknown type byte: {0}")] + UnknownType(u8), + #[error("Internal map error")] + InternalMapError, +} + +pub trait AsNomError { + fn to_nom_err(self) -> Result>; +} + +impl AsNomError for Result { + fn to_nom_err(self) -> Result> { + match self { + Err(e) => Err(nom::Err::Failure(e.into())), + Ok(x) => Ok(x), + } + } +} + +impl<'a> ParseError<&'a [u8]> for ParserError { + fn from_error_kind(_input: &'a [u8], kind: ErrorKind) -> Self { + ParserError::Nom(kind) + } + + fn append(_input: &'a [u8], _kind: ErrorKind, other: Self) -> Self { + other + } +} + +pub fn parse_string(input: &[u8]) -> IResult<&[u8], Cow<'_, str>, ParserError> { + let (input, length) = le_u16(input)?; + let (input, string_data) = take(length)(input)?; + + Ok((input, String::from_utf8_lossy(string_data))) +} + +#[derive(PartialEq, Debug)] +pub enum TypeParseResult { + Success(T), + Failure(u8), +} + +impl From for TypeParseResult +where + T: FromPrimitive, +{ + fn from(value: u8) -> Self { + match T::from_u8(value) { + Some(val) => Self::Success(val), + None => Self::Failure(value), + } + } +} + +pub fn parse_type_id(input: &[u8]) -> IResult<&[u8], TypeParseResult, ParserError> +where + T: FromPrimitive, +{ + let (input, type_id) = u8(input)?; + Ok((input, type_id.into())) +} + +#[cfg(test)] +mod tests { + use bytes::BufMut; + use num_derive::FromPrimitive; + + use super::*; + + #[test] + fn test_parse_string() { + let mut buf = vec![]; + + let expected = "abcd"; + + buf.put_u16_le(expected.len() as u16); + buf.put_slice(expected.as_bytes()); + + let (_, result) = parse_string(&buf).expect("Should succeed"); + + assert_eq!(result, expected); + } + + #[test] + fn test_parse_empty_string() { + let mut buf = vec![]; + + buf.put_u16_le(0); + + let (_, result) = parse_string(&buf).expect("Should succeed"); + + assert_eq!(result, ""); + } + + #[derive(FromPrimitive, Debug, PartialEq)] + #[repr(u8)] + pub enum TestTypeId { + Val1 = 1, + Val2 = 2, + } + + #[test] + fn test_parse_type_id() { + let mut buf = vec![]; + + buf.put_u8(1); + + let (_, result) = parse_type_id(&buf).expect("Should succeed"); + + assert_eq!(result, TypeParseResult::Success(TestTypeId::Val1)); + } + + #[test] + fn test_parse_type_id_invalid() { + let mut buf = vec![]; + + buf.put_u8(3); + + let (_, result) = parse_type_id::(&buf).expect("Should succeed"); + + assert_eq!(result, TypeParseResult::Failure(3)); + } +} diff --git a/core/src/rust/src/state.rs b/core/src/rust/src/state.rs index 29eed08c4..121bc35e1 100644 --- a/core/src/rust/src/state.rs +++ b/core/src/rust/src/state.rs @@ -5,7 +5,7 @@ use std::{ collections::{BTreeMap, HashMap}, - sync::{atomic::AtomicBool, Mutex, RwLock}, + sync::{atomic::AtomicBool, RwLock}, }; use jni::sys::jlong; @@ -15,24 +15,23 @@ use tantivy::{ }; pub struct IndexHandle { - // Immutable fields that don't need synchronization + // Fields that don't need explicit synchronization // // // Schema for this nidex pub schema: Schema, // Default field for JSON searches pub default_field: Option, + // Active reader + pub reader: IndexReader, + // Are there changes pending to commit + pub changes_pending: AtomicBool, // Fields that need synchronization // // - pub changes_pending: AtomicBool, // Active writer pub writer: RwLock, - // Active reader - pub reader: Mutex, - // Actively ingesting doc - pub ingesting_doc: Mutex, } impl IndexHandle { @@ -46,9 +45,8 @@ impl IndexHandle { schema, default_field, writer: RwLock::new(writer), - reader: Mutex::new(reader), + reader, changes_pending: AtomicBool::new(false), - ingesting_doc: Mutex::new(IngestingDocument::default()), }); Box::into_raw(obj) as jlong @@ -73,8 +71,6 @@ pub struct IngestingDocument { pub doc: TantivyDocument, } -const PART_KEY_INDEX_RAW_CLASS: &str = "filodb/core/memstore/PartKeyIndexRaw"; - pub mod field_constants { pub fn facet_field_name(name: &str) -> String { format!("{}{}", FACET_FIELD_PREFIX, name) diff --git a/core/src/rust/src/test_utils.rs b/core/src/rust/src/test_utils.rs new file mode 100644 index 000000000..a019f8674 --- /dev/null +++ b/core/src/rust/src/test_utils.rs @@ -0,0 +1,83 @@ +//! Utilites for testing + +use tantivy::{ + schema::{JsonObjectOptions, Schema, SchemaBuilder, TextFieldIndexing, FAST, INDEXED, STRING}, + Index, TantivyDocument, +}; + +use crate::state::field_constants; +pub const COL1_NAME: &str = "col1"; +pub const COL2_NAME: &str = "col2"; +pub const JSON_COL_NAME: &str = "json_col"; +pub const JSON_ATTRIBUTE1_NAME: &str = "f1"; + +pub struct TestIndex { + pub schema: Schema, +} + +pub fn build_test_schema() -> TestIndex { + let mut builder = SchemaBuilder::new(); + + builder.add_text_field(COL1_NAME, STRING | FAST); + builder.add_text_field(COL2_NAME, STRING | FAST); + builder.add_i64_field(field_constants::PART_ID, INDEXED | FAST); + builder.add_i64_field(field_constants::START_TIME, INDEXED | FAST); + builder.add_i64_field(field_constants::END_TIME, INDEXED | FAST); + builder.add_bytes_field(field_constants::PART_KEY, INDEXED | FAST); + builder.add_json_field( + JSON_COL_NAME, + JsonObjectOptions::default() + .set_indexing_options(TextFieldIndexing::default().set_tokenizer("raw")) + .set_fast(Some("raw")), + ); + + let schema = builder.build(); + + let index = Index::create_in_ram(schema.clone()); + + { + let mut writer = index.writer::(50_000_000).unwrap(); + + let doc = TantivyDocument::parse_json( + &schema, + r#"{ + "col1": "ABC", + "col2": "def", + "__partIdDv__": 1, + "__startTime__": 1234, + "__endTime__": 1235, + "__partKey__": "QUE=", + "json_col": { + "f1": "value", + "f2": "value2" + } + }"#, + ) + .unwrap(); + + writer.add_document(doc).unwrap(); + + let doc = TantivyDocument::parse_json( + &schema, + r#"{ + "col1": "DEF", + "col2": "abc", + "__partIdDv__": 10, + "__startTime__": 4321, + "__endTime__": 10000, + "__partKey__": "QkI=", + "json_col": { + "f1": "othervalue", + "f2": "othervalue2" + } + }"#, + ) + .unwrap(); + + writer.add_document(doc).unwrap(); + + writer.commit().unwrap(); + } + + TestIndex { schema } +} From bf797b5f20d82d5c59a39dffab7a2644f1b65822 Mon Sep 17 00:00:00 2001 From: Ryan Fairfax Date: Thu, 1 Aug 2024 13:09:51 -0700 Subject: [PATCH 2/5] Fix clippy config file --- core/src/rust/{Clippy.toml => clippy.toml} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename core/src/rust/{Clippy.toml => clippy.toml} (100%) diff --git a/core/src/rust/Clippy.toml b/core/src/rust/clippy.toml similarity index 100% rename from core/src/rust/Clippy.toml rename to core/src/rust/clippy.toml From 572ac5339c4dcafe7293fd38bc452b79614966cf Mon Sep 17 00:00:00 2001 From: Ryan Fairfax Date: Fri, 2 Aug 2024 14:22:07 -0700 Subject: [PATCH 3/5] Add _type_ support --- .../scala/filodb.core/memstore/PartKeyTantivyIndex.scala | 6 +++++- core/src/rust/src/index.rs | 1 + core/src/rust/src/state.rs | 1 + 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala index 2ee9bdf80..624762c2e 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala @@ -12,9 +12,10 @@ import org.apache.lucene.util.BytesRef import spire.implicits.cforRange import filodb.core.DatasetRef +import filodb.core.binaryrecord2.RecordSchema import filodb.core.memstore.PartKeyIndexRaw.bytesRefToUnsafeOffset +import filodb.core.metadata.{PartitionSchema, Schemas} import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn} -import filodb.core.metadata.PartitionSchema import filodb.core.query.ColumnFilter import filodb.memory.format.UnsafeUtils @@ -239,6 +240,9 @@ class PartKeyTantivyIndex(ref: DatasetRef, // If configured and enabled, Multi-column facets will be created on "partition-schema" columns createMultiColumnFacets(partKeyOnHeapBytes, partKeyBytesRefOffset) + val schemaName = Schemas.global.schemaName(RecordSchema.schemaID(partKeyOnHeapBytes, UnsafeUtils.arayOffset)) + addIndexedField(Schemas.TypeLabel, schemaName) + cforRange { 0 until numPartColumns } { i => diff --git a/core/src/rust/src/index.rs b/core/src/rust/src/index.rs index 72122f062..6254923ac 100644 --- a/core/src/rust/src/index.rs +++ b/core/src/rust/src/index.rs @@ -102,6 +102,7 @@ fn build_schema( builder.add_bytes_field(field_constants::PART_KEY, byte_options); builder.add_i64_field(field_constants::START_TIME, numeric_options.clone()); builder.add_i64_field(field_constants::END_TIME, numeric_options.clone()); + builder.add_text_field(field_constants::TYPE, text_options.clone()); // Fields from input schema env.foreach_string_in_array(schema_fields, |name| { diff --git a/core/src/rust/src/state.rs b/core/src/rust/src/state.rs index 121bc35e1..a0ca1b530 100644 --- a/core/src/rust/src/state.rs +++ b/core/src/rust/src/state.rs @@ -86,4 +86,5 @@ pub mod field_constants { pub const FACET_FIELD_PREFIX: &str = "$facet_"; pub const START_TIME: &str = "__startTime__"; pub const END_TIME: &str = "__endTime__"; + pub const TYPE: &str = "_type_"; } From 2b90c68398eea850a3349d550992db39fc0c5aba Mon Sep 17 00:00:00 2001 From: Ryan Fairfax Date: Tue, 13 Aug 2024 14:53:08 -0700 Subject: [PATCH 4/5] Address feedback --- .../memstore/PartKeyTantivyIndex.scala | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala index 624762c2e..fb2e5a28e 100644 --- a/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala +++ b/core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala @@ -200,8 +200,8 @@ class PartKeyTantivyIndex(ref: DatasetRef, // 1 - indexed field buffer += 1 - TantivyQueryBuilder.writeStringToBuffer(key, buffer) - TantivyQueryBuilder.writeStringToBuffer(value, buffer) + ByteBufferEncodingUtils.writeStringToBuffer(key, buffer) + ByteBufferEncodingUtils.writeStringToBuffer(value, buffer) } protected def addIndexedMapField(mapColumn: String, key: String, value: String): Unit = { @@ -209,9 +209,9 @@ class PartKeyTantivyIndex(ref: DatasetRef, // 2 - map field buffer += 2 - TantivyQueryBuilder.writeStringToBuffer(mapColumn, buffer) - TantivyQueryBuilder.writeStringToBuffer(key, buffer) - TantivyQueryBuilder.writeStringToBuffer(value, buffer) + ByteBufferEncodingUtils.writeStringToBuffer(mapColumn, buffer) + ByteBufferEncodingUtils.writeStringToBuffer(key, buffer) + ByteBufferEncodingUtils.writeStringToBuffer(value, buffer) } protected override def addMultiColumnFacet(key: String, value: String): Unit = { @@ -219,10 +219,17 @@ class PartKeyTantivyIndex(ref: DatasetRef, // 3 - mc field buffer += 3 - TantivyQueryBuilder.writeStringToBuffer(key, buffer) - TantivyQueryBuilder.writeStringToBuffer(value, buffer) + ByteBufferEncodingUtils.writeStringToBuffer(key, buffer) + ByteBufferEncodingUtils.writeStringToBuffer(value, buffer) } + // Ideally this would be a map of field -> value or something similar. + // However, passing a Map to the Rust code generates a much more expensive + // back and forth between JVM code and Rust code to get data. + // + // To solve this efficiency problem we pack into a byte buffer with a simple + // serialization format that the Rust side can decode quickly without JVM + // callbacks. private val docBufferLocal = new ThreadLocal[ArrayBuffer[Byte]]() { override def initialValue(): ArrayBuffer[Byte] = new ArrayBuffer[Byte](4096) } @@ -254,7 +261,7 @@ class PartKeyTantivyIndex(ref: DatasetRef, } } -object TantivyQueryBuilder { +object ByteBufferEncodingUtils { def writeStringToBuffer(s: String, buffer: ArrayBuffer[Byte]): Unit = { val bytes = s.getBytes writeLengthToBuffer(bytes.length, buffer) From 4cc260635b0d3e91a4ab0cfc2579fd571f38194e Mon Sep 17 00:00:00 2001 From: Ryan Fairfax Date: Tue, 13 Aug 2024 15:06:22 -0700 Subject: [PATCH 5/5] Address feedback --- core/src/rust/src/ingestion.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/rust/src/ingestion.rs b/core/src/rust/src/ingestion.rs index 2c1dd3548..67eefcd0a 100644 --- a/core/src/rust/src/ingestion.rs +++ b/core/src/rust/src/ingestion.rs @@ -243,6 +243,8 @@ pub extern "system" fn Java_filodb_core_memstore_TantivyNativeMethods_00024_remo let query = RangeQuery::new_i64_bounds( field_constants::END_TIME.to_string(), Bound::Included(0), + // To match existing Lucene index behavior, make this inclusive even though it's named + // "ended before" in the API Bound::Included(ended_before), );