Skip to content

Commit

Permalink
Fix broken build and clean up KafkaSource class. (#3469)
Browse files Browse the repository at this point in the history
Signed-off-by: David Venable <dlv@amazon.com>
  • Loading branch information
dlvenable authored Oct 10, 2023
1 parent 68875c4 commit c06e545
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,18 @@

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaJsonDeserializer;
import kafka.common.BrokerEndPointNotAvailableException;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -50,24 +45,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -87,15 +71,14 @@ public class KafkaSource implements Source<Record<Event>> {
private static final long RETRY_SLEEP_INTERVAL = 30000;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
private final KafkaSourceConfig sourceConfig;
private AtomicBoolean shutdownInProgress;
private final AtomicBoolean shutdownInProgress;
private ExecutorService executorService;
private final PluginMetrics pluginMetrics;
private KafkaCustomConsumer consumer;
private KafkaConsumer kafkaConsumer;
private String pipelineName;
private final String pipelineName;
private String consumerGroupID;
private String schemaType = MessageFormat.PLAINTEXT.toString();
private static final String SCHEMA_TYPE = "schemaType";
private final AcknowledgementSetManager acknowledgementSetManager;
private static CachedSchemaRegistryClient schemaRegistryClient;
private GlueSchemaRegistryKafkaDeserializer glueDeserializer;
Expand Down Expand Up @@ -176,7 +159,7 @@ public void start(Buffer<Record<Event>> buffer) {
case JSON:
return new KafkaConsumer<String, JsonNode>(consumerProperties);
case AVRO:
return new KafkaConsumer<String, GenericRecord>(consumerProperties);
return new KafkaConsumer<String, GenericRecord>(consumerProperties);
case PLAINTEXT:
default:
glueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(sourceConfig);
Expand Down Expand Up @@ -233,7 +216,7 @@ KafkaConsumer getConsumer() {
}

private Properties getConsumerProperties(final TopicConfig topicConfig, final Properties authProperties) {
Properties properties = (Properties)authProperties.clone();
Properties properties = (Properties) authProperties.clone();
if (StringUtils.isNotEmpty(sourceConfig.getClientDnsLookup())) {
ClientDNSLookupType dnsLookupType = ClientDNSLookupType.getDnsLookupType(sourceConfig.getClientDnsLookup());
switch (dnsLookupType) {
Expand All @@ -254,82 +237,10 @@ private Properties getConsumerProperties(final TopicConfig topicConfig, final Pr
return properties;
}

private static boolean validateURL(String url) {
try {
URI uri = new URI(url);
if (uri.getScheme() == null || uri.getHost() == null) {
return false;
}
return true;
} catch (URISyntaxException ex) {
LOG.error("Invalid Schema Registry URI: ", ex);
return false;
}
}

private String getSchemaRegistryUrl() {
return sourceConfig.getSchemaConfig().getRegistryURL();
}

private static String getSchemaType(final String registryUrl, final String topicName, final int schemaVersion) {
StringBuilder response = new StringBuilder();
String schemaType = MessageFormat.PLAINTEXT.toString();
try {
String urlPath = registryUrl + "subjects/" + topicName + "-value/versions/" + schemaVersion;
URL url = new URL(urlPath);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
int responseCode = connection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String inputLine;
while ((inputLine = reader.readLine()) != null) {
response.append(inputLine);
}
reader.close();
ObjectMapper mapper = new ObjectMapper();
Object json = mapper.readValue(response.toString(), Object.class);
String indented = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
JsonNode rootNode = mapper.readTree(indented);
// If the entry exists but schema type doesn't exist then
// the schemaType defaults to AVRO
if (rootNode.has(SCHEMA_TYPE)) {
JsonNode node = rootNode.findValue(SCHEMA_TYPE);
schemaType = node.textValue();
} else {
schemaType = MessageFormat.AVRO.toString();
}
} else {
InputStream errorStream = connection.getErrorStream();
String errorMessage = readErrorMessage(errorStream);
// Plaintext is not a valid schematype in schema registry
// So, if it doesn't exist in schema regitry, default
// the schemaType to PLAINTEXT
LOG.error("GET request failed while fetching the schema registry. Defaulting to schema type PLAINTEXT");
return MessageFormat.PLAINTEXT.toString();
}
} catch (IOException e) {
LOG.error("An error while fetching the schema registry details : ", e);
throw new RuntimeException();
}
return schemaType;
}

private static String readErrorMessage(InputStream errorStream) throws IOException {
if (errorStream == null) {
return null;
}
BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream));
StringBuilder errorMessage = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
errorMessage.append(line);
}
reader.close();
errorStream.close();
return errorMessage.toString();
}

private void setSchemaRegistryProperties(Properties properties, TopicConfig topicConfig) {
SchemaConfig schemaConfig = sourceConfig.getSchemaConfig();
if (Objects.isNull(schemaConfig)) {
Expand Down Expand Up @@ -394,24 +305,24 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic

private void setConsumerTopicProperties(Properties properties, TopicConfig topicConfig) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID);
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int)topicConfig.getMaxPartitionFetchBytes());
properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int) topicConfig.getMaxPartitionFetchBytes());
properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long) topicConfig.getRetryBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long) topicConfig.getReconnectBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
topicConfig.getAutoCommit());
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
((Long)topicConfig.getCommitInterval().toMillis()).intValue());
((Long) topicConfig.getCommitInterval().toMillis()).intValue());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
topicConfig.getAutoOffsetReset());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
topicConfig.getConsumerMaxPollRecords());
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
((Long)topicConfig.getMaxPollInterval().toMillis()).intValue());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long)topicConfig.getSessionTimeOut().toMillis()).intValue());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long)topicConfig.getHeartBeatInterval().toMillis()).intValue());
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int)topicConfig.getFetchMaxBytes());
((Long) topicConfig.getMaxPollInterval().toMillis()).intValue());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long) topicConfig.getSessionTimeOut().toMillis()).intValue());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long) topicConfig.getHeartBeatInterval().toMillis()).intValue());
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int) topicConfig.getFetchMaxBytes());
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int)topicConfig.getFetchMinBytes());
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int) topicConfig.getFetchMinBytes());
}

