diff --git a/packages/appserver-service/carbonio-mailbox-policies.json b/packages/appserver-service/carbonio-mailbox-policies.json
index c8ab75323c8..8bc664b2ab8 100644
--- a/packages/appserver-service/carbonio-mailbox-policies.json
+++ b/packages/appserver-service/carbonio-mailbox-policies.json
@@ -3,6 +3,9 @@
{
"carbonio-mailbox/": {
"policy": "read"
+ },
+ "carbonio-message-broker/": {
+ "policy": "read"
}
}
],
diff --git a/packages/appserver-service/carbonio-mailbox.hcl b/packages/appserver-service/carbonio-mailbox.hcl
index 704a2e35b56..1adec2d2ce6 100644
--- a/packages/appserver-service/carbonio-mailbox.hcl
+++ b/packages/appserver-service/carbonio-mailbox.hcl
@@ -33,6 +33,11 @@ services {
local_bind_port = 20004
local_bind_address = "127.78.0.7"
},
+ {
+ destination_name = "carbonio-message-broker"
+ local_bind_port = 20005
+ local_bind_address = "127.78.0.7"
+ },
]
}
}
diff --git a/pom.xml b/pom.xml
index dbf1cb89b44..7a47661506a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -774,6 +774,11 @@
4.2.0
runtime
+
+ com.zextras.carbonio.message-broker
+ carbonio-message-broker-sdk
+ ${carbonio-message-broker-sdk.version}
+
@@ -813,6 +818,7 @@
2.3.2
1.11
1.2.1
+ 0.0.3
-SNAPSHOT
24.12.0
diff --git a/store/pom.xml b/store/pom.xml
index d0369b453ae..5b4a6daf169 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -91,6 +91,11 @@
org.apache.curator
+
+ com.zextras.carbonio.message-broker
+ carbonio-message-broker-sdk
+
+
guava
@@ -125,6 +130,12 @@
1.17.6
test
+
+ org.testcontainers
+ rabbitmq
+ 1.19.8
+ test
+
com.github.docker-java
diff --git a/store/src/main/java/com/zextras/mailbox/client/ServiceDiscoverHttpClient.java b/store/src/main/java/com/zextras/mailbox/client/ServiceDiscoverHttpClient.java
new file mode 100644
index 00000000000..b4d9531662c
--- /dev/null
+++ b/store/src/main/java/com/zextras/mailbox/client/ServiceDiscoverHttpClient.java
@@ -0,0 +1,78 @@
+// SPDX-FileCopyrightText: 2024 Zextras
+//
+// SPDX-License-Identifier: AGPL-3.0-only
+
+package com.zextras.mailbox.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.zextras.carbonio.files.exceptions.InternalServerError;
+import com.zextras.carbonio.files.exceptions.UnAuthorized;
+import io.vavr.control.Try;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClients;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class ServiceDiscoverHttpClient {
+
+ private final Logger logger = LoggerFactory.getLogger(ServiceDiscoverHttpClient.class);
+
+ private final String serviceDiscoverURL;
+ private String token;
+
+ ServiceDiscoverHttpClient(String serviceDiscoverURL) {
+ this.serviceDiscoverURL = serviceDiscoverURL;
+ this.token = System.getenv("CONSUL_HTTP_TOKEN"); // default: get from env
+ }
+
+ public static ServiceDiscoverHttpClient atURL(
+ String url,
+ String serviceName
+ ) {
+ return new ServiceDiscoverHttpClient(url + "/v1/kv/" + serviceName + "/");
+ }
+
+ public static ServiceDiscoverHttpClient defaultURL(String serviceName) {
+ return new ServiceDiscoverHttpClient("http://localhost:8500/v1/kv/" + serviceName + "/");
+ }
+
+ public ServiceDiscoverHttpClient withToken(String token) {
+ this.token = token;
+ return this;
+ }
+
+ public Try getConfig(String configKey) {
+ try (CloseableHttpClient httpClient = HttpClients.createMinimal()) {
+ HttpGet request = new HttpGet(serviceDiscoverURL + configKey);
+ request.setHeader("X-Consul-Token", token);
+ try(CloseableHttpResponse response = httpClient.execute(request)) {
+ if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
+ String bodyResponse = IOUtils.toString(
+ response.getEntity().getContent(),
+ StandardCharsets.UTF_8
+ );
+
+ String value = new ObjectMapper().readTree(bodyResponse).get(0).get("Value").asText();
+ String valueDecoded = new String(Base64.decodeBase64(value), StandardCharsets.UTF_8).trim();
+
+ return Try.success(valueDecoded);
+ }
+
+ logger.error("Service discover didn't respond with 200 when requesting a config (received {})",
+ response.getStatusLine().getStatusCode());
+ return Try.failure(new UnAuthorized());
+ }
+ } catch (IOException exception) {
+ logger.error("Exception trying to get config from service discover: ", exception);
+ return Try.failure(new InternalServerError(exception));
+ }
+ }
+}
diff --git a/store/src/main/java/com/zextras/mailbox/servlet/HealthServletModule.java b/store/src/main/java/com/zextras/mailbox/servlet/HealthServletModule.java
index 1c8264c728a..b509829bec7 100644
--- a/store/src/main/java/com/zextras/mailbox/servlet/HealthServletModule.java
+++ b/store/src/main/java/com/zextras/mailbox/servlet/HealthServletModule.java
@@ -8,7 +8,10 @@
import com.google.inject.servlet.ServletModule;
import com.zextras.mailbox.health.DatabaseServiceDependency;
import com.zextras.mailbox.health.HealthUseCase;
+import com.zextras.mailbox.health.ServiceDependency;
import com.zimbra.cs.db.DbPool;
+
+import java.util.ArrayList;
import java.util.List;
import javax.inject.Singleton;
@@ -28,7 +31,9 @@ DbPool provideDatabasePool() {
@Provides
@Singleton
HealthUseCase provideHealthService(DbPool dbPool) {
- return new HealthUseCase(
- List.of(new DatabaseServiceDependency(dbPool, System::currentTimeMillis)));
+ List serviceDependencies = new ArrayList<>();
+ serviceDependencies.add(new DatabaseServiceDependency(dbPool, System::currentTimeMillis));
+
+ return new HealthUseCase(serviceDependencies);
}
}
diff --git a/store/src/main/java/com/zimbra/cs/account/callback/AccountStatus.java b/store/src/main/java/com/zimbra/cs/account/callback/AccountStatus.java
index 263b9c809f7..7abb77e1491 100644
--- a/store/src/main/java/com/zimbra/cs/account/callback/AccountStatus.java
+++ b/store/src/main/java/com/zimbra/cs/account/callback/AccountStatus.java
@@ -5,9 +5,17 @@
package com.zimbra.cs.account.callback;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Map;
import java.util.Set;
+import com.zextras.carbonio.message_broker.MessageBrokerClient;
+import com.zextras.carbonio.message_broker.config.enums.Service;
+import com.zextras.carbonio.message_broker.events.services.mailbox.UserStatusChanged;
+import com.zextras.mailbox.client.ServiceDiscoverHttpClient;
import com.zimbra.common.account.Key;
import com.zimbra.common.service.ServiceException;
import com.zimbra.common.util.ZimbraLog;
@@ -55,7 +63,6 @@ public void preModify(CallbackContext context, String attrName, Object value,
@Override
public void postModify(CallbackContext context, String attrName, Entry entry) {
-
if (context.isDoneAndSetIfNot(AccountStatus.class)) {
return;
}
@@ -63,15 +70,52 @@ public void postModify(CallbackContext context, String attrName, Entry entry) {
if (!context.isCreate()) {
if (entry instanceof Account) {
try {
+ publishStatusChangedEvent((Account)entry);
handleAccountStatusClosed((Account)entry);
- } catch (ServiceException se) {
- // all exceptions are already swallowed by LdapProvisioning, just to be safe here.
- ZimbraLog.account.warn("unable to remove account address and aliases from all DLs for closed account", se);
+ } catch (Exception e) {
+ ZimbraLog.account.warn("Exception thrown on account status changed callback", e);
}
}
}
}
+ private void publishStatusChangedEvent(Account account) {
+ Provisioning prov = Provisioning.getInstance();
+ String status = account.getAccountStatus(prov);
+ String userId = account.getId();
+
+ Path filePath = Paths.get("/etc/carbonio/mailbox/service-discover/token");
+ String token;
+ try {
+ token = Files.readString(filePath);
+ } catch (IOException e) {
+ throw new RuntimeException("Can't read consul token from file", e);
+ }
+
+ ServiceDiscoverHttpClient serviceDiscoverHttpClient =
+ ServiceDiscoverHttpClient.defaultURL("carbonio-message-broker")
+ .withToken(token);
+
+ try {
+ MessageBrokerClient messageBrokerClient = MessageBrokerClient.fromConfig(
+ "127.78.0.7",
+ 20005,
+ serviceDiscoverHttpClient.getConfig("default/username").get(),
+ serviceDiscoverHttpClient.getConfig("default/password").get()
+ )
+ .withCurrentService(Service.MAILBOX);
+
+ boolean result = messageBrokerClient.publish(new UserStatusChanged(userId, status.toUpperCase()));
+ if (result) {
+ ZimbraLog.account.info("Published status changed event for user: " + userId);
+ } else {
+ ZimbraLog.account.error("Failed to publish status changed event for user: " + userId);
+ }
+ } catch (Exception e){
+ ZimbraLog.account.error("Exception while publishing status changed event for user: " + userId, e);
+ }
+ }
+
private void handleAccountStatusClosed(Account account) throws ServiceException {
Provisioning prov = Provisioning.getInstance();
String status = account.getAccountStatus(prov);
diff --git a/store/src/test/java/com/zextras/mailbox/callbacks/AccountStatusChangedCallbackTest.java b/store/src/test/java/com/zextras/mailbox/callbacks/AccountStatusChangedCallbackTest.java
new file mode 100644
index 00000000000..4b38564f3c7
--- /dev/null
+++ b/store/src/test/java/com/zextras/mailbox/callbacks/AccountStatusChangedCallbackTest.java
@@ -0,0 +1,86 @@
+// SPDX-FileCopyrightText: 2024 Zextras
+//
+// SPDX-License-Identifier: GPL-2.0-only
+
+package com.zextras.mailbox.callbacks;
+
+import com.zextras.carbonio.message_broker.MessageBrokerClient;
+import com.zextras.carbonio.message_broker.config.enums.Service;
+import com.zextras.carbonio.message_broker.events.services.mailbox.UserStatusChanged;
+import com.zextras.mailbox.client.ServiceDiscoverHttpClient;
+import com.zextras.mailbox.util.MailboxTestUtil;
+import com.zimbra.cs.account.Account;
+import com.zimbra.cs.account.Provisioning;
+import com.zimbra.cs.account.callback.AccountStatus;
+import com.zimbra.cs.account.callback.CallbackContext;
+import io.vavr.control.Try;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.any;
+
+class AccountStatusChangedCallbackTest {
+
+ AccountStatus accountStatus;
+
+ @BeforeEach
+ void setUp() throws Exception {
+ MailboxTestUtil.setUp();
+ accountStatus = new AccountStatus();
+ }
+
+ /**
+ * A lot of calls are mocked since they are external calls to service discover or message broker, this
+ * just tests that if calls are successful no other exceptions are thrown.
+ */
+ @Test
+ void shouldNotFailWhenExecutingUserStatusChangedCallback(){
+ CallbackContext context = Mockito.mock(CallbackContext.class);
+ String attrName = "fake";
+ Account entry = Mockito.mock(Account.class);
+
+ ServiceDiscoverHttpClient serviceDiscoverHttpClient = Mockito.mock(ServiceDiscoverHttpClient.class);
+ MessageBrokerClient messageBrokerClient = Mockito.mock(MessageBrokerClient.class);
+
+ Mockito.when(context.isDoneAndSetIfNot(AccountStatus.class)).thenReturn(false);
+ Mockito.when(context.isCreate()).thenReturn(false);
+ Mockito.when(entry.getAccountStatus(any(Provisioning.class))).thenReturn("active");
+ Mockito.when(entry.getId()).thenReturn("fake-account-id");
+
+ try(MockedStatic mockedFiles = Mockito.mockStatic(Files.class);
+ MockedStatic mockedServiceDiscoverStatic = Mockito.mockStatic(ServiceDiscoverHttpClient.class);
+ MockedStatic mockedMessageBrokerClientStatic = Mockito.mockStatic(MessageBrokerClient.class)) {
+
+ mockedFiles.when(() -> Files.readString(any(Path.class))).thenReturn("fake-token");
+ mockedServiceDiscoverStatic.when(() -> ServiceDiscoverHttpClient.defaultURL("carbonio-message-broker"))
+ .thenReturn(serviceDiscoverHttpClient);
+ Mockito.when(serviceDiscoverHttpClient.withToken("fake-token")).thenReturn(serviceDiscoverHttpClient);
+
+ Mockito.when(serviceDiscoverHttpClient.getConfig("default/username")).thenReturn(Try.success("fake-username"));
+ Mockito.when(serviceDiscoverHttpClient.getConfig("default/password")).thenReturn(Try.success("fake-password"));
+
+ mockedMessageBrokerClientStatic.when(() -> MessageBrokerClient.fromConfig(
+ "127.78.0.7",
+ 20005,
+ "fake-username",
+ "fake-password"
+ )).thenReturn(messageBrokerClient);
+
+ Mockito.when(messageBrokerClient.withCurrentService(Service.MAILBOX)).thenReturn(messageBrokerClient);
+ Mockito.when(messageBrokerClient.publish(any(UserStatusChanged.class))).thenReturn(true);
+
+ accountStatus.postModify(context, attrName, entry);
+
+ assertTrue(true);
+ }catch(Exception e) {
+ e.printStackTrace();
+ fail();
+ }
+ }
+}