diff --git a/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEntry.java b/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEntry.java new file mode 100644 index 00000000000..1d2632629d1 --- /dev/null +++ b/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEntry.java @@ -0,0 +1,35 @@ +/******************************************************************************* + * Copyright (c) 2018 Eurotech and/or its affiliates and others + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Eurotech - initial API and implementation + *******************************************************************************/ +package org.eclipse.kapua.commons.event; + +/** + * Service entry used buy the {@link ServiceEventModule} and the {@link ServiceEventHousekeeper} to bind/unbind the services to the correct addresses + * + */ +public class ServiceEntry { + + private String serviceName; + private String address; + + public ServiceEntry(String serviceName, String address) { + this.serviceName = serviceName; + this.address = address; + } + + public String getServiceName() { + return serviceName; + } + + public String getAddress() { + return address; + } +} diff --git a/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEventHousekeeper.java b/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEventHousekeeper.java index f5d7516ce43..70f088d4b20 100644 --- a/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEventHousekeeper.java +++ b/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEventHousekeeper.java @@ -65,8 +65,7 @@ private enum EventsProcessType { private EntityManager manager; private ServiceEventBus eventbus; - private String serviceInternalEventAddress; - private String[] servicesNames; + private List servicesEntryList; private boolean running; /** @@ -74,15 +73,12 @@ private enum EventsProcessType { * * @param entityManagerFactory * @param eventbus - * @param serviceInternalEventAddress - * @param servicesNameList + * @param servicesEntryList * @throws KapuaException */ - public ServiceEventHousekeeper(EntityManagerFactory entityManagerFactory, ServiceEventBus eventbus, String serviceInternalEventAddress, List servicesNameList) throws KapuaException { + public ServiceEventHousekeeper(EntityManagerFactory entityManagerFactory, ServiceEventBus eventbus, List servicesEntryList) throws KapuaException { this.eventbus = eventbus; - this.serviceInternalEventAddress = serviceInternalEventAddress; - servicesNames = new String[0]; - servicesNames = servicesNameList.toArray(servicesNames); + this.servicesEntryList = servicesEntryList; manager = entityManagerFactory.createEntityManager(); kapuaEventService = new EventStoreServiceImpl(entityManagerFactory); } @@ -93,11 +89,11 @@ public void run() { running = true; while (running) { waitStep(); - for (String serviceName : servicesNames) { + for (ServiceEntry serviceEntry : servicesEntryList) { try { if (running) { KapuaSecurityUtils.doPrivileged(() -> { - processServiceEvents(serviceName); + processServiceEvents(serviceEntry.getServiceName()); }); } } catch (KapuaException e) { @@ -141,12 +137,14 @@ private void findAndSendUnsentEvents(String serviceName, EventsProcessType event if (!unsentMessagesList.isEmpty()) { for (EventStoreRecord kapuaEvent : unsentMessagesList.getItems()) { try { - LOGGER.info("publish event: service '{}' - operation '{}' - id '{}'", + String address = ServiceMap.getAddress(serviceName); + LOGGER.info("publish event: service '{}' - address '{}' - operation '{}' - id '{}'", kapuaEvent.getService(), + address, kapuaEvent.getOperation(), kapuaEvent.getContextId()); - eventbus.publish(serviceInternalEventAddress, ServiceEventUtil.toServiceEventBus(kapuaEvent)); + eventbus.publish(address, ServiceEventUtil.toServiceEventBus(kapuaEvent)); //if message was sent successfully then confirm the event in the event table //if something goes wrong during this update the event message may be raised twice (but this condition should happens rarely and it is compliant to the contract of the service events) //this is done in a different transaction diff --git a/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEventModule.java b/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEventModule.java index 75c0d4dda09..d9a833ecef4 100644 --- a/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEventModule.java +++ b/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceEventModule.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2017 Eurotech and/or its affiliates and others + * Copyright (c) 2017, 2018 Eurotech and/or its affiliates and others * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -11,6 +11,7 @@ *******************************************************************************/ package org.eclipse.kapua.commons.event; +import org.apache.commons.lang3.StringUtils; import org.eclipse.kapua.KapuaException; import org.eclipse.kapua.commons.core.ServiceModule; import org.eclipse.kapua.event.ServiceEventBus; @@ -19,6 +20,7 @@ import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -59,12 +61,19 @@ public void start() throws KapuaException { LOGGER.info("Starting service event module... initialize event bus"); ServiceEventBus eventbus = ServiceEventBusManager.getInstance(); LOGGER.info("Starting service event module... initialize event subscriptions"); + List servicesEntryList = new ArrayList<>(); if (serviceEventModuleConfiguration.getServiceEventClientConfigurations() != null) { for (ServiceEventClientConfiguration selc : serviceEventModuleConfiguration.getServiceEventClientConfigurations()) { + //get the specific service address... if empty switch to use the default configuration address + String address = selc.getAddress(); + if (StringUtils.isEmpty(selc.getAddress())) { + address = serviceEventModuleConfiguration.getInternalAddress(); + } // Listen to upstream service events if (selc.getEventListener() != null) { - eventbus.subscribe(selc.getAddress(), getSubscriptionName(selc.getAddress(), selc.getClientName()), selc.getEventListener()); + eventbus.subscribe(address, getSubscriptionName(address, selc.getClientName()), selc.getEventListener()); } + servicesEntryList.add(new ServiceEntry(selc.getClientName(), address)); subscriberNames.add(selc.getClientName()); // Set because names must be unique } } else { @@ -73,7 +82,7 @@ public void start() throws KapuaException { // register events to the service map LOGGER.info("Starting service event module... register services names"); - ServiceMap.registerServices(serviceEventModuleConfiguration.getInternalAddress(), new ArrayList<>(subscriberNames)); + ServiceMap.registerServices(serviceEventModuleConfiguration.getInternalAddress(), servicesEntryList); // Start the House keeper LOGGER.info("Starting service event module... start housekeeper"); @@ -81,17 +90,12 @@ public void start() throws KapuaException { houseKeeperJob = new ServiceEventHousekeeper( serviceEventModuleConfiguration.getEntityManagerFactory(), eventbus, - serviceEventModuleConfiguration.getInternalAddress(), - new ArrayList<>(subscriberNames)); + servicesEntryList); // Start time can be made random from 0 to 30 seconds houseKeeperHandler = houseKeeperScheduler.scheduleAtFixedRate(houseKeeperJob, SCHEDULED_EXECUTION_TIME_WINDOW, SCHEDULED_EXECUTION_TIME_WINDOW, TimeUnit.SECONDS); LOGGER.info("Starting service event module... DONE"); } - public String getEventAddress(String serviceName) { - return null; - } - @Override public void stop() throws KapuaException { LOGGER.info("Stopping service event module... {}", this.getClass().getName()); diff --git a/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceMap.java b/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceMap.java index 1c81594f90c..5321af04692 100644 --- a/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceMap.java +++ b/commons/src/main/java/org/eclipse/kapua/commons/event/ServiceMap.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2017 Eurotech and/or its affiliates and others + * Copyright (c) 2017, 2018 Eurotech and/or its affiliates and others * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -38,26 +38,26 @@ private ServiceMap() { /** * Register the list of services to the provided address * - * @param serviceAddress - * @param servicesNames + * @param serviceDefaultAddress + * @param servicesEntryList */ - public static synchronized void registerServices(String serviceAddress, List servicesNames) { - for (String serviceName : servicesNames) { + public static synchronized void registerServices(String serviceDefaultAddress, List servicesEntryList) { + for (ServiceEntry serviceEntry : servicesEntryList) { //register service name - String tmpServiceName = AVAILABLE_SERVICES.get(serviceName); + String tmpServiceName = AVAILABLE_SERVICES.get(serviceEntry.getServiceName()); if (tmpServiceName == null) { - AVAILABLE_SERVICES.put(serviceName, serviceAddress); - LOG.info("Bound service '{}' to address '{}'", serviceName, serviceAddress); - } else if (!serviceAddress.equals(tmpServiceName)) { - LOG.warn("The service '{}' is already registered with a different address (old '{}' - new '{}'). No change will be made", serviceName, tmpServiceName, serviceAddress); + AVAILABLE_SERVICES.put(serviceEntry.getServiceName(), serviceEntry.getAddress()); + LOG.info("Bound service '{}' to address '{}'", serviceEntry.getServiceName(), serviceEntry.getAddress()); + } else if (!serviceEntry.getAddress().equals(tmpServiceName)) { + LOG.warn("The service '{}' is already registered with a different address (old '{}' - new '{}'). No change will be made", serviceEntry.getServiceName(), tmpServiceName, serviceEntry.getAddress()); } else { - LOG.info("The service '{}' is already registered with address '{}'", serviceName, serviceAddress); + LOG.info("The service '{}' is already registered with address '{}'", serviceEntry.getServiceName(), serviceEntry.getAddress()); } } } /** - * Unregister the provided services from the addess map + * Unregister the provided services from the address map * * @param servicesNames */