From b1ffcfe16e887aab9b58bf0e9fc121f0dfd9e5e7 Mon Sep 17 00:00:00 2001 From: sim Date: Thu, 31 Oct 2024 11:05:11 +0000 Subject: [PATCH 1/2] Add ability to timeout to SSE --- .../kotlin/okhttp3/sse/EventSourceListener.kt | 5 ++++ .../okhttp3/sse/internal/RealEventSource.kt | 27 +++++++++++++++++-- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt index 8c595b178f4f..d0f78268efa1 100644 --- a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt +++ b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt @@ -18,6 +18,11 @@ package okhttp3.sse import okhttp3.Response abstract class EventSourceListener { + /** + * Seconds elapsed between 2 events until connection failed. Doesn't timeout if null + */ + open var timeout: Long? = null + /** * Invoked when an event source has been accepted by the remote peer and may begin transmitting * events. diff --git a/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt b/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt index f638d98b860f..f2e99bf9cc1c 100644 --- a/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt +++ b/okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt @@ -16,6 +16,8 @@ package okhttp3.sse.internal import java.io.IOException +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds import okhttp3.Call import okhttp3.Callback import okhttp3.Request @@ -24,6 +26,8 @@ import okhttp3.ResponseBody import okhttp3.internal.stripBody import okhttp3.sse.EventSource import okhttp3.sse.EventSourceListener +import okio.AsyncTimeout +import okio.Timeout.Companion.timeout internal class RealEventSource( private val request: Request, @@ -40,6 +44,18 @@ internal class RealEventSource( } } + private fun updateTimeout(call: Call?, duration: Duration) { + if (call?.timeout() is AsyncTimeout) { + (call.timeout() as AsyncTimeout).apply { + if (this.timeoutNanos() > 0L) { + exit() + } + timeout(duration) + enter() + } + } + } + override fun onResponse( call: Call, response: Response, @@ -65,8 +81,11 @@ internal class RealEventSource( return } - // This is a long-lived response. Cancel full-call timeouts. - call?.timeout()?.cancel() + // This is a long-lived response. Cancel full-call timeouts if no timeout has been set + listener.timeout?.let { + // We spend at most timeout seconds if set + updateTimeout(call, it.seconds) + } ?: call?.timeout()?.cancel() // Replace the body with a stripped one so the callbacks can't see real data. val response = response.stripBody() @@ -76,6 +95,10 @@ internal class RealEventSource( if (!canceled) { listener.onOpen(this, response) while (!canceled && reader.processNextEvent()) { + listener.timeout?.let { + // We spend at most timeout seconds if set + updateTimeout(call, it.seconds) + } } } } catch (e: Exception) { From 215120d6518e12775313ad1704cc8828c0c5e5a9 Mon Sep 17 00:00:00 2001 From: sim Date: Thu, 31 Oct 2024 11:05:23 +0000 Subject: [PATCH 2/2] Add comments for EventSourceListener --- .../src/main/kotlin/okhttp3/sse/EventSourceListener.kt | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt index d0f78268efa1..17eb4eec9519 100644 --- a/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt +++ b/okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt @@ -34,7 +34,12 @@ abstract class EventSourceListener { } /** - * TODO description. + * Invoked when an event is received. + * + * @param eventSource Source of the event + * @param id SSE event's id + * @param type SSE event's type + * @param data SSE event's data */ open fun onEvent( eventSource: EventSource, @@ -45,7 +50,7 @@ abstract class EventSourceListener { } /** - * TODO description. + * Invoked when the connection has been properly closed by the server. * * No further calls to this listener will be made. */