Skip to content

Commit

Permalink
NSX Integration fixes (#8906)
Browse files Browse the repository at this point in the history
* Prevent addition of duplicate PF rules on scale up and no rules left behind on scale down (#32)

* fix missing dependency injection

* NSX: Fix concurrency issues on port forwarding rules deletion (#37)

* Fix concurrency issues on port forwarding rules deletion

* Refactor objectExists

* Fix unit test

* Fix test

* Small fixes

* CKS: Externalize control and worker node setup wait time and installation attempts (#38)

* NSX: Add shared network support (#41)

* NSX: Fix number of physical networks for Guest traffic checks and leftover rules on CKS cluster deletion (#45)

* Fix pf rules removal on CKS cluster deletion

* Fix check for number of physical networks for guest traffic

* Fix unit test

* fix logger

* NSX: Handle CheckHealthCommand to avoid host disconnection and errors on APIs

* NSX: Handle CheckHealthCommand to avoid host disconnection and errors on APIs

* Remove unused string

* fix logger

* Update UDP active monitor to ICMP

* Fix NPE on restarting VPC with additional public IPs

* NSX / VPC: Reuse Source NAT IP from systemVM range on restarts

* CKS: Public IP not found for VPC networks

* Externalize retries and inverval for NSX segment deletion (#67)

* remove unused import

* remove duplicate imports

* remove unused import

* revert externalizing cks settings

* fix test

* Refactor log messages

* Address comments

* Fix issue caused due to forward merge: 90fe1d

---------

Co-authored-by: Nicolas Vazquez <nicovazquez90@gmail.com>
Co-authored-by: Rohit Yadav <rohit.yadav@shapeblue.com>
  • Loading branch information
3 people authored Sep 6, 2024
1 parent f156c4e commit f8d8a9c
Show file tree
Hide file tree
Showing 24 changed files with 338 additions and 140 deletions.
10 changes: 10 additions & 0 deletions api/src/main/java/com/cloud/network/nsx/NsxService.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,19 @@

import com.cloud.network.IpAddress;
import com.cloud.network.vpc.Vpc;
import org.apache.cloudstack.framework.config.ConfigKey;

public interface NsxService {

ConfigKey<Integer> NSX_API_FAILURE_RETRIES = new ConfigKey<>("Advanced", Integer.class,
"nsx.api.failure.retries", "30",
"Number of retries for NSX API operations in case of failures",
true, ConfigKey.Scope.Zone);
ConfigKey<Integer> NSX_API_FAILURE_INTERVAL = new ConfigKey<>("Advanced", Integer.class,
"nsx.api.failure.interval", "60",
"Waiting time (in seconds) before retrying an NSX API operation in case of failure",
true, ConfigKey.Scope.Zone);

boolean createVpcNetwork(Long zoneId, long accountId, long domainId, Long vpcId, String vpcName, boolean sourceNatEnabled);
boolean updateVpcSourceNatIp(Vpc vpc, IpAddress address);
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,16 @@ public interface VpcManager {
throws ConcurrentOperationException, InsufficientCapacityException, ResourceAllocationException;

/**
* Assigns source nat public IP address to VPC
* Assigns source nat public IP address to VPC.
* In case of NSX backed VPCs: CloudStack deploys VRs with Public NIC IP different to the VPC source NAT IP, the source NAT IP is on the NSX Public range
*
* @param owner
* @param vpc
* @return public IP address object
* @throws InsufficientAddressCapacityException
* @throws ConcurrentOperationException
*/
PublicIp assignSourceNatIpAddressToVpc(Account owner, Vpc vpc) throws InsufficientAddressCapacityException, ConcurrentOperationException;
PublicIp assignSourceNatIpAddressToVpc(Account owner, Vpc vpc, Long podId) throws InsufficientAddressCapacityException, ConcurrentOperationException;

/**
* Validates network offering to find if it can be used for network creation in VPC
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1645,7 +1645,7 @@ public void implementNetworkElementsAndResources(final DeployDestination dest, f
if (ips.isEmpty()) {
final Vpc vpc = _vpcMgr.getActiveVpc(network.getVpcId());
logger.debug("Creating a source nat ip for vpc {}", vpc);
_vpcMgr.assignSourceNatIpAddressToVpc(owner, vpc);
_vpcMgr.assignSourceNatIpAddressToVpc(owner, vpc, null);
}
} else {
ips = _ipAddressDao.listByAssociatedNetwork(network.getId(), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,8 @@ public long getResourceId() {
public boolean isDisplay() {
return display;
}

public void setValue(String value) {
this.value = value;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@ public void process(VirtualMachineProfile vmProfile, DeploymentPlan plan, Exclud
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
_affinityGroupDao.listByIds(affinityGroupIdList, true);
if (!affinityGroupIdList.isEmpty()) {
_affinityGroupDao.listByIds(affinityGroupIdList, true);
}
for (AffinityGroupVMMapVO vmGroupMapping : vmGroupMappings) {
processAffinityGroup(vmGroupMapping, plan, vm, vmList);
}
Expand Down Expand Up @@ -149,7 +151,9 @@ public boolean check(VirtualMachineProfile vmProfile, DeployDestination plannedD
return Transaction.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
_affinityGroupDao.listByIds(affinityGroupIds, true);
if (!affinityGroupIds.isEmpty()) {
_affinityGroupDao.listByIds(affinityGroupIds, true);
}
for (AffinityGroupVMMapVO vmGroupMapping : vmGroupMappings) {
if (!checkAffinityGroup(vmGroupMapping, vm, plannedHostId)) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ public void process(VirtualMachineProfile vmProfile, DeploymentPlan plan, Exclud
Transaction.execute(new TransactionCallbackNoReturn() {
@Override
public void doInTransactionWithoutResult(TransactionStatus status) {
_affinityGroupDao.listByIds(affinityGroupIds, true);
if (!affinityGroupIds.isEmpty()) {
_affinityGroupDao.listByIds(affinityGroupIds, true);
}
for (AffinityGroupVMMapVO vmGroupMapping : vmGroupMappings) {
processAffinityGroup(vmGroupMapping, avoid, vm);
}
Expand Down Expand Up @@ -165,7 +167,9 @@ public boolean check(VirtualMachineProfile vmProfile, DeployDestination plannedD
return Transaction.execute(new TransactionCallback<Boolean>() {
@Override
public Boolean doInTransaction(TransactionStatus status) {
_affinityGroupDao.listByIds(affinityGroupIds, true);
if (!affinityGroupIds.isEmpty()) {
_affinityGroupDao.listByIds(affinityGroupIds, true);
}
for (AffinityGroupVMMapVO vmGroupMapping : vmGroupMappings) {
// if more than 1 VM's are present in the group then check for
// conflict due to parallel deployment
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ protected IpAddress getVpcTierKubernetesPublicIp(Network network) {
return null;
}
IpAddress address = ipAddressDao.findByUuid(detailsVO.getValue());
if (address == null || network.getVpcId() != address.getVpcId()) {
if (address == null || !Objects.equals(network.getVpcId(), address.getVpcId())) {
logger.warn(String.format("Public IP with ID: %s linked to the Kubernetes cluster: %s is not usable", detailsVO.getValue(), kubernetesCluster.getName()));
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,31 @@

package com.cloud.kubernetes.cluster.actionworkers;

import static com.cloud.utils.NumbersUtil.toHumanReadableSize;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import javax.inject.Inject;

import com.cloud.network.rules.FirewallManager;
import com.cloud.offering.NetworkOffering;
import com.cloud.offerings.dao.NetworkOfferingDao;
import org.apache.cloudstack.api.ApiConstants;
import org.apache.cloudstack.api.BaseCmd;
import org.apache.cloudstack.api.command.user.firewall.CreateFirewallRuleCmd;
import org.apache.cloudstack.api.command.user.network.CreateNetworkACLCmd;
import org.apache.cloudstack.api.command.user.volume.ResizeVolumeCmd;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;

import com.cloud.capacity.CapacityManager;
import com.cloud.dc.ClusterDetailsDao;
import com.cloud.dc.ClusterDetailsVO;
Expand Down Expand Up @@ -61,9 +86,7 @@
import com.cloud.network.vpc.NetworkACLItemDao;
import com.cloud.network.vpc.NetworkACLItemVO;
import com.cloud.network.vpc.NetworkACLService;
import com.cloud.offering.NetworkOffering;
import com.cloud.offering.ServiceOffering;
import com.cloud.offerings.dao.NetworkOfferingDao;
import com.cloud.resource.ResourceManager;
import com.cloud.storage.Volume;
import com.cloud.storage.VolumeApiService;
Expand All @@ -88,29 +111,9 @@
import com.cloud.vm.VmDetailConstants;
import com.cloud.vm.dao.VMInstanceDao;
import org.apache.cloudstack.api.ApiCommandResourceType;
import org.apache.cloudstack.api.ApiConstants;
import org.apache.cloudstack.api.BaseCmd;
import org.apache.cloudstack.api.command.user.firewall.CreateFirewallRuleCmd;
import org.apache.cloudstack.api.command.user.network.CreateNetworkACLCmd;
import org.apache.cloudstack.api.command.user.volume.ResizeVolumeCmd;
import org.apache.cloudstack.context.CallContext;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Level;

import javax.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import static com.cloud.utils.NumbersUtil.toHumanReadableSize;

public class KubernetesClusterResourceModifierActionWorker extends KubernetesClusterActionWorker {

@Inject
Expand All @@ -134,6 +137,8 @@ public class KubernetesClusterResourceModifierActionWorker extends KubernetesClu
@Inject
protected RulesService rulesService;
@Inject
protected FirewallManager firewallManager;
@Inject
protected PortForwardingRulesDao portForwardingRulesDao;
@Inject
protected ResourceManager resourceManager;
Expand Down Expand Up @@ -169,6 +174,7 @@ private String getKubernetesNodeConfig(final String joinIp, final boolean ejectI
final String joinIpKey = "{{ k8s_control_node.join_ip }}";
final String clusterTokenKey = "{{ k8s_control_node.cluster.token }}";
final String ejectIsoKey = "{{ k8s.eject.iso }}";

String pubKey = "- \"" + configurationDao.getValue("ssh.publickey") + "\"";
String sshKeyPair = kubernetesCluster.getKeyPair();
if (StringUtils.isNotEmpty(sshKeyPair)) {
Expand All @@ -181,7 +187,6 @@ private String getKubernetesNodeConfig(final String joinIp, final boolean ejectI
k8sNodeConfig = k8sNodeConfig.replace(joinIpKey, joinIp);
k8sNodeConfig = k8sNodeConfig.replace(clusterTokenKey, KubernetesClusterUtil.generateClusterToken(kubernetesCluster));
k8sNodeConfig = k8sNodeConfig.replace(ejectIsoKey, String.valueOf(ejectIso));

k8sNodeConfig = updateKubeConfigWithRegistryDetails(k8sNodeConfig);

return k8sNodeConfig;
Expand Down Expand Up @@ -522,17 +527,22 @@ protected FirewallRule removeSshFirewallRule(final IpAddress publicIp) {

protected void removePortForwardingRules(final IpAddress publicIp, final Network network, final Account account, final List<Long> removedVMIds) throws ResourceUnavailableException {
if (!CollectionUtils.isEmpty(removedVMIds)) {
List<PortForwardingRuleVO> pfRules = new ArrayList<>();
List<PortForwardingRuleVO> revokedRules = new ArrayList<>();
for (Long vmId : removedVMIds) {
List<PortForwardingRuleVO> pfRules = portForwardingRulesDao.listByNetwork(network.getId());
pfRules.addAll(portForwardingRulesDao.listByNetwork(network.getId()));
for (PortForwardingRuleVO pfRule : pfRules) {
if (pfRule.getVirtualMachineId() == vmId) {
portForwardingRulesDao.remove(pfRule.getId());
logger.trace("Marking PF rule {} with Revoke state", pfRule);
pfRule.setState(FirewallRule.State.Revoke);
revokedRules.add(pfRule);
logger.debug("The Port forwarding rule [%s] with the id [%s] was removed.", pfRule.getName(), pfRule.getId());
break;
}
}
}
rulesService.applyPortForwardingRules(publicIp.getId(), account);
firewallManager.applyRules(revokedRules, false, true);
}
}

Expand All @@ -542,10 +552,11 @@ protected void removePortForwardingRules(final IpAddress publicIp, final Network
for (PortForwardingRuleVO pfRule : pfRules) {
if (startPort <= pfRule.getSourcePortStart() && pfRule.getSourcePortStart() <= endPort) {
portForwardingRulesDao.remove(pfRule.getId());
logger.debug("The Port forwarding rule [{}] with the id [{}] was removed.", pfRule.getName(), pfRule.getId());
logger.debug("The Port forwarding rule [{}] with the id [{}] was mark as revoked.", pfRule.getName(), pfRule.getId());
pfRule.setState(FirewallRule.State.Revoke);
}
}
rulesService.applyPortForwardingRules(publicIp.getId(), account);
firewallManager.applyRules(pfRules, false, true);
}

protected void removeLoadBalancingRule(final IpAddress publicIp, final Network network,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ private String getKubernetesControlNodeConfig(final String controlNodeIp, final
final String clusterToken = "{{ k8s_control_node.cluster.token }}";
final String clusterInitArgsKey = "{{ k8s_control_node.cluster.initargs }}";
final String ejectIsoKey = "{{ k8s.eject.iso }}";

final List<String> addresses = new ArrayList<>();
addresses.add(controlNodeIp);
if (!serverIp.equals(controlNodeIp)) {
Expand Down Expand Up @@ -243,6 +244,7 @@ private String getKubernetesAdditionalControlNodeConfig(final String joinIp, fin
final String sshPubKey = "{{ k8s.ssh.pub.key }}";
final String clusterHACertificateKey = "{{ k8s_control_node.cluster.ha.certificate.key }}";
final String ejectIsoKey = "{{ k8s.eject.iso }}";

String pubKey = "- \"" + configurationDao.getValue("ssh.publickey") + "\"";
String sshKeyPair = kubernetesCluster.getKeyPair();
if (StringUtils.isNotEmpty(sshKeyPair)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
import com.cloud.agent.api.Command;

public class NsxAnswer extends Answer {

private boolean objectExists;

public NsxAnswer(final Command command, final boolean success, final String details) {
super(command, success, details);
}
Expand All @@ -28,4 +31,11 @@ public NsxAnswer(final Command command, final Exception e) {
super(command, e);
}

public boolean isObjectExistent() {
return objectExists;
}

public void setObjectExists(boolean objectExisted) {
this.objectExists = objectExisted;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.cloud.agent.IAgentControl;
import com.cloud.agent.api.Answer;
import com.cloud.agent.api.CheckHealthAnswer;
import com.cloud.agent.api.CheckHealthCommand;
import com.cloud.agent.api.Command;
import com.cloud.agent.api.PingCommand;
import com.cloud.agent.api.ReadyAnswer;
Expand Down Expand Up @@ -102,6 +104,8 @@ public PingCommand getCurrentStatus(long id) {
public Answer executeRequest(Command cmd) {
if (cmd instanceof ReadyCommand) {
return executeRequest((ReadyCommand) cmd);
} else if (cmd instanceof CheckHealthCommand) {
return executeRequest((CheckHealthCommand) cmd);
} else if (cmd instanceof DeleteNsxTier1GatewayCommand) {
return executeRequest((DeleteNsxTier1GatewayCommand) cmd);
} else if (cmd instanceof DeleteNsxSegmentCommand) {
Expand Down Expand Up @@ -293,6 +297,10 @@ private Answer executeRequest(ReadyCommand cmd) {
return new ReadyAnswer(cmd);
}

private Answer executeRequest(CheckHealthCommand cmd) {
return new CheckHealthAnswer(cmd, nsxApiClient.isNsxControllerActive());
}

private Answer executeRequest(CreateNsxTier1GatewayCommand cmd) {
String tier1GatewayName = NsxControllerUtils.getTier1GatewayName(cmd.getDomainId(), cmd.getAccountId(), cmd.getZoneId(), cmd.getNetworkResourceId(), cmd.isResourceVpc());
boolean sourceNatEnabled = cmd.isSourceNatEnabled();
Expand Down Expand Up @@ -385,16 +393,21 @@ private NsxAnswer executeRequest(CreateNsxPortForwardRuleCommand cmd) {
cmd.getNetworkResourceId(), cmd.isResourceVpc());
try {
String privatePort = cmd.getPrivatePort();
String service = privatePort.contains("-") ? nsxApiClient.getServicePath(ruleName, privatePort, cmd.getProtocol(), null, null) :
nsxApiClient.getNsxInfraServices(ruleName, privatePort, cmd.getProtocol(), null, null);
logger.debug("Checking if the rule {} exists on Tier 1 Gateway: {}", ruleName, tier1GatewayName);
if (nsxApiClient.doesPfRuleExist(ruleName, tier1GatewayName)) {
logger.debug(String.format("Port forward rule for port: %s exits on NSX, not adding it again", privatePort));
return new NsxAnswer(cmd, true, null);
String msg = String.format("Port forward rule for port: %s (%s) exits on NSX, not adding it again", ruleName, privatePort);
logger.debug(msg);
NsxAnswer answer = new NsxAnswer(cmd, true, msg);
answer.setObjectExists(true);
return answer;
}
String service = privatePort.contains("-") ? nsxApiClient.getServicePath(ruleName, privatePort, cmd.getProtocol(), null, null) :
nsxApiClient.getNsxInfraServices(ruleName, privatePort, cmd.getProtocol(), null, null);
nsxApiClient.createPortForwardingRule(ruleName, tier1GatewayName, cmd.getNetworkResourceName(), cmd.getPublicIp(),
cmd.getVmIp(), cmd.getPublicPort(), service);
} catch (Exception e) {
logger.error(String.format("Failed to add NSX port forward rule %s for network: %s", ruleName, cmd.getNetworkResourceName()));
String msg = String.format("Failed to add NSX port forward rule %s for network: %s", ruleName, cmd.getNetworkResourceName());
logger.error(msg, e);
return new NsxAnswer(cmd, new CloudRuntimeException(e.getMessage()));
}
return new NsxAnswer(cmd, true, null);
Expand All @@ -415,8 +428,9 @@ private NsxAnswer executeRequest(DeleteNsxNatRuleCommand cmd) {
nsxApiClient.deleteNatRule(cmd.getService(), cmd.getPrivatePort(), cmd.getProtocol(),
cmd.getNetworkResourceName(), tier1GatewayName, ruleName);
} catch (Exception e) {
logger.error(String.format("Failed to add NSX static NAT rule %s for network: %s", ruleName, cmd.getNetworkResourceName()));
return new NsxAnswer(cmd, new CloudRuntimeException(e.getMessage()));
String msg = String.format("Failed to delete NSX rule %s for network %s: due to %s", ruleName, cmd.getNetworkResourceName(), e.getMessage());
logger.error(msg, e);
return new NsxAnswer(cmd, new CloudRuntimeException(msg));
}
return new NsxAnswer(cmd, true, null);
}
Expand Down
Loading

0 comments on commit f8d8a9c

Please sign in to comment.