Skip to content

Commit

Permalink
NIFI-13563: Updated Provenance Repository so that instead of returnin…
Browse files Browse the repository at this point in the history
…g the single latest event for a component, we return the events from the latest invocation / session. Added system tests to verify the behavior. Also, when replaying latest event, attempt all of those events until one succeeds or all fail

Signed-off-by: Matt Burgess <mattyb149@apache.org>

This closes #9095
  • Loading branch information
markap14 authored and mattyb149 committed Jul 22, 2024
1 parent f50fce5 commit 6dd83b7
Show file tree
Hide file tree
Showing 24 changed files with 510 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static java.util.Collections.EMPTY_SET;
Expand Down Expand Up @@ -104,8 +103,8 @@ public QuerySubmission submitQuery(Query query, NiFiUser niFiUser) {
}

@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
return Optional.empty();
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) {
return List.of();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;


Expand Down Expand Up @@ -258,8 +257,8 @@ public QuerySubmission submitQuery(final Query query, final NiFiUser user) {
}

@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
return eventIndex.getLatestCachedEvent(componentId);
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) throws IOException {
return eventIndex.getLatestCachedEvents(componentId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* An Event Index is responsible for indexing Provenance Events in such a way that the index can be quickly
Expand Down Expand Up @@ -83,13 +83,13 @@ public interface EventIndex extends Closeable {
QuerySubmission submitQuery(Query query, EventAuthorizer authorizer, String userId);

/**
* Retrieves the most recent Provenance Event that is cached for the given component that is also accessible by the given user
* Retrieves the list of Provenance Events that are cached for the most recent invocation of the given component
* @param componentId the ID of the component
*
* @return an Optional containing the event, or an empty optional if no events are available or none of the available events are accessible by the given user
* @throws IOException if unable to read from the repository
*/
Optional<ProvenanceEventRecord> getLatestCachedEvent(String componentId) throws IOException;
List<ProvenanceEventRecord> getLatestCachedEvents(String componentId) throws IOException;

/**
* Asynchronously computes the lineage for the FlowFile that is identified by the Provenance Event with the given ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@

package org.apache.nifi.provenance.index.lucene;

import java.util.List;
import java.util.Optional;

import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.serialization.StorageSummary;

import java.util.List;
import java.util.Map;
import java.util.Optional;

public interface CachedQuery {

void update(ProvenanceEventRecord event, StorageSummary storageSummary);
void update(Map<ProvenanceEventRecord, StorageSummary> events);

Optional<List<Long>> evaluate(Query query);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,46 @@
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.util.RingBuffer;

import java.util.Collections;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class LatestEventsPerProcessorQuery implements CachedQuery {
private static final String COMPONENT_ID_FIELD_NAME = SearchableFields.ComponentID.getSearchableFieldName();
// Map of component ID to a RingBuffer holding up to the last 1000 events
private final ConcurrentMap<String, RingBuffer<Long>> latestRecords = new ConcurrentHashMap<>();

// Map of component ID to a List of the Event IDs for all events in the latest batch of events that have been indexed for the given component ID
private final ConcurrentMap<String, List<Long>> latestEventSet = new ConcurrentHashMap<>();

@Override
public void update(final ProvenanceEventRecord event, final StorageSummary storageSummary) {
public void update(final Map<ProvenanceEventRecord, StorageSummary> events) {
final Map<String, List<Long>> eventsByComponent = new HashMap<>();

for (final Map.Entry<ProvenanceEventRecord, StorageSummary> entry : events.entrySet()) {
update(entry.getKey(), entry.getValue());

final String componentId = entry.getKey().getComponentId();
final List<Long> eventSet = eventsByComponent.computeIfAbsent(componentId, id -> new ArrayList<>());
eventSet.add(entry.getValue().getEventId());
}

latestEventSet.putAll(eventsByComponent);
}

private void update(final ProvenanceEventRecord event, final StorageSummary storageSummary) {
final String componentId = event.getComponentId();
final RingBuffer<Long> ringBuffer = latestRecords.computeIfAbsent(componentId, id -> new RingBuffer<>(1000));
ringBuffer.add(storageSummary.getEventId());
}

public List<Long> getLatestEventIds(final String componentId) {
final RingBuffer<Long> ringBuffer = latestRecords.get(componentId);
if (ringBuffer == null) {
return Collections.emptyList();
}

return ringBuffer.asList();
final List<Long> eventIds = latestEventSet.get(componentId);
return eventIds == null ? List.of() : eventIds;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,24 @@

package org.apache.nifi.provenance.index.lucene;

import java.util.List;
import java.util.Optional;

import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.search.Query;
import org.apache.nifi.provenance.serialization.StorageSummary;
import org.apache.nifi.util.RingBuffer;

import java.util.List;
import java.util.Map;
import java.util.Optional;

public class LatestEventsQuery implements CachedQuery {

final RingBuffer<Long> latestRecords = new RingBuffer<>(1000);

@Override
public void update(final ProvenanceEventRecord event, final StorageSummary storageSummary) {
latestRecords.add(storageSummary.getEventId());
public void update(final Map<ProvenanceEventRecord, StorageSummary> events) {
for (final StorageSummary storageSummary : events.values()) {
latestRecords.add(storageSummary.getEventId());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,15 +368,15 @@ public void reindexEvents(final Map<ProvenanceEventRecord, StorageSummary> event
File lastIndexDir = null;
long lastEventTime = -2L;

for (final CachedQuery cachedQuery : cachedQueries) {
cachedQuery.update(events);
}

final List<IndexableDocument> indexableDocs = new ArrayList<>(events.size());
for (final Map.Entry<ProvenanceEventRecord, StorageSummary> entry : events.entrySet()) {
final ProvenanceEventRecord event = entry.getKey();
final StorageSummary summary = entry.getValue();

for (final CachedQuery cachedQuery : cachedQueries) {
cachedQuery.update(event, summary);
}

final Document document = eventConverter.convert(event, summary);
if (document == null) {
logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event.getEventId());
Expand Down Expand Up @@ -424,10 +424,6 @@ public void commitChanges(final String partitionName) throws IOException {
}

protected void addEvent(final ProvenanceEventRecord event, final StorageSummary location) {
for (final CachedQuery cachedQuery : cachedQueries) {
cachedQuery.update(event, location);
}

final Document document = eventConverter.convert(event, location);
if (document == null) {
logger.debug("Received Provenance Event {} to index but it contained no information that should be indexed, so skipping it", event.getEventId());
Expand Down Expand Up @@ -486,6 +482,10 @@ public void addEvents(final Map<ProvenanceEventRecord, StorageSummary> events) {
for (final Map.Entry<ProvenanceEventRecord, StorageSummary> entry : events.entrySet()) {
addEvent(entry.getKey(), entry.getValue());
}

for (final CachedQuery cachedQuery : cachedQueries) {
cachedQuery.update(events);
}
}


Expand Down Expand Up @@ -643,22 +643,28 @@ public QuerySubmission submitQuery(final Query query, final EventAuthorizer auth
}

@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) throws IOException {
final List<Long> eventIds = latestEventsPerProcessorQuery.getLatestEventIds(componentId);
if (eventIds.isEmpty()) {
logger.info("There are no recent Provenance Events cached for Component with ID {}", componentId);
return Optional.empty();
return List.of();
}

final Long latestEventId = eventIds.get(eventIds.size() - 1);
final Optional<ProvenanceEventRecord> latestEvent = eventStore.getEvent(latestEventId);
if (latestEvent.isPresent()) {
logger.info("Returning {} as the most recent Provenance Events cached for Component with ID {}", latestEvent.get(), componentId);
} else {
final List<ProvenanceEventRecord> latestEvents = new ArrayList<>(eventIds.size());
for (final Long eventId : eventIds) {
final Optional<ProvenanceEventRecord> latestEvent = eventStore.getEvent(eventId);
if (latestEvent.isPresent()) {
latestEvents.add(latestEvent.get());
}
}

if (latestEvents.isEmpty()) {
logger.info("There are no recent Provenance Events cached for Component with ID {}", componentId);
} else {
logger.info("Returning {} as the most recent Provenance Events cached for Component with ID {}", latestEvents, componentId);
}

return latestEvent;
return latestEvents;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -492,14 +491,15 @@ public QuerySubmission submitQuery(final Query query, final NiFiUser user) {
}

@Override
public Optional<ProvenanceEventRecord> getLatestCachedEvent(final String componentId) throws IOException {
final List<ProvenanceEventRecord> matches = ringBuffer.getSelectedElements(event -> componentId.equals(event.getComponentId()));
public List<ProvenanceEventRecord> getLatestCachedEvents(final String componentId) {
final List<ProvenanceEventRecord> matches = ringBuffer.getSelectedElements(
event -> componentId.equals(event.getComponentId()), 1);

if (matches.isEmpty()) {
return Optional.empty();
return List.of();
}

return Optional.of(matches.get(matches.size() - 1));
return List.of(matches.getLast());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;

public interface ProvenanceRepository extends ProvenanceEventRepository {
Expand Down Expand Up @@ -95,12 +94,12 @@ public interface ProvenanceRepository extends ProvenanceEventRepository {
QuerySubmission submitQuery(Query query, NiFiUser user);

/**
* Retrieves the most recent Provenance Event that is cached for the given component that is also accessible by the given user
* Retrieves the Provenance Events that are cached for the most recent invocation of the given component.
* @param componentId the ID of the component
* @return an Optional containing the event, or an empty optional if no events are available or none of the available events are accessible by the given user
* @return the list of events that are cached for the given component
* @throws IOException if unable to read from the repository
*/
Optional<ProvenanceEventRecord> getLatestCachedEvent(String componentId) throws IOException;
List<ProvenanceEventRecord> getLatestCachedEvents(String componentId) throws IOException;

/**
* @param queryIdentifier of the query
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.web.api.dto.provenance;

import jakarta.xml.bind.annotation.XmlType;

import java.util.List;

@XmlType(name = "latestProvenanceEvents")
public class LatestProvenanceEventsDTO {
private String componentId;
private List<ProvenanceEventDTO> provenanceEvents;

/**
* @return the ID of the component whose latest events were fetched
*/
public String getComponentId() {
return componentId;
}

public void setComponentId(final String componentId) {
this.componentId = componentId;
}

/**
* @return the latest provenance events that were recorded for the associated component
*/
public List<ProvenanceEventDTO> getProvenanceEvents() {
return provenanceEvents;
}

public void setProvenanceEvents(final List<ProvenanceEventDTO> provenanceEvents) {
this.provenanceEvents = provenanceEvents;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.web.api.entity;

import jakarta.xml.bind.annotation.XmlRootElement;
import org.apache.nifi.web.api.dto.provenance.LatestProvenanceEventsDTO;

@XmlRootElement(name = "latestProvenanceEventsEntity")
public class LatestProvenanceEventsEntity extends Entity {
private LatestProvenanceEventsDTO latestProvenanceEvents;

/**
* @return latest provenance events
*/
public LatestProvenanceEventsDTO getLatestProvenanceEvents() {
return latestProvenanceEvents;
}

public void setLatestProvenanceEvents(LatestProvenanceEventsDTO latestProvenanceEvents) {
this.latestProvenanceEvents = latestProvenanceEvents;
}
}
Loading

0 comments on commit 6dd83b7

Please sign in to comment.