Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

Commit

Permalink
[NSE-481] JVM heap memory leak on memory leak tracker facilities (Arr…
Browse files Browse the repository at this point in the history
…ow Allocator) (#489)

Closes #481
  • Loading branch information
zhztheplayer authored Aug 25, 2021
1 parent 64fc1bf commit d6bc791
Showing 1 changed file with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import com.intel.oap.spark.sql.execution.datasources.v2.arrow._
import org.apache.arrow.dataset.jni.NativeMemoryPool
import org.apache.arrow.memory.BufferAllocator
import org.apache.arrow.memory.RootAllocator

import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
Expand All @@ -40,15 +41,12 @@ object SparkMemoryUtils extends Logging {
}

val sharedMetrics = new NativeSQLMemoryMetrics()
val defaultAllocator: BufferAllocator = {

val globalAlloc = globalAllocator()
val defaultAllocator: BufferAllocator = {
val al = new SparkManagedAllocationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), Spiller.NO_OP),
sharedMetrics)
val parent = globalAlloc
parent.newChildAllocator("Spark Managed Allocator - " +
UUID.randomUUID().toString, al, 0, parent.getLimit)
new RootAllocator(al, Long.MaxValue)
}

val defaultMemoryPool: NativeMemoryPoolWrapper = {
Expand Down Expand Up @@ -77,7 +75,7 @@ object SparkMemoryUtils extends Logging {
val al = new SparkManagedAllocationListener(
new NativeSQLMemoryConsumer(getTaskMemoryManager(), spiller),
sharedMetrics)
val parent = globalAllocator()
val parent = defaultAllocator
val alloc = parent.newChildAllocator("Spark Managed Allocator - " +
UUID.randomUUID().toString, al, 0, parent.getLimit).asInstanceOf[BufferAllocator]
allocators.add(alloc)
Expand Down Expand Up @@ -121,7 +119,8 @@ object SparkMemoryUtils extends Logging {
}

def release(): Unit = {
for (allocator <- allocators.asScala) {
for (allocator <- allocators.asScala.reverse) {
// reversed iterating: close children first
val allocated = allocator.getAllocatedMemory
if (allocated == 0L) {
close(allocator)
Expand Down Expand Up @@ -212,9 +211,8 @@ object SparkMemoryUtils extends Logging {
}

def contextAllocator(): BufferAllocator = {
val globalAlloc = globalAllocator()
if (!inSparkTask()) {
return globalAlloc
return globalAllocator()
}
getTaskMemoryResources().defaultAllocator
}
Expand Down

0 comments on commit d6bc791

Please sign in to comment.