Skip to content

Commit

Permalink
[Module Sync] Store CM-handle IDs in work queue
Browse files Browse the repository at this point in the history
This fixes bug CPS-2474, handling various edge cases, such as
CM handles being deleted during module sync.

- Change moduleSyncWorkQueue to store CmHandleId instead of DataNode.
- Freshly fetch Cm Handles in module sync task, so latest CM-handle
  state is used, and only process ADVISED CM handles in module sync.

Issue-ID: CPS-2474
Signed-off-by: danielhanrahan <daniel.hanrahan@est.tech>
Change-Id: I53d5796c56014a2bfbe5b1c3f17d3991e4feef53
  • Loading branch information
danielhanrahan committed Dec 6, 2024
1 parent 2dac434 commit 42dfa67
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ Collection<String> queryCmHandlesByTrustLevel(Map<String, String> trustLevelProp
boolean outputAlternateId);

/**
* Method which returns cm handles by the cm handles state.
* Method which returns cm handle ids by the cm handles state.
*
* @param cmHandleState cm handle state
* @return a list of data nodes representing the cm handles.
* @return a list of cm handle ids.
*/
Collection<DataNode> queryCmHandlesByState(CmHandleState cmHandleState);
Collection<String> queryCmHandleIdsByState(CmHandleState cmHandleState);

