From 6dd83b70346e35ada3aaed82f14de00447d43277 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Fri, 19 Jul 2024 14:47:18 -0400 Subject: [PATCH] NIFI-13563: Updated Provenance Repository so that instead of returning the single latest event for a component, we return the events from the latest invocation / session. Added system tests to verify the behavior. Also, when replaying latest event, attempt all of those events until one succeeds or all fail Signed-off-by: Matt Burgess This closes #9095 --- .../provenance/NoOpProvenanceRepository.java | 5 +- .../WriteAheadProvenanceRepository.java | 5 +- .../nifi/provenance/index/EventIndex.java | 6 +- .../provenance/index/lucene/CachedQuery.java | 9 +- .../lucene/LatestEventsPerProcessorQuery.java | 32 +++++-- .../index/lucene/LatestEventsQuery.java | 13 ++- .../index/lucene/LuceneEventIndex.java | 38 ++++---- .../VolatileProvenanceRepository.java | 10 +- .../nifi/provenance/ProvenanceRepository.java | 7 +- .../provenance/LatestProvenanceEventsDTO.java | 50 ++++++++++ .../entity/LatestProvenanceEventsEntity.java | 37 ++++++++ .../http/StandardHttpResponseMapper.java | 4 +- .../LatestProvenanceEventsMerger.java | 80 ++++++++++++++++ .../provenance/MockProvenanceRepository.java | 5 +- .../apache/nifi/web/NiFiServiceFacade.java | 8 ++ .../nifi/web/StandardNiFiServiceFacade.java | 11 +++ .../nifi/web/api/ProvenanceEventResource.java | 52 +++++++++-- .../nifi/web/api/ProvenanceResource.java | 8 +- .../nifi/web/controller/ControllerFacade.java | 68 +++++++++++--- .../StatelessProvenanceRepository.java | 5 +- .../ClusteredGetLatestProvenanceEventsIT.java | 28 ++++++ .../GetLatestProvenanceEventsIT.java | 91 +++++++++++++++++++ .../impl/client/nifi/ProvenanceClient.java | 3 + .../nifi/impl/JerseyProvenanceClient.java | 19 +++- 24 files changed, 510 insertions(+), 84 deletions(-) create mode 100644 nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/provenance/LatestProvenanceEventsDTO.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/LatestProvenanceEventsEntity.java create mode 100644 nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/LatestProvenanceEventsMerger.java create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ClusteredGetLatestProvenanceEventsIT.java create mode 100644 nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/GetLatestProvenanceEventsIT.java diff --git a/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java b/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java index 6444c42fbc3a..0379d728216d 100644 --- a/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java +++ b/minifi/minifi-nar-bundles/minifi-provenance-repository-bundle/minifi-provenance-repositories/src/main/java/org/apache/nifi/provenance/NoOpProvenanceRepository.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.List; -import java.util.Optional; import java.util.Set; import static java.util.Collections.EMPTY_SET; @@ -104,8 +103,8 @@ public QuerySubmission submitQuery(Query query, NiFiUser niFiUser) { } @Override - public Optional getLatestCachedEvent(final String componentId) throws IOException { - return Optional.empty(); + public List getLatestCachedEvents(final String componentId) { + return List.of(); } @Override diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java index 975c6b5ffc29..85d1e5c04678 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/WriteAheadProvenanceRepository.java @@ -57,7 +57,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; @@ -258,8 +257,8 @@ public QuerySubmission submitQuery(final Query query, final NiFiUser user) { } @Override - public Optional getLatestCachedEvent(final String componentId) throws IOException { - return eventIndex.getLatestCachedEvent(componentId); + public List getLatestCachedEvents(final String componentId) throws IOException { + return eventIndex.getLatestCachedEvents(componentId); } @Override diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java index a8addb6a0387..a5b3d59166bb 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/EventIndex.java @@ -28,8 +28,8 @@ import java.io.Closeable; import java.io.IOException; +import java.util.List; import java.util.Map; -import java.util.Optional; /** * An Event Index is responsible for indexing Provenance Events in such a way that the index can be quickly @@ -83,13 +83,13 @@ public interface EventIndex extends Closeable { QuerySubmission submitQuery(Query query, EventAuthorizer authorizer, String userId); /** - * Retrieves the most recent Provenance Event that is cached for the given component that is also accessible by the given user + * Retrieves the list of Provenance Events that are cached for the most recent invocation of the given component * @param componentId the ID of the component * * @return an Optional containing the event, or an empty optional if no events are available or none of the available events are accessible by the given user * @throws IOException if unable to read from the repository */ - Optional getLatestCachedEvent(String componentId) throws IOException; + List getLatestCachedEvents(String componentId) throws IOException; /** * Asynchronously computes the lineage for the FlowFile that is identified by the Provenance Event with the given ID. diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CachedQuery.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CachedQuery.java index 770c4552bf37..4c64f4cae6d9 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CachedQuery.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/CachedQuery.java @@ -17,16 +17,17 @@ package org.apache.nifi.provenance.index.lucene; -import java.util.List; -import java.util.Optional; - import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.serialization.StorageSummary; +import java.util.List; +import java.util.Map; +import java.util.Optional; + public interface CachedQuery { - void update(ProvenanceEventRecord event, StorageSummary storageSummary); + void update(Map events); Optional> evaluate(Query query); diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java index 36bfec85d127..13a3cf466b66 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsPerProcessorQuery.java @@ -24,30 +24,46 @@ import org.apache.nifi.provenance.serialization.StorageSummary; import org.apache.nifi.util.RingBuffer; -import java.util.Collections; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class LatestEventsPerProcessorQuery implements CachedQuery { private static final String COMPONENT_ID_FIELD_NAME = SearchableFields.ComponentID.getSearchableFieldName(); + // Map of component ID to a RingBuffer holding up to the last 1000 events private final ConcurrentMap> latestRecords = new ConcurrentHashMap<>(); + // Map of component ID to a List of the Event IDs for all events in the latest batch of events that have been indexed for the given component ID + private final ConcurrentMap> latestEventSet = new ConcurrentHashMap<>(); + @Override - public void update(final ProvenanceEventRecord event, final StorageSummary storageSummary) { + public void update(final Map events) { + final Map> eventsByComponent = new HashMap<>(); + + for (final Map.Entry entry : events.entrySet()) { + update(entry.getKey(), entry.getValue()); + + final String componentId = entry.getKey().getComponentId(); + final List eventSet = eventsByComponent.computeIfAbsent(componentId, id -> new ArrayList<>()); + eventSet.add(entry.getValue().getEventId()); + } + + latestEventSet.putAll(eventsByComponent); + } + + private void update(final ProvenanceEventRecord event, final StorageSummary storageSummary) { final String componentId = event.getComponentId(); final RingBuffer ringBuffer = latestRecords.computeIfAbsent(componentId, id -> new RingBuffer<>(1000)); ringBuffer.add(storageSummary.getEventId()); } public List getLatestEventIds(final String componentId) { - final RingBuffer ringBuffer = latestRecords.get(componentId); - if (ringBuffer == null) { - return Collections.emptyList(); - } - - return ringBuffer.asList(); + final List eventIds = latestEventSet.get(componentId); + return eventIds == null ? List.of() : eventIds; } @Override diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java index 60025217c576..48d9fbbeaf81 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LatestEventsQuery.java @@ -17,21 +17,24 @@ package org.apache.nifi.provenance.index.lucene; -import java.util.List; -import java.util.Optional; - import org.apache.nifi.provenance.ProvenanceEventRecord; import org.apache.nifi.provenance.search.Query; import org.apache.nifi.provenance.serialization.StorageSummary; import org.apache.nifi.util.RingBuffer; +import java.util.List; +import java.util.Map; +import java.util.Optional; + public class LatestEventsQuery implements CachedQuery { final RingBuffer latestRecords = new RingBuffer<>(1000); @Override - public void update(final ProvenanceEventRecord event, final StorageSummary storageSummary) { - latestRecords.add(storageSummary.getEventId()); + public void update(final Map events) { + for (final StorageSummary storageSummary : events.values()) { + latestRecords.add(storageSummary.getEventId()); + } } @Override diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java index 6405ed39802b..b7dcbbbc5069 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-persistent-provenance-repository/src/main/java/org/apache/nifi/provenance/index/lucene/LuceneEventIndex.java @@ -368,15 +368,15 @@ public void reindexEvents(final Map event File lastIndexDir = null; long lastEventTime = -2L; + for (final CachedQuery cachedQuery : cachedQueries) { + cachedQuery.update(events); + } + final List indexableDocs = new ArrayList<>(events.size()); for (final Map.Entry entry : events.entrySet()) { final ProvenanceEventRecord event = entry.getKey(); final StorageSummary summary = entry.getValue(); - for (final CachedQuery cachedQuery : cachedQueries) { - cachedQuery.update(event, summary); - } - final Document document = eventConverter.convert(event, summary); if (document == null) { logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event.getEventId()); @@ -424,10 +424,6 @@ public void commitChanges(final String partitionName) throws IOException { } protected void addEvent(final ProvenanceEventRecord event, final StorageSummary location) { - for (final CachedQuery cachedQuery : cachedQueries) { - cachedQuery.update(event, location); - } - final Document document = eventConverter.convert(event, location); if (document == null) { logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event.getEventId()); @@ -486,6 +482,10 @@ public void addEvents(final Map events) { for (final Map.Entry entry : events.entrySet()) { addEvent(entry.getKey(), entry.getValue()); } + + for (final CachedQuery cachedQuery : cachedQueries) { + cachedQuery.update(events); + } } @@ -643,22 +643,28 @@ public QuerySubmission submitQuery(final Query query, final EventAuthorizer auth } @Override - public Optional getLatestCachedEvent(final String componentId) throws IOException { + public List getLatestCachedEvents(final String componentId) throws IOException { final List eventIds = latestEventsPerProcessorQuery.getLatestEventIds(componentId); if (eventIds.isEmpty()) { logger.info("There are no recent Provenance Events cached for Component with ID {}", componentId); - return Optional.empty(); + return List.of(); } - final Long latestEventId = eventIds.get(eventIds.size() - 1); - final Optional latestEvent = eventStore.getEvent(latestEventId); - if (latestEvent.isPresent()) { - logger.info("Returning {} as the most recent Provenance Events cached for Component with ID {}", latestEvent.get(), componentId); - } else { + final List latestEvents = new ArrayList<>(eventIds.size()); + for (final Long eventId : eventIds) { + final Optional latestEvent = eventStore.getEvent(eventId); + if (latestEvent.isPresent()) { + latestEvents.add(latestEvent.get()); + } + } + + if (latestEvents.isEmpty()) { logger.info("There are no recent Provenance Events cached for Component with ID {}", componentId); + } else { + logger.info("Returning {} as the most recent Provenance Events cached for Component with ID {}", latestEvents, componentId); } - return latestEvent; + return latestEvents; } @Override diff --git a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java index a01c4df68c38..3b166a4ace8a 100644 --- a/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java +++ b/nifi-extension-bundles/nifi-provenance-repository-bundle/nifi-volatile-provenance-repository/src/main/java/org/apache/nifi/provenance/VolatileProvenanceRepository.java @@ -50,7 +50,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -492,14 +491,15 @@ public QuerySubmission submitQuery(final Query query, final NiFiUser user) { } @Override - public Optional getLatestCachedEvent(final String componentId) throws IOException { - final List matches = ringBuffer.getSelectedElements(event -> componentId.equals(event.getComponentId())); + public List getLatestCachedEvents(final String componentId) { + final List matches = ringBuffer.getSelectedElements( + event -> componentId.equals(event.getComponentId()), 1); if (matches.isEmpty()) { - return Optional.empty(); + return List.of(); } - return Optional.of(matches.get(matches.size() - 1)); + return List.of(matches.getLast()); } @Override diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java index 8e159d864bb9..f37c406b1a79 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/provenance/ProvenanceRepository.java @@ -26,7 +26,6 @@ import java.io.IOException; import java.util.List; -import java.util.Optional; import java.util.Set; public interface ProvenanceRepository extends ProvenanceEventRepository { @@ -95,12 +94,12 @@ public interface ProvenanceRepository extends ProvenanceEventRepository { QuerySubmission submitQuery(Query query, NiFiUser user); /** - * Retrieves the most recent Provenance Event that is cached for the given component that is also accessible by the given user + * Retrieves the Provenance Events that are cached for the most recent invocation of the given component. * @param componentId the ID of the component - * @return an Optional containing the event, or an empty optional if no events are available or none of the available events are accessible by the given user + * @return the list of events that are cached for the given component * @throws IOException if unable to read from the repository */ - Optional getLatestCachedEvent(String componentId) throws IOException; + List getLatestCachedEvents(String componentId) throws IOException; /** * @param queryIdentifier of the query diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/provenance/LatestProvenanceEventsDTO.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/provenance/LatestProvenanceEventsDTO.java new file mode 100644 index 000000000000..bcad385a32c5 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/provenance/LatestProvenanceEventsDTO.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.dto.provenance; + +import jakarta.xml.bind.annotation.XmlType; + +import java.util.List; + +@XmlType(name = "latestProvenanceEvents") +public class LatestProvenanceEventsDTO { + private String componentId; + private List provenanceEvents; + + /** + * @return the ID of the component whose latest events were fetched + */ + public String getComponentId() { + return componentId; + } + + public void setComponentId(final String componentId) { + this.componentId = componentId; + } + + /** + * @return the latest provenance events that were recorded for the associated component + */ + public List getProvenanceEvents() { + return provenanceEvents; + } + + public void setProvenanceEvents(final List provenanceEvents) { + this.provenanceEvents = provenanceEvents; + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/LatestProvenanceEventsEntity.java b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/LatestProvenanceEventsEntity.java new file mode 100644 index 000000000000..1cbb1c8a57ab --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/LatestProvenanceEventsEntity.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.web.api.entity; + +import jakarta.xml.bind.annotation.XmlRootElement; +import org.apache.nifi.web.api.dto.provenance.LatestProvenanceEventsDTO; + +@XmlRootElement(name = "latestProvenanceEventsEntity") +public class LatestProvenanceEventsEntity extends Entity { + private LatestProvenanceEventsDTO latestProvenanceEvents; + + /** + * @return latest provenance events + */ + public LatestProvenanceEventsDTO getLatestProvenanceEvents() { + return latestProvenanceEvents; + } + + public void setLatestProvenanceEvents(LatestProvenanceEventsDTO latestProvenanceEvents) { + this.latestProvenanceEvents = latestProvenanceEvents; + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java index c4d742d73e2b..e3ae59a9bcf4 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/StandardHttpResponseMapper.java @@ -51,6 +51,7 @@ import org.apache.nifi.cluster.coordination.http.endpoints.InputPortsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.LabelEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.LabelsEndpointMerger; +import org.apache.nifi.cluster.coordination.http.endpoints.LatestProvenanceEventsMerger; import org.apache.nifi.cluster.coordination.http.endpoints.ListFlowFilesEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.NarDetailsEndpointMerger; import org.apache.nifi.cluster.coordination.http.endpoints.NarSummariesEndpointMerger; @@ -111,7 +112,7 @@ public class StandardHttpResponseMapper implements HttpResponseMapper { - private Logger logger = LoggerFactory.getLogger(StandardHttpResponseMapper.class); + private final Logger logger = LoggerFactory.getLogger(StandardHttpResponseMapper.class); private final List endpointMergers = new ArrayList<>(); @@ -145,6 +146,7 @@ public StandardHttpResponseMapper(final NiFiProperties nifiProperties) { endpointMergers.add(new FlowSnippetEndpointMerger()); endpointMergers.add(new ProvenanceQueryEndpointMerger()); endpointMergers.add(new ProvenanceEventEndpointMerger()); + endpointMergers.add(new LatestProvenanceEventsMerger()); endpointMergers.add(new ControllerServiceEndpointMerger()); endpointMergers.add(new ControllerServicesEndpointMerger()); endpointMergers.add(new ControllerServiceReferenceEndpointMerger()); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/LatestProvenanceEventsMerger.java b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/LatestProvenanceEventsMerger.java new file mode 100644 index 000000000000..d36b8a04502d --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/coordination/http/endpoints/LatestProvenanceEventsMerger.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.cluster.coordination.http.endpoints; + +import org.apache.nifi.cluster.coordination.http.EndpointResponseMerger; +import org.apache.nifi.cluster.manager.NodeResponse; +import org.apache.nifi.cluster.protocol.NodeIdentifier; +import org.apache.nifi.web.api.dto.provenance.LatestProvenanceEventsDTO; +import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; +import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.regex.Pattern; + +public class LatestProvenanceEventsMerger implements EndpointResponseMerger { + public static final Pattern LATEST_EVENTS_URI = Pattern.compile("/nifi-api/provenance-events/latest/[a-f0-9\\-]{36}"); + + @Override + public boolean canHandle(final URI uri, final String method) { + if ("GET".equalsIgnoreCase(method) && LATEST_EVENTS_URI.matcher(uri.getPath()).matches()) { + return true; + } + + return false; + } + + @Override + public NodeResponse merge(final URI uri, final String method, final Set successfulResponses, final Set problematicResponses, final NodeResponse clientResponse) { + if (!canHandle(uri, method)) { + throw new IllegalArgumentException("Cannot use Endpoint Mapper of type " + getClass().getSimpleName() + " to map responses for URI " + uri + ", HTTP Method " + method); + } + + final LatestProvenanceEventsEntity responseEntity = clientResponse.getClientResponse().readEntity(LatestProvenanceEventsEntity.class); + final LatestProvenanceEventsDTO dto = responseEntity.getLatestProvenanceEvents(); + final List mergedEvents = new ArrayList<>(); + + for (final NodeResponse nodeResponse : successfulResponses) { + final NodeIdentifier nodeId = nodeResponse.getNodeId(); + + final LatestProvenanceEventsEntity nodeResponseEntity = nodeResponse.getClientResponse().readEntity(LatestProvenanceEventsEntity.class); + final List nodeEvents = nodeResponseEntity.getLatestProvenanceEvents().getProvenanceEvents(); + + // if the cluster node id or node address is not set, then we need to populate them. If they + // are already set, we don't want to populate them because it will be the case that they were populated + // by the Cluster Coordinator when it federated the request, and we are now just receiving the response + // from the Cluster Coordinator. + for (final ProvenanceEventDTO eventDto : nodeEvents) { + if (eventDto.getClusterNodeId() == null || eventDto.getClusterNodeAddress() == null) { + eventDto.setClusterNodeId(nodeId.getId()); + eventDto.setClusterNodeAddress(nodeId.getApiAddress() + ":" + nodeId.getApiPort()); + } + } + + mergedEvents.addAll(nodeEvents); + } + + dto.setProvenanceEvents(mergedEvents); + + return new NodeResponse(clientResponse, responseEntity); + } + +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/provenance/MockProvenanceRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/provenance/MockProvenanceRepository.java index 4c7b964a9f96..f15157cf291d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/provenance/MockProvenanceRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/provenance/MockProvenanceRepository.java @@ -29,7 +29,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -92,8 +91,8 @@ public QuerySubmission submitQuery(Query query, NiFiUser user) { } @Override - public Optional getLatestCachedEvent(final String componentId) throws IOException { - return Optional.empty(); + public List getLatestCachedEvents(final String componentId) { + return List.of(); } @Override diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java index 973dae7164ab..c9e6d7a99ecd 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java @@ -114,6 +114,7 @@ import org.apache.nifi.web.api.entity.FlowRegistryClientEntity; import org.apache.nifi.web.api.entity.FunnelEntity; import org.apache.nifi.web.api.entity.LabelEntity; +import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity; import org.apache.nifi.web.api.entity.NarDetailsEntity; import org.apache.nifi.web.api.entity.NarSummaryEntity; import org.apache.nifi.web.api.entity.ParameterContextEntity; @@ -322,6 +323,13 @@ public interface NiFiServiceFacade { */ ProvenanceEventDTO getProvenanceEvent(Long id); + /** + * Gets the latest provenance events for the specified component. + * @param componentId the ID of the components to retrieve the latest events for + * @return the latest provenance events + */ + LatestProvenanceEventsEntity getLatestProvenanceEvents(String componentId); + /** * Gets the configuration for this controller. * diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java index 66461519cbef..e67c50d42833 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java @@ -280,6 +280,7 @@ import org.apache.nifi.web.api.dto.diagnostics.JVMDiagnosticsSnapshotDTO; import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO; import org.apache.nifi.web.api.dto.flow.FlowDTO; +import org.apache.nifi.web.api.dto.provenance.LatestProvenanceEventsDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; @@ -325,6 +326,7 @@ import org.apache.nifi.web.api.entity.FlowRegistryClientEntity; import org.apache.nifi.web.api.entity.FunnelEntity; import org.apache.nifi.web.api.entity.LabelEntity; +import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity; import org.apache.nifi.web.api.entity.NarDetailsEntity; import org.apache.nifi.web.api.entity.NarSummaryEntity; import org.apache.nifi.web.api.entity.ParameterContextEntity; @@ -3656,6 +3658,15 @@ public ProvenanceEventDTO getProvenanceEvent(final Long id) { return controllerFacade.getProvenanceEvent(id); } + @Override + public LatestProvenanceEventsEntity getLatestProvenanceEvents(final String componentId) { + final LatestProvenanceEventsDTO dto = controllerFacade.getLatestProvenanceEvents(componentId); + + final LatestProvenanceEventsEntity entity = new LatestProvenanceEventsEntity(); + entity.setLatestProvenanceEvents(dto); + return entity; + } + @Override public ProcessGroupStatusEntity getProcessGroupStatus(final String groupId, final boolean recursive) { final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId); diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java index 42994012da75..fffa46eea9af 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceEventResource.java @@ -16,12 +16,6 @@ */ package org.apache.nifi.web.api; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.URI; -import java.util.Collections; - import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.media.Content; @@ -57,6 +51,7 @@ import org.apache.nifi.web.DownloadableContent; import org.apache.nifi.web.NiFiServiceFacade; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; +import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity; import org.apache.nifi.web.api.entity.ProvenanceEventEntity; import org.apache.nifi.web.api.entity.ReplayLastEventRequestEntity; import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity; @@ -67,6 +62,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.util.Collections; + /** * RESTful endpoint for querying data provenance. */ @@ -496,6 +497,45 @@ public Response submitReplay( return generateCreatedResponse(uri, entity).build(); } + @GET + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.APPLICATION_JSON) + @Path("latest/{componentId}") + @Operation( + summary = "Retrieves the latest cached Provenance Events for the specified component", + responses = @ApiResponse(content = @Content(schema = @Schema(implementation = LatestProvenanceEventsEntity.class))), + security = { + @SecurityRequirement(name = "Read Component Provenance Data - /provenance-data/{component-type}/{uuid}"), + @SecurityRequirement(name = "Read Component Data - /data/{component-type}/{uuid}") + } + ) + @ApiResponses( + value = { + @ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."), + @ApiResponse(responseCode = "401", description = "Client could not be authenticated."), + @ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."), + @ApiResponse(responseCode = "404", description = "The specified resource could not be found."), + @ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.") + } + ) + public Response getLatestProvenanceEvents( + @Parameter( + description = "The ID of the component to retrieve the latest Provenance Events for.", + required = true + ) + @PathParam("componentId") final String componentId + ) { + if (isReplicateRequest()) { + return replicate(HttpMethod.GET); + } + + // get the latest provenance events + final LatestProvenanceEventsEntity entity = serviceFacade.getLatestProvenanceEvents(componentId); + + // generate the response + return generateOkResponse(entity).build(); + } + // setters public void setServiceFacade(NiFiServiceFacade serviceFacade) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java index 2819cf154f8b..6599db969e47 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/ProvenanceResource.java @@ -16,10 +16,6 @@ */ package org.apache.nifi.web.api; -import java.net.URI; -import java.util.HashMap; -import java.util.Map; - import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import io.swagger.v3.oas.annotations.media.Content; @@ -55,6 +51,10 @@ import org.apache.nifi.web.api.entity.ProvenanceEntity; import org.apache.nifi.web.api.entity.ProvenanceOptionsEntity; +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + /** * RESTful endpoint for querying data provenance. diff --git a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java index c455cb345977..79a89a05f5b6 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java +++ b/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.web.controller; +import jakarta.ws.rs.WebApplicationException; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.ClassUtils; import org.apache.commons.lang3.StringUtils; @@ -66,8 +67,8 @@ import org.apache.nifi.controller.status.history.StatusHistoryRepository; import org.apache.nifi.diagnostics.StorageUsage; import org.apache.nifi.diagnostics.SystemDiagnostics; -import org.apache.nifi.flowanalysis.FlowAnalysisRule; import org.apache.nifi.flow.VersionedProcessGroup; +import org.apache.nifi.flowanalysis.FlowAnalysisRule; import org.apache.nifi.flowfile.FlowFilePrioritizer; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.groups.ProcessGroup; @@ -107,6 +108,7 @@ import org.apache.nifi.web.api.dto.DtoFactory; import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO; import org.apache.nifi.web.api.dto.provenance.AttributeDTO; +import org.apache.nifi.web.api.dto.provenance.LatestProvenanceEventsDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO; @@ -126,7 +128,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.ws.rs.WebApplicationException; import java.io.IOException; import java.io.InputStream; import java.text.Collator; @@ -137,10 +138,10 @@ import java.util.Date; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Locale; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.SortedSet; import java.util.TimeZone; @@ -1414,20 +1415,37 @@ public ProvenanceEventDTO submitReplayLastEvent(String componentId) { } // lookup the original event - final Optional optionalEvent = flowController.getProvenanceRepository().getLatestCachedEvent(componentId); - if (!optionalEvent.isPresent()) { + final List latestEvents = flowController.getProvenanceRepository().getLatestCachedEvents(componentId); + if (latestEvents.isEmpty()) { return null; } - // Authorize the replay - final ProvenanceEventRecord event = optionalEvent.get(); - authorizeReplay(event); - - // Replay the FlowFile - flowController.replayFlowFile(event, user); + final Iterator itr = latestEvents.iterator(); + while (itr.hasNext()) { + final ProvenanceEventRecord event = itr.next(); + + try { + // Authorize the replay + authorizeReplay(event); + + // Replay the FlowFile + flowController.replayFlowFile(event, user); + + // convert the event record + return createProvenanceEventDto(event, false); + } catch (final IOException e) { + throw e; + } catch (final Exception e) { + if (itr.hasNext()) { + logger.debug("Failed to replay Provenance Event {} but will continue to try remaining events", event, e); + } else { + throw e; + } + } + } - // convert the event record - return createProvenanceEventDto(event, false); + // Won't happen, because we will have either thrown an Exception or returned the result of createProvenanceEventDto, but necessary for compiler + return null; } catch (final IOException ioe) { throw new NiFiCoreException("An error occurred while getting the specified event.", ioe); } @@ -1519,6 +1537,30 @@ public ProvenanceEventDTO getProvenanceEvent(final Long eventId) { } } + public LatestProvenanceEventsDTO getLatestProvenanceEvents(final String componentId) { + final Authorizable authorizable = flowController.getProvenanceAuthorizableFactory().createProvenanceDataAuthorizable(componentId); + final Authorizer authorizer = flowController.getAuthorizer(); + if (!authorizable.isAuthorized(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser())) { + throw new AccessDeniedException("User does not have permission to view the latest events for the specified component."); + } + + try { + final List events = flowController.getProvenanceRepository().getLatestCachedEvents(componentId); + final List eventDtos = new ArrayList<>(); + for (final ProvenanceEventRecord event : events) { + eventDtos.add(createProvenanceEventDto(event, false)); + } + + final LatestProvenanceEventsDTO dto = new LatestProvenanceEventsDTO(); + dto.setComponentId(componentId); + dto.setProvenanceEvents(eventDtos); + + return dto; + } catch (final IOException ioe) { + throw new NiFiCoreException("An error occurred while getting the latest events for the specified component.", ioe); + } + } + /** * Creates a ProvenanceEventDTO for the specified ProvenanceEventRecord. This should only be invoked once the * current user has been authorized for access to this provenance event. diff --git a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java index fcc2e797d545..c26de50f1450 100644 --- a/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java +++ b/nifi-stateless/nifi-stateless-bundle/nifi-stateless-engine/src/main/java/org/apache/nifi/stateless/repository/StatelessProvenanceRepository.java @@ -38,7 +38,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -154,8 +153,8 @@ public QuerySubmission submitQuery(final Query query, final NiFiUser user) { } @Override - public Optional getLatestCachedEvent(final String componentId) throws IOException { - return Optional.empty(); + public List getLatestCachedEvents(final String componentId) { + return List.of(); } @Override diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ClusteredGetLatestProvenanceEventsIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ClusteredGetLatestProvenanceEventsIT.java new file mode 100644 index 000000000000..96393d241a6e --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ClusteredGetLatestProvenanceEventsIT.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.provenance; + +import org.apache.nifi.tests.system.NiFiInstanceFactory; + +public class ClusteredGetLatestProvenanceEventsIT extends GetLatestProvenanceEventsIT { + + @Override + public NiFiInstanceFactory getInstanceFactory() { + return createTwoNodeInstanceFactory(); + } +} diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/GetLatestProvenanceEventsIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/GetLatestProvenanceEventsIT.java new file mode 100644 index 000000000000..16588d9b9b53 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/GetLatestProvenanceEventsIT.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.tests.system.provenance; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; +import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO; +import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class GetLatestProvenanceEventsIT extends NiFiSystemIT { + + @Test + public void testSingleEvent() throws NiFiClientException, IOException, InterruptedException { + runTest(false); + } + + @Test + public void testMultipleEvents() throws NiFiClientException, IOException, InterruptedException { + runTest(true); + } + + private void runTest(final boolean autoTerminateReverse) throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + final ProcessorEntity reverse = getClientUtil().createProcessor("ReverseContents"); + + if (autoTerminateReverse) { + getClientUtil().setAutoTerminatedRelationships(reverse, Set.of("success", "failure")); + } else { + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + getClientUtil().createConnection(reverse, terminate, "success"); + getClientUtil().setAutoTerminatedRelationships(reverse, "failure"); + } + + getClientUtil().createConnection(generate, reverse, "success"); + getClientUtil().updateProcessorProperties(generate, Map.of("Text", "Hello, World!")); + + getClientUtil().startProcessor(generate); + getClientUtil().startProcessor(reverse); + + final int expectedEventCount = getNumberOfNodes() * (autoTerminateReverse ? 2 : 1); + waitFor(() -> { + final LatestProvenanceEventsEntity entity = getNifiClient().getProvenanceClient().getLatestEvents(reverse.getId()); + final List events = entity.getLatestProvenanceEvents().getProvenanceEvents(); + return events.size() == expectedEventCount; + }); + + final LatestProvenanceEventsEntity entity = getNifiClient().getProvenanceClient().getLatestEvents(reverse.getId()); + final List events = entity.getLatestProvenanceEvents().getProvenanceEvents(); + final Map countsByEventType = new HashMap<>(); + for (final ProvenanceEventDTO event : events) { + assertEquals(reverse.getId(), event.getComponentId()); + final String eventType = event.getEventType(); + countsByEventType.put(eventType, countsByEventType.getOrDefault(eventType, 0) + 1); + + if (getNumberOfNodes() > 1) { + assertNotNull(event.getClusterNodeId()); + } + } + + if (autoTerminateReverse) { + assertEquals(getNumberOfNodes(), countsByEventType.get("DROP").intValue()); + } + assertEquals(getNumberOfNodes(), countsByEventType.get("CONTENT_MODIFIED").intValue()); + } +} diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java index 8f9c6808efc2..8da20f161849 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/ProvenanceClient.java @@ -16,6 +16,7 @@ */ package org.apache.nifi.toolkit.cli.impl.client.nifi; +import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity; import org.apache.nifi.web.api.entity.LineageEntity; import org.apache.nifi.web.api.entity.ProvenanceEntity; import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity; @@ -37,6 +38,8 @@ public interface ProvenanceClient { ReplayLastEventResponseEntity replayLastEvent(String processorId, ReplayEventNodes replayEventNodes) throws NiFiClientException, IOException; + LatestProvenanceEventsEntity getLatestEvents(String processorId) throws NiFiClientException, IOException; + enum ReplayEventNodes { PRIMARY, ALL; diff --git a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java index b9a124728b81..b1f96fb17c7e 100644 --- a/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java +++ b/nifi-toolkit/nifi-toolkit-cli/src/main/java/org/apache/nifi/toolkit/cli/impl/client/nifi/impl/JerseyProvenanceClient.java @@ -16,18 +16,19 @@ */ package org.apache.nifi.toolkit.cli.impl.client.nifi.impl; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.WebTarget; +import jakarta.ws.rs.core.MediaType; import org.apache.commons.lang3.StringUtils; import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException; import org.apache.nifi.toolkit.cli.impl.client.nifi.ProvenanceClient; import org.apache.nifi.toolkit.cli.impl.client.nifi.RequestConfig; +import org.apache.nifi.web.api.entity.LatestProvenanceEventsEntity; import org.apache.nifi.web.api.entity.LineageEntity; import org.apache.nifi.web.api.entity.ProvenanceEntity; import org.apache.nifi.web.api.entity.ReplayLastEventRequestEntity; import org.apache.nifi.web.api.entity.ReplayLastEventResponseEntity; -import jakarta.ws.rs.client.Entity; -import jakarta.ws.rs.client.WebTarget; -import jakarta.ws.rs.core.MediaType; import java.io.IOException; import java.util.Objects; @@ -138,4 +139,16 @@ public ReplayLastEventResponseEntity replayLastEvent(final String processorId, f ReplayLastEventResponseEntity.class); }); } + + @Override + public LatestProvenanceEventsEntity getLatestEvents(final String processorId) throws NiFiClientException, IOException { + if (StringUtils.isBlank(processorId)) { + throw new IllegalArgumentException("Processor ID must be specified"); + } + + return executeAction("Error getting latest events for Processor " + processorId, () -> { + final WebTarget target = provenanceEventsTarget.path("/latest/").path(processorId); + return getRequestBuilder(target).get(LatestProvenanceEventsEntity.class); + }); + } }