Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Confluent Cloud ACL(s) API usage, so ACL(s) are finally created properly #444

Merged
merged 9 commits into from
Feb 4, 2022
16 changes: 2 additions & 14 deletions example/descriptor-only-topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,14 @@ context: "oltp"
projects:
- name: "foo"
consumers:
- principal: "User:App0"
- principal: "User:NewApp2"
topics:
- name: "foo"
config:
replication.factor: "1"
num.partitions: "1"
- dataType: "avro"
name: "bar"
config:
replication.factor: "1"
num.partitions: "1"
- dataType: "json"
name: "zet"
config:
replication.factor: "1"
num.partitions: "1"
- name: "bar"
topics:
- dataType: "avro"
name: "bar"
config:
replication.factor: "1"
num.partitions: "1"
name: "bar"
7 changes: 6 additions & 1 deletion example/julieops-confluent-cloud.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,9 @@ ccloud.cloud.api.key=<CLOUD_API_KEY>
ccloud.cloud.api.secret=<CLOUD_API_SECRET>
topology.builder.ccloud.kafka.cluster.id=lkc-jkz1m
ccloud.cluster.url=<CLUSTER_REST_URL>
topology.builder.access.control.class = com.purbon.kafka.topology.roles.CCloudAclsProvider
topology.builder.access.control.class = com.purbon.kafka.topology.roles.CCloudAclsProvider
# julie.enable.principal.management = true
# allow.delete.principals = true
# allow.delete.topics = true
# topology.state.cluster.enabled = false
# topology.state.topics.cluster.enabled = false
4 changes: 4 additions & 0 deletions src/main/java/com/purbon/kafka/topology/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,10 @@ public String getConfluentCloudClusterUrl() {
return config.getString(CCLOUD_CLUSTER_URL);
}

public Boolean isConfluentCloudServiceAccountTranslationEnabled() {
return config.getBoolean(CCLOUD_SERVICE_ACCOUNT_TRANSLATION_ENABLED);
}

