Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken build and clean up KafkaSource class. #3469

Merged
merged 1 commit into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,18 @@

import com.amazonaws.services.schemaregistry.deserializers.GlueSchemaRegistryKafkaDeserializer;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;
import io.confluent.kafka.serializers.KafkaJsonDeserializer;
import kafka.common.BrokerEndPointNotAvailableException;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.BrokerNotAvailableException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
Expand All @@ -50,24 +45,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.Socket;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
Expand All @@ -87,15 +71,14 @@ public class KafkaSource implements Source<Record<Event>> {
private static final long RETRY_SLEEP_INTERVAL = 30000;
private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
private final KafkaSourceConfig sourceConfig;
private AtomicBoolean shutdownInProgress;
private final AtomicBoolean shutdownInProgress;
private ExecutorService executorService;
private final PluginMetrics pluginMetrics;
private KafkaCustomConsumer consumer;
private KafkaConsumer kafkaConsumer;
private String pipelineName;
private final String pipelineName;
private String consumerGroupID;
private String schemaType = MessageFormat.PLAINTEXT.toString();
private static final String SCHEMA_TYPE = "schemaType";
private final AcknowledgementSetManager acknowledgementSetManager;
private static CachedSchemaRegistryClient schemaRegistryClient;
private GlueSchemaRegistryKafkaDeserializer glueDeserializer;
Expand Down Expand Up @@ -176,7 +159,7 @@ public void start(Buffer<Record<Event>> buffer) {
case JSON:
return new KafkaConsumer<String, JsonNode>(consumerProperties);
case AVRO:
return new KafkaConsumer<String, GenericRecord>(consumerProperties);
return new KafkaConsumer<String, GenericRecord>(consumerProperties);
case PLAINTEXT:
default:
glueDeserializer = KafkaSecurityConfigurer.getGlueSerializer(sourceConfig);
Expand Down Expand Up @@ -233,7 +216,7 @@ KafkaConsumer getConsumer() {
}

private Properties getConsumerProperties(final TopicConfig topicConfig, final Properties authProperties) {
Properties properties = (Properties)authProperties.clone();
Properties properties = (Properties) authProperties.clone();
if (StringUtils.isNotEmpty(sourceConfig.getClientDnsLookup())) {
ClientDNSLookupType dnsLookupType = ClientDNSLookupType.getDnsLookupType(sourceConfig.getClientDnsLookup());
switch (dnsLookupType) {
Expand All @@ -254,82 +237,10 @@ private Properties getConsumerProperties(final TopicConfig topicConfig, final Pr
return properties;
}

private static boolean validateURL(String url) {
try {
URI uri = new URI(url);
if (uri.getScheme() == null || uri.getHost() == null) {
return false;
}
return true;
} catch (URISyntaxException ex) {
LOG.error("Invalid Schema Registry URI: ", ex);
return false;
}
}

private String getSchemaRegistryUrl() {
return sourceConfig.getSchemaConfig().getRegistryURL();
}

private static String getSchemaType(final String registryUrl, final String topicName, final int schemaVersion) {
StringBuilder response = new StringBuilder();
String schemaType = MessageFormat.PLAINTEXT.toString();
try {
String urlPath = registryUrl + "subjects/" + topicName + "-value/versions/" + schemaVersion;
URL url = new URL(urlPath);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestMethod("GET");
int responseCode = connection.getResponseCode();
if (responseCode == HttpURLConnection.HTTP_OK) {
BufferedReader reader = new BufferedReader(new InputStreamReader(connection.getInputStream()));
String inputLine;
while ((inputLine = reader.readLine()) != null) {
response.append(inputLine);
}
reader.close();
ObjectMapper mapper = new ObjectMapper();
Object json = mapper.readValue(response.toString(), Object.class);
String indented = mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json);
JsonNode rootNode = mapper.readTree(indented);
// If the entry exists but schema type doesn't exist then
// the schemaType defaults to AVRO
if (rootNode.has(SCHEMA_TYPE)) {
JsonNode node = rootNode.findValue(SCHEMA_TYPE);
schemaType = node.textValue();
} else {
schemaType = MessageFormat.AVRO.toString();
}
} else {
InputStream errorStream = connection.getErrorStream();
String errorMessage = readErrorMessage(errorStream);
// Plaintext is not a valid schematype in schema registry
// So, if it doesn't exist in schema regitry, default
// the schemaType to PLAINTEXT
LOG.error("GET request failed while fetching the schema registry. Defaulting to schema type PLAINTEXT");
return MessageFormat.PLAINTEXT.toString();
}
} catch (IOException e) {
LOG.error("An error while fetching the schema registry details : ", e);
throw new RuntimeException();
}
return schemaType;
}

private static String readErrorMessage(InputStream errorStream) throws IOException {
if (errorStream == null) {
return null;
}
BufferedReader reader = new BufferedReader(new InputStreamReader(errorStream));
StringBuilder errorMessage = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
errorMessage.append(line);
}
reader.close();
errorStream.close();
return errorMessage.toString();
}

private void setSchemaRegistryProperties(Properties properties, TopicConfig topicConfig) {
SchemaConfig schemaConfig = sourceConfig.getSchemaConfig();
if (Objects.isNull(schemaConfig)) {
Expand Down Expand Up @@ -394,24 +305,24 @@ private void setPropertiesForSchemaType(Properties properties, TopicConfig topic

private void setConsumerTopicProperties(Properties properties, TopicConfig topicConfig) {
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupID);
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int)topicConfig.getMaxPartitionFetchBytes());
properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long)topicConfig.getRetryBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long)topicConfig.getReconnectBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, (int) topicConfig.getMaxPartitionFetchBytes());
properties.put(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, ((Long) topicConfig.getRetryBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, ((Long) topicConfig.getReconnectBackoff().toMillis()).intValue());
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
topicConfig.getAutoCommit());
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,
((Long)topicConfig.getCommitInterval().toMillis()).intValue());
((Long) topicConfig.getCommitInterval().toMillis()).intValue());
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
topicConfig.getAutoOffsetReset());
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
topicConfig.getConsumerMaxPollRecords());
properties.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
((Long)topicConfig.getMaxPollInterval().toMillis()).intValue());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long)topicConfig.getSessionTimeOut().toMillis()).intValue());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long)topicConfig.getHeartBeatInterval().toMillis()).intValue());
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int)topicConfig.getFetchMaxBytes());
((Long) topicConfig.getMaxPollInterval().toMillis()).intValue());
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, ((Long) topicConfig.getSessionTimeOut().toMillis()).intValue());
properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, ((Long) topicConfig.getHeartBeatInterval().toMillis()).intValue());
properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, (int) topicConfig.getFetchMaxBytes());
properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, topicConfig.getFetchMaxWait());
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int)topicConfig.getFetchMinBytes());
properties.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, (int) topicConfig.getFetchMinBytes());
}

