diff --git a/api/src/main/java/io/kafbat/ui/service/audit/AuditService.java b/api/src/main/java/io/kafbat/ui/service/audit/AuditService.java index e78b40b55..66004b004 100644 --- a/api/src/main/java/io/kafbat/ui/service/audit/AuditService.java +++ b/api/src/main/java/io/kafbat/ui/service/audit/AuditService.java @@ -12,13 +12,13 @@ import java.io.Closeable; import java.io.IOException; import java.time.Duration; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import java.util.stream.Collectors; import javax.annotation.Nullable; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; @@ -26,7 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.security.core.GrantedAuthority; +import org.springframework.security.core.AuthenticatedPrincipal; import org.springframework.security.core.context.SecurityContext; import org.springframework.security.core.userdetails.UserDetails; import org.springframework.stereotype.Service; @@ -38,7 +38,7 @@ @Service public class AuditService implements Closeable { - private static final Mono NO_AUTH_USER = Mono.just(new AuthenticatedUser("Unknown", Set.of())); + private static final AuthenticatedUser UNKNOWN_USER = new AuthenticatedUser("Unknown", Set.of()); private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5); private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log"; @@ -172,6 +172,33 @@ private static void printAuditInitError(KafkaCluster cluster, String errorMsg, E log.error("-----------------------------------------------------------------"); } + private Mono extractUser(Signal sig) { + //see ReactiveSecurityContextHolder for impl details + Object key = SecurityContext.class; + + if (!sig.getContextView().hasKey(key)) { + return Mono.just(UNKNOWN_USER); + } + + return sig.getContextView().>get(key) + .map(context -> context.getAuthentication().getPrincipal()) + .map(AuditService::extractUser) + .switchIfEmpty(Mono.just(UNKNOWN_USER)); + } + + private static AuthenticatedUser extractUser(Object principal) { + if (principal instanceof UserDetails u) { + return new AuthenticatedUser(u.getUsername(), Collections.emptySet()); + } else if (principal instanceof AuthenticatedPrincipal p) { + return new AuthenticatedUser(p.getName(), Collections.emptySet()); + } else { + if (principal != null) { + log.trace("Principal type: [{}]", principal.getClass().getName()); + } + return UNKNOWN_USER; + } + } + public boolean isAuditTopic(KafkaCluster cluster, String topic) { var writer = auditWriters.get(cluster.getName()); return writer != null @@ -191,23 +218,6 @@ public void audit(AccessContext acxt, Signal sig) { } } - private Mono extractUser(Signal sig) { - //see ReactiveSecurityContextHolder for impl details - Object key = SecurityContext.class; - if (sig.getContextView().hasKey(key)) { - return sig.getContextView().>get(key) - .map(context -> context.getAuthentication().getPrincipal()) - .cast(UserDetails.class) - .map(user -> { - var roles = user.getAuthorities().stream().map(GrantedAuthority::getAuthority).collect(Collectors.toSet()); - return new AuthenticatedUser(user.getUsername(), roles); - }) - .switchIfEmpty(NO_AUTH_USER); - } else { - return NO_AUTH_USER; - } - } - private void sendAuditRecord(AccessContext ctx, AuthenticatedUser user) { sendAuditRecord(ctx, user, null); }