public Boolean enabledPrincipalManagement() {
return config.getBoolean(JULIE_ENABLE_PRINCIPAL_MANAGEMENT);
}
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/purbon/kafka/topology/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ public class Constants {
"topology.builder.ccloud.kafka.cluster.id";
public static final String CCLOUD_CLUSTER_URL = "ccloud.cluster.url";

public static final String CCLOUD_SERVICE_ACCOUNT_TRANSLATION_ENABLED =
"ccloud.service_account.translation.enabled";

public static final String TOPOLOGY_EXPERIMENTAL_ENABLED_CONFIG =
"topology.features.experimental";
static final String TOPOLOGY_PRINCIPAL_TRANSLATION_ENABLED_CONFIG =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class PrincipalUpdateManager extends AbstractPrincipalManager {

private static final Logger LOGGER = LogManager.getLogger(PrincipalUpdateManager.class);

public PrincipalUpdateManager(PrincipalProvider provider, Configuration config) {
super(provider, config);
}
Expand All @@ -22,6 +26,13 @@ protected void doUpdatePlan(
Topology topology,
List<String> principals,
Map<String, ServiceAccount> accounts) {
LOGGER.debug(
"Updating accounts for principals = "
+ principals.stream().collect(Collectors.joining(","))
+ " accounts = "
+ accounts.values().stream()
.map(ServiceAccount::toString)
.collect(Collectors.joining(", ")));
// build set of principals to be created.
Set<ServiceAccount> principalsToBeCreated =
principals.stream()
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/purbon/kafka/topology/api/ccloud/CCloudApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
import com.purbon.kafka.topology.api.ccloud.response.KafkaAclListResponse;
import com.purbon.kafka.topology.api.ccloud.response.ListServiceAccountResponse;
import com.purbon.kafka.topology.api.ccloud.response.ServiceAccountResponse;
import com.purbon.kafka.topology.api.ccloud.response.ServiceAccountV1Response;
import com.purbon.kafka.topology.api.mds.Response;
import com.purbon.kafka.topology.clients.JulieHttpClient;
import com.purbon.kafka.topology.model.cluster.ServiceAccount;
import com.purbon.kafka.topology.model.cluster.ServiceAccountV1;
import com.purbon.kafka.topology.roles.TopologyAclBinding;
import com.purbon.kafka.topology.utils.JSON;
import java.io.IOException;
Expand All @@ -27,6 +29,7 @@ public class CCloudApi {

private static final Logger LOGGER = LogManager.getLogger(CCloudApi.class);

private static final String V1_IAM_SERVICE_ACCOUNTS_URL = "/service_accounts";
private static final String V2_IAM_SERVICE_ACCOUNTS_URL = "/iam/v2/service-accounts";
private static final String V3_KAFKA_CLUSTER_URL = "/kafka/v3/clusters/";

Expand Down Expand Up @@ -132,6 +135,21 @@ public Set<ServiceAccount> listServiceAccounts() throws IOException {
return accounts;
}

public Set<ServiceAccountV1> listServiceAccountsV1() throws IOException {
Set<ServiceAccountV1> accounts = new HashSet<>();
var response = getServiceAccountsV1(V1_IAM_SERVICE_ACCOUNTS_URL);
if (response.getError() == null) {
accounts = new HashSet<>(response.getUsers());
}
return accounts;
}

private ServiceAccountV1Response getServiceAccountsV1(String url) throws IOException {
Response r = ccloudApiHttpClient.doGet(url);
return (ServiceAccountV1Response)
JSON.toObject(r.getResponseAsString(), ServiceAccountV1Response.class);
}

private ListServiceAccountResponse getListServiceAccounts(String url) throws IOException {
Response r = ccloudApiHttpClient.doGet(url);
return (ListServiceAccountResponse)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.purbon.kafka.topology.api.ccloud.response;

import com.purbon.kafka.topology.model.cluster.ServiceAccountV1;
import java.util.List;
import java.util.Map;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
public class ServiceAccountV1Response {

private List<ServiceAccountV1> users;
private Map<String, Object> page_info;
private String error;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.purbon.kafka.topology.model.cluster;

import java.util.Map;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ServiceAccountV1 {

private long id;
private String email;
private String first_name;
private String last_name;
private int organization_id;
private boolean deactivated;
private String verified;
private String created;
private String modified;
private String service_name;
private String service_description;
private boolean service_account;
private Map<String, Object> sso;
private Map<String, Object> preferences;
private boolean internal;
private String resource_id;
private String deactivated_at;
private String social_connection;
private String auth_type;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public String getOwner() {
return owner;
}

public String commandTopic() { return String.format("_confluent-ksql-%s_command_topic", ksqlDbId); }
public String commandTopic() {
return String.format("_confluent-ksql-%s_command_topic", ksqlDbId);
}

public String internalTopics() {
return String.format("_confluent-ksql-%s", ksqlDbId);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.purbon.kafka.topology.roles;

import static java.util.Arrays.asList;

import com.purbon.kafka.topology.AccessControlProvider;
import com.purbon.kafka.topology.Configuration;
import com.purbon.kafka.topology.api.adminclient.TopologyBuilderAdminClient;
Expand All @@ -10,6 +12,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -20,25 +23,30 @@ public class CCloudAclsProvider extends SimpleAclsProvider implements AccessCont

private final CCloudApi cli;
private final String clusterId;
private final Configuration config;
private Map<String, Long> lookupServiceAccountId;

public CCloudAclsProvider(
final TopologyBuilderAdminClient adminClient, final Configuration config) throws IOException {
final TopologyBuilderAdminClient adminClient, final Configuration config) {
super(adminClient);
this.cli = new CCloudApi(config.getConfluentCloudClusterUrl(), config);
this.clusterId = config.getConfluentCloudClusterId();
this.config = config;
}

@Override
public void createBindings(Set<TopologyAclBinding> bindings) throws IOException {
this.lookupServiceAccountId = initializeLookupTable(this.cli);
for (TopologyAclBinding binding : bindings) {
cli.createAcl(clusterId, binding);
cli.createAcl(clusterId, translateIfNecessary(binding));
}
}

@Override
public void clearBindings(Set<TopologyAclBinding> bindings) throws IOException {
for (TopologyAclBinding binding : bindings) {
cli.deleteAcls(clusterId, binding);
this.lookupServiceAccountId = initializeLookupTable(this.cli);
cli.deleteAcls(clusterId, translateIfNecessary(binding));
}
}

Expand All @@ -59,4 +67,70 @@ public Map<String, List<TopologyAclBinding>> listAcls() {
return Collections.emptyMap();
}
}

private TopologyAclBinding translateIfNecessary(TopologyAclBinding binding) throws IOException {

if (!config.isConfluentCloudServiceAccountTranslationEnabled()) {
LOGGER.debug("Confluent Cloud Principal translation is currently disabled");
return binding;
}

LOGGER.info(
"At the time of this PR, 4 Feb the Confluent Cloud ACL(s) api require to translate "
+ "Service Account names into ID(s). At some point in time this will not be required anymore, "
+ "so you can configure this out by using ccloud.service_account.translation.enabled=false (true by default)");

String principal = binding.getPrincipal();
Long translatedPrincipalId = lookupServiceAccountId.get(principal);
if (translatedPrincipalId == null) { // Translation failed, so we can't continue
throw new IOException(
"Translation of principal "
+ principal
+ " failed, please review your system configuration");
}
String[] fields = principal.split(":");

if (!asList("group", "user").contains(fields[0].toLowerCase())) {
throw new IOException("Unknown principalType: " + fields[0]);
}

TopologyAclBinding translatedBinding =
TopologyAclBinding.build(
binding.getResourceType(),
binding.getResourceName(),
binding.getHost(),
binding.getOperation(),
mappedPrincipal(fields[0], principal, translatedPrincipalId),
binding.getPattern());
return translatedBinding;
}

private String mappedPrincipal(String type, String principal, Long translatedPrincipalId) {
LOGGER.debug(
"Translating Confluent Cloud principal "
+ principal
+ " to "
+ type
+ ":"
+ translatedPrincipalId);
return type + ":" + translatedPrincipalId;
}

private Map<String, Long> initializeLookupTable(CCloudApi cli) throws IOException {
Map<String, Long> lookupServiceAccountTable = new HashMap<>();

Map<String, String> lookupSaName = new HashMap<>();
var v2ServiceAccounts = cli.listServiceAccounts();
for (var serviceAccount : v2ServiceAccounts) {
lookupSaName.put(serviceAccount.getId(), serviceAccount.getName());
}
var v1ServiceAccounts = cli.listServiceAccountsV1();
for (var serviceAccount : v1ServiceAccounts) {
var serviceAccountNameOptional =
Optional.ofNullable(lookupSaName.get(serviceAccount.getResource_id()));
serviceAccountNameOptional.ifPresent(
name -> lookupServiceAccountTable.put(name, serviceAccount.getId()));
}
return lookupServiceAccountTable;
}
}
6 changes: 6 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ confluent {
metrics.topic = "_confluent-metrics"
}

ccloud {
service_account {
translation.enabled = true
}
}

kafka {
internal {
topic.prefixes = [ "_" ]
Expand Down