private void setPropertiesForSchemaRegistryConnectivity(Properties properties) {
Expand All @@ -436,79 +347,6 @@ private void setPropertiesForSchemaRegistryConnectivity(Properties properties) {
}
}

private void isTopicExists(String topicName, String bootStrapServer, Properties properties) {
List<String> bootStrapServers = new ArrayList<>();
String servers[];
if (bootStrapServer.contains(",")) {
servers = bootStrapServer.split(",");
bootStrapServers.addAll(Arrays.asList(servers));
} else {
bootStrapServers.add(bootStrapServer);
}
properties.put("connections.max.idle.ms", 5000);
properties.put("request.timeout.ms", 10000);
try (AdminClient client = KafkaAdminClient.create(properties)) {
boolean topicExists = client.listTopics().names().get().stream().anyMatch(name -> name.equalsIgnoreCase(topicName));
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
LOG.error("Topic does not exist: " + topicName);
}
throw new RuntimeException("Exception while checking the topics availability...");
}
}

private boolean isKafkaClusterExists(String bootStrapServers) {
Socket socket = null;
String[] serverDetails = new String[0];
String[] servers = new String[0];
int counter = 0;
try {
if (bootStrapServers.contains(",")) {
servers = bootStrapServers.split(",");
} else {
servers = new String[]{bootStrapServers};
}
if (CollectionUtils.isNotEmpty(Arrays.asList(servers))) {
for (String bootstrapServer : servers) {
if (bootstrapServer.contains(":")) {
serverDetails = bootstrapServer.split(":");
if (StringUtils.isNotEmpty(serverDetails[0])) {
InetAddress inetAddress = InetAddress.getByName(serverDetails[0]);
socket = new Socket(inetAddress, Integer.parseInt(serverDetails[1]));
}
}
}
}
} catch (IOException e) {
counter++;
LOG.error("Kafka broker : {} is not available...", getMaskedBootStrapDetails(serverDetails[0]));
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
if (counter == servers.length) {
return true;
}
return false;
}

private String getMaskedBootStrapDetails(String serverIP) {
if (serverIP == null || serverIP.length() <= 4) {
return serverIP;
}
int maskedLength = serverIP.length() - 4;
StringBuilder maskedString = new StringBuilder(maskedLength);
for (int i = 0; i < maskedLength; i++) {
maskedString.append('*');
}
return maskedString.append(serverIP.substring(maskedLength)).toString();
}

protected void sleep(final long millis) throws InterruptedException {
Thread.sleep(millis);
}
Expand Down

0 comments on commit c06e545

Please sign in to comment.