Skip to content

Commit

Permalink
feat(client): clean up resource leaks when the resource becomes phant…
Browse files Browse the repository at this point in the history
…om reachable

chore: unknown commit message
  • Loading branch information
stainless-bot authored and Stainless Bot committed Oct 28, 2024
1 parent 685553f commit 3ab123c
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.json.JsonMapper
import com.google.common.collect.ArrayListMultimap
import com.google.common.collect.ListMultimap
import com.openai.core.http.HttpClient
import com.openai.core.http.PhantomReachableClosingHttpClient
import com.openai.core.http.RetryingHttpClient
import java.time.Clock

Expand Down Expand Up @@ -162,11 +163,13 @@ private constructor(

return ClientOptions(
httpClient!!,
RetryingHttpClient.builder()
.httpClient(httpClient!!)
.clock(clock)
.maxRetries(maxRetries)
.build(),
PhantomReachableClosingHttpClient(
RetryingHttpClient.builder()
.httpClient(httpClient!!)
.clock(clock)
.maxRetries(maxRetries)
.build()
),
jsonMapper ?: jsonMapper(),
clock,
baseUrl,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
@file:JvmName("PhantomReachable")

package com.openai.core

import com.openai.errors.OpenAIException
import java.lang.reflect.InvocationTargetException

/**
* Closes [closeable] when [observed] becomes only phantom reachable.
*
* This is a wrapper around a Java 9+ [java.lang.ref.Cleaner], or a no-op in older Java versions.
*/
@JvmSynthetic
internal fun closeWhenPhantomReachable(observed: Any, closeable: AutoCloseable) {
check(observed !== closeable) {
"`observed` cannot be the same object as `closeable` because it would never become phantom reachable"
}
closeWhenPhantomReachable?.let { it(observed, closeable::close) }
}

private val closeWhenPhantomReachable: ((Any, AutoCloseable) -> Unit)? by lazy {
try {
val cleanerClass = Class.forName("java.lang.ref.Cleaner")
val cleanerCreate = cleanerClass.getMethod("create")
val cleanerRegister =
cleanerClass.getMethod("register", Any::class.java, Runnable::class.java)
val cleanerObject = cleanerCreate.invoke(null);

{ observed, closeable ->
try {
cleanerRegister.invoke(cleanerObject, observed, Runnable { closeable.close() })
} catch (e: ReflectiveOperationException) {
if (e is InvocationTargetException) {
when (val cause = e.cause) {
is RuntimeException,
is Error -> throw cause
}
}
throw OpenAIException("Unexpected reflective invocation failure", e)
}
}
} catch (e: ReflectiveOperationException) {
// We're running Java 8, which has no Cleaner.
null
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.json.JsonMapper
import com.openai.core.JsonValue
import com.openai.core.http.HttpResponse
import com.openai.core.http.HttpResponse.Handler
import com.openai.core.http.PhantomReachableClosingStreamResponse
import com.openai.core.http.SseMessage
import com.openai.core.http.StreamResponse
import com.openai.errors.OpenAIException
Expand Down Expand Up @@ -58,14 +59,16 @@ internal fun sseHandler(jsonMapper: JsonMapper): Handler<StreamResponse<SseMessa
}
}

return object : StreamResponse<SseMessage> {
override fun stream(): Stream<SseMessage> = sequence.asStream()
return PhantomReachableClosingStreamResponse(
object : StreamResponse<SseMessage> {
override fun stream(): Stream<SseMessage> = sequence.asStream()

override fun close() {
reader.close()
response.close()
override fun close() {
reader.close()
response.close()
}
}
}
)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.openai.core.http

import com.openai.core.RequestOptions
import com.openai.core.closeWhenPhantomReachable
import java.util.concurrent.CompletableFuture

internal class PhantomReachableClosingHttpClient(private val httpClient: HttpClient) : HttpClient {
init {
closeWhenPhantomReachable(this, httpClient)
}

override fun execute(request: HttpRequest, requestOptions: RequestOptions): HttpResponse =
httpClient.execute(request, requestOptions)

override fun executeAsync(
request: HttpRequest,
requestOptions: RequestOptions
): CompletableFuture<HttpResponse> = httpClient.executeAsync(request, requestOptions)

override fun close() = httpClient.close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.openai.core.http

import com.openai.core.closeWhenPhantomReachable
import java.util.stream.Stream

internal class PhantomReachableClosingStreamResponse<T>(
private val streamResponse: StreamResponse<T>
) : StreamResponse<T> {
init {
closeWhenPhantomReachable(this, streamResponse)
}

override fun stream(): Stream<T> = streamResponse.stream()

override fun close() = streamResponse.close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.openai.core

import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test

internal class PhantomReachableTest {

@Test
fun closeWhenPhantomReachable_whenObservedIsGarbageCollected_closesCloseable() {
var closed = false
val closeable = AutoCloseable { closed = true }

closeWhenPhantomReachable(
// Pass an inline object for the object to observe so that it becomes immediately
// unreachable.
Any(),
closeable
)

assertThat(closed).isFalse()

System.gc()
Thread.sleep(3000)

assertThat(closed).isTrue()
}
}

0 comments on commit 3ab123c

Please sign in to comment.