Skip to content

Commit

Permalink
Audit: Fix user mapping (#91)
Browse files Browse the repository at this point in the history
  • Loading branch information
Haarolean authored Feb 17, 2024
1 parent 5c093ee commit 7b44dd3
Showing 1 changed file with 30 additions and 20 deletions.
50 changes: 30 additions & 20 deletions api/src/main/java/io/kafbat/ui/service/audit/AuditService.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.security.core.AuthenticatedPrincipal;
import org.springframework.security.core.context.SecurityContext;
import org.springframework.security.core.userdetails.UserDetails;
import org.springframework.stereotype.Service;
Expand All @@ -38,7 +38,7 @@
@Service
public class AuditService implements Closeable {

private static final Mono<AuthenticatedUser> NO_AUTH_USER = Mono.just(new AuthenticatedUser("Unknown", Set.of()));
private static final AuthenticatedUser UNKNOWN_USER = new AuthenticatedUser("Unknown", Set.of());
private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5);

private static final String DEFAULT_AUDIT_TOPIC_NAME = "__kui-audit-log";
Expand Down Expand Up @@ -172,6 +172,33 @@ private static void printAuditInitError(KafkaCluster cluster, String errorMsg, E
log.error("-----------------------------------------------------------------");
}

private Mono<AuthenticatedUser> extractUser(Signal<?> sig) {
//see ReactiveSecurityContextHolder for impl details
Object key = SecurityContext.class;

if (!sig.getContextView().hasKey(key)) {
return Mono.just(UNKNOWN_USER);
}

return sig.getContextView().<Mono<SecurityContext>>get(key)
.map(context -> context.getAuthentication().getPrincipal())
.map(AuditService::extractUser)
.switchIfEmpty(Mono.just(UNKNOWN_USER));
}

private static AuthenticatedUser extractUser(Object principal) {
if (principal instanceof UserDetails u) {
return new AuthenticatedUser(u.getUsername(), Collections.emptySet());
} else if (principal instanceof AuthenticatedPrincipal p) {
return new AuthenticatedUser(p.getName(), Collections.emptySet());
} else {
if (principal != null) {
log.trace("Principal type: [{}]", principal.getClass().getName());
}
return UNKNOWN_USER;
}
}

public boolean isAuditTopic(KafkaCluster cluster, String topic) {
var writer = auditWriters.get(cluster.getName());
return writer != null
Expand All @@ -191,23 +218,6 @@ public void audit(AccessContext acxt, Signal<?> sig) {
}
}

private Mono<AuthenticatedUser> extractUser(Signal<?> sig) {
//see ReactiveSecurityContextHolder for impl details
Object key = SecurityContext.class;
if (sig.getContextView().hasKey(key)) {
return sig.getContextView().<Mono<SecurityContext>>get(key)
.map(context -> context.getAuthentication().getPrincipal())
.cast(UserDetails.class)
.map(user -> {
var roles = user.getAuthorities().stream().map(GrantedAuthority::getAuthority).collect(Collectors.toSet());
return new AuthenticatedUser(user.getUsername(), roles);
})
.switchIfEmpty(NO_AUTH_USER);
} else {
return NO_AUTH_USER;
}
}

private void sendAuditRecord(AccessContext ctx, AuthenticatedUser user) {
sendAuditRecord(ctx, user, null);
}
Expand Down

0 comments on commit 7b44dd3

Please sign in to comment.