Skip to content

Commit

Permalink
Optimize bulk operations within EntityCache #1133
Browse files Browse the repository at this point in the history
* Compilation fixed after merging master
* Make `EntityCache.data` public as it is widely used to re-init EntityCache
* Selectively remove referrers and references
* Restrict access to cache internals
* Rotate referrer cache to create more O(1) ops
* Optimize bulk updates Joel Feinstein
* Ignore jetbrains.db Joel Feinstein
* Optimize inserts using identity hash set
  • Loading branch information
jnfeinstein authored and Tapac committed Aug 4, 2021
1 parent 6fd6d98 commit ad89245
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 87 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@
out/
build/
classes/
**/jetbrains.db
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,20 @@ open class Entity<ID : Comparable<ID>>(val id: EntityID<ID>) {
klass.invalidateEntityInCache(o)
val currentValue = _readValues?.getOrNull(this)
if (writeValues.containsKey(this as Column<out Any?>) || currentValue != value) {
val entityCache = TransactionManager.current().entityCache
if (referee != null) {
val entityCache = TransactionManager.current().entityCache
if (value is EntityID<*> && value.table == referee!!.table) value.value // flush

listOfNotNull<Any>(value, currentValue).forEach {
entityCache.referrers[it]?.remove(this)
entityCache.referrers[this]?.remove(it)
}
entityCache.removeTablesReferrers(listOf(referee!!.table))
}
writeValues[this as Column<Any?>] = value
// TODO: Can this be simplified?
if (o.id._value?.let { entityCache.data[table]?.contains(it) } == true) {
entityCache.scheduleUpdate(klass, o)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,34 @@ package org.jetbrains.exposed.dao

import org.jetbrains.exposed.dao.id.EntityID
import org.jetbrains.exposed.dao.id.IdTable
import org.jetbrains.exposed.sql.Column
import org.jetbrains.exposed.sql.LazySizedCollection
import org.jetbrains.exposed.sql.SchemaUtils
import org.jetbrains.exposed.sql.SizedIterable
import org.jetbrains.exposed.sql.Table
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.batchInsert
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.transactions.transactionScope
import java.util.*
import kotlin.collections.HashMap

val Transaction.entityCache: EntityCache by transactionScope { EntityCache(this) }

@Suppress("UNCHECKED_CAST")
class EntityCache(private val transaction: Transaction) {
private var flushingEntities = false
val data = LinkedHashMap<IdTable<*>, MutableMap<Any, Entity<*>>>()
val inserts = LinkedHashMap<IdTable<*>, MutableList<Entity<*>>>()
val referrers = HashMap<EntityID<*>, MutableMap<Column<*>, SizedIterable<*>>>()
internal val inserts = LinkedHashMap<IdTable<*>, MutableSet<Entity<*>>>()
private val updates = LinkedHashMap<IdTable<*>, MutableSet<Entity<*>>>()
internal val referrers = HashMap<Column<*>, MutableMap<EntityID<*>, SizedIterable<*>>>()

private fun getMap(f: EntityClass<*, *>): MutableMap<Any, Entity<*>> = getMap(f.table)

private fun getMap(table: IdTable<*>): MutableMap<Any, Entity<*>> = data.getOrPut(table) {
LinkedHashMap()
}

fun <ID : Any, R : Entity<ID>> getOrPutReferrers(sourceId: EntityID<*>, key: Column<*>, refs: () -> SizedIterable<R>): SizedIterable<R> =
referrers.getOrPut(sourceId) { HashMap() }.getOrPut(key) { LazySizedCollection(refs()) } as SizedIterable<R>
fun <R : Entity<*>> getReferrers(sourceId: EntityID<*>, key: Column<*>): SizedIterable<R>? {
return referrers[key]?.get(sourceId) as? SizedIterable<R>
}

fun <ID : Any, R : Entity<ID>> getOrPutReferrers(sourceId: EntityID<*>, key: Column<*>, refs: () -> SizedIterable<R>): SizedIterable<R> {
return referrers.getOrPut(key) { HashMap() }.getOrPut(sourceId) { LazySizedCollection(refs()) } as SizedIterable<R>
}

fun <ID : Comparable<ID>, T : Entity<ID>> find(f: EntityClass<ID, T>, id: EntityID<ID>): T? =
getMap(f)[id.value] as T? ?: inserts[f.table]?.firstOrNull { it.id == id } as? T
Expand All @@ -47,33 +49,33 @@ class EntityCache(private val transaction: Transaction) {
}

fun <ID : Comparable<ID>, T : Entity<ID>> scheduleInsert(f: EntityClass<ID, T>, o: T) {
inserts.getOrPut(f.table) { arrayListOf() }.add(o as Entity<*>)
inserts.getOrPut(f.table) { LinkedIdentityHashSet() }.add(o as Entity<*>)
}

fun <ID : Comparable<ID>, T : Entity<ID>> scheduleUpdate(f: EntityClass<ID, T>, o: T) {
updates.getOrPut(f.table) { LinkedIdentityHashSet() }.add(o as Entity<*>)
}

fun flush() {
flush(inserts.keys + data.keys)
flush(inserts.keys + updates.keys)
}

private fun updateEntities(idTable: IdTable<*>) {
data[idTable]?.let { map ->
if (map.isNotEmpty()) {
val updatedEntities = HashSet<Entity<*>>()
val batch = EntityBatchUpdate(map.values.first().klass)
for ((_, entity) in map) {
if (entity.flush(batch)) {
check(entity.klass !is ImmutableEntityClass<*, *>) {
"Update on immutable entity ${entity.javaClass.simpleName} ${entity.id}"
}
updatedEntities.add(entity)
}
}
executeAsPartOfEntityLifecycle {
batch.execute(transaction)
}
updatedEntities.forEach {
transaction.registerChange(it.klass, it.id, EntityChangeType.Updated)
updates.remove(idTable)?.takeIf { it.isNotEmpty() }?.let {
val updatedEntities = HashSet<Entity<*>>()
val batch = EntityBatchUpdate(it.first().klass)
for (entity in it) {
if (entity.flush(batch)) {
check(entity.klass !is ImmutableEntityClass<*, *>) { "Update on immutable entity ${entity.javaClass.simpleName} ${entity.id}" }
updatedEntities.add(entity)
}
}
executeAsPartOfEntityLifecycle {
batch.execute(transaction)
}
updatedEntities.forEach {
transaction.registerChange(it.klass, it.id, EntityChangeType.Updated)
}
}
}

Expand Down Expand Up @@ -117,7 +119,7 @@ class EntityCache(private val transaction: Transaction) {

internal fun flushInserts(table: IdTable<*>) {
inserts.remove(table)?.let {
var toFlush: List<Entity<*>> = it
var toFlush: List<Entity<*>> = it.toList()
do {
val partition = toFlush.partition {
it.writeValues.none {
Expand Down Expand Up @@ -154,6 +156,14 @@ class EntityCache(private val transaction: Transaction) {
transaction.alertSubscribers()
}

fun clear(flush: Boolean = true) {
if (flush) flush()
data.clear()
inserts.clear()
updates.clear()
clearReferrersCache()
}

fun clearReferrersCache() {
referrers.clear()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,15 @@ abstract class EntityClass<ID : Comparable<ID>, out T : Entity<ID>>(val table: I
fun removeFromCache(entity: Entity<ID>) {
val cache = warmCache()
cache.remove(table, entity)
cache.referrers.remove(entity.id)
cache.removeTablesReferrers(listOf(table))
cache.referrers.forEach { (col, referrers) ->
// Remove references from entity to other entities
referrers.remove(entity.id)

// Remove references from other entities to this entity
if (col.table == table) {
with(entity) { col.lookup() }?.let { referrers.remove(it as EntityID<*>) }
}
}
}

open fun forEntityIds(ids: List<EntityID<ID>>): SizedIterable<T> {
Expand Down Expand Up @@ -337,7 +344,7 @@ abstract class EntityClass<ID : Comparable<ID>, out T : Entity<ID>>(val table: I
refColumn as Column<EntityID<*>>
distinctRefIds as List<EntityID<ID>>
val toLoad = distinctRefIds.filter {
cache.referrers[it]?.containsKey(refColumn)?.not() ?: true
cache.referrers[refColumn]?.containsKey(it)?.not() ?: true
}
if (toLoad.isNotEmpty()) {
val findQuery = find { refColumn inList toLoad }
Expand All @@ -350,11 +357,11 @@ abstract class EntityClass<ID : Comparable<ID>, out T : Entity<ID>>(val table: I
val result = entities.groupBy { it.readValues[refColumn] }

distinctRefIds.forEach { id ->
cache.getOrPutReferrers(id, refColumn) { result[id]?.let { SizedCollection(it) } ?: emptySized<T>() }
cache.getOrPutReferrers(id, refColumn) { result[id]?.let { SizedCollection(it) } ?: emptySized() }
}
}

return distinctRefIds.flatMap { cache.referrers[it]?.get(refColumn)?.toList().orEmpty() } as List<T>
return distinctRefIds.flatMap { cache.getReferrers<T>(it, refColumn)?.toList().orEmpty() }
} else {
val baseQuery = searchQuery(Op.build { refColumn inList distinctRefIds })
val finalQuery = if (parentTable.id in baseQuery.set.fields) {
Expand Down Expand Up @@ -388,8 +395,7 @@ abstract class EntityClass<ID : Comparable<ID>, out T : Entity<ID>>(val table: I

val transaction = TransactionManager.current()

val inCache = transaction.entityCache.referrers.filter { it.key in distinctRefIds && sourceRefColumn in it.value }
.mapValues { it.value[sourceRefColumn]!! }
val inCache = transaction.entityCache.referrers[sourceRefColumn] ?: emptyMap()
val loaded = (distinctRefIds - inCache.keys).takeIf { it.isNotEmpty() }?.let { idsToLoad ->
val alreadyInJoin = (dependsOnTables as? Join)?.alreadyInJoin(linkTable) ?: false
val entityTables = if (alreadyInJoin) dependsOnTables else dependsOnTables.join(linkTable, JoinType.INNER, targetRefColumn, table.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class InnerTableLink<SID : Comparable<SID>, Source : Entity<SID>, ID : Comparabl
entityCache.flush()
val oldValue = getValue(o, unused)
val existingIds = oldValue.map { it.id }.toSet()
entityCache.referrers[o.id]?.remove(sourceRefColumn)
entityCache.referrers[sourceRefColumn]?.remove(o.id)

val targetIds = value.map { it.id }
executeAsPartOfEntityLifecycle {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.jetbrains.exposed.dao

import java.util.*

internal class LinkedIdentityHashSet<T> : MutableSet<T> {
private val set: MutableSet<T> = Collections.newSetFromMap(IdentityHashMap())
private val list: MutableList<T> = LinkedList()

override fun add(element: T): Boolean {
return set.add(element).also { if (it) list.add(element) }
}

override fun addAll(elements: Collection<T>): Boolean {
val toAdd = elements.filter { it !in set } // Maintain order
if (toAdd.isEmpty()) return false
set.addAll(toAdd)
list.addAll(toAdd)
return true
}

override fun clear() {
set.clear()
list.clear()
}

override fun iterator(): MutableIterator<T> {
return object : MutableIterator<T> {
private val delegate = list.iterator()
private var current: T? = null

override fun hasNext() = delegate.hasNext()

override fun next() = delegate.next().also {
current = it
}

override fun remove() {
val p = checkNotNull(current)
this@LinkedIdentityHashSet.remove(p)
current = null
}
}
}

override fun remove(element: T): Boolean {
return set.remove(element).also { if (it) list.remove(element) }
}

override fun removeAll(elements: Collection<T>): Boolean {
val toRemove = set intersect elements
if (toRemove.isEmpty()) return false
set.removeAll(toRemove)
list.removeAll(toRemove)
return true
}

override fun retainAll(elements: Collection<T>): Boolean {
return removeAll(set subtract elements)
}

override val size: Int
get() = set.size

override fun contains(element: T): Boolean {
return set.contains(element)
}

override fun containsAll(elements: Collection<T>): Boolean {
return set.containsAll(elements)
}

override fun isEmpty(): Boolean {
return set.isEmpty()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class AliasesTests : DatabaseTestsBase() {
}

flushCache()
entityCache.data.clear()
entityCache.clear()

val alias = EntityTestsData.XTable.alias("xAlias")
val entityFromAlias = alias.selectAll().map { EntityTestsData.XEntity.wrapRow(it, alias) }.singleOrNull()
Expand All @@ -101,7 +101,7 @@ class AliasesTests : DatabaseTestsBase() {
}

flushCache()
entityCache.data.clear()
entityCache.clear()

val alias = EntityTestsData.XTable.selectAll().alias("xAlias")
val entityFromAlias = alias.selectAll().map { EntityTestsData.XEntity.wrapRow(it, alias) }.singleOrNull()
Expand Down
Loading

0 comments on commit ad89245

Please sign in to comment.