Skip to content

Commit

Permalink
NIFI-13556 Fixed logic for identifying components from uploaded NAR
Browse files Browse the repository at this point in the history
- Return copied set of Extensions from ExtensionManager to avoid concurrent modification

This closes #9089

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
bbende authored and exceptionfactory committed Jul 17, 2024
1 parent 0e7e511 commit 8e5cf99
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,22 @@ public boolean test(final ComponentNode componentNode) {
}

private <T extends ComponentNode> boolean isComponentFromType(final T componentNode, final ExtensionDefinition extensionDefinition) {
final String componentType = componentNode.getComponentType();
final String componentClassName = componentNode.getCanonicalClassName();
final BundleCoordinate componentCoordinate = componentNode.getBundleCoordinate();

final String extensionDefinitionType = extensionDefinition.getImplementationClassName();
final String extensionDefinitionClassName = extensionDefinition.getImplementationClassName();
final BundleCoordinate extensionDefinitionCoordinate = extensionDefinition.getBundle().getBundleDetails().getCoordinate();

if (PythonBundle.isPythonCoordinate(componentCoordinate)) {
final String componentType = componentNode.getComponentType();
final String pythonComponentType = "python." + componentType;
return pythonComponentType.equals(extensionDefinitionType) && componentCoordinate.equals(extensionDefinitionCoordinate);
return pythonComponentType.equals(extensionDefinitionClassName) && componentCoordinate.equals(extensionDefinitionCoordinate);
} else if (componentNode.isExtensionMissing()) {
return componentType.equals(extensionDefinitionType)
return componentClassName.equals(extensionDefinitionClassName)
&& componentCoordinate.getGroup().equals(extensionDefinitionCoordinate.getGroup())
&& componentCoordinate.getId().equals(extensionDefinitionCoordinate.getId());
} else {
return componentType.equals(extensionDefinitionType) && componentCoordinate.equals(extensionDefinitionCoordinate);
return componentClassName.equals(extensionDefinitionClassName) && componentCoordinate.equals(extensionDefinitionCoordinate);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@

public class ComponentNodeDefinitionPredicateTest {

private static final String STANDARD_PROCESSOR_TYPE = "org.apache.nifi.MyProcessor";
private static final String STANDARD_PROCESSOR_COMPONENT_TYPE = "MyProcessor";
private static final String STANDARD_PROCESSOR_CLASS_NAME = "org.apache.nifi." + STANDARD_PROCESSOR_COMPONENT_TYPE;
private static final BundleCoordinate STANDARD_BUNDLE_COORDINATE_V1 = new BundleCoordinate("org.apache.nifi", "my-processors-nar", "1.0.0");
private static final BundleCoordinate STANDARD_BUNDLE_COORDINATE_V2 = new BundleCoordinate("org.apache.nifi", "my-processors-nar", "2.0.0");

Expand All @@ -55,7 +56,7 @@ public void setup() {
when(standardBundle.getBundleDetails()).thenReturn(standardBundleDetails);

standardProcessorDefinition = mock(ExtensionDefinition.class);
when(standardProcessorDefinition.getImplementationClassName()).thenReturn(STANDARD_PROCESSOR_TYPE);
when(standardProcessorDefinition.getImplementationClassName()).thenReturn(STANDARD_PROCESSOR_CLASS_NAME);
when(standardProcessorDefinition.getBundle()).thenReturn(standardBundle);

final BundleDetails pythonBundleDetails = mock(BundleDetails.class);
Expand All @@ -72,7 +73,7 @@ public void setup() {
@Test
public void testWhenComponentNodeMatchesDefinition() {
final ComponentNode componentNode = mock(ComponentNode.class);
when(componentNode.getComponentType()).thenReturn(STANDARD_PROCESSOR_TYPE);
when(componentNode.getCanonicalClassName()).thenReturn(STANDARD_PROCESSOR_CLASS_NAME);
when(componentNode.getBundleCoordinate()).thenReturn(STANDARD_BUNDLE_COORDINATE_V1);

final Predicate<ComponentNode> predicate = new ComponentNodeDefinitionPredicate(Set.of(standardProcessorDefinition));
Expand All @@ -82,7 +83,7 @@ public void testWhenComponentNodeMatchesDefinition() {
@Test
public void testWhenComponentNodeTypeDoesNotMatchDefinition() {
final ComponentNode componentNode = mock(ComponentNode.class);
when(componentNode.getComponentType()).thenReturn("com.SomeOtherProcessor");
when(componentNode.getCanonicalClassName()).thenReturn("com.SomeOtherProcessor");
when(componentNode.getBundleCoordinate()).thenReturn(STANDARD_BUNDLE_COORDINATE_V1);

final Predicate<ComponentNode> predicate = new ComponentNodeDefinitionPredicate(Set.of(standardProcessorDefinition));
Expand All @@ -92,7 +93,7 @@ public void testWhenComponentNodeTypeDoesNotMatchDefinition() {
@Test
public void testWhenComponentNodeCoordinateDoesMatchDefinition() {
final ComponentNode componentNode = mock(ComponentNode.class);
when(componentNode.getComponentType()).thenReturn(STANDARD_PROCESSOR_TYPE);
when(componentNode.getCanonicalClassName()).thenReturn(STANDARD_PROCESSOR_CLASS_NAME);
when(componentNode.getBundleCoordinate()).thenReturn(STANDARD_BUNDLE_COORDINATE_V2);

final Predicate<ComponentNode> predicate = new ComponentNodeDefinitionPredicate(Set.of(standardProcessorDefinition));
Expand All @@ -102,7 +103,7 @@ public void testWhenComponentNodeCoordinateDoesMatchDefinition() {
@Test
public void testWhenComponentNodeIsMissingAndCompatibleBundle() {
final ComponentNode componentNode = mock(ComponentNode.class);
when(componentNode.getComponentType()).thenReturn(STANDARD_PROCESSOR_TYPE);
when(componentNode.getCanonicalClassName()).thenReturn(STANDARD_PROCESSOR_CLASS_NAME);
when(componentNode.getBundleCoordinate()).thenReturn(STANDARD_BUNDLE_COORDINATE_V2);
when(componentNode.isExtensionMissing()).thenReturn(true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ public synchronized Set<ExtensionDefinition> getExtensions(final Class<?> defini
throw new IllegalArgumentException("Class cannot be null");
}
final Set<ExtensionDefinition> extensions = definitionMap.get(definition);
return (extensions == null) ? Collections.emptySet() : extensions;
return (extensions == null) ? Collections.emptySet() : new HashSet<>(extensions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,11 @@
import org.apache.nifi.web.api.entity.NarDetailsEntity;
import org.apache.nifi.web.api.entity.NarSummariesEntity;
import org.apache.nifi.web.api.entity.NarSummaryEntity;
import org.apache.nifi.web.api.entity.ProcessorEntity;
import org.apache.nifi.web.api.entity.ProcessorTypesEntity;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
Expand All @@ -43,8 +46,11 @@

public class NarUploadStandaloneIT extends NiFiSystemIT {

private static final Logger logger = LoggerFactory.getLogger(NarUploadStandaloneIT.class);

private static final String NAR_PROVIDER_NARS_LOCATION = "target/nifi-nar-provider-nars";
private static final String PROCESSORS_NAR_ID = "nifi-nar-provider-processors-nar";
private static final String PROCESSOR_CLASS_NAME = "org.apache.nifi.nar.provider.GetClassLoaderInfo";
private static final String CONTROLLER_SERVICE_API_NAR_ID = "nifi-nar-provider-service-api-nar";
private static final String CONTROLLER_SERVICE_NAR_ID = "nifi-nar-provider-service-nar";

Expand Down Expand Up @@ -116,6 +122,14 @@ public void testUploadStandardNars() throws NiFiClientException, IOException, In
assertNotNull(serviceApiNarDetails.getDependentCoordinates());
assertEquals(2, serviceApiNarDetails.getDependentCoordinates().size());

// Create instance of the custom processor
final NarCoordinateDTO uploadedProcessorCoordinate = uploadedProcessorsNar.getCoordinate();
final ProcessorEntity customProcessor = getClientUtil().createProcessor(PROCESSOR_CLASS_NAME, uploadedProcessorCoordinate.getGroup(),
uploadedProcessorCoordinate.getArtifact(), uploadedProcessorCoordinate.getVersion());
assertNotNull(customProcessor.getComponent());
assertNotNull(customProcessor.getComponent().getExtensionMissing());
assertFalse(customProcessor.getComponent().getExtensionMissing());

// Verify service API NAR can't be replaced while other NARs depend on it
assertThrows(NiFiClientException.class, () -> narUploadUtil.uploadNar(narsLocation, CONTROLLER_SERVICE_API_NAR_ID));

Expand All @@ -137,6 +151,27 @@ public void testUploadStandardNars() throws NiFiClientException, IOException, In

// Verify no NARs exist
narUploadUtil.verifyNarSummaries(0);

// Verify custom processor is ghosted
final String customProcessorId = customProcessor.getId();
waitFor(() -> {
final ProcessorEntity customProcessorAfterDelete = getNifiClient().getProcessorClient().getProcessor(customProcessorId);
logger.info("Waiting for processor {} to be considered missing", customProcessorId);
return customProcessorAfterDelete.getComponent().getExtensionMissing();
});

// Restore NARs
narUploadUtil.uploadNar(narsLocation, CONTROLLER_SERVICE_API_NAR_ID);
narUploadUtil.uploadNar(narsLocation, CONTROLLER_SERVICE_NAR_ID);
final NarSummaryDTO restoredProcessorsNar = narUploadUtil.uploadNar(narsLocation, PROCESSORS_NAR_ID);
waitFor(narUploadUtil.getWaitForNarStateSupplier(restoredProcessorsNar.getIdentifier(), NarState.INSTALLED));

// Verify processor is un-ghosted
waitFor(() -> {
final ProcessorEntity customProcessorAfterDelete = getNifiClient().getProcessorClient().getProcessor(customProcessorId);
logger.info("Waiting for processor {} to be considered not missing", customProcessorId);
return !customProcessorAfterDelete.getComponent().getExtensionMissing();
});
}

private boolean matchingBundles(final BundleDTO bundleDTO, final NarCoordinateDTO narCoordinateDTO) {
Expand Down

0 comments on commit 8e5cf99

Please sign in to comment.