Skip to content

Commit

Permalink
fix: issue in insert statement, decode receipt
Browse files Browse the repository at this point in the history
  • Loading branch information
schoenenberg committed Jul 4, 2024
1 parent 7650ed4 commit 2bbbc9a
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package com.truzzt.extension.logginghouse.client.events.messages;

public record LogMessageReceipt (String data) {}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.truzzt.extension.logginghouse.client.events.messages;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.truzzt.extension.logginghouse.client.multipart.ids.jsonld.JsonLd;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.CalendarUtil;
import com.truzzt.extension.logginghouse.client.multipart.ids.multipart.IdsConstants;
Expand All @@ -32,9 +33,10 @@
import org.eclipse.edc.spi.monitor.Monitor;
import org.json.JSONObject;

import java.io.IOException;
import java.util.List;

public class LogMessageSender implements MultipartSenderDelegate<LogMessage, String> {
public class LogMessageSender implements MultipartSenderDelegate<LogMessage, LogMessageReceipt> {

Monitor monitor;
String connectorId;
Expand Down Expand Up @@ -70,8 +72,8 @@ public String buildMessagePayload(LogMessage logMessage) {
}

@Override
public MultipartResponse<String> getResponseContent(IdsMultipartParts parts) throws Exception {
return ResponseUtil.parseMultipartStringResponse(parts, JsonLd.getObjectMapper());
public MultipartResponse<LogMessageReceipt> getResponseContent(IdsMultipartParts parts) throws Exception {
return parseLogMessageReceiptResponse(parts, JsonLd.getObjectMapper());
}

@Override
Expand Down Expand Up @@ -132,4 +134,15 @@ private String buildTransferProcessPayload(TransferProcess transferProcess) {

return jo.toString();
}

public static MultipartResponse<LogMessageReceipt> parseLogMessageReceiptResponse(IdsMultipartParts parts, ObjectMapper objectMapper) throws IOException {
var header = objectMapper.readValue(parts.getHeader(), Message.class);

LogMessageReceipt payload = null;
if (parts.getPayload() != null) {
payload = objectMapper.readValue(parts.getPayload(), LogMessageReceipt.class);
}

return new MultipartResponse<>(header, payload);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public void save(LoggingHouseMessage event) {
event.getProcessId(),
event.getConsumerId(),
event.getProviderId(),
event.getStatus().getCode(),
mapFromZonedDateTime(event.getCreatedAt())
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.truzzt.extension.logginghouse.client.events.messages.CreateProcessMessage;
import com.truzzt.extension.logginghouse.client.events.messages.LogMessage;
import com.truzzt.extension.logginghouse.client.events.messages.LogMessageReceipt;
import com.truzzt.extension.logginghouse.client.spi.store.LoggingHouseMessageStore;
import com.truzzt.extension.logginghouse.client.spi.types.LoggingHouseMessage;
import org.eclipse.edc.spi.EdcException;
Expand Down Expand Up @@ -78,7 +79,7 @@ public void process(LoggingHouseMessage message) {
createProcess(message, extendedProcessUrl).join();
} catch (Exception e) {
// TODO: Not fail when process already exists
monitor.severe("CreateProcess returned error: " + e.getMessage());
monitor.warning("CreateProcess returned error (ignore it when the process already exists): " + e.getMessage());
//throw new EdcException("Could not create process in LoggingHouse", e);
}
}
Expand All @@ -89,7 +90,7 @@ public void process(LoggingHouseMessage message) {
var response = logMessage(message, extendedLogUrl).join();
response.onSuccess(msg -> {
monitor.info("Received receipt successfully from LoggingHouse for message with id " + message.getEventId());
store.updateSent(message.getId(), msg);
store.updateSent(message.getId(), msg.data());
});
} catch (Exception e) {
throw new EdcException("Could not log message to LoggingHouse", e);
Expand All @@ -111,12 +112,12 @@ public CompletableFuture<StatusResult<Object>> createProcess(LoggingHouseMessage
return dispatcherRegistry.dispatch(Object.class, logMessage);
}

public CompletableFuture<StatusResult<String>> logMessage(LoggingHouseMessage message, URL clearingHouseLogUrl) {
public CompletableFuture<StatusResult<LogMessageReceipt>> logMessage(LoggingHouseMessage message, URL clearingHouseLogUrl) {

monitor.info("Logging message to LoggingHouse with type " + message.getEventType() + " and id " + message.getEventId());
var logMessage = new LogMessage(clearingHouseLogUrl, connectorBaseUrl, message.getEventToLog());

return dispatcherRegistry.dispatch(String.class, logMessage);
return dispatcherRegistry.dispatch(LogMessageReceipt.class, logMessage);
}

}

0 comments on commit 2bbbc9a

Please sign in to comment.