Skip to content

Commit

Permalink
NIFI-14230 Implement start/stop flow C2 operations in MiNiFi
Browse files Browse the repository at this point in the history
  • Loading branch information
pkedvessy committed Feb 5, 2025
1 parent 46cbada commit 1a11074
Show file tree
Hide file tree
Showing 16 changed files with 704 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.net.SocketException;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -48,13 +47,10 @@
import org.apache.nifi.c2.protocol.api.AgentRepositories;
import org.apache.nifi.c2.protocol.api.AgentResourceConsumption;
import org.apache.nifi.c2.protocol.api.AgentStatus;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.ProcessorStatus;
import org.apache.nifi.c2.protocol.api.ResourceInfo;
import org.apache.nifi.c2.protocol.api.C2Heartbeat;
import org.apache.nifi.c2.protocol.api.DeviceInfo;
import org.apache.nifi.c2.protocol.api.FlowInfo;
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.NetworkInfo;
import org.apache.nifi.c2.protocol.api.SupportedOperation;
import org.apache.nifi.c2.protocol.api.ResourcesGlobalHash;
Expand Down Expand Up @@ -92,7 +88,7 @@ public C2Heartbeat create(RuntimeInfoWrapper runtimeInfoWrapper) {

heartbeat.setAgentInfo(getAgentInfo(runtimeInfoWrapper.getAgentRepositories(), runtimeInfoWrapper.getManifest()));
heartbeat.setDeviceInfo(generateDeviceInfo());
heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper.getQueueStatus(), runtimeInfoWrapper.getProcessorBulletins(), runtimeInfoWrapper.getProcessorStatus()));
heartbeat.setFlowInfo(getFlowInfo(runtimeInfoWrapper));
heartbeat.setCreated(System.currentTimeMillis());

ResourceInfo resourceInfo = new ResourceInfo();
Expand All @@ -102,11 +98,12 @@ public C2Heartbeat create(RuntimeInfoWrapper runtimeInfoWrapper) {
return heartbeat;
}

