diff --git a/example/descriptor-only-topics.yaml b/example/descriptor-only-topics.yaml index cd46f50b1..0b6567847 100644 --- a/example/descriptor-only-topics.yaml +++ b/example/descriptor-only-topics.yaml @@ -3,26 +3,14 @@ context: "oltp" projects: - name: "foo" consumers: - - principal: "User:App0" + - principal: "User:NewApp2" topics: - name: "foo" - config: - replication.factor: "1" - num.partitions: "1" - dataType: "avro" name: "bar" - config: - replication.factor: "1" - num.partitions: "1" - dataType: "json" name: "zet" - config: - replication.factor: "1" - num.partitions: "1" - name: "bar" topics: - dataType: "avro" - name: "bar" - config: - replication.factor: "1" - num.partitions: "1" \ No newline at end of file + name: "bar" \ No newline at end of file diff --git a/example/julieops-confluent-cloud.properties b/example/julieops-confluent-cloud.properties index 2fb0f0bae..9a73b2e24 100644 --- a/example/julieops-confluent-cloud.properties +++ b/example/julieops-confluent-cloud.properties @@ -14,4 +14,9 @@ ccloud.cloud.api.key= ccloud.cloud.api.secret= topology.builder.ccloud.kafka.cluster.id=lkc-jkz1m ccloud.cluster.url= -topology.builder.access.control.class = com.purbon.kafka.topology.roles.CCloudAclsProvider \ No newline at end of file +topology.builder.access.control.class = com.purbon.kafka.topology.roles.CCloudAclsProvider +# julie.enable.principal.management = true +# allow.delete.principals = true +# allow.delete.topics = true +# topology.state.cluster.enabled = false +# topology.state.topics.cluster.enabled = false \ No newline at end of file diff --git a/src/main/java/com/purbon/kafka/topology/Configuration.java b/src/main/java/com/purbon/kafka/topology/Configuration.java index 9640f4b7c..545e97c95 100644 --- a/src/main/java/com/purbon/kafka/topology/Configuration.java +++ b/src/main/java/com/purbon/kafka/topology/Configuration.java @@ -594,6 +594,10 @@ public String getConfluentCloudClusterUrl() { return config.getString(CCLOUD_CLUSTER_URL); } + public Boolean isConfluentCloudServiceAccountTranslationEnabled() { + return config.getBoolean(CCLOUD_SERVICE_ACCOUNT_TRANSLATION_ENABLED); + } + public Boolean enabledPrincipalManagement() { return config.getBoolean(JULIE_ENABLE_PRINCIPAL_MANAGEMENT); } diff --git a/src/main/java/com/purbon/kafka/topology/Constants.java b/src/main/java/com/purbon/kafka/topology/Constants.java index a4071d58c..474a4e0d3 100644 --- a/src/main/java/com/purbon/kafka/topology/Constants.java +++ b/src/main/java/com/purbon/kafka/topology/Constants.java @@ -86,6 +86,9 @@ public class Constants { "topology.builder.ccloud.kafka.cluster.id"; public static final String CCLOUD_CLUSTER_URL = "ccloud.cluster.url"; + public static final String CCLOUD_SERVICE_ACCOUNT_TRANSLATION_ENABLED = + "ccloud.service_account.translation.enabled"; + public static final String TOPOLOGY_EXPERIMENTAL_ENABLED_CONFIG = "topology.features.experimental"; static final String TOPOLOGY_PRINCIPAL_TRANSLATION_ENABLED_CONFIG = diff --git a/src/main/java/com/purbon/kafka/topology/PrincipalUpdateManager.java b/src/main/java/com/purbon/kafka/topology/PrincipalUpdateManager.java index 0b4b1c3c2..00f50091d 100644 --- a/src/main/java/com/purbon/kafka/topology/PrincipalUpdateManager.java +++ b/src/main/java/com/purbon/kafka/topology/PrincipalUpdateManager.java @@ -9,9 +9,13 @@ import java.util.Map; import java.util.Set; import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; public class PrincipalUpdateManager extends AbstractPrincipalManager { + private static final Logger LOGGER = LogManager.getLogger(PrincipalUpdateManager.class); + public PrincipalUpdateManager(PrincipalProvider provider, Configuration config) { super(provider, config); } @@ -22,6 +26,13 @@ protected void doUpdatePlan( Topology topology, List principals, Map accounts) { + LOGGER.debug( + "Updating accounts for principals = " + + principals.stream().collect(Collectors.joining(",")) + + " accounts = " + + accounts.values().stream() + .map(ServiceAccount::toString) + .collect(Collectors.joining(", "))); // build set of principals to be created. Set principalsToBeCreated = principals.stream() diff --git a/src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java b/src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java index 27d0f579d..d199e749f 100644 --- a/src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java +++ b/src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java @@ -8,9 +8,11 @@ import com.purbon.kafka.topology.api.ccloud.response.KafkaAclListResponse; import com.purbon.kafka.topology.api.ccloud.response.ListServiceAccountResponse; import com.purbon.kafka.topology.api.ccloud.response.ServiceAccountResponse; +import com.purbon.kafka.topology.api.ccloud.response.ServiceAccountV1Response; import com.purbon.kafka.topology.api.mds.Response; import com.purbon.kafka.topology.clients.JulieHttpClient; import com.purbon.kafka.topology.model.cluster.ServiceAccount; +import com.purbon.kafka.topology.model.cluster.ServiceAccountV1; import com.purbon.kafka.topology.roles.TopologyAclBinding; import com.purbon.kafka.topology.utils.JSON; import java.io.IOException; @@ -27,6 +29,7 @@ public class CCloudApi { private static final Logger LOGGER = LogManager.getLogger(CCloudApi.class); + private static final String V1_IAM_SERVICE_ACCOUNTS_URL = "/service_accounts"; private static final String V2_IAM_SERVICE_ACCOUNTS_URL = "/iam/v2/service-accounts"; private static final String V3_KAFKA_CLUSTER_URL = "/kafka/v3/clusters/"; @@ -132,6 +135,21 @@ public Set listServiceAccounts() throws IOException { return accounts; } + public Set listServiceAccountsV1() throws IOException { + Set accounts = new HashSet<>(); + var response = getServiceAccountsV1(V1_IAM_SERVICE_ACCOUNTS_URL); + if (response.getError() == null) { + accounts = new HashSet<>(response.getUsers()); + } + return accounts; + } + + private ServiceAccountV1Response getServiceAccountsV1(String url) throws IOException { + Response r = ccloudApiHttpClient.doGet(url); + return (ServiceAccountV1Response) + JSON.toObject(r.getResponseAsString(), ServiceAccountV1Response.class); + } + private ListServiceAccountResponse getListServiceAccounts(String url) throws IOException { Response r = ccloudApiHttpClient.doGet(url); return (ListServiceAccountResponse) diff --git a/src/main/java/com/purbon/kafka/topology/api/ccloud/response/ServiceAccountV1Response.java b/src/main/java/com/purbon/kafka/topology/api/ccloud/response/ServiceAccountV1Response.java new file mode 100644 index 000000000..5a0a844a0 --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/api/ccloud/response/ServiceAccountV1Response.java @@ -0,0 +1,16 @@ +package com.purbon.kafka.topology.api.ccloud.response; + +import com.purbon.kafka.topology.model.cluster.ServiceAccountV1; +import java.util.List; +import java.util.Map; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class ServiceAccountV1Response { + + private List users; + private Map page_info; + private String error; +} diff --git a/src/main/java/com/purbon/kafka/topology/model/cluster/ServiceAccountV1.java b/src/main/java/com/purbon/kafka/topology/model/cluster/ServiceAccountV1.java new file mode 100644 index 000000000..322dd27aa --- /dev/null +++ b/src/main/java/com/purbon/kafka/topology/model/cluster/ServiceAccountV1.java @@ -0,0 +1,34 @@ +package com.purbon.kafka.topology.model.cluster; + +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.ToString; + +@Data +@NoArgsConstructor +@AllArgsConstructor +@ToString +public class ServiceAccountV1 { + + private long id; + private String email; + private String first_name; + private String last_name; + private int organization_id; + private boolean deactivated; + private String verified; + private String created; + private String modified; + private String service_name; + private String service_description; + private boolean service_account; + private Map sso; + private Map preferences; + private boolean internal; + private String resource_id; + private String deactivated_at; + private String social_connection; + private String auth_type; +} diff --git a/src/main/java/com/purbon/kafka/topology/model/users/platform/KsqlServerInstance.java b/src/main/java/com/purbon/kafka/topology/model/users/platform/KsqlServerInstance.java index 59eb8e62a..1e45a4f7b 100644 --- a/src/main/java/com/purbon/kafka/topology/model/users/platform/KsqlServerInstance.java +++ b/src/main/java/com/purbon/kafka/topology/model/users/platform/KsqlServerInstance.java @@ -19,7 +19,9 @@ public String getOwner() { return owner; } - public String commandTopic() { return String.format("_confluent-ksql-%s_command_topic", ksqlDbId); } + public String commandTopic() { + return String.format("_confluent-ksql-%s_command_topic", ksqlDbId); + } public String internalTopics() { return String.format("_confluent-ksql-%s", ksqlDbId); diff --git a/src/main/java/com/purbon/kafka/topology/roles/CCloudAclsProvider.java b/src/main/java/com/purbon/kafka/topology/roles/CCloudAclsProvider.java index 797c29818..1ce8724fe 100644 --- a/src/main/java/com/purbon/kafka/topology/roles/CCloudAclsProvider.java +++ b/src/main/java/com/purbon/kafka/topology/roles/CCloudAclsProvider.java @@ -1,5 +1,7 @@ package com.purbon.kafka.topology.roles; +import static java.util.Arrays.asList; + import com.purbon.kafka.topology.AccessControlProvider; import com.purbon.kafka.topology.Configuration; import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient; @@ -10,6 +12,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -20,25 +23,30 @@ public class CCloudAclsProvider extends SimpleAclsProvider implements AccessCont private final CCloudApi cli; private final String clusterId; + private final Configuration config; + private Map lookupServiceAccountId; public CCloudAclsProvider( - final TopologyBuilderAdminClient adminClient, final Configuration config) throws IOException { + final TopologyBuilderAdminClient adminClient, final Configuration config) { super(adminClient); this.cli = new CCloudApi(config.getConfluentCloudClusterUrl(), config); this.clusterId = config.getConfluentCloudClusterId(); + this.config = config; } @Override public void createBindings(Set bindings) throws IOException { + this.lookupServiceAccountId = initializeLookupTable(this.cli); for (TopologyAclBinding binding : bindings) { - cli.createAcl(clusterId, binding); + cli.createAcl(clusterId, translateIfNecessary(binding)); } } @Override public void clearBindings(Set bindings) throws IOException { for (TopologyAclBinding binding : bindings) { - cli.deleteAcls(clusterId, binding); + this.lookupServiceAccountId = initializeLookupTable(this.cli); + cli.deleteAcls(clusterId, translateIfNecessary(binding)); } } @@ -59,4 +67,70 @@ public Map> listAcls() { return Collections.emptyMap(); } } + + private TopologyAclBinding translateIfNecessary(TopologyAclBinding binding) throws IOException { + + if (!config.isConfluentCloudServiceAccountTranslationEnabled()) { + LOGGER.debug("Confluent Cloud Principal translation is currently disabled"); + return binding; + } + + LOGGER.info( + "At the time of this PR, 4 Feb the Confluent Cloud ACL(s) api require to translate " + + "Service Account names into ID(s). At some point in time this will not be required anymore, " + + "so you can configure this out by using ccloud.service_account.translation.enabled=false (true by default)"); + + String principal = binding.getPrincipal(); + Long translatedPrincipalId = lookupServiceAccountId.get(principal); + if (translatedPrincipalId == null) { // Translation failed, so we can't continue + throw new IOException( + "Translation of principal " + + principal + + " failed, please review your system configuration"); + } + String[] fields = principal.split(":"); + + if (!asList("group", "user").contains(fields[0].toLowerCase())) { + throw new IOException("Unknown principalType: " + fields[0]); + } + + TopologyAclBinding translatedBinding = + TopologyAclBinding.build( + binding.getResourceType(), + binding.getResourceName(), + binding.getHost(), + binding.getOperation(), + mappedPrincipal(fields[0], principal, translatedPrincipalId), + binding.getPattern()); + return translatedBinding; + } + + private String mappedPrincipal(String type, String principal, Long translatedPrincipalId) { + LOGGER.debug( + "Translating Confluent Cloud principal " + + principal + + " to " + + type + + ":" + + translatedPrincipalId); + return type + ":" + translatedPrincipalId; + } + + private Map initializeLookupTable(CCloudApi cli) throws IOException { + Map lookupServiceAccountTable = new HashMap<>(); + + Map lookupSaName = new HashMap<>(); + var v2ServiceAccounts = cli.listServiceAccounts(); + for (var serviceAccount : v2ServiceAccounts) { + lookupSaName.put(serviceAccount.getId(), serviceAccount.getName()); + } + var v1ServiceAccounts = cli.listServiceAccountsV1(); + for (var serviceAccount : v1ServiceAccounts) { + var serviceAccountNameOptional = + Optional.ofNullable(lookupSaName.get(serviceAccount.getResource_id())); + serviceAccountNameOptional.ifPresent( + name -> lookupServiceAccountTable.put(name, serviceAccount.getId())); + } + return lookupServiceAccountTable; + } } diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index a9d3413c5..9307f065f 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -99,6 +99,12 @@ confluent { metrics.topic = "_confluent-metrics" } +ccloud { + service_account { + translation.enabled = true + } +} + kafka { internal { topic.prefixes = [ "_" ]