private void setPropertiesForSchemaRegistryConnectivity(Properties properties) {
Expand All @@ -436,79 +347,6 @@ private void setPropertiesForSchemaRegistryConnectivity(Properties properties) {
}
}

private void isTopicExists(String topicName, String bootStrapServer, Properties properties) {
List<String> bootStrapServers = new ArrayList<>();
String servers[];
if (bootStrapServer.contains(",")) {
servers = bootStrapServer.split(",");
bootStrapServers.addAll(Arrays.asList(servers));
} else {
bootStrapServers.add(bootStrapServer);
}
properties.put("connections.max.idle.ms", 5000);
properties.put("request.timeout.ms", 10000);
try (AdminClient client = KafkaAdminClient.create(properties)) {
boolean topicExists = client.listTopics().names().get().stream().anyMatch(name -> name.equalsIgnoreCase(topicName));
} catch (InterruptedException | ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
LOG.error("Topic does not exist: " + topicName);
}
throw new RuntimeException("Exception while checking the topics availability...");
}
}

private boolean isKafkaClusterExists(String bootStrapServers) {
Socket socket = null;
String[] serverDetails = new String[0];
String[] servers = new String[0];
int counter = 0;
try {
if (bootStrapServers.contains(",")) {
servers = bootStrapServers.split(",");
} else {
servers = new String[]{bootStrapServers};
}
if (CollectionUtils.isNotEmpty(Arrays.asList(servers))) {
for (String bootstrapServer : servers) {
if (bootstrapServer.contains(":")) {
serverDetails = bootstrapServer.split(":");
if (StringUtils.isNotEmpty(serverDetails[0])) {
InetAddress inetAddress = InetAddress.getByName(serverDetails[0]);
socket = new Socket(inetAddress, Integer.parseInt(serverDetails[1]));
}
}
}
}
} catch (IOException e) {
counter++;
LOG.error("Kafka broker : {} is not available...", getMaskedBootStrapDetails(serverDetails[0]));
} finally {
if (socket != null) {
try {
socket.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
if (counter == servers.length) {
return true;
}
return false;
}

private String getMaskedBootStrapDetails(String serverIP) {
if (serverIP == null || serverIP.length() <= 4) {
return serverIP;
}
int maskedLength = serverIP.length() - 4;
StringBuilder maskedString = new StringBuilder(maskedLength);
for (int i = 0; i < maskedLength; i++) {
maskedString.append('*');
}
return maskedString.append(serverIP.substring(maskedLength)).toString();
}

protected void sleep(final long millis) throws InterruptedException {
Thread.sleep(millis);
}
Expand Down
Loading