Skip to content

Commit

Permalink
fix service event loader overrides the service event address with the…
Browse files Browse the repository at this point in the history
… module configured address - see #2263

Signed-off-by: riccardomodanese <riccardo.modanese@eurotech.com>
  • Loading branch information
riccardomodanese committed Dec 18, 2018
1 parent 341e46a commit 0a5afeb
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 33 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*******************************************************************************
* Copyright (c) 2018 Eurotech and/or its affiliates and others
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.commons.event;

/**
* Service entry used buy the {@link ServiceEventModule} and the {@link ServiceEventHousekeeper} to bind/unbind the services to the correct addresses
*
*/
public class ServiceEntry {

private String serviceName;
private String address;

public ServiceEntry(String serviceName, String address) {
this.serviceName = serviceName;
this.address = address;
}

public String getServiceName() {
return serviceName;
}

public String getAddress() {
return address;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,24 +65,20 @@ private enum EventsProcessType {
private EntityManager manager;

private ServiceEventBus eventbus;
private String serviceInternalEventAddress;
private String[] servicesNames;
private List<ServiceEntry> servicesEntryList;
private boolean running;

/**
* Default constructor
*
* @param entityManagerFactory
* @param eventbus
* @param serviceInternalEventAddress
* @param servicesNameList
* @param servicesEntryList
* @throws KapuaException
*/
public ServiceEventHousekeeper(EntityManagerFactory entityManagerFactory, ServiceEventBus eventbus, String serviceInternalEventAddress, List<String> servicesNameList) throws KapuaException {
public ServiceEventHousekeeper(EntityManagerFactory entityManagerFactory, ServiceEventBus eventbus, List<ServiceEntry> servicesEntryList) throws KapuaException {
this.eventbus = eventbus;
this.serviceInternalEventAddress = serviceInternalEventAddress;
servicesNames = new String[0];
servicesNames = servicesNameList.toArray(servicesNames);
this.servicesEntryList = servicesEntryList;
manager = entityManagerFactory.createEntityManager();
kapuaEventService = new EventStoreServiceImpl(entityManagerFactory);
}
Expand All @@ -93,11 +89,11 @@ public void run() {
running = true;
while (running) {
waitStep();
for (String serviceName : servicesNames) {
for (ServiceEntry serviceEntry : servicesEntryList) {
try {
if (running) {
KapuaSecurityUtils.doPrivileged(() -> {
processServiceEvents(serviceName);
processServiceEvents(serviceEntry.getServiceName());
});
}
} catch (KapuaException e) {
Expand Down Expand Up @@ -141,12 +137,14 @@ private void findAndSendUnsentEvents(String serviceName, EventsProcessType event
if (!unsentMessagesList.isEmpty()) {
for (EventStoreRecord kapuaEvent : unsentMessagesList.getItems()) {
try {
LOGGER.info("publish event: service '{}' - operation '{}' - id '{}'",
String address = ServiceMap.getAddress(serviceName);
LOGGER.info("publish event: service '{}' - address '{}' - operation '{}' - id '{}'",
kapuaEvent.getService(),
address,
kapuaEvent.getOperation(),
kapuaEvent.getContextId());

eventbus.publish(serviceInternalEventAddress, ServiceEventUtil.toServiceEventBus(kapuaEvent));
eventbus.publish(address, ServiceEventUtil.toServiceEventBus(kapuaEvent));
//if message was sent successfully then confirm the event in the event table
//if something goes wrong during this update the event message may be raised twice (but this condition should happens rarely and it is compliant to the contract of the service events)
//this is done in a different transaction
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2017 Eurotech and/or its affiliates and others
* Copyright (c) 2017, 2018 Eurotech and/or its affiliates and others
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
Expand All @@ -11,6 +11,7 @@
*******************************************************************************/
package org.eclipse.kapua.commons.event;

import org.apache.commons.lang3.StringUtils;
import org.eclipse.kapua.KapuaException;
import org.eclipse.kapua.commons.core.ServiceModule;
import org.eclipse.kapua.event.ServiceEventBus;
Expand All @@ -19,6 +20,7 @@

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -59,12 +61,19 @@ public void start() throws KapuaException {
LOGGER.info("Starting service event module... initialize event bus");
ServiceEventBus eventbus = ServiceEventBusManager.getInstance();
LOGGER.info("Starting service event module... initialize event subscriptions");
List<ServiceEntry> servicesEntryList = new ArrayList<>();
if (serviceEventModuleConfiguration.getServiceEventClientConfigurations() != null) {
for (ServiceEventClientConfiguration selc : serviceEventModuleConfiguration.getServiceEventClientConfigurations()) {
//get the specific service address... if empty switch to use the default configuration address
String address = selc.getAddress();
if (StringUtils.isEmpty(selc.getAddress())) {
address = serviceEventModuleConfiguration.getInternalAddress();
}
// Listen to upstream service events
if (selc.getEventListener() != null) {
eventbus.subscribe(selc.getAddress(), getSubscriptionName(selc.getAddress(), selc.getClientName()), selc.getEventListener());
eventbus.subscribe(address, getSubscriptionName(address, selc.getClientName()), selc.getEventListener());
}
servicesEntryList.add(new ServiceEntry(selc.getClientName(), address));
subscriberNames.add(selc.getClientName()); // Set because names must be unique
}
} else {
Expand All @@ -73,25 +82,20 @@ public void start() throws KapuaException {

// register events to the service map
LOGGER.info("Starting service event module... register services names");
ServiceMap.registerServices(serviceEventModuleConfiguration.getInternalAddress(), new ArrayList<>(subscriberNames));
ServiceMap.registerServices(serviceEventModuleConfiguration.getInternalAddress(), servicesEntryList);

// Start the House keeper
LOGGER.info("Starting service event module... start housekeeper");
houseKeeperScheduler = Executors.newScheduledThreadPool(1);
houseKeeperJob = new ServiceEventHousekeeper(
serviceEventModuleConfiguration.getEntityManagerFactory(),
eventbus,
serviceEventModuleConfiguration.getInternalAddress(),
new ArrayList<>(subscriberNames));
servicesEntryList);
// Start time can be made random from 0 to 30 seconds
houseKeeperHandler = houseKeeperScheduler.scheduleAtFixedRate(houseKeeperJob, SCHEDULED_EXECUTION_TIME_WINDOW, SCHEDULED_EXECUTION_TIME_WINDOW, TimeUnit.SECONDS);
LOGGER.info("Starting service event module... DONE");
}

public String getEventAddress(String serviceName) {
return null;
}

@Override
public void stop() throws KapuaException {
LOGGER.info("Stopping service event module... {}", this.getClass().getName());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*******************************************************************************
* Copyright (c) 2017 Eurotech and/or its affiliates and others
* Copyright (c) 2017, 2018 Eurotech and/or its affiliates and others
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
Expand Down Expand Up @@ -38,26 +38,26 @@ private ServiceMap() {
/**
* Register the list of services to the provided address
*
* @param serviceAddress
* @param servicesNames
* @param serviceDefaultAddress
* @param servicesEntryList
*/
public static synchronized void registerServices(String serviceAddress, List<String> servicesNames) {
for (String serviceName : servicesNames) {
public static synchronized void registerServices(String serviceDefaultAddress, List<ServiceEntry> servicesEntryList) {
for (ServiceEntry serviceEntry : servicesEntryList) {
//register service name
String tmpServiceName = AVAILABLE_SERVICES.get(serviceName);
String tmpServiceName = AVAILABLE_SERVICES.get(serviceEntry.getServiceName());
if (tmpServiceName == null) {
AVAILABLE_SERVICES.put(serviceName, serviceAddress);
LOG.info("Bound service '{}' to address '{}'", serviceName, serviceAddress);
} else if (!serviceAddress.equals(tmpServiceName)) {
LOG.warn("The service '{}' is already registered with a different address (old '{}' - new '{}'). No change will be made", serviceName, tmpServiceName, serviceAddress);
AVAILABLE_SERVICES.put(serviceEntry.getServiceName(), serviceEntry.getAddress());
LOG.info("Bound service '{}' to address '{}'", serviceEntry.getServiceName(), serviceEntry.getAddress());
} else if (!serviceEntry.getAddress().equals(tmpServiceName)) {
LOG.warn("The service '{}' is already registered with a different address (old '{}' - new '{}'). No change will be made", serviceEntry.getServiceName(), tmpServiceName, serviceEntry.getAddress());
} else {
LOG.info("The service '{}' is already registered with address '{}'", serviceName, serviceAddress);
LOG.info("The service '{}' is already registered with address '{}'", serviceEntry.getServiceName(), serviceEntry.getAddress());
}
}
}

/**
* Unregister the provided services from the addess map
* Unregister the provided services from the address map
*
* @param servicesNames
*/
Expand Down

0 comments on commit 0a5afeb

Please sign in to comment.