Skip to content

Commit

Permalink
:fix: object serializer
Browse files Browse the repository at this point in the history
Signed-off-by: riccardomodanese <riccardo.modanese@eurotech.com>
  • Loading branch information
riccardomodanese committed Feb 4, 2025
1 parent 18f2abc commit 388c8f8
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2020, 2022 Eurotech and/or its affiliates and others
* Copyright (c) 2020, 2025 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -22,6 +22,7 @@
import org.slf4j.LoggerFactory;

import javax.inject.Inject;

import java.text.ParseException;
import java.util.Base64;
import java.util.Date;
Expand All @@ -41,12 +42,12 @@ public class ErrorMessageListener {
private static final String EMPTY_FIELD = "N/A";

private final MetricsCamel metrics;
private final ObjectDeserializer objectDeserializer;
private final ObjectSerializer objectSerializer;

@Inject
public ErrorMessageListener(ObjectDeserializer objectDeserializer, MetricsCamel metricsCamel) {
public ErrorMessageListener(ObjectSerializer objectSerializer, MetricsCamel metricsCamel) {
this.metrics = metricsCamel;
this.objectDeserializer = objectDeserializer;
this.objectSerializer = objectSerializer;
}

/**
Expand All @@ -70,13 +71,16 @@ private String getMessage(Exchange exchange) {
Message message = exchange.getIn();
String clientId = message.getHeader(MessageConstants.HEADER_KAPUA_CLIENT_ID, String.class);
Long timestamp = message.getHeader(MessageConstants.HEADER_KAPUA_RECEIVED_TIMESTAMP, Long.class);
String originalTopic = message.getHeader(MessageConstants.PROPERTY_ORIGINAL_TOPIC, String.class);
String encodedMsg = getMessageBody(message.getBody());
String messageLogged = String.format("%s %s %s",
String messageLogged = String.format("%s %s %s %s",
clientId != null ? clientId : EMPTY_FIELD,
originalTopic,
getDate(timestamp),
encodedMsg);
logger.info("body: {} / headers: {}", exchange.getIn().getBody(), exchange.getIn().getHeaders());
logger.info("discarded message from: {} - client id: {}", originalTopic, clientId);
if (logger.isDebugEnabled()) {
logger.debug("body: {} / headers: {}", message.getBody(), message.getHeaders());
logger.debug(messageLogged);
}
return messageLogged;
Expand All @@ -91,27 +95,14 @@ private String getDate(Long timestamp) {
}

private String getMessageBody(Object body) {
if (body instanceof byte[]) {
return getBody(objectDeserializer.convertToCamelKapuaMessage((byte[]) body));
}
else if (body instanceof String) {
return getBody((String) body);
}
else {
return getBody(body);
}
}

private String getBody(Object body) {
if (body instanceof byte[]) {
return Base64.getEncoder().encodeToString((byte[]) body);
} else if (body instanceof String) {
return Base64.getEncoder().encodeToString(((String) body).getBytes());
} else {
byte[] convertedBody = objectSerializer.convertToBytes(body);
if (convertedBody==null) {
//something wrong happened! Anyway try to get the message to be stored
logger.error("Wrong message type! Cannot convert message of type {} to byte[]", body != null ? body.getClass() : "N/A");
logger.warn("Wrong message type! Cannot convert message of type {} to byte[]", body != null ? body.getClass() : "N/A");
metrics.getUnknownBodyType().inc();
return EMPTY_ENCODED_MESSAGE;
} else {
return Base64.getEncoder().encodeToString(convertedBody);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,22 @@
import java.io.InputStream;
import java.io.ObjectInputStream;

import org.eclipse.kapua.service.camel.message.CamelKapuaMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ObjectDeserializer {

private static final Logger logger = LoggerFactory.getLogger(ObjectDeserializer.class);

public CamelKapuaMessage<?> convertToCamelKapuaMessage(byte[] message) {
Object convertedObj = toObject(message);
if (convertedObj instanceof CamelKapuaMessage<?>) {
return (CamelKapuaMessage<?>)convertedObj;
}
else {
//return null to allow DLQ message processing to end the flow (anyway the message is not processable)
logger.warn("Cannot convert byte[] message to CamelKapuaMessage. Bad class: {}", convertedObj);
return null;
}
}

protected Object toObject(byte[] bytes) {
InputStream is = new ByteArrayInputStream(bytes);
try (ObjectInputStream ois = new ObjectInputStream(is)) {
return ois.readObject();
} catch (Exception e) {
logger.warn("Cannot convert byte[] message to Object. Error: {}", e.getMessage(), e);
logger.warn("Cannot convert byte[] message to Object. Error: {}", e.getMessage());
logger.debug("", e);
//the caller perform an instance of that returns false if the instance is null
return null;
return bytes;
}
}

Expand Down

0 comments on commit 388c8f8

Please sign in to comment.