Skip to content

Commit

Permalink
Merge "CPS NCMP: Add state tags to cps.ncmp.lcm.events.publish metrics"
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhanrahan authored and Gerrit Code Review committed Jul 17, 2024
2 parents b08422d + 05da891 commit 3643790
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@

package org.onap.cps.ncmp.impl.inventory.sync.lcm;

import io.micrometer.core.annotation.Timed;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.onap.cps.events.EventsPublisher;
import org.onap.cps.ncmp.events.lcm.v1.LcmEvent;
import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader;
import org.onap.cps.ncmp.events.lcm.v1.Values;
import org.onap.cps.utils.JsonObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.KafkaException;
Expand All @@ -41,8 +46,12 @@
@RequiredArgsConstructor
public class LcmEventsService {

private static final Tag TAG_METHOD = Tag.of("method", "publishLcmEvent");
private static final Tag TAG_CLASS = Tag.of("class", LcmEventsService.class.getName());
private static final String UNAVAILABLE_CM_HANDLE_STATE = "N/A";
private final EventsPublisher<LcmEvent> eventsPublisher;
private final JsonObjectMapper jsonObjectMapper;
private final MeterRegistry meterRegistry;

@Value("${app.lcm.events.topic:ncmp-events}")
private String topicName;
Expand All @@ -51,24 +60,58 @@ public class LcmEventsService {
private boolean notificationsEnabled;

/**
* Publish the LcmEvent with header to the public topic.
* Publishes an LCM event to the dedicated topic with optional notification headers.
* Capture and log KafkaException If an error occurs while publishing the event to Kafka
*
* @param cmHandleId Cm Handle Id
* @param lcmEvent Lcm Event
* @param lcmEventHeader Lcm Event Header
* @param cmHandleId Cm Handle Id associated with the LCM event
* @param lcmEvent The LCM event object to be published
* @param lcmEventHeader Optional headers associated with the LCM event
*/
@Timed(value = "cps.ncmp.lcm.events.publish", description = "Time taken to publish a LCM event")
public void publishLcmEvent(final String cmHandleId, final LcmEvent lcmEvent, final LcmEventHeader lcmEventHeader) {

if (notificationsEnabled) {
final Timer.Sample timerSample = Timer.start(meterRegistry);
try {
final Map<String, Object> lcmEventHeadersMap =
jsonObjectMapper.convertToValueType(lcmEventHeader, Map.class);
eventsPublisher.publishEvent(topicName, cmHandleId, lcmEventHeadersMap, lcmEvent);
} catch (final KafkaException e) {
log.error("Unable to publish message to topic : {} and cause : {}", topicName, e.getMessage());
} finally {
recordMetrics(lcmEvent, timerSample);
}
} else {
log.debug("Notifications disabled.");
}
}

private void recordMetrics(final LcmEvent lcmEvent, final Timer.Sample timerSample) {
final List<Tag> tags = new ArrayList<>(4);
tags.add(TAG_CLASS);
tags.add(TAG_METHOD);

final String oldCmHandleState = extractCmHandleStateValue(lcmEvent.getEvent().getOldValues());
tags.add(Tag.of("oldCmHandleState", oldCmHandleState));

final String newCmHandleState = extractCmHandleStateValue(lcmEvent.getEvent().getNewValues());
tags.add(Tag.of("newCmHandleState", newCmHandleState));

timerSample.stop(Timer.builder("cps.ncmp.lcm.events.publish")
.description("Time taken to publish a LCM event")
.tags(tags)
.register(meterRegistry));
}

/**
* Extracts the CM handle state value from the given Values object.
* If the provided Values object or its CM handle state is null, returns a default value.
*
* @param values The Values object containing CM handle state information.
* @return The CM handle state value as a string, or a default value if null.
*/
private String extractCmHandleStateValue(final Values values) {
return (values != null && values.getCmHandleState() != null)
? values.getCmHandleState().value()
: UNAVAILABLE_CM_HANDLE_STATE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,16 @@

package org.onap.cps.ncmp.impl.inventory.sync.lcm

import static org.onap.cps.ncmp.events.lcm.v1.Values.CmHandleState.ADVISED
import static org.onap.cps.ncmp.events.lcm.v1.Values.CmHandleState.READY

import io.micrometer.core.instrument.Tag
import io.micrometer.core.instrument.simple.SimpleMeterRegistry
import org.onap.cps.events.EventsPublisher
import org.onap.cps.ncmp.events.lcm.v1.Event
import org.onap.cps.ncmp.events.lcm.v1.LcmEvent
import org.onap.cps.ncmp.events.lcm.v1.LcmEventHeader
import org.onap.cps.ncmp.events.lcm.v1.Values
import org.onap.cps.utils.JsonObjectMapper
import org.springframework.kafka.KafkaException
import spock.lang.Specification
Expand All @@ -31,14 +38,16 @@ class LcmEventsServiceSpec extends Specification {

def mockLcmEventsPublisher = Mock(EventsPublisher)
def mockJsonObjectMapper = Mock(JsonObjectMapper)
def meterRegistry = new SimpleMeterRegistry()

def objectUnderTest = new LcmEventsService(mockLcmEventsPublisher, mockJsonObjectMapper)
def objectUnderTest = new LcmEventsService(mockLcmEventsPublisher, mockJsonObjectMapper, meterRegistry)

def 'Create and Publish lcm event where events are #scenario'() {
given: 'a cm handle id, Lcm Event, and headers'
def cmHandleId = 'test-cm-handle-id'
def eventId = UUID.randomUUID().toString()
def lcmEvent = new LcmEvent(eventId: eventId, eventCorrelationId: cmHandleId)
def event = getEventWithCmHandleState(ADVISED, READY)
def lcmEvent = new LcmEvent(event: event, eventId: eventId, eventCorrelationId: cmHandleId)
and: 'we also have a lcm event header'
def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
and: 'notificationsEnabled is #notificationsEnabled and it will be true as default'
Expand All @@ -57,6 +66,16 @@ class LcmEventsServiceSpec extends Specification {
assert eventHeaders.get('eventCorrelationId') == cmHandleId
}
}
and: 'metrics are recorded with correct tags'
def timer = meterRegistry.find('cps.ncmp.lcm.events.publish').timer()
if (notificationsEnabled) {
assert timer != null
assert timer.count() == expectedTimesMethodCalled
def tags = timer.getId().getTags()
assert tags.containsAll(Tag.of('oldCmHandleState', ADVISED.value()), Tag.of('newCmHandleState', READY.value()))
} else {
assert timer == null
}
where: 'the following values are used'
scenario | notificationsEnabled || expectedTimesMethodCalled
'enabled' | true || 1
Expand All @@ -67,7 +86,8 @@ class LcmEventsServiceSpec extends Specification {
given: 'a cm handle id and Lcm Event and notification enabled'
def cmHandleId = 'test-cm-handle-id'
def eventId = UUID.randomUUID().toString()
def lcmEvent = new LcmEvent(eventId: eventId, eventCorrelationId: cmHandleId)
and: 'event #event'
def lcmEvent = new LcmEvent(event: event, eventId: eventId, eventCorrelationId: cmHandleId)
def lcmEventHeader = new LcmEventHeader(eventId: eventId, eventCorrelationId: cmHandleId)
objectUnderTest.notificationsEnabled = true
when: 'publisher set to throw an exception'
Expand All @@ -76,6 +96,35 @@ class LcmEventsServiceSpec extends Specification {
objectUnderTest.publishLcmEvent(cmHandleId, lcmEvent, lcmEventHeader)
then: 'the exception is just logged and not bubbled up'
noExceptionThrown()
and: 'metrics are recorded with error tags'
def timer = meterRegistry.find('cps.ncmp.lcm.events.publish').timer()
assert timer != null
assert timer.count() == 1
def expectedTags = [Tag.of('oldCmHandleState', 'N/A'), Tag.of('newCmHandleState', 'N/A')]
def tags = timer.getId().getTags()
assert tags.containsAll(expectedTags)
where: 'the following values are used'
scenario | event
'without values' | new Event()
'without cm handle state' | getEvent()
}

def getEvent() {
def event = new Event()
def values = new Values()
event.setOldValues(values)
event.setNewValues(values)
event
}

def getEventWithCmHandleState(oldCmHandleState, newCmHandleState) {
def event = new Event()
def advisedCmHandleStateValues = new Values()
advisedCmHandleStateValues.setCmHandleState(oldCmHandleState)
event.setOldValues(advisedCmHandleStateValues)
def readyCmHandleStateValues = new Values()
readyCmHandleStateValues.setCmHandleState(newCmHandleState)
event.setNewValues(readyCmHandleStateValues)
return event
}
}

0 comments on commit 3643790

Please sign in to comment.