forked from jtriley/StarCluster
-
Notifications
You must be signed in to change notification settings - Fork 13
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Mich
committed
Jul 24, 2014
1 parent
f19f1ae
commit 0824e39
Showing
4 changed files
with
142 additions
and
64 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -23,6 +23,7 @@ | |
import warnings | ||
import datetime | ||
import json | ||
from collections import Counter | ||
|
||
import iptools | ||
|
||
|
@@ -423,7 +424,7 @@ def __init__(self, | |
cluster_group=None, | ||
force_spot_master=False, | ||
disable_cloudinit=False, | ||
subnet_id=None, | ||
subnet_ids=[], | ||
public_ips=None, | ||
plugins_order=[], | ||
config_on_master=False, | ||
|
@@ -456,14 +457,15 @@ def __init__(self, | |
|
||
self._cluster_group = None | ||
self._placement_group = None | ||
self._subnet = None | ||
self._zone = None | ||
self._master = None | ||
self._nodes = [] | ||
self._pool = None | ||
self._progress_bar = None | ||
self.__default_plugin = None | ||
self.__sge_plugin = None | ||
self._vpc_id = None | ||
self._subnet_zones_mapping = None | ||
|
||
def __repr__(self): | ||
return '<Cluster: %s (%s-node)>' % (self.cluster_tag, | ||
|
@@ -665,11 +667,15 @@ def __getstate__(self): | |
def _security_group(self): | ||
return static.SECURITY_GROUP_TEMPLATE % self.cluster_tag | ||
|
||
def get_vpc_id(self, subnet_id): | ||
subnet = self.ec2.get_subnet(self.subnet_ids[0]) | ||
return subnet.vpc_id | ||
|
||
@property | ||
def subnet(self): | ||
if not self._subnet and self.subnet_id: | ||
self._subnet = self.ec2.get_subnet(self.subnet_id) | ||
return self._subnet | ||
def vpc_id(self): | ||
if not self._vpc_id and self.subnet_ids: | ||
self._vpc_id = self.get_vpc_id(self.subnet_ids[0]) | ||
return self._vpc_id | ||
|
||
@property | ||
def cluster_group(self): | ||
|
@@ -678,19 +684,27 @@ def cluster_group(self): | |
sg = self.ec2.get_group_or_none(self._security_group) | ||
if not sg: | ||
desc = 'StarCluster-%s' % static.VERSION.replace('.', '_') | ||
if self.subnet: | ||
if self.subnet_ids: | ||
desc += ' (VPC)' | ||
vpc_id = getattr(self.subnet, 'vpc_id', None) | ||
sg = self.ec2.create_group(self._security_group, | ||
description=desc, | ||
auth_ssh=True, | ||
auth_group_traffic=True, | ||
vpc_id=vpc_id) | ||
vpc_id=self.vpc_id) | ||
self._add_tags_to_sg(sg) | ||
self._add_permissions_to_sg(sg) | ||
self._cluster_group = sg | ||
return sg | ||
|
||
@property | ||
def subnets_mapping(self): | ||
if self._subnet_zones_mapping is None: | ||
mapping = {} | ||
for subnet_id in self.subnet_ids: | ||
mapping[subnet_id] = self.ec2.get_subnet(subnet_id) | ||
self._subnet_zones_mapping = mapping | ||
return self._subnet_zones_mapping | ||
|
||
def _add_permissions_to_sg(self, sg): | ||
ssh_port = static.DEFAULT_SSH_PORT | ||
for p in self.permissions: | ||
|
@@ -730,7 +744,7 @@ def _get_settings(self): | |
node_instance_type=self.node_instance_type, | ||
availability_zone=self.availability_zone, | ||
dns_prefix=self.dns_prefix, | ||
subnet_id=self.subnet_id, | ||
subnet_ids=self.subnet_ids, | ||
public_ips=self.public_ips, | ||
disable_queue=self.disable_queue, | ||
disable_cloudinit=self.disable_cloudinit, | ||
|
@@ -985,7 +999,7 @@ def spot_requests(self): | |
states = ['active', 'open'] | ||
filters = {'state': states} | ||
vpc_id = self.cluster_group.vpc_id | ||
if vpc_id and self.subnet_id: | ||
if vpc_id and self.subnet_ids: | ||
# According to the EC2 API docs this *should* be | ||
# launch.network-interface.group-id but it doesn't work | ||
filters['network-interface.group-id'] = group_id | ||
|
@@ -1055,7 +1069,13 @@ def create_nodes(self, aliases, image_id=None, instance_type=None, | |
launch_group = availability_zone_group | ||
|
||
if spot_bid and not placement_group and zone is None: | ||
zone, price = self.ec2.get_spot_cheapest_zone(instance_type) | ||
zones_filter = None | ||
if self.subnet_ids: | ||
zones_filter = [s_net.availability_zone | ||
for s_net in self.subnets_mapping.values()] | ||
|
||
zone, price = self.ec2.get_spot_cheapest_zone(instance_type, | ||
zones_filter) | ||
log.info("Min price of %f found in zone %s", price, zone) | ||
if price > spot_bid: | ||
# Let amazon pick the first zone where the prices goes | ||
|
@@ -1075,10 +1095,19 @@ def create_nodes(self, aliases, image_id=None, instance_type=None, | |
placement=zone, | ||
user_data=user_data, | ||
placement_group=placement_group) | ||
if self.subnet_id: | ||
if self.subnet_ids: | ||
subnet_id = self.subnet_ids[0] # default subnet | ||
if zone: | ||
found = False | ||
for s_id, s_net in self.subnets_mapping.iteritems(): | ||
if s_net.availability_zone == zone: | ||
found = True | ||
subnet_id = s_id | ||
break | ||
assert found | ||
netif = self.ec2.get_network_spec( | ||
device_index=0, associate_public_ip_address=self.public_ips, | ||
subnet_id=self.subnet_id, groups=[self.cluster_group.id]) | ||
subnet_id=subnet_id, groups=[self.cluster_group.id]) | ||
kwargs.update( | ||
network_interfaces=self.ec2.get_network_collection(netif)) | ||
else: | ||
|
@@ -1087,7 +1116,7 @@ def create_nodes(self, aliases, image_id=None, instance_type=None, | |
if spot_bid: | ||
security_group_id = self.cluster_group.id | ||
for alias in aliases: | ||
if not self.subnet_id: | ||
if not self.subnet_ids: | ||
kwargs['security_group_ids'] = [security_group_id] | ||
kwargs['user_data'] = self._get_cluster_userdata([alias]) | ||
resvs.extend(self.ec2.request_instances(image_id, **kwargs)) | ||
|
@@ -1149,12 +1178,18 @@ def add_nodes(self, num_nodes, aliases=None, image_id=None, | |
raise exception.ClusterValidationError( | ||
"worker nodes cannot have master as an alias") | ||
if not no_create: | ||
if self.subnet: | ||
ip_count = self.subnet.available_ip_address_count | ||
if ip_count < len(aliases): | ||
raise exception.ClusterValidationError( | ||
"Not enough IP addresses available in %s (%d)" % | ||
(self.subnet.id, ip_count)) | ||
if self.subnet_ids: | ||
subnet = None | ||
tested = False | ||
for s_net in self.subnets_mapping.values(): | ||
if zone is None or s_net.availability_zone == zone: | ||
tested = True | ||
ip_count = s_net.available_ip_address_count | ||
if ip_count < len(aliases): | ||
raise exception.ClusterValidationError( | ||
"Not enough IP addresses available in " | ||
"%s (%d)" % (subnet.id, ip_count)) | ||
assert tested | ||
for node in running_pending: | ||
if node.alias in aliases: | ||
raise exception.ClusterValidationError( | ||
|
@@ -1323,7 +1358,7 @@ def create_cluster(self): | |
Launches all EC2 instances based on this cluster's settings. | ||
""" | ||
log.info("Launching a %d-node %s" % (self.cluster_size, ' '.join( | ||
['VPC' if self.subnet_id else '', 'cluster...']).strip())) | ||
['VPC' if self.subnet_ids else '', 'cluster...']).strip())) | ||
mtype = self.master_instance_type or self.node_instance_type | ||
self.master_instance_type = mtype | ||
if self.spot_bid: | ||
|
@@ -2514,44 +2549,75 @@ def validate_userdata(self): | |
"to store internal metadata" % ud_size_kb) | ||
|
||
def validate_vpc(self): | ||
if self.cluster.subnet_id: | ||
try: | ||
assert self.cluster.subnet is not None | ||
except exception.SubnetDoesNotExist as e: | ||
raise exception.ClusterValidationError(e) | ||
azone = self.cluster.availability_zone | ||
szone = self.cluster.subnet.availability_zone | ||
if not self.cluster.subnet_ids: | ||
return | ||
if self.cluster.public_ips is False: | ||
raise exception.ClusterValidationError( | ||
"Only VPC clusters can disable public IP addresses") | ||
|
||
try: | ||
assert self.cluster.vpc_id is not None | ||
except exception.SubnetDoesNotExist as e: | ||
raise exception.ClusterValidationError(e) | ||
|
||
azone = self.cluster.availability_zone | ||
if len(self.cluster.subnet_ids) > 1 and azone: | ||
This comment has been minimized.
Sorry, something went wrong.
This comment has been minimized.
Sorry, something went wrong.
FinchPowers
|
||
raise exception.ClusterValidationError( | ||
"Cannot define a cluster availability_zone with multiple " | ||
"subnets at the same time.") | ||
|
||
nodes = self.cluster.nodes | ||
zones = [] | ||
for subnet_id in self.cluster.subnet_ids: | ||
subnet = self.cluster.ec2.get_subnet(subnet_id) | ||
|
||
if subnet is None: | ||
raise exception.ClusterValidationError( | ||
"Subnet id {} not found".format(subnet_id)) | ||
vpc_id = self.cluster.get_vpc_id(subnet_id) | ||
assert vpc_id is not None | ||
if vpc_id != self.cluster.vpc_id: | ||
raise exception.ClusterValidationError( | ||
"All subnets must be in the same vpc") | ||
szone = subnet.availability_zone | ||
zones.append(szone) | ||
if azone and szone != azone: | ||
raise exception.ClusterValidationError( | ||
"The cluster availability_zone (%s) does not match the " | ||
"subnet zone (%s)" % (azone, szone)) | ||
ip_count = self.cluster.subnet.available_ip_address_count | ||
nodes = self.cluster.nodes | ||
"The cluster availability_zone (%s) does not match " | ||
"the subnet zone (%s)" % (azone, szone)) | ||
|
||
ip_count = subnet.available_ip_address_count | ||
if not nodes and ip_count < self.cluster.cluster_size: | ||
raise exception.ClusterValidationError( | ||
"Not enough IP addresses available in %s (%d)" % | ||
(self.cluster.subnet.id, ip_count)) | ||
if self.cluster.public_ips: | ||
gws = self.cluster.ec2.get_internet_gateways(filters={ | ||
'attachment.vpc-id': self.cluster.subnet.vpc_id}) | ||
if not gws: | ||
raise exception.ClusterValidationError( | ||
"No internet gateway attached to VPC: %s" % | ||
self.cluster.subnet.vpc_id) | ||
rtables = self.cluster.ec2.get_route_tables(filters={ | ||
'association.subnet-id': self.cluster.subnet_id, | ||
'route.destination-cidr-block': static.WORLD_CIDRIP, | ||
'route.gateway-id': gws[0].id}) | ||
if not rtables: | ||
raise exception.ClusterValidationError( | ||
"No route to %s found for subnet: %s" % | ||
(static.WORLD_CIDRIP, self.cluster.subnet_id)) | ||
else: | ||
log.warn(user_msgs.public_ips_disabled % | ||
dict(vpc_id=self.cluster.subnet.vpc_id)) | ||
elif self.cluster.public_ips is False: | ||
raise exception.ClusterValidationError( | ||
"Only VPC clusters can disable public IP addresses") | ||
|
||
zones_count = Counter(zones) | ||
for zone, count in zones_count.iteritems(): | ||
if count > 1: | ||
raise exception.ClusterValidationError( | ||
"You cannot define 2 or more subnets in the same zone. " | ||
"({})".format(zone)) | ||
|
||
if self.cluster.public_ips: | ||
gws = self.cluster.ec2.get_internet_gateways(filters={ | ||
'attachment.vpc-id': self.cluster.vpc_id}) | ||
if not gws: | ||
raise exception.ClusterValidationError( | ||
"No internet gateway attached to VPC: %s" % | ||
self.cluster.vpc_id) | ||
# for subnet_id in self.cluster.subnet_ids: | ||
# rtables = self.cluster.ec2.get_route_tables(filters={ | ||
# 'association.subnet-id': subnet_id, | ||
# 'route.destination-cidr-block': static.WORLD_CIDRIP, | ||
# 'route.gateway-id': gws[0].id}) | ||
# if not rtables: | ||
# raise exception.ClusterValidationError( | ||
# "No route to %s found for subnet: %s" % | ||
# (static.WORLD_CIDRIP, subnet_id)) | ||
else: | ||
log.warn(user_msgs.public_ips_disabled % | ||
dict(vpc_id=self.cluster.vpc_id)) | ||
|
||
|
||
if __name__ == "__main__": | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@FinchPowers I'm trying to use multiple subnets in my config file with the "SUBNET_IDS" key. The validation code here seems to require that the key only have a single entry. Is there a specific way/flag to use multiple subnets for spot instances?