diff --git a/service/camel/src/main/java/org/eclipse/kapua/service/camel/listener/error/ErrorMessageListener.java b/service/camel/src/main/java/org/eclipse/kapua/service/camel/listener/error/ErrorMessageListener.java index 708cef84cd7..7ebd1530211 100644 --- a/service/camel/src/main/java/org/eclipse/kapua/service/camel/listener/error/ErrorMessageListener.java +++ b/service/camel/src/main/java/org/eclipse/kapua/service/camel/listener/error/ErrorMessageListener.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2020, 2022 Eurotech and/or its affiliates and others + * Copyright (c) 2020, 2025 Eurotech and/or its affiliates and others * * This program and the accompanying materials are made * available under the terms of the Eclipse Public License 2.0 @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory; import javax.inject.Inject; + import java.text.ParseException; import java.util.Base64; import java.util.Date; @@ -41,12 +42,12 @@ public class ErrorMessageListener { private static final String EMPTY_FIELD = "N/A"; private final MetricsCamel metrics; - private final ObjectDeserializer objectDeserializer; + private final ObjectSerializer objectSerializer; @Inject - public ErrorMessageListener(ObjectDeserializer objectDeserializer, MetricsCamel metricsCamel) { + public ErrorMessageListener(ObjectSerializer objectSerializer, MetricsCamel metricsCamel) { this.metrics = metricsCamel; - this.objectDeserializer = objectDeserializer; + this.objectSerializer = objectSerializer; } /** @@ -70,13 +71,16 @@ private String getMessage(Exchange exchange) { Message message = exchange.getIn(); String clientId = message.getHeader(MessageConstants.HEADER_KAPUA_CLIENT_ID, String.class); Long timestamp = message.getHeader(MessageConstants.HEADER_KAPUA_RECEIVED_TIMESTAMP, Long.class); + String originalTopic = message.getHeader(MessageConstants.PROPERTY_ORIGINAL_TOPIC, String.class); String encodedMsg = getMessageBody(message.getBody()); - String messageLogged = String.format("%s %s %s", + String messageLogged = String.format("%s %s %s %s", clientId != null ? clientId : EMPTY_FIELD, + originalTopic, getDate(timestamp), encodedMsg); - logger.info("body: {} / headers: {}", exchange.getIn().getBody(), exchange.getIn().getHeaders()); + logger.info("discarded message from: {} - client id: {}", originalTopic, clientId); if (logger.isDebugEnabled()) { + logger.debug("body: {} / headers: {}", message.getBody(), message.getHeaders()); logger.debug(messageLogged); } return messageLogged; @@ -91,27 +95,14 @@ private String getDate(Long timestamp) { } private String getMessageBody(Object body) { - if (body instanceof byte[]) { - return getBody(objectDeserializer.convertToCamelKapuaMessage((byte[]) body)); - } - else if (body instanceof String) { - return getBody((String) body); - } - else { - return getBody(body); - } - } - - private String getBody(Object body) { - if (body instanceof byte[]) { - return Base64.getEncoder().encodeToString((byte[]) body); - } else if (body instanceof String) { - return Base64.getEncoder().encodeToString(((String) body).getBytes()); - } else { + byte[] convertedBody = objectSerializer.convertToBytes(body); + if (convertedBody==null) { //something wrong happened! Anyway try to get the message to be stored - logger.error("Wrong message type! Cannot convert message of type {} to byte[]", body != null ? body.getClass() : "N/A"); + logger.warn("Wrong message type! Cannot convert message of type {} to byte[]", body != null ? body.getClass() : "N/A"); metrics.getUnknownBodyType().inc(); return EMPTY_ENCODED_MESSAGE; + } else { + return Base64.getEncoder().encodeToString(convertedBody); } } diff --git a/service/camel/src/main/java/org/eclipse/kapua/service/camel/listener/error/ObjectDeserializer.java b/service/camel/src/main/java/org/eclipse/kapua/service/camel/listener/error/ObjectDeserializer.java index 42f2c6507bd..2a433287301 100644 --- a/service/camel/src/main/java/org/eclipse/kapua/service/camel/listener/error/ObjectDeserializer.java +++ b/service/camel/src/main/java/org/eclipse/kapua/service/camel/listener/error/ObjectDeserializer.java @@ -16,7 +16,6 @@ import java.io.InputStream; import java.io.ObjectInputStream; -import org.eclipse.kapua.service.camel.message.CamelKapuaMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,26 +23,15 @@ public class ObjectDeserializer { private static final Logger logger = LoggerFactory.getLogger(ObjectDeserializer.class); - public CamelKapuaMessage convertToCamelKapuaMessage(byte[] message) { - Object convertedObj = toObject(message); - if (convertedObj instanceof CamelKapuaMessage) { - return (CamelKapuaMessage)convertedObj; - } - else { - //return null to allow DLQ message processing to end the flow (anyway the message is not processable) - logger.warn("Cannot convert byte[] message to CamelKapuaMessage. Bad class: {}", convertedObj); - return null; - } - } - protected Object toObject(byte[] bytes) { InputStream is = new ByteArrayInputStream(bytes); try (ObjectInputStream ois = new ObjectInputStream(is)) { return ois.readObject(); } catch (Exception e) { - logger.warn("Cannot convert byte[] message to Object. Error: {}", e.getMessage(), e); + logger.warn("Cannot convert byte[] message to Object. Error: {}", e.getMessage()); + logger.debug("", e); //the caller perform an instance of that returns false if the instance is null - return null; + return bytes; } }