Skip to content

Commit

Permalink
Add pull token approach to realtime (#807)
Browse files Browse the repository at this point in the history
* Add pull token approach to realtime

* Update docs
  • Loading branch information
jan-tennert authored Dec 9, 2024
1 parent a302e9d commit 123da08
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import io.github.jan.supabase.SupabaseClient
import io.github.jan.supabase.SupabaseClientBuilder
import io.github.jan.supabase.SupabaseSerializer
import io.github.jan.supabase.annotations.SupabaseInternal
import io.github.jan.supabase.auth.resolveAccessToken
import io.github.jan.supabase.logging.SupabaseLogger
import io.github.jan.supabase.logging.w
import io.github.jan.supabase.plugins.CustomSerializationConfig
import io.github.jan.supabase.plugins.CustomSerializationPlugin
import io.github.jan.supabase.plugins.MainConfig
Expand Down Expand Up @@ -92,6 +94,15 @@ sealed interface Realtime : MainPlugin<Realtime.Config>, CustomSerializationPlug
@SupabaseInternal
suspend fun send(message: RealtimeMessage)

/**
* Sets the JWT access token used for channel subscription authorization and Realtime RLS.
*
* If [token] is null, the token will be resolved using the [Realtime.Config.accessToken] provider.
*
* @param token The JWT access token
*/
suspend fun setAuth(token: String? = null)

/**
* @property websocketConfig Custom configuration for the Ktor Websocket Client. This only applies if [Realtime.Config.websocketFactory] is null.
* @property secure Whether to use wss or ws. Defaults to [SupabaseClient.useHTTPS] when null
Expand All @@ -114,6 +125,14 @@ sealed interface Realtime : MainPlugin<Realtime.Config>, CustomSerializationPlug
var disconnectOnNoSubscriptions: Boolean = true,
): MainConfig(), CustomSerializationConfig {

/**
* A custom access token provider. If this is set, the [SupabaseClient] will not be used to resolve the access token.
*/
var accessToken: suspend SupabaseClient.() -> String? = { resolveAccessToken(realtime, keyAsFallback = false) }
set(value) {
logger.w { "You are setting a custom access token provider. This can lead to unexpected behavior." }
field = value
}
override var serializer: SupabaseSerializer? = null

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ sealed interface RealtimeChannel {
suspend fun subscribe(blockUntilSubscribed: Boolean = false)

/**
* Updates the JWT token for this client
* Updates the JWT token for this channel
*/
suspend fun updateAuth(jwt: String)
suspend fun updateAuth(jwt: String?)

/**
* Unsubscribes from the channel
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.github.jan.supabase.realtime

import io.github.jan.supabase.annotations.SupabaseInternal
import io.github.jan.supabase.auth.resolveAccessToken
import io.github.jan.supabase.collections.AtomicMutableList
import io.github.jan.supabase.logging.d
import io.github.jan.supabase.logging.e
Expand Down Expand Up @@ -41,7 +40,9 @@ internal class RealtimeChannelImpl(
private val _status = MutableStateFlow(RealtimeChannel.Status.UNSUBSCRIBED)
override val status = _status.asStateFlow()
override val realtime: Realtime = realtimeImpl

private val accessToken = suspend {
realtimeImpl.config.accessToken(supabaseClient) ?: realtimeImpl.accessToken
}
override val supabaseClient = realtimeImpl.supabaseClient

private val broadcastUrl = realtimeImpl.broadcastUrl()
Expand All @@ -59,7 +60,7 @@ internal class RealtimeChannelImpl(
}
_status.value = RealtimeChannel.Status.SUBSCRIBING
Realtime.logger.d { "Subscribing to channel $topic" }
val currentJwt = supabaseClient.resolveAccessToken(realtimeImpl, keyAsFallback = false)
val currentJwt = accessToken()
val postgrestChanges = clientChanges.toList()
val joinConfig = RealtimeJoinPayload(RealtimeJoinConfig(broadcastJoinConfig, presenceJoinConfig, postgrestChanges, isPrivate))
val joinConfigObject = buildJsonObject {
Expand Down Expand Up @@ -93,7 +94,7 @@ internal class RealtimeChannelImpl(
realtimeImpl.send(RealtimeMessage(topic, RealtimeChannel.CHANNEL_EVENT_LEAVE, buildJsonObject {}, null))
}

override suspend fun updateAuth(jwt: String) {
override suspend fun updateAuth(jwt: String?) {
Realtime.logger.d { "Updating auth token for channel $topic" }
realtimeImpl.send(RealtimeMessage(topic, RealtimeChannel.CHANNEL_EVENT_ACCESS_TOKEN, buildJsonObject {
put("access_token", jwt)
Expand All @@ -102,12 +103,16 @@ internal class RealtimeChannelImpl(

override suspend fun broadcast(event: String, message: JsonObject) {
if(status.value != RealtimeChannel.Status.SUBSCRIBED) {
val token = accessToken()
val response = httpClient.postJson(
url = broadcastUrl,
body = BroadcastApiBody(listOf(BroadcastApiMessage(subTopic, event, message, isPrivate)))
) {
headers {
append("apikey", realtimeImpl.supabaseClient.supabaseKey)
token?.let {
set("Authorization", "Bearer $it")
}
}
}
@Suppress("MagicNumber")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import kotlinx.serialization.json.buildJsonObject
override val subscriptions: Map<String, RealtimeChannel> = _subscriptions
private val scope = CoroutineScope(Dispatchers.Default + SupervisorJob())
private val mutex = Mutex()
internal var accessToken by atomic<String?>(null)
var heartbeatJob: Job? = null
var messageJob: Job? = null
var ref by atomic(0)
Expand Down Expand Up @@ -92,7 +93,7 @@ import kotlinx.serialization.json.buildJsonObject
supabaseClient.pluginManager.getPluginOrNull(Auth)?.sessionStatus?.collect {
if(status.value == Realtime.Status.CONNECTED) {
when(it) {
is SessionStatus.Authenticated -> updateJwt(it.session.accessToken)
is SessionStatus.Authenticated -> setAuth(it.session.accessToken)
is SessionStatus.NotAuthenticated -> {
if(config.disconnectOnSessionLoss) {
Realtime.logger.w { "No auth session found, disconnecting from realtime websocket"}
Expand Down Expand Up @@ -166,9 +167,11 @@ import kotlinx.serialization.json.buildJsonObject
}
}

private fun updateJwt(jwt: String) {
override suspend fun setAuth(token: String?) {
val newToken = token ?: config.accessToken(supabaseClient)
this.accessToken = newToken
scope.launch {
subscriptions.values.filter { it.status.value == RealtimeChannel.Status.SUBSCRIBED }.forEach { it.updateAuth(jwt) }
subscriptions.values.filter { it.status.value == RealtimeChannel.Status.SUBSCRIBED }.forEach { it.updateAuth(accessToken) }
}
}

Expand Down

0 comments on commit 123da08

Please sign in to comment.