Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#6858 downgrade kafka in-loop debug info to DEBUG from INFO - too noisy #6863

Merged
merged 1 commit into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public void run()
//The connector queue is too big. Wait until the size goes down until
//polling again. If we let the events just accumulate, we will
//eventually run out of memory if the consumer cannot keep up.
log.info("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", nUnprocessedEvents, maxQueueSize);
log.debug("Skipping Kafka polling since unprocessed message queue size {} is greater than {}", nUnprocessedEvents, maxQueueSize);
awaitNextPollingTime();
continue;

Expand All @@ -183,7 +183,7 @@ public void run()
String json = consumerRecord.value();
log.debug("Received message: {}" ,json);
countReceivedMessages++;
log.info("Metrics: receivedMessages: {}", countReceivedMessages);
log.debug("Metrics: receivedMessages: {}", countReceivedMessages);
final KafkaIncomingEvent event = new KafkaIncomingEvent(json, consumerRecord.offset());
final String recordKey=consumerRecord.key();
final String recordValue=consumerRecord.value();
Expand All @@ -194,12 +194,12 @@ public void run()
addUnprocessedEvent(consumerRecord.partition(), consumerRecord.topic(), event);
connector.distributeToListeners(event);
countMessagesToProcess++;
log.info("Metrics: messagesToProcess: {}", countMessagesToProcess);
log.debug("Metrics: messagesToProcess: {}", countMessagesToProcess);
}
catch (Exception error)
{
countMessagesFailedToProcess++;
log.info("Metrics: messagesFailedToProcess: {}", countMessagesFailedToProcess);
log.debug("Metrics: messagesFailedToProcess: {}", countMessagesFailedToProcess);
log.warn("Error distributing inbound event: {}", error.getMessage());

if (auditLog != null)
Expand All @@ -217,7 +217,7 @@ public void run()
{
log.debug("Ignoring message with key: {} and value: {}",recordKey, recordValue);
countIgnoredMessages++;
log.info("Metrics: ignoredMessages: {}", countIgnoredMessages);
log.debug("Metrics: ignoredMessages: {}", countIgnoredMessages);
}

if ( isAutoCommitEnabled) {
Expand All @@ -231,14 +231,14 @@ public void run()
final TopicPartition partition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
currentOffsets.put(partition, new OffsetAndMetadata(consumerRecord.offset() + 1));
countCommits++;
log.info("Metrics: messageCommits: {}", countCommits);
log.debug("Metrics: messageCommits: {}", countCommits);

}
}
}
catch (WakeupException e)
{
log.info("Received wakeup call, proceeding with graceful shutdown");
log.debug("Received wakeup call, proceeding with graceful shutdown");
}
catch (Exception error)
{
Expand Down Expand Up @@ -313,7 +313,7 @@ public void run()
}
consumer = null;
}
log.debug("Exiting main loop for topic {} & cleaning up", this.topicToSubscribe);
log.info("Exiting main loop for topic {} & cleaning up", this.topicToSubscribe);

}

