Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): Add ingestion support for Tantivy #1827

Merged
merged 5 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 143 additions & 10 deletions core/src/main/scala/filodb.core/memstore/PartKeyTantivyIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@ package filodb.core.memstore

import java.io.File
import java.nio.file.Files
import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}

import scala.collection.mutable.ArrayBuffer

import debox.Buffer
import org.apache.commons.lang3.SystemUtils
import org.apache.lucene.util.BytesRef
import spire.implicits.cforRange

import filodb.core.DatasetRef
import filodb.core.binaryrecord2.RecordSchema
import filodb.core.memstore.PartKeyIndexRaw.bytesRefToUnsafeOffset
import filodb.core.metadata.{PartitionSchema, Schemas}
import filodb.core.metadata.Column.ColumnType.{MapColumn, StringColumn}
import filodb.core.metadata.PartitionSchema
import filodb.core.query.ColumnFilter
import filodb.memory.format.UnsafeUtils

class PartKeyTantivyIndex(ref: DatasetRef,
schema: PartitionSchema,
Expand Down Expand Up @@ -45,20 +52,33 @@ class PartKeyTantivyIndex(ref: DatasetRef,
TantivyNativeMethods.reset(indexHandle)
}

private var flushThreadPool: ScheduledThreadPoolExecutor = _

override def startFlushThread(flushDelayMinSeconds: Int, flushDelayMaxSeconds: Int): Unit = {
???
if (flushThreadPool != UnsafeUtils.ZeroPointer) {
// Already running
logger.warn("startFlushThread called when already running, ignoring")
return
}

flushThreadPool = new ScheduledThreadPoolExecutor(1)

flushThreadPool.scheduleAtFixedRate(() => refreshReadersBlocking(), flushDelayMinSeconds,
flushDelayMinSeconds, TimeUnit.SECONDS)
}

override def partIdsEndedBefore(endedBefore: Long): Buffer[Int] = {
???
}

override def removePartitionsEndedBefore(endedBefore: Long, returnApproxDeletedCount: Boolean): Int = {
???
TantivyNativeMethods.removePartitionsEndedBefore(indexHandle, endedBefore, returnApproxDeletedCount)
}

override def removePartKeys(partIds: Buffer[Int]): Unit = {
???
if (!partIds.isEmpty) {
TantivyNativeMethods.removePartKeys(indexHandle, partIds.toArray)
}
}

override def indexRamBytes: Long = {
Expand All @@ -72,6 +92,11 @@ class PartKeyTantivyIndex(ref: DatasetRef,
override def closeIndex(): Unit = {
logger.info(s"Closing index on dataset=$ref shard=$shardNum")

if (flushThreadPool != UnsafeUtils.ZeroPointer) {
flushThreadPool.shutdown()
flushThreadPool.awaitTermination(60, TimeUnit.SECONDS)
}

commit()
TantivyNativeMethods.freeIndexHandle(indexHandle)
indexHandle = 0
Expand All @@ -96,18 +121,26 @@ class PartKeyTantivyIndex(ref: DatasetRef,

override def addPartKey(partKeyOnHeapBytes: Array[Byte], partId: Int, startTime: Long, endTime: Long,
partKeyBytesRefOffset: Int)(partKeyNumBytes: Int, documentId: String): Unit = {
???
logger.debug(s"Adding document ${partKeyString(documentId, partKeyOnHeapBytes, partKeyBytesRefOffset)} " +
s"with startTime=$startTime endTime=$endTime into dataset=$ref shard=$shardNum")
makeDocument(partKeyOnHeapBytes, partKeyBytesRefOffset, partKeyNumBytes, partId, documentId, startTime, endTime,
upsert = false)
}

override def upsertPartKey(partKeyOnHeapBytes: Array[Byte], partId: Int, startTime: Long, endTime: Long,
partKeyBytesRefOffset: Int)(partKeyNumBytes: Int, documentId: String): Unit = {
???
logger.debug(s"Upserting document ${partKeyString(documentId, partKeyOnHeapBytes, partKeyBytesRefOffset)} " +
s"with startTime=$startTime endTime=$endTime into dataset=$ref shard=$shardNum")
makeDocument(partKeyOnHeapBytes, partKeyBytesRefOffset, partKeyNumBytes, partId, documentId, startTime, endTime,
upsert = true)
}

override def partKeyFromPartId(partId: Int): Option[BytesRef] = {
???
}

private val NOT_FOUND = -1

override def startTimeFromPartId(partId: Int): Long = {
???
}
Expand All @@ -126,7 +159,17 @@ class PartKeyTantivyIndex(ref: DatasetRef,

override def updatePartKeyWithEndTime(partKeyOnHeapBytes: Array[Byte], partId: Int, endTime: Long,
partKeyBytesRefOffset: Int)(partKeyNumBytes: Int, documentId: String): Unit = {
???
var startTime = startTimeFromPartId(partId) // look up index for old start time
if (startTime == NOT_FOUND) {
startTime = System.currentTimeMillis() - retentionMillis
logger.warn(s"Could not find in Lucene startTime for partId=$partId in dataset=$ref. Using " +
s"$startTime instead.", new IllegalStateException()) // assume this time series started retention period ago
}
logger.debug(s"Updating document ${partKeyString(documentId, partKeyOnHeapBytes, partKeyBytesRefOffset)} " +
s"with startTime=$startTime endTime=$endTime into dataset=$ref shard=$shardNum")

makeDocument(partKeyOnHeapBytes, partKeyBytesRefOffset, partKeyNumBytes,
partId, documentId, startTime, endTime, upsert = true)
}

override def refreshReadersBlocking(): Unit = {
Expand All @@ -153,19 +196,88 @@ class PartKeyTantivyIndex(ref: DatasetRef,
}

override protected def addIndexedField(key: String, value: String): Unit = {
???
val buffer = docBufferLocal.get()

// 1 - indexed field
buffer += 1
ByteBufferEncodingUtils.writeStringToBuffer(key, buffer)
ByteBufferEncodingUtils.writeStringToBuffer(value, buffer)
}

protected def addIndexedMapField(mapColumn: String, key: String, value: String): Unit = {
???
val buffer = docBufferLocal.get()

// 2 - map field
buffer += 2
ByteBufferEncodingUtils.writeStringToBuffer(mapColumn, buffer)
ByteBufferEncodingUtils.writeStringToBuffer(key, buffer)
ByteBufferEncodingUtils.writeStringToBuffer(value, buffer)
}

protected override def addMultiColumnFacet(key: String, value: String): Unit = {
???
val buffer = docBufferLocal.get()

// 3 - mc field
buffer += 3
ByteBufferEncodingUtils.writeStringToBuffer(key, buffer)
ByteBufferEncodingUtils.writeStringToBuffer(value, buffer)
}

// Ideally this would be a map of field -> value or something similar.
// However, passing a Map to the Rust code generates a much more expensive
// back and forth between JVM code and Rust code to get data.
//
// To solve this efficiency problem we pack into a byte buffer with a simple
// serialization format that the Rust side can decode quickly without JVM
// callbacks.
private val docBufferLocal = new ThreadLocal[ArrayBuffer[Byte]]() {
rfairfax marked this conversation as resolved.
Show resolved Hide resolved
override def initialValue(): ArrayBuffer[Byte] = new ArrayBuffer[Byte](4096)
}

private def makeDocument(partKeyOnHeapBytes: Array[Byte],
partKeyBytesRefOffset: Int,
partKeyNumBytes: Int,
partId: Int,
documentId: String,
startTime: Long,
endTime: Long,
upsert: Boolean): Unit = {
docBufferLocal.get().clear()

// If configured and enabled, Multi-column facets will be created on "partition-schema" columns
createMultiColumnFacets(partKeyOnHeapBytes, partKeyBytesRefOffset)

val schemaName = Schemas.global.schemaName(RecordSchema.schemaID(partKeyOnHeapBytes, UnsafeUtils.arayOffset))
addIndexedField(Schemas.TypeLabel, schemaName)

cforRange {
0 until numPartColumns
} { i =>
indexers(i).fromPartKey(partKeyOnHeapBytes, bytesRefToUnsafeOffset(partKeyBytesRefOffset), partId)
}

TantivyNativeMethods.ingestDocument(indexHandle, partKeyOnHeapBytes, partKeyBytesRefOffset, partKeyNumBytes,
partId, documentId, startTime, endTime, docBufferLocal.get().toArray, upsert)
}
}

object ByteBufferEncodingUtils {
def writeStringToBuffer(s: String, buffer: ArrayBuffer[Byte]): Unit = {
val bytes = s.getBytes
writeLengthToBuffer(bytes.length, buffer)
buffer ++= bytes
}

private def writeLengthToBuffer(len: Int, buffer: ArrayBuffer[Byte]): Unit = {
buffer += len.toByte
buffer += (len >> 8).toByte
}
}

// JNI methods
// Thread safety -
// * Index handle creation / cleanup is not thread safe.
// * Other operations are thread safe and may involve an internal mutex
protected object TantivyNativeMethods {
// Load native library from jar
private def loadLibrary(): Unit = {
Expand Down Expand Up @@ -203,11 +315,32 @@ protected object TantivyNativeMethods {
@native
def freeIndexHandle(handle: Long): Unit

// Force refresh any readers to be up to date (primarily used by tests)
@native
def refreshReaders(handle: Long): Unit

// Reset index data (delete all docs)
@native
def reset(handle: Long): Unit

// Commit changes to the index
@native
def commit(handle: Long): Unit

// Ingest a new document
// scalastyle:off parameter.number
@native
def ingestDocument(handle: Long, partKeyData: Array[Byte], partKeyOffset: Int,
partKeyNumBytes: Int, partId: Int, documentId: String,
startTime: Long, endTime: Long, fields: Array[Byte],
upsert: Boolean): Unit
// scalastyle:on parameter.number

// Remove docs with given part keys
@native
def removePartKeys(handle: Long, keys: Array[Int]): Unit

// Remove partition IDs and return approximate deleted count
@native
def removePartitionsEndedBefore(handle: Long, endedBefore: Long, returnApproxDeletedCount: Boolean): Int
}
Loading
Loading