Skip to content

Commit

Permalink
Token based authentication integration with core extension (#3063)
Browse files Browse the repository at this point in the history
* Support for StreamingCredentials

     This enables use cases like credential rotation and token based auth without client disconnect. Especially with Pub/Sub clients will reduce the chnance of missing events.

* Tests & publish ReauthEvent

* Clean up & Format & Add ReauthenticateEvent test

* Conditionally enable connection reauthentication based on client setting
DEFAULT_REAUTHENTICATE_BEHAVIOUR

* Client setting for enabling reauthentication
  - Moved Authentication handler to DefaultEndpoint
  - updated since 6.6.0

* formating

* resolve conflict with main

* format

* dispath using connection handler

* Support multi with re-auth
Defer the re-auth operation in case there is on-going multi
Tx in lettuce need to be externally synchronised when used in multithreaded env. Since re-auth happens from different thread we need to make sure it does not happen while there is ongoing transaction.

* Fix EndpointId missing in events

* format

* Add unit tests for setCredenatials

* Skip preProcessing of auth command to avoid replacing the credential provider with static one provider

Add unit tests for setCredentials

* clean up - remove dead code

* Moved almost all code inside the new handler

* fix inTransaction lock with dispatch command batch

* Remove StreamingCredentialsProvider interface.

move credentials() method to RedisCredentialsProvider.

Resolve issue with unsafe cast after extending RedisCredentialsProvider with supportsStreaming() method

* Add authentication handler to ClusterPubSub connections

* Token based auth integration with core extension

Provide a way for lettuce clients to use token-based authentication.
TOKENs come with a TTL. After a Redis client authenticates with a TOKEN, if they didn't renew their authentication we need to evict (close) them. The suggested approach is to leverage the existing CredentialsProvider and add support for streaming credentials to handle token refresh scenarios. Each time a new token is received connection is reauthenticated.

* rebase to address "oid" core-autx lib change
formating

* Add EntraId integration tests
   Verify authentication using Azure AD with service principals

* StreamingCredentialsProvider replaced with RedisCredentialsProvider.supportsStreaming()

* pub/sub test basic functionality with entraid auth

* Update src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvider.java

Co-authored-by: Tihomir Krasimirov Mateev <tihomir.mateev@redis.com>

* Addressing review  comments from @tishun

* Bump redis-authx-core & redis-authx-entraid from 0.1.0-SNAPSHOT to 0.1.1-beta1

* add java doc for
   TokenBasedRedisCredentialsProvider

---------

Co-authored-by: Tihomir Mateev <tihomir.mateev@gmail.com>
Co-authored-by: Tihomir Krasimirov Mateev <tihomir.mateev@redis.com>
  • Loading branch information
3 people authored Dec 20, 2024
1 parent 14de5b8 commit eff243d
Show file tree
Hide file tree
Showing 27 changed files with 2,245 additions and 22 deletions.
18 changes: 17 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,23 @@
</dependencyManagement>

<dependencies>

<dependency>
<groupId>redis.clients.authentication</groupId>
<artifactId>redis-authx-core</artifactId>
<version>0.1.1-beta1</version>
</dependency>
<dependency>
<groupId>redis.clients.authentication</groupId>
<artifactId>redis-authx-entraid</artifactId>
<version>0.1.1-beta1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.github.cdimascio</groupId>
<artifactId>dotenv-java</artifactId>
<version>2.2.0</version>
<scope>test</scope>
</dependency>
<!-- Start of core dependencies -->

<dependency>
Expand Down
137 changes: 137 additions & 0 deletions src/main/java/io/lettuce/authx/TokenBasedRedisCredentialsProvider.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
* Copyright 2024, Redis Ltd. and Contributors
* All rights reserved.
*
* Licensed under the MIT License.
*/
package io.lettuce.authx;

import io.lettuce.core.RedisCredentials;
import io.lettuce.core.RedisCredentialsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import redis.clients.authentication.core.Token;
import redis.clients.authentication.core.TokenAuthConfig;
import redis.clients.authentication.core.TokenListener;
import redis.clients.authentication.core.TokenManager;

/**
* A {@link RedisCredentialsProvider} implementation that supports token-based authentication for Redis.
* <p>
* This provider uses a {@link TokenManager} to manage and renew tokens, ensuring that the Redis client can authenticate with
* Redis using a dynamically updated token. This is particularly useful in scenarios where Redis access is controlled via
* token-based authentication, such as when Redis is integrated with an identity provider like EntraID.
* </p>
* <p>
* The provider supports streaming of credentials and automatically emits new credentials whenever a token is renewed. It must
* be used with {@link io.lettuce.core.ClientOptions.ReauthenticateBehavior#ON_NEW_CREDENTIALS} to automatically re-authenticate
* connections whenever new tokens are emitted by the provider.
* </p>
* <p>
* The lifecycle of this provider is externally managed. It should be closed when there are no longer any connections using it,
* to stop the token management process and release resources.
* </p>
*
* @since 6.6
*/
public class TokenBasedRedisCredentialsProvider implements RedisCredentialsProvider, AutoCloseable {

private static final Logger log = LoggerFactory.getLogger(TokenBasedRedisCredentialsProvider.class);

private final TokenManager tokenManager;

private final Sinks.Many<RedisCredentials> credentialsSink = Sinks.many().replay().latest();

private TokenBasedRedisCredentialsProvider(TokenManager tokenManager) {
this.tokenManager = tokenManager;
}

private void init() {

TokenListener listener = new TokenListener() {

@Override
public void onTokenRenewed(Token token) {
String username = token.getUser();
char[] pass = token.getValue().toCharArray();
RedisCredentials credentials = RedisCredentials.just(username, pass);
credentialsSink.tryEmitNext(credentials);
}

@Override
public void onError(Exception exception) {
log.error("Token renew failed!", exception);
}

};

try {
tokenManager.start(listener, false);
} catch (Exception e) {
credentialsSink.tryEmitError(e);
tokenManager.stop();
throw new RuntimeException("Failed to start TokenManager", e);
}
}

/**
* Resolve the latest available credentials as a Mono.
* <p>
* This method returns a Mono that emits the most recent set of Redis credentials. The Mono will complete once the
* credentials are emitted. If no credentials are available at the time of subscription, the Mono will wait until
* credentials are available.
*
* @return a Mono that emits the latest Redis credentials
*/
@Override
public Mono<RedisCredentials> resolveCredentials() {

return credentialsSink.asFlux().next();
}

/**
* Expose the Flux for all credential updates.
* <p>
* This method returns a Flux that emits all updates to the Redis credentials. Subscribers will receive the latest
* credentials whenever they are updated. The Flux will continue to emit updates until the provider is shut down.
*
* @return a Flux that emits all updates to the Redis credentials
*/
@Override
public Flux<RedisCredentials> credentials() {

return credentialsSink.asFlux().onBackpressureLatest(); // Provide a continuous stream of credentials
}

@Override
public boolean supportsStreaming() {
return true;
}

/**
* Stop the credentials provider and clean up resources.
* <p>
* This method stops the TokenManager and completes the credentials sink, ensuring that all resources are properly released.
* It should be called when the credentials provider is no longer needed.
*/
@Override
public void close() {
credentialsSink.tryEmitComplete();
tokenManager.stop();
}

public static TokenBasedRedisCredentialsProvider create(TokenAuthConfig tokenAuthConfig) {
return create(new TokenManager(tokenAuthConfig.getIdentityProviderConfig().getProvider(),
tokenAuthConfig.getTokenManagerConfig()));
}

public static TokenBasedRedisCredentialsProvider create(TokenManager tokenManager) {
TokenBasedRedisCredentialsProvider credentialManager = new TokenBasedRedisCredentialsProvider(tokenManager);
credentialManager.init();
return credentialManager;
}

}
83 changes: 78 additions & 5 deletions src/main/java/io/lettuce/core/ClientOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class ClientOptions implements Serializable {

public static final DisconnectedBehavior DEFAULT_DISCONNECTED_BEHAVIOR = DisconnectedBehavior.DEFAULT;

public static final ReauthenticateBehavior DEFAULT_REAUTHENTICATE_BEHAVIOUR = ReauthenticateBehavior.DEFAULT;

public static final boolean DEFAULT_PUBLISH_ON_SCHEDULER = false;

public static final boolean DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION = true;
Expand Down Expand Up @@ -95,6 +97,8 @@ public class ClientOptions implements Serializable {

private final DisconnectedBehavior disconnectedBehavior;

private final ReauthenticateBehavior reauthenticateBehavior;

private final boolean publishOnScheduler;

private final boolean pingBeforeActivateConnection;
Expand Down Expand Up @@ -124,6 +128,7 @@ protected ClientOptions(Builder builder) {
this.cancelCommandsOnReconnectFailure = builder.cancelCommandsOnReconnectFailure;
this.decodeBufferPolicy = builder.decodeBufferPolicy;
this.disconnectedBehavior = builder.disconnectedBehavior;
this.reauthenticateBehavior = builder.reauthenticateBehavior;
this.publishOnScheduler = builder.publishOnScheduler;
this.pingBeforeActivateConnection = builder.pingBeforeActivateConnection;
this.protocolVersion = builder.protocolVersion;
Expand All @@ -143,6 +148,7 @@ protected ClientOptions(ClientOptions original) {
this.cancelCommandsOnReconnectFailure = original.isCancelCommandsOnReconnectFailure();
this.decodeBufferPolicy = original.getDecodeBufferPolicy();
this.disconnectedBehavior = original.getDisconnectedBehavior();
this.reauthenticateBehavior = original.getReauthenticateBehaviour();
this.publishOnScheduler = original.isPublishOnScheduler();
this.pingBeforeActivateConnection = original.isPingBeforeActivateConnection();
this.protocolVersion = original.getConfiguredProtocolVersion();
Expand Down Expand Up @@ -220,6 +226,8 @@ public static class Builder {

private TimeoutOptions timeoutOptions = DEFAULT_TIMEOUT_OPTIONS;

private ReauthenticateBehavior reauthenticateBehavior = DEFAULT_REAUTHENTICATE_BEHAVIOUR;

private boolean useHashIndexedQueue = DEFAULT_USE_HASH_INDEX_QUEUE;

protected Builder() {
Expand Down Expand Up @@ -301,6 +309,20 @@ public Builder disconnectedBehavior(DisconnectedBehavior disconnectedBehavior) {
return this;
}

/**
* Configure the {@link ReauthenticateBehavior} of the Lettuce driver. Defaults to
* {@link ReauthenticateBehavior#DEFAULT}.
*
* @param reauthenticateBehavior the {@link ReauthenticateBehavior} to use. Must not be {@code null}.
* @return {@code this}
*/
public Builder reauthenticateBehavior(ReauthenticateBehavior reauthenticateBehavior) {

LettuceAssert.notNull(reauthenticateBehavior, "ReuthenticatBehavior must not be null");
this.reauthenticateBehavior = reauthenticateBehavior;
return this;
}

/**
* Perform a lightweight {@literal PING} connection handshake when establishing a Redis connection. If {@code true}
* (default is {@code true}, {@link #DEFAULT_PING_BEFORE_ACTIVATE_CONNECTION}), every connection and reconnect will
Expand Down Expand Up @@ -505,11 +527,12 @@ public ClientOptions.Builder mutate() {

builder.autoReconnect(isAutoReconnect()).cancelCommandsOnReconnectFailure(isCancelCommandsOnReconnectFailure())
.decodeBufferPolicy(getDecodeBufferPolicy()).disconnectedBehavior(getDisconnectedBehavior())
.readOnlyCommands(getReadOnlyCommands()).publishOnScheduler(isPublishOnScheduler())
.pingBeforeActivateConnection(isPingBeforeActivateConnection()).protocolVersion(getConfiguredProtocolVersion())
.requestQueueSize(getRequestQueueSize()).scriptCharset(getScriptCharset()).jsonParser(getJsonParser())
.socketOptions(getSocketOptions()).sslOptions(getSslOptions())
.suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure()).timeoutOptions(getTimeoutOptions());
.reauthenticateBehavior(getReauthenticateBehaviour()).readOnlyCommands(getReadOnlyCommands())
.publishOnScheduler(isPublishOnScheduler()).pingBeforeActivateConnection(isPingBeforeActivateConnection())
.protocolVersion(getConfiguredProtocolVersion()).requestQueueSize(getRequestQueueSize())
.scriptCharset(getScriptCharset()).jsonParser(getJsonParser()).socketOptions(getSocketOptions())
.sslOptions(getSslOptions()).suspendReconnectOnProtocolFailure(isSuspendReconnectOnProtocolFailure())
.timeoutOptions(getTimeoutOptions());

return builder;
}
Expand Down Expand Up @@ -573,6 +596,16 @@ public DisconnectedBehavior getDisconnectedBehavior() {
return disconnectedBehavior;
}

/**
* Behavior for re-authentication when the {@link RedisCredentialsProvider} emits new credentials. Defaults to
* {@link ReauthenticateBehavior#DEFAULT}.
*
* @return the currently set {@link ReauthenticateBehavior}.
*/
public ReauthenticateBehavior getReauthenticateBehaviour() {
return reauthenticateBehavior;
}

/**
* Predicate to identify commands as read-only. Defaults to {@link #DEFAULT_READ_ONLY_COMMANDS}.
*
Expand Down Expand Up @@ -704,6 +737,46 @@ public TimeoutOptions getTimeoutOptions() {
return timeoutOptions;
}

/**
* Defines the re-authentication behavior of the Redis client.
* <p/>
* Certain implementations of the {@link RedisCredentialsProvider} could emit new credentials at runtime. This setting
* controls how the driver reacts to these newly emitted credentials.
*/
public enum ReauthenticateBehavior {

/**
* This is the default behavior. The client will fetch current credentials from the underlying
* {@link RedisCredentialsProvider} only when the driver needs to, e.g. when the connection is first established or when
* it is re-established after a disconnect.
* <p/>
* <p>
* No re-authentication is performed when new credentials are emitted by a {@link RedisCredentialsProvider} that
* supports streaming. The client does not subscribe to or react to any updates in the credential stream provided by
* {@link RedisCredentialsProvider#credentials()}.
* </p>
*/
DEFAULT,

/**
* Automatically triggers re-authentication whenever new credentials are emitted by a {@link RedisCredentialsProvider}
* that supports streaming, as indicated by {@link RedisCredentialsProvider#supportsStreaming()}.
*
* <p>
* When this behavior is enabled, the client subscribes to the credential stream provided by
* {@link RedisCredentialsProvider#credentials()} and issues an {@code AUTH} command to the Redis server each time new
* credentials are received. This behavior supports dynamic credential scenarios, such as token-based authentication, or
* credential rotation where credentials are refreshed periodically to maintain access.
* </p>
*
* <p>
* Note: {@code AUTH} commands issued as part of this behavior may interleave with user-submitted commands, as the
* client performs re-authentication independently of user command flow.
* </p>
*/
ON_NEW_CREDENTIALS
}

/**
* Whether we should use hash indexed queue, which provides O(1) remove(Object)
*
Expand Down
Loading

0 comments on commit eff243d

Please sign in to comment.