Expand Down Expand Up @@ -358,7 +358,7 @@ private boolean checkForFullyProcessedMessages() {
if (isAutoCommitEnabled) {
return false;
}
log.info("Checking for fully processed messages whose offsets need to be committed");
log.debug("Checking for fully processed messages whose offsets need to be committed");

//Check all the queues to see they have events initial events
//that are fully processed
Expand All @@ -375,7 +375,7 @@ private boolean checkForFullyProcessedMessages() {

if (! commitData.isEmpty()) {
currentOffsets.putAll(commitData);
log.info("Committing: {}", commitData);
log.debug("Committing: {}", commitData);
try {
consumer.commitSync(commitData);
return true;
Expand Down Expand Up @@ -417,15 +417,15 @@ private KafkaIncomingEvent removeFullyProcessedEventsFromBeginningOfQueue(Queue<
//The message at the beginning of the queue has been fully processed. Remove
//it from the queue and repeat the check.
lastRemoved = queue.remove();
log.info("Message with offset {} has been fully processed.",lastRemoved.getOffset() );
log.debug("Message with offset {} has been fully processed.",lastRemoved.getOffset() );
countCommits++;
log.info("Metrics: commits: {}", countCommits);
log.debug("Metrics: commits: {}", countCommits);
}
KafkaIncomingEvent firstEvent = queue.peek();
if (firstEvent != null) {
//Queue is not empty, so we're waiting for the processing of first message in
//the queue to finish
log.info("Waiting for completing of processing of message with offset {}",firstEvent.getOffset());
log.debug("Waiting for completing of processing of message with offset {}",firstEvent.getOffset());
}
return lastRemoved;
}
Expand Down Expand Up @@ -551,12 +551,12 @@ public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// Check if we need to rewind to handle initial startup case -- but only on first assignment
try {
if (initialPartitionAssignment) {
log.info("Received initial PartitionsAssigned event");
log.debug("Received initial PartitionsAssigned event");

long partitionCount = partitions.size();

if (partitionCount != 1) {
log.info("Received PartitionsAssigned event with {} partitions. This is not supported.",partitionCount);
log.warn("Received PartitionsAssigned event with {} partitions. This is not supported.",partitionCount);
} else {
// there is only one partition, so we can just grab the first one - and we'll try this once only
initialPartitionAssignment = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private void publishEvent(String event) throws ConnectorCheckedException {
long eventRetryCount = 0;

messagePublishRequestCount++;
log.info("Metrics: messagePublishRequestCount {}", messagePublishRequestCount);
log.debug("Metrics: messagePublishRequestCount {}", messagePublishRequestCount);

if (producer == null) {
try {
Expand All @@ -107,11 +107,11 @@ private void publishEvent(String event) throws ConnectorCheckedException {
log.debug("Sending message try {} [0 based] : {}", eventRetryCount,event);
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName, localServerId, event);
kafkaSendAttemptCount++;
log.info("Metrics: kafkaSendAttemptCount {}", kafkaSendAttemptCount);
log.debug("Metrics: kafkaSendAttemptCount {}", kafkaSendAttemptCount);
producer.send(producerRecord).get();
eventSent = true;
messageSendCount++;
log.info("Metrics: messageSendCount {}", messageSendCount);
log.debug("Metrics: messageSendCount {}", messageSendCount);
} catch (ExecutionException error) {
kafkaSendFailCount++;
log.debug("Metrics: kafkaSendFailCount {}", kafkaSendFailCount);
Expand All @@ -129,7 +129,7 @@ private void publishEvent(String event) throws ConnectorCheckedException {
producer = null;

messageFailedSendCount++;
log.info(messageFailedCountString, messageFailedSendCount);
log.warn(messageFailedCountString, messageFailedSendCount);

throw new ConnectorCheckedException(
KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition(
Expand All @@ -141,7 +141,7 @@ private void publishEvent(String event) throws ConnectorCheckedException {
producer.close();
producer = null;
messageFailedSendCount++;
log.info(messageFailedCountString, messageFailedSendCount);
log.warn(messageFailedCountString, messageFailedSendCount);
log.error("Retryable Exception closed producer after {} tries", eventRetryCount);
break;
} else {
Expand Down Expand Up @@ -171,7 +171,7 @@ private void publishEvent(String event) throws ConnectorCheckedException {
}

messageFailedSendCount++;
log.info(messageFailedCountString, messageFailedSendCount);
log.warn(messageFailedCountString, messageFailedSendCount);

throw new ConnectorCheckedException(
KafkaOpenMetadataTopicConnectorErrorCode.ERROR_SENDING_EVENT.getMessageDefinition(
Expand Down Expand Up @@ -225,7 +225,7 @@ public void run() {
}
}
} catch (InterruptedException error) {
log.info("Woken up from sleep ");
log.debug("Woken up from sleep ");
Thread.currentThread().interrupt();
} catch (Exception error) {
log.warn("Bad exception from sending events: {}",error.getMessage());
Expand All @@ -240,7 +240,7 @@ public void run() {
}
}
}
log.debug("Exiting main loop for topic {} & cleaning up", topicName);
log.info("Exiting main loop for topic {} & cleaning up", topicName);

/* producer may have already closed by exception handler in publishEvent */
if (producer != null) {
Expand All @@ -265,8 +265,8 @@ public void run() {
*/
private void putEvent(String newEvent) {
inmemoryPutMessageCount++;
log.info("Metrics: inmemoryPutMessageCount {}", inmemoryPutMessageCount);
log.info("Metrics: sendBufferSize {}", sendBuffer.size());
log.debug("Metrics: inmemoryPutMessageCount {}", inmemoryPutMessageCount);
log.debug("Metrics: sendBufferSize {}", sendBuffer.size());
sendBuffer.add(newEvent);
}

Expand Down