Skip to content

Commit

Permalink
Fix Confluent Cloud ACL(s) API usage, so ACL(s) are finally created p…
Browse files Browse the repository at this point in the history
…roperly (#444)

* add few calls to translate String ServiceAccountLabels into proper AccountIDs as expected by the internal system

* add configuration variable to control the confluent cloud translation future for service account id(s)

* simplify topic only descriptor

* add log information for when the translation process happends

* update the julieops example for confluent cloud with more interesting configs

* code simplification

* more simplification

* ammend typo

* ammend formatting
  • Loading branch information
purbon authored Feb 4, 2022
1 parent e7cac6a commit ac53aea
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 19 deletions.
16 changes: 2 additions & 14 deletions example/descriptor-only-topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,14 @@ context: "oltp"
projects:
- name: "foo"
consumers:
- principal: "User:App0"
- principal: "User:NewApp2"
topics:
- name: "foo"
config:
replication.factor: "1"
num.partitions: "1"
- dataType: "avro"
name: "bar"
config:
replication.factor: "1"
num.partitions: "1"
- dataType: "json"
name: "zet"
config:
replication.factor: "1"
num.partitions: "1"
- name: "bar"
topics:
- dataType: "avro"
name: "bar"
config:
replication.factor: "1"
num.partitions: "1"
name: "bar"
7 changes: 6 additions & 1 deletion example/julieops-confluent-cloud.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ ccloud.cloud.api.key=<CLOUD_API_KEY>
ccloud.cloud.api.secret=<CLOUD_API_SECRET>
topology.builder.ccloud.kafka.cluster.id=lkc-jkz1m
ccloud.cluster.url=<CLUSTER_REST_URL>
topology.builder.access.control.class = com.purbon.kafka.topology.roles.CCloudAclsProvider
topology.builder.access.control.class = com.purbon.kafka.topology.roles.CCloudAclsProvider
# julie.enable.principal.management = true
# allow.delete.principals = true
# allow.delete.topics = true
# topology.state.cluster.enabled = false
# topology.state.topics.cluster.enabled = false
4 changes: 4 additions & 0 deletions src/main/java/com/purbon/kafka/topology/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,10 @@ public String getConfluentCloudClusterUrl() {
return config.getString(CCLOUD_CLUSTER_URL);
}

public Boolean isConfluentCloudServiceAccountTranslationEnabled() {
return config.getBoolean(CCLOUD_SERVICE_ACCOUNT_TRANSLATION_ENABLED);
}

public Boolean enabledPrincipalManagement() {
return config.getBoolean(JULIE_ENABLE_PRINCIPAL_MANAGEMENT);
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/purbon/kafka/topology/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public class Constants {
"topology.builder.ccloud.kafka.cluster.id";
public static final String CCLOUD_CLUSTER_URL = "ccloud.cluster.url";

public static final String CCLOUD_SERVICE_ACCOUNT_TRANSLATION_ENABLED =
"ccloud.service_account.translation.enabled";

public static final String TOPOLOGY_EXPERIMENTAL_ENABLED_CONFIG =
"topology.features.experimental";
static final String TOPOLOGY_PRINCIPAL_TRANSLATION_ENABLED_CONFIG =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class PrincipalUpdateManager extends AbstractPrincipalManager {

private static final Logger LOGGER = LogManager.getLogger(PrincipalUpdateManager.class);

public PrincipalUpdateManager(PrincipalProvider provider, Configuration config) {
super(provider, config);
}
Expand All @@ -22,6 +26,13 @@ protected void doUpdatePlan(
Topology topology,
List<String> principals,
Map<String, ServiceAccount> accounts) {
LOGGER.debug(
"Updating accounts for principals = "
+ principals.stream().collect(Collectors.joining(","))
+ " accounts = "
+ accounts.values().stream()
.map(ServiceAccount::toString)
.collect(Collectors.joining(", ")));
// build set of principals to be created.
Set<ServiceAccount> principalsToBeCreated =
principals.stream()
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import com.purbon.kafka.topology.api.ccloud.response.KafkaAclListResponse;
import com.purbon.kafka.topology.api.ccloud.response.ListServiceAccountResponse;
import com.purbon.kafka.topology.api.ccloud.response.ServiceAccountResponse;
import com.purbon.kafka.topology.api.ccloud.response.ServiceAccountV1Response;
import com.purbon.kafka.topology.api.mds.Response;
import com.purbon.kafka.topology.clients.JulieHttpClient;
import com.purbon.kafka.topology.model.cluster.ServiceAccount;
import com.purbon.kafka.topology.model.cluster.ServiceAccountV1;
import com.purbon.kafka.topology.roles.TopologyAclBinding;
import com.purbon.kafka.topology.utils.JSON;
import java.io.IOException;
Expand All @@ -27,6 +29,7 @@ public class CCloudApi {

private static final Logger LOGGER = LogManager.getLogger(CCloudApi.class);

private static final String V1_IAM_SERVICE_ACCOUNTS_URL = "/service_accounts";
private static final String V2_IAM_SERVICE_ACCOUNTS_URL = "/iam/v2/service-accounts";
private static final String V3_KAFKA_CLUSTER_URL = "/kafka/v3/clusters/";

Expand Down Expand Up @@ -132,6 +135,21 @@ public Set<ServiceAccount> listServiceAccounts() throws IOException {
return accounts;
}

public Set<ServiceAccountV1> listServiceAccountsV1() throws IOException {
Set<ServiceAccountV1> accounts = new HashSet<>();
var response = getServiceAccountsV1(V1_IAM_SERVICE_ACCOUNTS_URL);
if (response.getError() == null) {
accounts = new HashSet<>(response.getUsers());
}
return accounts;
}

private ServiceAccountV1Response getServiceAccountsV1(String url) throws IOException {
Response r = ccloudApiHttpClient.doGet(url);
return (ServiceAccountV1Response)
JSON.toObject(r.getResponseAsString(), ServiceAccountV1Response.class);
}

private ListServiceAccountResponse getListServiceAccounts(String url) throws IOException {
Response r = ccloudApiHttpClient.doGet(url);
return (ListServiceAccountResponse)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.purbon.kafka.topology.api.ccloud.response;

import com.purbon.kafka.topology.model.cluster.ServiceAccountV1;
import java.util.List;
import java.util.Map;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class ServiceAccountV1Response {

private List<ServiceAccountV1> users;
private Map<String, Object> page_info;
private String error;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.purbon.kafka.topology.model.cluster;

import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ServiceAccountV1 {

private long id;
private String email;
private String first_name;
private String last_name;
private int organization_id;
private boolean deactivated;
private String verified;
private String created;
private String modified;
private String service_name;
private String service_description;
private boolean service_account;
private Map<String, Object> sso;
private Map<String, Object> preferences;
private boolean internal;
private String resource_id;
private String deactivated_at;
private String social_connection;
private String auth_type;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public String getOwner() {
return owner;
}

public String commandTopic() { return String.format("_confluent-ksql-%s_command_topic", ksqlDbId); }
public String commandTopic() {
return String.format("_confluent-ksql-%s_command_topic", ksqlDbId);
}

public String internalTopics() {
return String.format("_confluent-ksql-%s", ksqlDbId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.purbon.kafka.topology.roles;

import static java.util.Arrays.asList;

import com.purbon.kafka.topology.AccessControlProvider;
import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient;
Expand All @@ -10,6 +12,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -20,25 +23,30 @@ public class CCloudAclsProvider extends SimpleAclsProvider implements AccessCont

private final CCloudApi cli;
private final String clusterId;
private final Configuration config;
private Map<String, Long> lookupServiceAccountId;

public CCloudAclsProvider(
final TopologyBuilderAdminClient adminClient, final Configuration config) throws IOException {
final TopologyBuilderAdminClient adminClient, final Configuration config) {
super(adminClient);
this.cli = new CCloudApi(config.getConfluentCloudClusterUrl(), config);
this.clusterId = config.getConfluentCloudClusterId();
this.config = config;
}

@Override
public void createBindings(Set<TopologyAclBinding> bindings) throws IOException {
this.lookupServiceAccountId = initializeLookupTable(this.cli);
for (TopologyAclBinding binding : bindings) {
cli.createAcl(clusterId, binding);
cli.createAcl(clusterId, translateIfNecessary(binding));
}
}

@Override
public void clearBindings(Set<TopologyAclBinding> bindings) throws IOException {
for (TopologyAclBinding binding : bindings) {
cli.deleteAcls(clusterId, binding);
this.lookupServiceAccountId = initializeLookupTable(this.cli);
cli.deleteAcls(clusterId, translateIfNecessary(binding));
}
}

Expand All @@ -59,4 +67,70 @@ public Map<String, List<TopologyAclBinding>> listAcls() {
return Collections.emptyMap();
}
}

private TopologyAclBinding translateIfNecessary(TopologyAclBinding binding) throws IOException {

if (!config.isConfluentCloudServiceAccountTranslationEnabled()) {
LOGGER.debug("Confluent Cloud Principal translation is currently disabled");
return binding;
}

LOGGER.info(
"At the time of this PR, 4 Feb the Confluent Cloud ACL(s) api require to translate "
+ "Service Account names into ID(s). At some point in time this will not be required anymore, "
+ "so you can configure this out by using ccloud.service_account.translation.enabled=false (true by default)");

String principal = binding.getPrincipal();
Long translatedPrincipalId = lookupServiceAccountId.get(principal);
if (translatedPrincipalId == null) { // Translation failed, so we can't continue
throw new IOException(
"Translation of principal "
+ principal
+ " failed, please review your system configuration");
}
String[] fields = principal.split(":");

if (!asList("group", "user").contains(fields[0].toLowerCase())) {
throw new IOException("Unknown principalType: " + fields[0]);
}

TopologyAclBinding translatedBinding =
TopologyAclBinding.build(
binding.getResourceType(),
binding.getResourceName(),
binding.getHost(),
binding.getOperation(),
mappedPrincipal(fields[0], principal, translatedPrincipalId),
binding.getPattern());
return translatedBinding;
}

private String mappedPrincipal(String type, String principal, Long translatedPrincipalId) {
LOGGER.debug(
"Translating Confluent Cloud principal "
+ principal
+ " to "
+ type
+ ":"
+ translatedPrincipalId);
return type + ":" + translatedPrincipalId;
}

private Map<String, Long> initializeLookupTable(CCloudApi cli) throws IOException {
Map<String, Long> lookupServiceAccountTable = new HashMap<>();

Map<String, String> lookupSaName = new HashMap<>();
var v2ServiceAccounts = cli.listServiceAccounts();
for (var serviceAccount : v2ServiceAccounts) {
lookupSaName.put(serviceAccount.getId(), serviceAccount.getName());
}
var v1ServiceAccounts = cli.listServiceAccountsV1();
for (var serviceAccount : v1ServiceAccounts) {
var serviceAccountNameOptional =
Optional.ofNullable(lookupSaName.get(serviceAccount.getResource_id()));
serviceAccountNameOptional.ifPresent(
name -> lookupServiceAccountTable.put(name, serviceAccount.getId()));
}
return lookupServiceAccountTable;
}
}
6 changes: 6 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ confluent {
metrics.topic = "_confluent-metrics"
}

ccloud {
service_account {
translation.enabled = true
}
}

kafka {
internal {
topic.prefixes = [ "_" ]
Expand Down

0 comments on commit ac53aea

Please sign in to comment.