Skip to content

Commit

Permalink
feat(connect): filter Connect Configs by regexp (#477)
Browse files Browse the repository at this point in the history
  • Loading branch information
twobeeb authored Oct 30, 2020
1 parent 6ba38f2 commit 4a7053b
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 10 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ Define groups with specific roles for your users
* `- name: group-name` Group identifier
* `roles`: Roles list for the group
* `attributes.topics-filter-regexp`: Regexp to filter topics available for current group
* `attributes.connects-filter-regexp`: Regexp to filter Connect tasks available for current group


3 defaults group are available :
Expand Down Expand Up @@ -420,6 +421,7 @@ akhq:
attributes:
# Regexp to filter topic available for group
topics-filter-regexp: "test\\.reader.*"
connects-filter-regexp: "^test.*$"
- name: topic-writer # Group name
roles:
- topic/read
Expand All @@ -428,6 +430,7 @@ akhq:
- topic/config/update
attributes:
topics-filter-regexp: "test.*"
connects-filter-regexp: "^test.*$"
ldap:
groups:
- name: mathematicians
Expand Down
2 changes: 2 additions & 0 deletions application.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ akhq:
attributes:
# Regexp to filter topic available for group
topics-filter-regexp: "test.*"
# Regexp to filter connect configs visible for group
connects-filter-regexp: "^test.*$"
- name: topic-reader # Other group
roles:
- topic/read
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/akhq/repositories/AbstractRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ public static boolean isSearchMatch(Optional<String> search, String value) {
return count == split.length;
}

public static boolean isTopicMatchRegex(Optional<List<String>> regex, String topic) {
public static boolean isMatchRegex(Optional<List<String>> regex, String item) {
if (regex.isEmpty() || regex.get().isEmpty()) {
return true;
}

for (String strRegex : regex.get()) {
if (topic.matches(strRegex)) {
if (item.matches(strRegex)) {
return true;
}
}
Expand Down
62 changes: 59 additions & 3 deletions src/main/java/org/akhq/repositories/ConnectRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,15 @@
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import io.micronaut.context.ApplicationContext;
import io.micronaut.retry.annotation.Retryable;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.utils.SecurityService;
import org.akhq.configs.SecurityProperties;
import org.akhq.models.ConnectDefinition;
import org.akhq.models.ConnectPlugin;
import org.akhq.modules.KafkaModule;
import org.akhq.utils.UserGroupUtils;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPlugin;
import org.sourcelab.kafka.connect.apiclient.request.dto.ConnectorPluginConfigDefinition;
import org.sourcelab.kafka.connect.apiclient.request.dto.NewConnectorDefinition;
Expand All @@ -20,19 +25,32 @@
import javax.inject.Singleton;
import java.util.*;
import java.util.stream.Collectors;
import java.util.regex.Pattern;

@Singleton
public class ConnectRepository extends AbstractRepository {
@Inject
private KafkaModule kafkaModule;

@Inject
private ApplicationContext applicationContext;

@Inject
private UserGroupUtils userGroupUtils;

@Inject
private SecurityProperties securityProperties;

private static final Gson gson = new GsonBuilder().setPrettyPrinting().create();

@Retryable(includes = {
ConcurrentConfigModificationException.class,
ResourceNotFoundException.class
}, delay = "3s", attempts = "5")
public ConnectDefinition getDefinition(String clusterId, String connectId, String name) {
if (!isMatchRegex(getConnectFilterRegex(), name)) {
throw new IllegalArgumentException(String.format("Not allowed to view Connector %s", name));
}
return new ConnectDefinition(
this.kafkaModule
.getConnectRestClient(clusterId)
Expand All @@ -50,11 +68,19 @@ public ConnectDefinition getDefinition(String clusterId, String connectId, Strin
ResourceNotFoundException.class
}, delay = "3s", attempts = "5")
public List<ConnectDefinition> getDefinitions(String clusterId, String connectId) {
return this.kafkaModule
Collection<String> unfiltered = this.kafkaModule
.getConnectRestClient(clusterId)
.get(connectId)
.getConnectors()
.stream()
.getConnectors();

ArrayList<String> filtered = new ArrayList<String>();
for (String item : unfiltered) {
if (isMatchRegex(getConnectFilterRegex(), item)) {
filtered.add(item);
}
}

return filtered.stream()
.map(s -> getDefinition(clusterId, connectId, s))
.collect(Collectors.toList());
}
Expand Down Expand Up @@ -207,4 +233,34 @@ private String getShortClassName(String className) {

return split[split.length - 1];
}

private Optional<List<String>> getConnectFilterRegex() {

List<String> connectFilterRegex = new ArrayList<>();

if (applicationContext.containsBean(SecurityService.class)) {
SecurityService securityService = applicationContext.getBean(SecurityService.class);
Optional<Authentication> authentication = securityService.getAuthentication();
if (authentication.isPresent()) {
Authentication auth = authentication.get();
connectFilterRegex.addAll(getConnectFilterRegexFromAttributes(auth.getAttributes()));
}
}
// get Connect filter regex for default groups
connectFilterRegex.addAll(getConnectFilterRegexFromAttributes(
userGroupUtils.getUserAttributes(Collections.singletonList(securityProperties.getDefaultGroup()))
));

return Optional.of(connectFilterRegex);
}

@SuppressWarnings("unchecked")
private List<String> getConnectFilterRegexFromAttributes(Map<String, Object> attributes) {
if (attributes.get("connects-filter-regexp") != null) {
if (attributes.get("connects-filter-regexp") instanceof List) {
return (List<String>)attributes.get("connects-filter-regexp");
}
}
return new ArrayList<>();
}
}
6 changes: 3 additions & 3 deletions src/main/java/org/akhq/repositories/TopicRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public List<String> all(String clusterId, TopicListView view, Optional<String> s
Collection<TopicListing> listTopics = kafkaWrapper.listTopics(clusterId);

for (TopicListing item : listTopics) {
if (isSearchMatch(search, item.name()) && isListViewMatch(view, item.name()) && isTopicMatchRegex(
if (isSearchMatch(search, item.name()) && isListViewMatch(view, item.name()) && isMatchRegex(
getTopicFilterRegex(), item.name())) {
list.add(item.name());
}
Expand All @@ -91,7 +91,7 @@ public boolean isListViewMatch(TopicListView view, String value) {

public Topic findByName(String clusterId, String name) throws ExecutionException, InterruptedException {
Optional<Topic> topic = Optional.empty();
if(isTopicMatchRegex(getTopicFilterRegex(),name)) {
if(isMatchRegex(getTopicFilterRegex(),name)) {
topic = this.findByName(clusterId, Collections.singletonList(name)).stream().findFirst();
}

Expand All @@ -107,7 +107,7 @@ public List<Topic> findByName(String clusterId, List<String> topics) throws Exec
Optional<List<String>> topicRegex = getTopicFilterRegex();

for (Map.Entry<String, TopicDescription> description : topicDescriptions) {
if(isTopicMatchRegex(topicRegex, description.getValue().name())){
if(isMatchRegex(topicRegex, description.getValue().name())){
list.add(
new Topic(
description.getValue(),
Expand Down
77 changes: 75 additions & 2 deletions src/test/java/org/akhq/repositories/ConnectRepositoryTest.java
Original file line number Diff line number Diff line change
@@ -1,26 +1,46 @@
package org.akhq.repositories;

import io.micronaut.context.ApplicationContext;
import io.micronaut.security.authentication.Authentication;
import io.micronaut.security.authentication.DefaultAuthentication;
import io.micronaut.security.utils.DefaultSecurityService;
import io.micronaut.security.utils.SecurityService;
import com.google.common.collect.ImmutableMap;
import lombok.extern.slf4j.Slf4j;
import org.akhq.AbstractTest;
import org.akhq.KafkaTestCluster;
import org.akhq.models.ConnectDefinition;
import org.akhq.models.ConnectPlugin;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import java.util.*;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.when;

@Slf4j
public class ConnectRepositoryTest extends AbstractTest {

@Inject
@InjectMocks
private ConnectRepository repository;

@Mock
ApplicationContext applicationContext;

@BeforeEach
public void before(){
MockitoAnnotations.initMocks(this);
}

@Test
public void getPlugins() {
List<ConnectPlugin> all = repository.getPlugins(KafkaTestCluster.CLUSTER_ID, "connect-1");
Expand Down Expand Up @@ -135,4 +155,57 @@ public void create() {
assertEquals(0, repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1").size());
assertEquals(0, repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-2").size());
}

@Test
public void getFilteredList() {

repository.create(
KafkaTestCluster.CLUSTER_ID,
"connect-1",
"prefixed.Matching1",
ImmutableMap.of(
"connector.class", "FileStreamSinkConnector",
"file", "/tmp/test.txt",
"topics", KafkaTestCluster.TOPIC_CONNECT
)
);

repository.create(
KafkaTestCluster.CLUSTER_ID,
"connect-1",
"prefixed.Matching2",
ImmutableMap.of(
"connector.class", "FileStreamSinkConnector",
"file", "/tmp/test.txt",
"topics", KafkaTestCluster.TOPIC_CONNECT
)
);

repository.create(
KafkaTestCluster.CLUSTER_ID,
"connect-1",
"not.Matching3",
ImmutableMap.of(
"connector.class", "FileStreamSinkConnector",
"file", "/tmp/test.txt",
"topics", KafkaTestCluster.TOPIC_CONNECT
)
);

mockApplicationContext();

List<ConnectDefinition> filtered = repository.getDefinitions(KafkaTestCluster.CLUSTER_ID, "connect-1");
assertEquals(2, filtered.size());
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","prefixed.Matching1");
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","prefixed.Matching2");
repository.delete(KafkaTestCluster.CLUSTER_ID, "connect-1","not.Matching3");
}
private void mockApplicationContext() {
Authentication auth = new DefaultAuthentication("test", Collections.singletonMap("connects-filter-regexp", new ArrayList<>(Arrays.asList("^prefixed.*$"))));
DefaultSecurityService securityService = Mockito.mock(DefaultSecurityService.class);
when(securityService.getAuthentication()).thenReturn(Optional.of(auth));
when(applicationContext.containsBean(SecurityService.class)).thenReturn(true);
when(applicationContext.getBean(SecurityService.class)).thenReturn(securityService);
}

}

0 comments on commit 4a7053b

Please sign in to comment.