From 3c32ec2862da438fc630ed8c0971146d33ca159a Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Tue, 7 Jun 2016 15:49:54 -0400 Subject: [PATCH] Add DNS support Instances that are suitably tagged will be configured in Route53, if --zone-name is passed to the controller --- cmd/aws-controller/main.go | 48 ++--- images/aws-controller/Dockerfile | 8 + images/route-controller/Dockerfile | 8 - .../instances/instancescontroller.go | 182 +++++++++------- pkg/kope/cloud.go | 8 + pkg/kope/kopeaws/awscloud.go | 199 ++++++++++++++++++ pkg/kope/kopeaws/dns.go | 129 ++++++++++++ pkg/kope/utils/utils.go | 14 ++ 8 files changed, 486 insertions(+), 110 deletions(-) create mode 100644 images/aws-controller/Dockerfile delete mode 100644 images/route-controller/Dockerfile create mode 100644 pkg/kope/cloud.go create mode 100644 pkg/kope/kopeaws/awscloud.go create mode 100644 pkg/kope/kopeaws/dns.go create mode 100644 pkg/kope/utils/utils.go diff --git a/cmd/aws-controller/main.go b/cmd/aws-controller/main.go index 47f104f..2f7d8d8 100644 --- a/cmd/aws-controller/main.go +++ b/cmd/aws-controller/main.go @@ -29,17 +29,13 @@ import ( "github.com/golang/glog" "github.com/spf13/pflag" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" - "github.com/aws/aws-sdk-go/aws/ec2metadata" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/ec2" "github.com/kopeio/aws-controller/pkg/awscontroller/instances" + "github.com/kopeio/aws-controller/pkg/kope" + "github.com/kopeio/aws-controller/pkg/kope/kopeaws" ) const ( - healthPort = 10249 + healthPort = 10245 ) var ( @@ -57,7 +53,8 @@ var ( //kubeConfig = flags.String("kubeconfig", "", "Path to kubeconfig file with authorization information.") //nodeName = flags.String("node-name", "", "name of this node") - flagClusterID = flags.String("cluster-id", "", "cluster id") + flagZoneName = flags.String("zone-name", "", "DNS zone name to use (if managing DNS)") + flagClusterID = flags.String("cluster-id", "", "cluster id") //systemUUIDPath = flags.String("system-uuid", "", "path to file containing system-uuid (as set in node status)") //bootIDPath = flags.String("boot-id", "", "path to file containing boot-id (as set in node status)") //providerID = flags.String("provider", "gre", "route backend to use") @@ -70,10 +67,6 @@ var ( profiling = flags.Bool("profiling", true, `Enable profiling via web interface host:port/debug/pprof/`) ) -// The tag name we use to differentiate multiple logically independent clusters running in the same AZ -const TagNameKubernetesCluster = "KubernetesCluster" - - func main() { flags.AddGoFlagSet(flag.CommandLine) @@ -81,29 +74,26 @@ func main() { glog.Infof("Using build: %v - %v", gitRepo, version) + cloud, err := kopeaws.NewAWSCloud() + if err != nil { + glog.Fatalf("error building cloud: %v", err) + } + clusterID := *flagClusterID + if clusterID == "" && cloud != nil { + clusterID = cloud.ClusterID() + } if clusterID == "" { glog.Fatalf("cluster-id flag must be set") } - tags := make(map[string]string) - tags[TagNameKubernetesCluster] = clusterID - - creds := credentials.NewChainCredentials( - []credentials.Provider{ - &credentials.EnvProvider{}, - &ec2rolecreds.EC2RoleProvider{ - Client: ec2metadata.New(session.New(&aws.Config{})), - }, - &credentials.SharedCredentialsProvider{}, - }) - - ec2 := ec2.New(session.New(&aws.Config{ - //Region: ®ionName, - Credentials: creds, - })) + var dns kope.DNSProvider + zoneName := *flagZoneName + if zoneName != "" { + dns = kopeaws.NewRoute53DNSProvider(zoneName) + } - c := instances.NewInstancesController(ec2, tags) + c := instances.NewInstancesController(cloud, *resyncPeriod, dns) sourceDestCheck := false c.SourceDestCheck = &sourceDestCheck diff --git a/images/aws-controller/Dockerfile b/images/aws-controller/Dockerfile new file mode 100644 index 0000000..eb8d29c --- /dev/null +++ b/images/aws-controller/Dockerfile @@ -0,0 +1,8 @@ +FROM debian:jessie + +RUN apt-get update && apt-get install --yes ca-certificates + +COPY /.build/artifacts/aws-controller /usr/bin/aws-controller + +CMD /usr/bin/aws-controller + diff --git a/images/route-controller/Dockerfile b/images/route-controller/Dockerfile deleted file mode 100644 index 9be7c00..0000000 --- a/images/route-controller/Dockerfile +++ /dev/null @@ -1,8 +0,0 @@ -FROM debian:jessie - -RUN apt-get update && apt-get install --yes ca-certificates - -COPY /.build/artifacts/route-controller /usr/bin/route-controller - -CMD /usr/bin/route-controller - diff --git a/pkg/awscontroller/instances/instancescontroller.go b/pkg/awscontroller/instances/instancescontroller.go index ec50a44..cf0d765 100644 --- a/pkg/awscontroller/instances/instancescontroller.go +++ b/pkg/awscontroller/instances/instancescontroller.go @@ -5,36 +5,43 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/ec2" "github.com/golang/glog" + "github.com/kopeio/aws-controller/pkg/kope" + "github.com/kopeio/aws-controller/pkg/kope/kopeaws" "k8s.io/kubernetes/pkg/util/runtime" "k8s.io/kubernetes/pkg/util/wait" + "sort" "sync" "time" ) type InstancesController struct { SourceDestCheck *bool - ec2 *ec2.EC2 - filterTags map[string]string + cloud *kopeaws.AWSCloud - period time.Duration + period time.Duration - instances map[string]*instance - sequence int + instances map[string]*instance + sequence int + + // dnsState holds the last configured DNS state + dns kope.DNSProvider + dnsState map[string][]string // stopLock is used to enforce only a single call to Stop is active. // Needed because we allow stopping through an http endpoint and // allowing concurrent stoppers leads to stack traces. - stopLock sync.Mutex - shutdown bool - stopCh chan struct{} + stopLock sync.Mutex + shutdown bool + stopCh chan struct{} } -func NewInstancesController(ec2 *ec2.EC2, filterTags map[string]string) *InstancesController { +func NewInstancesController(cloud *kopeaws.AWSCloud, period time.Duration, dns kope.DNSProvider) *InstancesController { c := &InstancesController{ - ec2: ec2, + cloud: cloud, instances: make(map[string]*instance), - period: 10 * time.Second, - filterTags: filterTags, + period: period, + dns: dns, + dnsState: make(map[string][]string), } return c } @@ -78,65 +85,33 @@ func (c *InstancesController) Run() { glog.Infof("shutting down route controller") } -func newEc2Filter(name string, value string) *ec2.Filter { - filter := &ec2.Filter{ - Name: aws.String(name), - Values: []*string{ - aws.String(value), - }, - } - return filter -} - - -// Add additional filters, to match on our tags -// This lets us run multiple k8s clusters in a single EC2 AZ -func (c *InstancesController) addFilterTags(filters []*ec2.Filter) []*ec2.Filter { - for k, v := range c.filterTags { - filters = append(filters, newEc2Filter("tag:" + k, v)) - } - if len(filters) == 0 { - // We can't pass a zero-length Filters to AWS (it's an error) - // So if we end up with no filters; just return nil - return nil - } - - return filters -} - func (c *InstancesController) runOnce() error { - request := &ec2.DescribeInstancesInput{ - Filters: c.addFilterTags(nil), + instances, err := c.cloud.DescribeInstances() + if err != nil { + return err } - glog.Infof("Querying EC2 instances") - c.sequence = c.sequence + 1 sequence := c.sequence - err := c.ec2.DescribeInstancesPages(request, func(p *ec2.DescribeInstancesOutput, lastPage bool) bool { - for _, r := range p.Reservations { - for _, awsInstance := range r.Instances { - id := aws.StringValue(awsInstance.InstanceId) - if id == "" { - runtime.HandleError(fmt.Errorf("skipping instance with empty instanceid: %v", awsInstance)) - continue - } - - i := c.instances[id] - if i == nil { - i = &instance{ - ID: id, - } - c.instances[id] = i - } - - i.status = awsInstance - i.sequence = sequence + for _, awsInstance := range instances { + id := aws.StringValue(awsInstance.InstanceId) + if id == "" { + runtime.HandleError(fmt.Errorf("skipping instance with empty instanceid: %v", awsInstance)) + continue + } + + i := c.instances[id] + if i == nil { + i = &instance{ + ID: id, } + c.instances[id] = i } - return true - }) + + i.status = awsInstance + i.sequence = sequence + } if err != nil { return fmt.Errorf("error doing EC2 describe instances: %v", err) @@ -153,7 +128,7 @@ func (c *InstancesController) runOnce() error { canSetSourceDestCheck := false instanceStateName := aws.StringValue(i.status.State.Name) - switch (instanceStateName) { + switch instanceStateName { case "pending": glog.V(2).Infof("Ignoring pending instance: %q", id) case "running": @@ -172,7 +147,7 @@ func (c *InstancesController) runOnce() error { } if canSetSourceDestCheck && c.SourceDestCheck != nil && *c.SourceDestCheck != aws.BoolValue(i.status.SourceDestCheck) { - err := c.configureInstanceSourceDestCheck(i.ID, *c.SourceDestCheck) + err := c.cloud.ConfigureInstanceSourceDestCheck(i.ID, *c.SourceDestCheck) if err != nil { runtime.HandleError(fmt.Errorf("failed to configure SourceDestCheck for instance %q: %v", i.ID, err)) } else { @@ -193,20 +168,81 @@ func (c *InstancesController) runOnce() error { glog.Infof("Found %d instances", len(c.instances)) + if c.dns != nil { + err = c.configureDNS(c.instances) + if err != nil { + return err + } + } + return nil } -// Sets the instance attribute "source-dest-check" to the specified value -func (c *InstancesController) configureInstanceSourceDestCheck(instanceID string, sourceDestCheck bool) error { - glog.Infof("Configuring SourceDestCheck on %q to %v", instanceID, sourceDestCheck) +func (c *InstancesController) configureDNS(instances map[string]*instance) error { + dnsState := make(map[string][]string) + + for _, i := range instances { + internalName, _ := kopeaws.FindTag(i.status, kopeaws.TagNameKubernetesDnsInternal) + if internalName != "" { + internalIP := aws.StringValue(i.status.PrivateIpAddress) + if internalIP != "" { + dnsState[internalName] = append(dnsState[internalName], internalIP) + } + } + publicName, _ := kopeaws.FindTag(i.status, kopeaws.TagNameKubernetesDnsPublic) + if publicName != "" { + publicIP := aws.StringValue(i.status.PublicIpAddress) + if publicIP != "" { + dnsState[publicName] = append(dnsState[publicName], publicIP) + } + } + } + + var changes map[string][]string + if c.dnsState == nil { + if len(dnsState) == 0 { + glog.V(2).Infof("No dns configuration to apply") + c.dnsState = dnsState + return nil + } else { + changes = dnsState + } + } else { + changes = make(map[string][]string) + for k, v := range dnsState { + sort.Strings(v) + lastV := c.dnsState[k] + if !StringSlicesEqual(lastV, v) { + glog.V(2).Infof("DNS change %s: %v -> %v", k, lastV, v) + changes[k] = v + } + } - request := &ec2.ModifyInstanceAttributeInput{} - request.InstanceId = aws.String(instanceID) - request.SourceDestCheck = &ec2.AttributeBooleanValue{Value: aws.Bool(sourceDestCheck)} + if len(changes) == 0 { + glog.V(2).Infof("DNS configuration unchanged") + return nil + } + } - _, err := c.ec2.ModifyInstanceAttribute(request) + err := c.dns.ApplyDNSChanges(changes) if err != nil { - return fmt.Errorf("error configuring source-dest-check on instance %q: %v", instanceID, err) + return fmt.Errorf("error applying DNS changes: %v", err) } + + glog.V(2).Infof("Applied DNS changes to %d hosts", len(changes)) + + c.dnsState = dnsState return nil } + +func StringSlicesEqual(l, r []string) bool { + if len(l) != len(r) { + return false + } + for i, v := range l { + if r[i] != v { + return false + } + } + return true +} diff --git a/pkg/kope/cloud.go b/pkg/kope/cloud.go new file mode 100644 index 0000000..27f185e --- /dev/null +++ b/pkg/kope/cloud.go @@ -0,0 +1,8 @@ +package kope + +type Cloud interface { +} + +type DNSProvider interface { + ApplyDNSChanges(records map[string][]string) error +} diff --git a/pkg/kope/kopeaws/awscloud.go b/pkg/kope/kopeaws/awscloud.go new file mode 100644 index 0000000..7c08595 --- /dev/null +++ b/pkg/kope/kopeaws/awscloud.go @@ -0,0 +1,199 @@ +package kopeaws + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/ec2" + "github.com/golang/glog" + "github.com/kopeio/aws-controller/pkg/kope" + "net" +) + +// The tag name we use to differentiate multiple logically independent clusters running in the same region +const TagNameKubernetesCluster = "KubernetesCluster" + +// Set to expose the public IP of this instance via DNS +const TagNameKubernetesDnsPublic = "k8s.io/dns/public" + +// Set to expose the internal IP of this instance via DNS +const TagNameKubernetesDnsInternal = "k8s.io/dns/internal" + +type AWSCloud struct { + ec2 *ec2.EC2 + metadata *ec2metadata.EC2Metadata + + zone string + instanceID string + + self *ec2.Instance + clusterID string + internalIP net.IP +} + +var _ kope.Cloud = &AWSCloud{} + +func NewAWSCloud() (*AWSCloud, error) { + a := &AWSCloud{} + + s := session.New() + s.Handlers.Send.PushFront(func(r *request.Request) { + // Log requests + glog.V(4).Infof("AWS API Request: %s/%s", r.ClientInfo.ServiceName, r.Operation.Name) + }) + + config := aws.NewConfig() + a.metadata = ec2metadata.New(s, config) + + region, err := a.metadata.Region() + if err != nil { + return nil, fmt.Errorf("error querying ec2 metadata service (for az/region): %v", err) + } + + a.zone, err = a.metadata.GetMetadata("placement/availability-zone") + if err != nil { + return nil, fmt.Errorf("error querying ec2 metadata service (for az): %v", err) + } + + a.instanceID, err = a.metadata.GetMetadata("instance-id") + if err != nil { + return nil, fmt.Errorf("error querying ec2 metadata service (for instance-id): %v", err) + } + + a.ec2 = ec2.New(s, config.WithRegion(region)) + + err = a.getSelfInstance() + if err != nil { + return nil, err + } + + return a, nil +} + +func (a *AWSCloud) ClusterID() string { + return a.clusterID +} + +func (a *AWSCloud) getSelfInstance() error { + instance, err := a.describeInstance(a.instanceID) + if err != nil { + return err + } + + a.self = instance + + clusterID, _ := FindTag(instance, TagNameKubernetesCluster) + if clusterID == "" { + return fmt.Errorf("Cluster tag %q not found on this instance (%q)", TagNameKubernetesCluster, a.instanceID) + } + + a.clusterID = clusterID + + a.internalIP = net.ParseIP(aws.StringValue(instance.PrivateIpAddress)) + if a.internalIP == nil { + return fmt.Errorf("Internal IP not found on this instance (%q)", a.instanceID) + } + + return nil +} + +func (a *AWSCloud) describeInstance(instanceID string) (*ec2.Instance, error) { + request := &ec2.DescribeInstancesInput{} + request.InstanceIds = []*string{&instanceID} + + var instances []*ec2.Instance + err := a.ec2.DescribeInstancesPages(request, func(p *ec2.DescribeInstancesOutput, lastPage bool) (shouldContinue bool) { + for _, r := range p.Reservations { + instances = append(instances, r.Instances...) + } + return true + }) + + if err != nil { + return nil, fmt.Errorf("error querying for EC2 instance %q: %v", instanceID, err) + } + + if len(instances) != 1 { + return nil, fmt.Errorf("unexpected number of instances found with id %q: %d", instanceID, len(instances)) + } + + return instances[0], nil +} + +// Add additional filters, to match on our tags +// This lets us run multiple k8s clusters in a single EC2 AZ +func (a *AWSCloud) addFilterTags(filters []*ec2.Filter) []*ec2.Filter { + //for k, v := range c.filterTags { + filters = append(filters, newEc2Filter("tag:"+TagNameKubernetesCluster, a.clusterID)) + //} + if len(filters) == 0 { + // We can't pass a zero-length Filters to AWS (it's an error) + // So if we end up with no filters; just return nil + return nil + } + + return filters +} + +func (a *AWSCloud) DescribeInstances() ([]*ec2.Instance, error) { + request := &ec2.DescribeInstancesInput{ + Filters: a.addFilterTags(nil), + } + + glog.Infof("Querying EC2 instances") + + var instances []*ec2.Instance + + err := a.ec2.DescribeInstancesPages(request, func(p *ec2.DescribeInstancesOutput, lastPage bool) bool { + for _, r := range p.Reservations { + for _, i := range r.Instances { + instances = append(instances, i) + } + } + return true + }) + + if err != nil { + return nil, fmt.Errorf("error doing EC2 describe instances: %v", err) + } + + return instances, nil +} + +// Sets the instance attribute "source-dest-check" to the specified value +func (a *AWSCloud) ConfigureInstanceSourceDestCheck(instanceID string, sourceDestCheck bool) error { + glog.Infof("Configuring SourceDestCheck on %q to %v", instanceID, sourceDestCheck) + + request := &ec2.ModifyInstanceAttributeInput{} + request.InstanceId = aws.String(instanceID) + request.SourceDestCheck = &ec2.AttributeBooleanValue{Value: aws.Bool(sourceDestCheck)} + + _, err := a.ec2.ModifyInstanceAttribute(request) + if err != nil { + return fmt.Errorf("error configuring source-dest-check on instance %q: %v", instanceID, err) + } + return nil +} + +func newEc2Filter(name string, value string) *ec2.Filter { + filter := &ec2.Filter{ + Name: aws.String(name), + Values: []*string{ + aws.String(value), + }, + } + return filter +} + +func FindTag(instance *ec2.Instance, name string) (string, bool) { + for _, tag := range instance.Tags { + k := aws.StringValue(tag.Key) + if k == name { + return aws.StringValue(tag.Value), true + } + } + + return "", false +} diff --git a/pkg/kope/kopeaws/dns.go b/pkg/kope/kopeaws/dns.go new file mode 100644 index 0000000..57f1e7f --- /dev/null +++ b/pkg/kope/kopeaws/dns.go @@ -0,0 +1,129 @@ +package kopeaws + +import ( + "fmt" + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/route53" + "github.com/golang/glog" + "github.com/kopeio/aws-controller/pkg/kope" + "github.com/kopeio/aws-controller/pkg/kope/utils" + "strings" + "time" +) + +var defaultTTL = time.Minute + +// TODO: Replace with k8s built-in helpers + +type Route53DNSProvider struct { + zoneName string + route53 *route53.Route53 + + zone *route53.HostedZone +} + +var _ kope.DNSProvider = &Route53DNSProvider{} + +func NewRoute53DNSProvider(zoneName string) *Route53DNSProvider { + s := session.New() + s.Handlers.Send.PushFront(func(r *request.Request) { + // Log requests + glog.V(4).Infof("AWS API Request: %s/%s", r.ClientInfo.ServiceName, r.Operation.Name) + }) + + config := aws.NewConfig() + + route53 := route53.New(s, config) + + return &Route53DNSProvider{ + route53: route53, + zoneName: zoneName, + } +} + +func (d *Route53DNSProvider) ApplyDNSChanges(dns map[string][]string) error { + return d.set(dns, defaultTTL) +} + +func (d *Route53DNSProvider) getZone() (*route53.HostedZone, error) { + if d.zone != nil { + return d.zone, nil + } + + findZone := d.zoneName + if !strings.HasSuffix(findZone, ".") { + findZone += "." + } + request := &route53.ListHostedZonesByNameInput{ + DNSName: aws.String(findZone), + } + + response, err := d.route53.ListHostedZonesByName(request) + if err != nil { + return nil, fmt.Errorf("error querying for DNS HostedZones %q: %v", findZone, err) + } + + var zones []*route53.HostedZone + for _, zone := range response.HostedZones { + if aws.StringValue(zone.Name) == findZone { + zones = append(zones, zone) + } + } + if len(zones) == 0 { + return nil, nil + } + if len(zones) != 1 { + return nil, fmt.Errorf("found multiple hosted zones matched name %q", findZone) + } + + d.zone = zones[0] + + return d.zone, nil +} + +func (d *Route53DNSProvider) set(records map[string][]string, ttl time.Duration) error { + zone, err := d.getZone() + if err != nil { + return err + } + + changeBatch := &route53.ChangeBatch{} + for name, hosts := range records { + rrs := &route53.ResourceRecordSet{ + Name: aws.String(name), + Type: aws.String("A"), + TTL: aws.Int64(int64(ttl.Seconds())), + } + + for _, host := range hosts { + rr := &route53.ResourceRecord{ + Value: aws.String(host), + } + rrs.ResourceRecords = append(rrs.ResourceRecords, rr) + } + + change := &route53.Change{ + Action: aws.String("UPSERT"), + ResourceRecordSet: rrs, + } + changeBatch.Changes = append(changeBatch.Changes, change) + } + + request := &route53.ChangeResourceRecordSetsInput{} + request.HostedZoneId = zone.Id + request.ChangeBatch = changeBatch + + glog.V(2).Infof("Updating DNS records %q", records) + glog.V(4).Infof("route53 request: %s", utils.DebugString(request)) + + response, err := d.route53.ChangeResourceRecordSets(request) + if err != nil { + return fmt.Errorf("error creating ResourceRecordSets: %v", err) + } + + glog.V(2).Infof("Change id is %q", aws.StringValue(response.ChangeInfo.Id)) + + return nil +} diff --git a/pkg/kope/utils/utils.go b/pkg/kope/utils/utils.go new file mode 100644 index 0000000..4c16440 --- /dev/null +++ b/pkg/kope/utils/utils.go @@ -0,0 +1,14 @@ +package utils + +import ( + "encoding/json" + "fmt" +) + +func DebugString(o interface{}) string { + b, err := json.Marshal(o) + if err != nil { + return fmt.Sprintf("error marshaling %T: %v", o, err) + } + return string(b) +}