Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding enum for kafka protocols #100

Merged
merged 7 commits into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -306,4 +306,13 @@ public ResponseEntity<Map<String, String>> getClusterInfoFromEnv(
envsClustersTenantsControllerService.getClusterInfoFromEnv(envSelected, envType),
HttpStatus.OK);
}

@RequestMapping(
value = "/getKafkaProtocols",
method = RequestMethod.GET,
produces = {MediaType.APPLICATION_JSON_VALUE})
public ResponseEntity<List<Map<String, String>>> getSupportedKafkaProtocols() {
return new ResponseEntity<>(
envsClustersTenantsControllerService.getSupportedKafkaProtocols(), HttpStatus.OK);
}
}
4 changes: 3 additions & 1 deletion src/main/java/io/aiven/klaw/dao/KwClusters.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.aiven.klaw.dao;

import io.aiven.klaw.model.KafkaSupportedProtocol;
import java.io.Serializable;
import javax.persistence.*;
import lombok.Getter;
Expand Down Expand Up @@ -29,7 +30,8 @@ public class KwClusters implements Serializable {
private String bootstrapServers;

@Column(name = "protocol")
private String protocol;
@Enumerated(EnumType.STRING)
private KafkaSupportedProtocol protocol;

@Column(name = "clustertype")
private String clusterType;
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/io/aiven/klaw/model/KafkaSupportedProtocol.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.aiven.klaw.model;

public enum KafkaSupportedProtocol {
PLAINTEXT("PLAINTEXT"),
SSL("SSL"),
SASL_PLAIN("SASL_PLAIN"),
SASL_SSL_PLAIN_MECHANISM("SASL_SSL/PLAIN"),
SASL_SSL_GSSAPI_MECHANISM("SASL_SSL/GSSAPI"),
SASL_SSL_SCRAM_MECHANISM_256("SASL_SSL/SCRAM-SHA-256"),
SASL_SSL_SCRAM_MECHANISM_512("SASL_SSL/SCRAM-SHA-512");

private final String value;

public String getName() {
return name();
}

public String getValue() {
return value;
}

KafkaSupportedProtocol(String value) {
this.value = value;
}
}
4 changes: 3 additions & 1 deletion src/main/java/io/aiven/klaw/model/KwClustersModel.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.aiven.klaw.model;

import io.aiven.klaw.validation.KafkaClusterValidator;
import java.io.Serializable;
import java.util.List;
import javax.validation.constraints.NotNull;
Expand All @@ -12,6 +13,7 @@
@ToString
@Getter
@Setter
@KafkaClusterValidator
public class KwClustersModel implements Serializable {

private Integer clusterId;
Expand All @@ -26,7 +28,7 @@ public class KwClustersModel implements Serializable {
private String bootstrapServers;

@NotNull(message = "Protocol cannot be null")
private String protocol;
private KafkaSupportedProtocol protocol;

@NotNull private String clusterType;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.aiven.klaw.model.cluster;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.aiven.klaw.model.KafkaSupportedProtocol;
import io.aiven.klaw.model.RequestOperationType;
import java.io.Serializable;
import lombok.Builder;
Expand All @@ -20,7 +21,7 @@ public class ClusterAclRequest implements Serializable {

@JsonProperty private String env;

@JsonProperty private String protocol;
@JsonProperty private KafkaSupportedProtocol protocol;

@JsonProperty private String clusterName;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.aiven.klaw.model.cluster;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.aiven.klaw.model.KafkaSupportedProtocol;
import java.io.Serializable;
import lombok.Builder;

Expand All @@ -9,7 +10,7 @@ public class ClusterConnectorRequest implements Serializable {

@JsonProperty private String env;

@JsonProperty private String protocol;
@JsonProperty private KafkaSupportedProtocol protocol;

@JsonProperty private String connectorConfig;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.aiven.klaw.model.cluster;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.aiven.klaw.model.KafkaSupportedProtocol;
import java.io.Serializable;
import lombok.Builder;

Expand All @@ -9,7 +10,7 @@ public class ClusterSchemaRequest implements Serializable {

@JsonProperty private String env;

@JsonProperty private String protocol;
@JsonProperty private KafkaSupportedProtocol protocol;

@JsonProperty private String topicName;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.aiven.klaw.model.cluster;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.aiven.klaw.model.KafkaSupportedProtocol;
import java.io.Serializable;
import lombok.Builder;

Expand All @@ -13,7 +14,7 @@ public class ClusterTopicRequest implements Serializable {

@JsonProperty private short replicationFactor;

@JsonProperty private String protocol;
@JsonProperty private KafkaSupportedProtocol protocol;

@JsonProperty private String clusterName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.aiven.klaw.model.ApiResultStatus;
import io.aiven.klaw.model.KafkaClustersType;
import io.aiven.klaw.model.KafkaFlavors;
import io.aiven.klaw.model.KafkaSupportedProtocol;
import io.aiven.klaw.model.PermissionType;
import io.aiven.klaw.model.RequestOperationType;
import io.aiven.klaw.model.SyncAclUpdates;
Expand Down Expand Up @@ -221,7 +222,7 @@ private void approveSyncBackAcls(
private List<Map<String, String>> getAclListFromCluster(
String bootstrapHost,
Env envSelected,
String protocol,
KafkaSupportedProtocol protocol,
String clusterName,
String topicNameSearch,
int tenantId)
Expand Down
43 changes: 27 additions & 16 deletions src/main/java/io/aiven/klaw/service/ClusterApiService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.aiven.klaw.model.ClusterStatus;
import io.aiven.klaw.model.KafkaClustersType;
import io.aiven.klaw.model.KafkaFlavors;
import io.aiven.klaw.model.KafkaSupportedProtocol;
import io.aiven.klaw.model.RequestOperationType;
import io.aiven.klaw.model.cluster.ClusterAclRequest;
import io.aiven.klaw.model.cluster.ClusterConnectorRequest;
Expand Down Expand Up @@ -170,7 +171,7 @@ String getSchemaClusterStatus(String host, int tenantId) {

String getKafkaClusterStatus(
String bootstrapHost,
String protocol,
KafkaSupportedProtocol protocol,
String clusterIdentification,
String clusterType,
int tenantId) {
Expand All @@ -184,7 +185,7 @@ String getKafkaClusterStatus(
+ uriEnvStatus
+ bootstrapHost
+ URL_DELIMITER
+ String.join(URL_DELIMITER, protocol, clusterIdentification, clusterType);
+ String.join(URL_DELIMITER, protocol.getName(), clusterIdentification, clusterType);

ResponseEntity<ClusterStatus> resultBody =
getRestTemplate().exchange(uri, HttpMethod.GET, getHttpEntity(), ClusterStatus.class);
Expand All @@ -197,7 +198,7 @@ String getKafkaClusterStatus(

public List<Map<String, String>> getConsumerOffsets(
String bootstrapHost,
String protocol,
KafkaSupportedProtocol protocol,
String clusterIdentification,
String topic,
String consumerGroupId,
Expand All @@ -213,7 +214,8 @@ public List<Map<String, String>> getConsumerOffsets(
+ url
+ bootstrapHost
+ URL_DELIMITER
+ String.join(URL_DELIMITER, protocol, clusterIdentification, consumerGroupId, topic);
+ String.join(
URL_DELIMITER, protocol.getName(), clusterIdentification, consumerGroupId, topic);

ResponseEntity<List<Map<String, String>>> resultBody =
getRestTemplate()
Expand All @@ -230,7 +232,7 @@ public List<Map<String, String>> getConsumerOffsets(

public Map<String, String> getTopicEvents(
String bootstrapHost,
String protocol,
KafkaSupportedProtocol protocol,
String clusterIdentification,
String topic,
String offsetId,
Expand All @@ -249,7 +251,12 @@ public Map<String, String> getTopicEvents(
+ bootstrapHost
+ URL_DELIMITER
+ String.join(
URL_DELIMITER, protocol, clusterIdentification, consumerGroupId, topic, offsetId);
URL_DELIMITER,
protocol.getName(),
clusterIdentification,
consumerGroupId,
topic,
offsetId);

ResponseEntity<Map<String, String>> resultBody =
getRestTemplate()
Expand All @@ -265,7 +272,8 @@ public Map<String, String> getTopicEvents(
}

public List<Map<String, String>> getAcls(
String bootstrapHost, Env envSelected, String protocol, int tenantId) throws KlawException {
String bootstrapHost, Env envSelected, KafkaSupportedProtocol protocol, int tenantId)
throws KlawException {
log.info("getAcls {} {} {}", bootstrapHost, protocol, tenantId);
getClusterApiProperties(tenantId);

Expand All @@ -288,7 +296,7 @@ public List<Map<String, String>> getAcls(
+ String.join(
URL_DELIMITER,
AclsNativeType.AIVEN.name(),
protocol,
protocol.getName(),
kwClusters.getClusterName() + kwClusters.getClusterId(),
kwClusters.getProjectName(),
kwClusters.getServiceName());
Expand All @@ -301,7 +309,7 @@ public List<Map<String, String>> getAcls(
+ String.join(
URL_DELIMITER,
AclsNativeType.NATIVE.name(),
protocol,
protocol.getName(),
kwClusters.getClusterName() + kwClusters.getClusterId(),
"na",
"na");
Expand All @@ -320,7 +328,10 @@ public List<Map<String, String>> getAcls(
}

public List<Map<String, String>> getAllTopics(
String bootstrapHost, String protocol, String clusterIdentification, int tenantId)
String bootstrapHost,
KafkaSupportedProtocol protocol,
String clusterIdentification,
int tenantId)
throws Exception {
log.info("getAllTopics {} {}", bootstrapHost, protocol);
getClusterApiProperties(tenantId);
Expand All @@ -332,7 +343,7 @@ public List<Map<String, String>> getAllTopics(
+ uriGetTopics
+ bootstrapHost
+ URL_DELIMITER
+ String.join(URL_DELIMITER, protocol, clusterIdentification);
+ String.join(URL_DELIMITER, protocol.getName(), clusterIdentification);

HttpEntity<String> entity = getHttpEntity();
ResponseEntity<Set<Map<String, String>>> s =
Expand All @@ -350,7 +361,7 @@ public List<Map<String, String>> getAllTopics(

public String approveConnectorRequests(
String connectorName,
String protocol,
KafkaSupportedProtocol protocol,
String connectorType,
String connectorConfig,
String kafkaConnectHost,
Expand Down Expand Up @@ -586,7 +597,7 @@ ResponseEntity<ApiResponse> postSchema(

public TreeMap<Integer, Map<String, Object>> getAvroSchema(
String schemaRegistryHost,
String protocol,
KafkaSupportedProtocol protocol,
String clusterName,
String topicName,
int tenantId)
Expand All @@ -602,7 +613,7 @@ public TreeMap<Integer, Map<String, Object>> getAvroSchema(
+ uriGetSchema
+ schemaRegistryHost
+ URL_DELIMITER
+ String.join(URL_DELIMITER, protocol, clusterName, topicName);
+ String.join(URL_DELIMITER, protocol.getName(), clusterName, topicName);

ResponseEntity<TreeMap<String, Map<String, Object>>> treeMapResponseEntity =
getRestTemplate()
Expand All @@ -626,7 +637,7 @@ public TreeMap<Integer, Map<String, Object>> getAvroSchema(
}

public Map<String, Object> getConnectorDetails(
String connectorName, String kafkaConnectHost, String protocol, int tenantId)
String connectorName, String kafkaConnectHost, KafkaSupportedProtocol protocol, int tenantId)
throws KlawException {
log.info("getConnectorDetails {} {}", connectorName, kafkaConnectHost);
getClusterApiProperties(tenantId);
Expand All @@ -637,7 +648,7 @@ public Map<String, Object> getConnectorDetails(
"/topics/getConnectorDetails",
connectorName,
kafkaConnectHost,
protocol);
protocol.getName());
String uriGetConnectorsFull = clusterConnUrl + uriGetTopics;

ResponseEntity<LinkedHashMap<String, Object>> s =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,11 @@ public List<KwClustersModel> getClustersPaginated(
List<KwClustersModel> envListMap3 =
kwClustersModelList.stream()
.filter(
env -> env.getProtocol().toLowerCase().contains(searchClusterParam.toLowerCase()))
env ->
env.getProtocol()
.getName()
.toLowerCase()
.contains(searchClusterParam.toLowerCase()))
.collect(Collectors.toList());
envListMap1.addAll(envListMap2);
envListMap1.addAll(envListMap3);
Expand Down Expand Up @@ -570,10 +574,10 @@ public List<EnvModel> getSchemaRegEnvsStatus() {
String status;

if (manageDatabase
.getClusters(KafkaClustersType.SCHEMA_REGISTRY, tenantId)
.get(oneEnv.getClusterId())
.getProtocol()
.equalsIgnoreCase("plaintext"))
.getClusters(KafkaClustersType.SCHEMA_REGISTRY, tenantId)
.get(oneEnv.getClusterId())
.getProtocol()
== KafkaSupportedProtocol.PLAINTEXT)
muralibasani marked this conversation as resolved.
Show resolved Hide resolved
status =
clusterApiService.getSchemaClusterStatus(
manageDatabase
Expand Down Expand Up @@ -688,7 +692,7 @@ public ApiResponse addNewEnv(EnvModel newEnv) throws KlawException {
}

public ApiResponse addNewCluster(KwClustersModel kwClustersModel) {
log.info("addNewCluster {}", kwClustersModel.getClusterName());
log.info("addNewCluster {}", kwClustersModel);
Map<String, String> resultMap = new HashMap<>();

int tenantId = commonUtilsService.getTenantId(getUserName());
Expand Down Expand Up @@ -722,7 +726,7 @@ public ApiResponse addNewCluster(KwClustersModel kwClustersModel) {
kwCluster.setClusterName(kwCluster.getClusterName().toUpperCase());

// only for new cluster requests on saas
if ("SSL".equals(kwCluster.getProtocol())
if (KafkaSupportedProtocol.SSL == kwCluster.getProtocol()
&& kwCluster.getClusterId() == null
&& "saas".equals(kwInstallationType)) {
if (!savePublicKey(kwClustersModel, resultMap, tenantId, kwCluster)) {
Expand Down Expand Up @@ -1245,4 +1249,16 @@ public Map<String, String> getClusterInfoFromEnv(String envSelected, String clus

return clusterInfo;
}

public List<Map<String, String>> getSupportedKafkaProtocols() {
List<Map<String, String>> supportedProtocols = new ArrayList<>();
for (KafkaSupportedProtocol kafkaSupportedProtocol : KafkaSupportedProtocol.values()) {
Map<String, String> protocolValues = new HashMap<>();
protocolValues.put("name", kafkaSupportedProtocol.getName());
protocolValues.put("value", kafkaSupportedProtocol.getValue());
supportedProtocols.add(protocolValues);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the contract for supportedProtocols? Which maps should be added there?
Is it ok to add NON modifiable maps?
I'm asking because if it is ok to add NON modifiable map then the cycle it could be a bit simplified like

for (KafkaSupportedProtocol kafkaSupportedProtocol : KafkaSupportedProtocol.values()) {
      supportedProtocols.add(
              Map.of("name", kafkaSupportedProtocol.getName(), 
                      "value", kafkaSupportedProtocol.getValue()));
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed this is a NON modifiable map, updated the code. thanks.

}

return supportedProtocols;
}
}
Loading