/**
* Method to return data nodes with ancestor representing the cm handles.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

package org.onap.cps.ncmp.impl.inventory;

import static org.onap.cps.api.parameters.FetchDescendantsOption.INCLUDE_ALL_DESCENDANTS;
import static org.onap.cps.api.parameters.FetchDescendantsOption.OMIT_DESCENDANTS;
import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DATASPACE_NAME;
import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DMI_REGISTRY_ANCHOR;
Expand All @@ -45,6 +44,7 @@
import org.onap.cps.ncmp.impl.inventory.models.ModelledDmiServiceLeaves;
import org.onap.cps.ncmp.impl.inventory.models.PropertyType;
import org.onap.cps.ncmp.impl.inventory.trustlevel.TrustLevelCacheConfig;
import org.onap.cps.ncmp.impl.utils.YangDataConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;

Expand Down Expand Up @@ -87,14 +87,18 @@ public Collection<String> queryCmHandlesByTrustLevel(final Map<String, String> t
}

@Override
public Collection<DataNode> queryCmHandlesByState(final CmHandleState cmHandleState) {
return queryCmHandleAncestorsByCpsPath("//state[@cm-handle-state=\"" + cmHandleState + "\"]",
INCLUDE_ALL_DESCENDANTS);
public Collection<String> queryCmHandleIdsByState(final CmHandleState cmHandleState) {
final Collection<DataNode> cmHandlesAsDataNodes =
queryNcmpRegistryByCpsPath("//state[@cm-handle-state='" + cmHandleState + "']", OMIT_DESCENDANTS);
return cmHandlesAsDataNodes.stream()
.map(DataNode::getXpath)
.map(YangDataConverter::extractCmHandleIdFromXpath)
.toList();
}

@Override
public Collection<DataNode> queryNcmpRegistryByCpsPath(final String cpsPath,
final FetchDescendantsOption fetchDescendantsOption) {
final FetchDescendantsOption fetchDescendantsOption) {
return cpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR, cpsPath,
fetchDescendantsOption);
}
Expand Down Expand Up @@ -232,5 +236,3 @@ private DataNode getCmHandleState(final String cmHandleId) {
xpath, OMIT_DESCENDANTS).iterator().next();
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,10 @@ public class ModuleOperationsUtils {
/**
* Query data nodes for cm handles with an "ADVISED" cm handle state.
*
* @return cm handles (data nodes) in ADVISED state (empty list if none found)
* @return cm handle ids in ADVISED state (empty list if none found)
*/
public Collection<DataNode> getAdvisedCmHandles() {
final Collection<DataNode> advisedCmHandlesAsDataNodes =
cmHandleQueryService.queryCmHandlesByState(CmHandleState.ADVISED);
log.debug("Total number of fetched advised cm handle(s) is (are) {}", advisedCmHandlesAsDataNodes.size());
return advisedCmHandlesAsDataNodes;
public Collection<String> getAdvisedCmHandleIds() {
return cmHandleQueryService.queryCmHandleIdsByState(CmHandleState.ADVISED);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@
import java.util.concurrent.atomic.AtomicInteger;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.model.DataNode;
import org.onap.cps.api.exceptions.DataNodeNotFoundException;
import org.onap.cps.ncmp.api.inventory.models.CompositeState;
import org.onap.cps.ncmp.impl.inventory.InventoryPersistence;
import org.onap.cps.ncmp.impl.inventory.models.CmHandleState;
import org.onap.cps.ncmp.impl.inventory.models.LockReasonCategory;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
import org.onap.cps.ncmp.impl.inventory.sync.lcm.LcmEventsCmHandleStateHandler;
import org.onap.cps.ncmp.impl.utils.YangDataConverter;
import org.springframework.stereotype.Component;

@RequiredArgsConstructor
Expand All @@ -51,21 +50,29 @@ public class ModuleSyncTasks {
/**
* Perform module sync on a batch of cm handles.
*
* @param cmHandlesAsDataNodes a batch of Data nodes representing cm handles to perform module sync on
* @param cmHandleIds a batch of cm handle ids to perform module sync on
* @param batchCounter the number of batches currently being processed, will be decreased when
* task is finished or fails
* @return completed future to handle post-processing
*/
public CompletableFuture<Void> performModuleSync(final Collection<DataNode> cmHandlesAsDataNodes,
public CompletableFuture<Void> performModuleSync(final Collection<String> cmHandleIds,
final AtomicInteger batchCounter) {
final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle =
new HashMap<>(cmHandlesAsDataNodes.size());
final Map<YangModelCmHandle, CmHandleState> cmHandleStatePerCmHandle = new HashMap<>(cmHandleIds.size());
try {
cmHandlesAsDataNodes.forEach(cmHandleAsDataNode -> {
final YangModelCmHandle yangModelCmHandle = YangDataConverter.toYangModelCmHandle(cmHandleAsDataNode);
final CmHandleState cmHandleState = processCmHandle(yangModelCmHandle);
cmHandleStatePerCmHandle.put(yangModelCmHandle, cmHandleState);
});
for (final String cmHandleId : cmHandleIds) {
try {
final YangModelCmHandle yangModelCmHandle = inventoryPersistence.getYangModelCmHandle(cmHandleId);
if (isCmHandleInAdvisedState(yangModelCmHandle)) {
final CmHandleState newCmHandleState = processCmHandle(yangModelCmHandle);
cmHandleStatePerCmHandle.put(yangModelCmHandle, newCmHandleState);
} else {
log.warn("Skipping module sync for CM handle '{}' as it is in {} state", cmHandleId,
yangModelCmHandle.getCompositeState().getCmHandleState().name());
}
} catch (final DataNodeNotFoundException dataNodeNotFoundException) {
log.warn("Skipping module sync for CM handle '{}' as it does not exist", cmHandleId);
}
}
} finally {
batchCounter.getAndDecrement();
lcmEventsCmHandleStateHandler.updateCmHandleStateBatch(cmHandleStatePerCmHandle);
Expand Down Expand Up @@ -96,7 +103,7 @@ public void setCmHandlesToAdvised(final Collection<YangModelCmHandle> yangModelC
}

private CmHandleState processCmHandle(final YangModelCmHandle yangModelCmHandle) {
final CompositeState compositeState = inventoryPersistence.getCmHandleState(yangModelCmHandle.getId());
final CompositeState compositeState = yangModelCmHandle.getCompositeState();
final boolean inUpgrade = ModuleOperationsUtils.inUpgradeOrUpgradeFailed(compositeState);
try {
if (inUpgrade) {
Expand All @@ -105,27 +112,25 @@ private CmHandleState processCmHandle(final YangModelCmHandle yangModelCmHandle)
moduleSyncService.deleteSchemaSetIfExists(yangModelCmHandle.getId());
moduleSyncService.syncAndCreateSchemaSetAndAnchor(yangModelCmHandle);
}
yangModelCmHandle.getCompositeState().setLockReason(null);
compositeState.setLockReason(null);
return CmHandleState.READY;
} catch (final Exception e) {
log.warn("Processing of {} module failed due to reason {}.", yangModelCmHandle.getId(), e.getMessage());
final LockReasonCategory lockReasonCategory = inUpgrade ? LockReasonCategory.MODULE_UPGRADE_FAILED
: LockReasonCategory.MODULE_SYNC_FAILED;
moduleOperationsUtils.updateLockReasonWithAttempts(compositeState,
lockReasonCategory, e.getMessage());
setCmHandleStateLocked(yangModelCmHandle, compositeState.getLockReason());
final LockReasonCategory lockReasonCategory = inUpgrade
? LockReasonCategory.MODULE_UPGRADE_FAILED
: LockReasonCategory.MODULE_SYNC_FAILED;
moduleOperationsUtils.updateLockReasonWithAttempts(compositeState, lockReasonCategory, e.getMessage());
return CmHandleState.LOCKED;
}
}

private void setCmHandleStateLocked(final YangModelCmHandle advisedCmHandle,
final CompositeState.LockReason lockReason) {
advisedCmHandle.getCompositeState().setLockReason(lockReason);
}

private void removeResetCmHandleFromModuleSyncMap(final String resetCmHandleId) {
if (moduleSyncStartedOnCmHandles.remove(resetCmHandleId) != null) {
log.info("{} removed from in progress map", resetCmHandleId);
}
}

private static boolean isCmHandleInAdvisedState(final YangModelCmHandle yangModelCmHandle) {
return yangModelCmHandle.getCompositeState().getCmHandleState() == CmHandleState.ADVISED;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.model.DataNode;
import org.onap.cps.ncmp.impl.inventory.models.YangModelCmHandle;
import org.onap.cps.ncmp.impl.utils.Sleeper;
import org.springframework.scheduling.annotation.Scheduled;
Expand All @@ -43,7 +42,7 @@
public class ModuleSyncWatchdog {

private final ModuleOperationsUtils moduleOperationsUtils;
private final BlockingQueue<DataNode> moduleSyncWorkQueue;
private final BlockingQueue<String> moduleSyncWorkQueue;
private final IMap<String, Object> moduleSyncStartedOnCmHandles;
private final ModuleSyncTasks moduleSyncTasks;
private final AsyncTaskExecutor asyncTaskExecutor;
Expand All @@ -70,7 +69,7 @@ public void moduleSyncAdvisedCmHandles() {
populateWorkQueueIfNeeded();
while (!moduleSyncWorkQueue.isEmpty()) {
if (batchCounter.get() <= asyncTaskExecutor.getAsyncTaskParallelismLevel()) {
final Collection<DataNode> nextBatch = prepareNextBatch();
final Collection<String> nextBatch = prepareNextBatch();
log.info("Processing module sync batch of {}. {} batch(es) active.",
nextBatch.size(), batchCounter.get());
if (!nextBatch.isEmpty()) {
Expand Down Expand Up @@ -104,14 +103,14 @@ public void populateWorkQueueIfNeeded() {
}

private void populateWorkQueue() {
final Collection<DataNode> advisedCmHandles = moduleOperationsUtils.getAdvisedCmHandles();
if (advisedCmHandles.isEmpty()) {
final Collection<String> advisedCmHandleIds = moduleOperationsUtils.getAdvisedCmHandleIds();
if (advisedCmHandleIds.isEmpty()) {
log.debug("No advised CM handles found in DB.");
} else {
log.info("Fetched {} advised CM handles from DB. Adding them to the work queue.", advisedCmHandles.size());
advisedCmHandles.forEach(advisedCmHandle -> {
final String cmHandleId = String.valueOf(advisedCmHandle.getLeaves().get("id"));
if (moduleSyncWorkQueue.offer(advisedCmHandle)) {
log.info("Fetched {} advised CM handles from DB. Adding them to the work queue.",
advisedCmHandleIds.size());
advisedCmHandleIds.forEach(cmHandleId -> {
if (moduleSyncWorkQueue.offer(cmHandleId)) {
log.info("CM handle {} added to the work queue.", cmHandleId);
} else {
log.warn("Failed to add CM handle {} to the work queue.", cmHandleId);
Expand All @@ -133,21 +132,20 @@ private void setPreviouslyLockedCmHandlesToAdvised() {
}
}

private Collection<DataNode> prepareNextBatch() {
final Collection<DataNode> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
final Collection<DataNode> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
private Collection<String> prepareNextBatch() {
final Collection<String> nextBatchCandidates = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
final Collection<String> nextBatch = new HashSet<>(MODULE_SYNC_BATCH_SIZE);
moduleSyncWorkQueue.drainTo(nextBatchCandidates, MODULE_SYNC_BATCH_SIZE);
log.info("nextBatchCandidates size : {}", nextBatchCandidates.size());
for (final DataNode batchCandidate : nextBatchCandidates) {
final String cmHandleId = String.valueOf(batchCandidate.getLeaves().get("id"));
for (final String cmHandleId : nextBatchCandidates) {
final boolean alreadyAddedToInProgressMap = VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP.equals(
moduleSyncStartedOnCmHandles.putIfAbsent(cmHandleId, VALUE_FOR_HAZELCAST_IN_PROGRESS_MAP,
SynchronizationCacheConfig.MODULE_SYNC_STARTED_TTL_SECS, TimeUnit.SECONDS));
if (alreadyAddedToInProgressMap) {
log.info("module sync for {} already in progress by other instance", cmHandleId);
} else {
log.info("Adding cmHandle : {} to current batch", cmHandleId);
nextBatch.add(batchCandidate);
nextBatch.add(cmHandleId);
}
}
log.info("nextBatch size : {}", nextBatch.size());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
* ============LICENSE_START========================================================
* Copyright (C) 2022-2023 Nordix Foundation
* Copyright (C) 2022-2024 Nordix Foundation
* ================================================================================
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,7 +28,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.Lock;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.api.model.DataNode;
import org.onap.cps.ncmp.impl.cache.HazelcastCacheConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -53,10 +52,10 @@ public class SynchronizationCacheConfig extends HazelcastCacheConfig {
/**
* Module Sync Distributed Queue Instance.
*
* @return queue of cm handles (data nodes) that need module sync
* @return queue of cm handle ids that need module sync
*/
@Bean
public BlockingQueue<DataNode> moduleSyncWorkQueue() {
public BlockingQueue<String> moduleSyncWorkQueue() {
return getOrCreateHazelcastInstance(commonQueueConfig).getQueue("moduleSyncWorkQueue");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.onap.cps.impl.utils.CpsValidator
import org.onap.cps.ncmp.api.inventory.models.TrustLevel
import org.onap.cps.ncmp.impl.inventory.models.CmHandleState
import org.onap.cps.api.model.DataNode
import spock.lang.Shared
import spock.lang.Specification

import static org.onap.cps.ncmp.impl.inventory.NcmpPersistence.NCMP_DATASPACE_NAME
Expand All @@ -40,17 +39,14 @@ class CmHandleQueryServiceImplSpec extends Specification {

def mockCpsQueryService = Mock(CpsQueryService)
def mockCpsDataService = Mock(CpsDataService)

def trustLevelPerDmiPlugin = [:]

def trustLevelPerCmHandleId = [ 'PNFDemo': TrustLevel.COMPLETE, 'PNFDemo2': TrustLevel.NONE, 'PNFDemo4': TrustLevel.NONE ]

def mockCpsValidator = Mock(CpsValidator)

def objectUnderTest = new CmHandleQueryServiceImpl(mockCpsDataService, mockCpsQueryService, trustLevelPerDmiPlugin, trustLevelPerCmHandleId, mockCpsValidator)

@Shared
def static sampleDataNodes = [new DataNode()]
def static sampleDataNodes = [new DataNode(xpath: "/dmi-registry/cm-handles[@id='ch-1']"),
new DataNode(xpath: "/dmi-registry/cm-handles[@id='ch-2']")]

def dataNodeWithPrivateField = '//additional-properties[@name=\"Contact3\" and @value=\"newemailforstore3@bookstore.com\"]/ancestor::cm-handles'

Expand Down Expand Up @@ -117,16 +113,16 @@ class CmHandleQueryServiceImplSpec extends Specification {
result.size() == 1
}

def 'Get CmHandles by it\'s state.'() {
def 'Get Ids of CmHandles by state.'() {
given: 'a cm handle state to query'
def cmHandleState = CmHandleState.ADVISED
and: 'the persistence service returns a list of data nodes'
mockCpsQueryService.queryDataNodes(NCMP_DATASPACE_NAME, NCMP_DMI_REGISTRY_ANCHOR,
'//state[@cm-handle-state="ADVISED"]/ancestor::cm-handles', INCLUDE_ALL_DESCENDANTS) >> sampleDataNodes
"//state[@cm-handle-state='ADVISED']", OMIT_DESCENDANTS) >> sampleDataNodes
when: 'cm handles are fetched by state'
def result = objectUnderTest.queryCmHandlesByState(cmHandleState)
def result = objectUnderTest.queryCmHandleIdsByState(cmHandleState)
then: 'the returned result matches the result from the persistence service'
assert result == sampleDataNodes
assert result.toSet() == ['ch-1', 'ch-2'].toSet()
}

def 'Check the state of a cmHandle when #scenario.'() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,15 @@ class ModuleOperationsUtilsSpec extends Specification{

def 'Get an advised Cm-Handle where ADVISED cm handle #scenario'() {
given: 'the inventory persistence service returns a collection of data nodes'
mockCmHandleQueries.queryCmHandlesByState(CmHandleState.ADVISED) >> dataNodeCollection
when: 'get advised cm handles are fetched'
def yangModelCmHandles = objectUnderTest.getAdvisedCmHandles()
then: 'the returned data node collection is the correct size'
yangModelCmHandles.size() == expectedDataNodeSize
mockCmHandleQueries.queryCmHandleIdsByState(CmHandleState.ADVISED) >> cmHandleIds
when: 'advised cm handle ids are fetched'
def advisedCmHandleIds = objectUnderTest.getAdvisedCmHandleIds()
then: 'the expected cm handle ids are returned'
advisedCmHandleIds == cmHandleIds
where: 'the following scenarios are used'
scenario | dataNodeCollection || expectedCallsToGetYangModelCmHandle | expectedDataNodeSize
'exists' | [dataNode] || 1 | 1
'does not exist' | [] || 0 | 0
scenario | cmHandleIds
'exists' | ['cm-handle-123']
'does not exist' | []
}

def 'Update Lock Reason, Details and Attempts where lock reason #scenario'() {
Expand Down
Loading

0 comments on commit 42dfa67

Please sign in to comment.