private FlowInfo getFlowInfo(Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin> processorBulletins, List<ProcessorStatus> processorStatus) {
private FlowInfo getFlowInfo(RuntimeInfoWrapper runtimeInfoWrapper) {
FlowInfo flowInfo = new FlowInfo();
flowInfo.setQueues(queueStatus);
flowInfo.setProcessorBulletins(processorBulletins);
flowInfo.setProcessorStatuses(processorStatus);
flowInfo.setQueues(runtimeInfoWrapper.getQueueStatus());
flowInfo.setProcessorBulletins(runtimeInfoWrapper.getProcessorBulletins());
flowInfo.setProcessorStatuses(runtimeInfoWrapper.getProcessorStatus());
flowInfo.setRunStatus(runtimeInfoWrapper.getRunStatus());
Optional.ofNullable(flowIdHolder.getFlowId()).ifPresent(flowInfo::setFlowId);
return flowInfo;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.nifi.c2.protocol.api.FlowQueueStatus;
import org.apache.nifi.c2.protocol.api.ProcessorBulletin;
import org.apache.nifi.c2.protocol.api.ProcessorStatus;
import org.apache.nifi.c2.protocol.api.RunStatus;
import org.apache.nifi.c2.protocol.component.api.RuntimeManifest;

public class RuntimeInfoWrapper {
Expand All @@ -30,14 +31,16 @@ public class RuntimeInfoWrapper {
final Map<String, FlowQueueStatus> queueStatus;
final List<ProcessorBulletin> processorBulletins;
final List<ProcessorStatus> processorStatus;
final RunStatus runStatus;

public RuntimeInfoWrapper(AgentRepositories repos, RuntimeManifest manifest, Map<String, FlowQueueStatus> queueStatus, List<ProcessorBulletin> processorBulletins,
List<ProcessorStatus> processorStatus) {
List<ProcessorStatus> processorStatus, RunStatus runStatus) {
this.repos = repos;
this.manifest = manifest;
this.queueStatus = queueStatus;
this.processorBulletins = processorBulletins;
this.processorStatus = processorStatus;
this.runStatus = runStatus;
}

public AgentRepositories getAgentRepositories() {
Expand All @@ -59,4 +62,8 @@ public List<ProcessorBulletin> getProcessorBulletins() {
public List<ProcessorStatus> getProcessorStatus() {
return processorStatus;
}

public RunStatus getRunStatus() {
return runStatus;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.c2.client.service.operation;

import org.apache.nifi.c2.protocol.api.C2OperationState.OperationState;

public interface FlowStateStrategy {

OperationState start();

OperationState stop();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.c2.client.service.operation;

import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;

import java.util.Collections;
import java.util.Map;

import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.EMPTY;

public class StartFlowOperationHandler implements C2OperationHandler {

public static final String NOT_APPLIED_DETAILS = "Failed to start flow, please check the log for errors";
public static final String PARTIALLY_APPLIED_DETAILS = "Some components failed to start, please check the log for errors";
public static final String FULLY_APPLIED_DETAILS = "Flow started";
public static final String UNEXPECTED_DETAILS = "Unexpected status, please check the log for errors";

private final FlowStateStrategy flowStateStrategy;

public StartFlowOperationHandler(FlowStateStrategy flowStateStrategy) {
this.flowStateStrategy = flowStateStrategy;
}

@Override
public OperationType getOperationType() {
return OperationType.START;
}

@Override
public OperandType getOperandType() {
return OperandType.FLOW;
}

@Override
public Map<String, Object> getProperties() {
return Collections.emptyMap();
}

@Override
public C2OperationAck handle(C2Operation operation) {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
C2OperationState.OperationState operationState = flowStateStrategy.start();

C2OperationState resultState = operationState(
operationState,
switch (operationState) {
case NOT_APPLIED -> NOT_APPLIED_DETAILS;
case FULLY_APPLIED -> FULLY_APPLIED_DETAILS;
case PARTIALLY_APPLIED -> PARTIALLY_APPLIED_DETAILS;
default -> UNEXPECTED_DETAILS;
}
);

return operationAck(operationId, resultState);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.c2.client.service.operation;

import org.apache.nifi.c2.protocol.api.C2Operation;
import org.apache.nifi.c2.protocol.api.C2OperationAck;
import org.apache.nifi.c2.protocol.api.C2OperationState;
import org.apache.nifi.c2.protocol.api.OperandType;
import org.apache.nifi.c2.protocol.api.OperationType;

import java.util.Collections;
import java.util.Map;

import static java.util.Optional.ofNullable;
import static org.apache.commons.lang3.StringUtils.EMPTY;

public class StopFlowOperationHandler implements C2OperationHandler {

public static final String NOT_APPLIED_DETAILS = "Failed to stop flow, please check the log for errors";
public static final String FULLY_APPLIED_DETAILS = "Flow stopped";
public static final String PARTIALLY_APPLIED_DETAILS = "Some components failed to stop, please check the log for errors";
public static final String UNEXPECTED_DETAILS = "Unexpected status, please check the log for errors";


private final FlowStateStrategy flowStateStrategy;

public StopFlowOperationHandler(FlowStateStrategy flowStateStrategy) {
this.flowStateStrategy = flowStateStrategy;
}

@Override
public OperationType getOperationType() {
return OperationType.STOP;
}

@Override
public OperandType getOperandType() {
return OperandType.FLOW;
}

@Override
public Map<String, Object> getProperties() {
return Collections.emptyMap();
}

@Override
public C2OperationAck handle(C2Operation operation) {
String operationId = ofNullable(operation.getIdentifier()).orElse(EMPTY);
C2OperationState.OperationState operationState = flowStateStrategy.stop();

C2OperationState resultState = operationState(
operationState,
switch (operationState) {
case NOT_APPLIED -> NOT_APPLIED_DETAILS;
case FULLY_APPLIED -> FULLY_APPLIED_DETAILS;
case PARTIALLY_APPLIED -> PARTIALLY_APPLIED_DETAILS;
default -> UNEXPECTED_DETAILS;
}
);

return operationAck(operationId, resultState);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.nifi.c2.client.service;

import static org.apache.nifi.c2.protocol.api.RunStatus.RUNNING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
Expand Down Expand Up @@ -124,13 +125,14 @@ void testCreatePopulatesFromRuntimeInfoWrapperForFullHeartbeat() {
List<ProcessorBulletin> processorBulletins = new ArrayList<>();
List<ProcessorStatus> processorStatus = new ArrayList<>();

C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins, processorStatus));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins, processorStatus, RUNNING));

assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
assertEquals(manifest, heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
assertEquals(processorBulletins, heartbeat.getFlowInfo().getProcessorBulletins());
assertEquals(processorStatus, heartbeat.getFlowInfo().getProcessorStatuses());
assertEquals(RUNNING, heartbeat.getFlowInfo().getRunStatus());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}

Expand All @@ -145,13 +147,14 @@ void testCreatePopulatesFromRuntimeInfoWrapperForLightHeartbeat() {
List<ProcessorBulletin> processorBulletins = new ArrayList<>();
List<ProcessorStatus> processorStatus = new ArrayList<>();

C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins, processorStatus));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(repos, manifest, queueStatus, processorBulletins, processorStatus, RUNNING));

assertEquals(repos, heartbeat.getAgentInfo().getStatus().getRepositories());
assertNull(heartbeat.getAgentInfo().getAgentManifest());
assertEquals(queueStatus, heartbeat.getFlowInfo().getQueues());
assertEquals(processorBulletins, heartbeat.getFlowInfo().getProcessorBulletins());
assertEquals(processorStatus, heartbeat.getFlowInfo().getProcessorStatuses());
assertEquals(RUNNING, heartbeat.getFlowInfo().getRunStatus());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
}

Expand All @@ -168,10 +171,11 @@ void testAgentManifestHashIsPopulatedInCaseOfRuntimeManifest() {
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), Collections.emptySet())).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());

C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>(), new ArrayList<>()));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>(), new ArrayList<>(), RUNNING));

assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
assertEquals(RUNNING, heartbeat.getFlowInfo().getRunStatus());
}

@Test
Expand All @@ -184,10 +188,11 @@ void testAgentManifestHashIsPopulatedInCaseOfAgentManifest() {
when(manifestHashProvider.calculateManifestHash(manifest.getBundles(), supportedOperations)).thenReturn(MANIFEST_HASH);
when(resourcesGlobalHashSupplier.get()).thenReturn(createResourcesGlobalHash());

C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>(), new ArrayList<>()));
C2Heartbeat heartbeat = c2HeartbeatFactory.create(new RuntimeInfoWrapper(new AgentRepositories(), manifest, new HashMap<>(), new ArrayList<>(), new ArrayList<>(), RUNNING));

assertEquals(MANIFEST_HASH, heartbeat.getAgentInfo().getAgentManifestHash());
assertEquals(RESOURCE_HASH, heartbeat.getResourceInfo().getHash());
assertEquals(RUNNING, heartbeat.getFlowInfo().getRunStatus());
}

private RuntimeManifest createManifest() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.nifi.c2.client.service.operation;

import static org.apache.nifi.c2.protocol.api.RunStatus.RUNNING;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -58,7 +59,7 @@ void testDescribeManifestOperationHandlerCreateSuccess() {
void testDescribeManifestOperationHandlerPopulatesAckSuccessfully() {
RuntimeManifest manifest = new RuntimeManifest();
manifest.setIdentifier("manifestId");
RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null, null, null);
RuntimeInfoWrapper runtimeInfoWrapper = new RuntimeInfoWrapper(null, manifest, null, null, null, RUNNING);

C2Heartbeat heartbeat = new C2Heartbeat();
AgentInfo agentInfo = new AgentInfo();
Expand Down
Loading

0 comments on commit 1a11074

Please sign in to comment.