Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] - Smart bisect faild change batch #1641

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 82 additions & 39 deletions provider/aws/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,48 +415,86 @@ func (p *AWSProvider) submitChanges(ctx context.Context, changes []*route53.Chan

var failedZones []string
for z, cs := range changesByZone {
var failedUpdate bool
errors := p.applyWithBisect(ctx, cs, z, aws.StringValue(zones[z].Name))
if len(errors) > 0 {
failedZones = append(failedZones, z)
}
}

batchCs := batchChangeSet(cs, p.batchChangeSize)
if len(failedZones) > 0 {
return fmt.Errorf("failed to submit all changes for the following zones: %v", failedZones)
}

for i, b := range batchCs {
for _, c := range b {
log.Infof("Desired change: %s %s %s [Id: %s]", *c.Action, *c.ResourceRecordSet.Name, *c.ResourceRecordSet.Type, z)
}
return nil
}

if !p.dryRun {
params := &route53.ChangeResourceRecordSetsInput{
HostedZoneId: aws.String(z),
ChangeBatch: &route53.ChangeBatch{
Changes: b,
},
}
func (p *AWSProvider) applyWithBisect(ctx context.Context, changes []*route53.Change, z string, zoneName string) []error {
res := []error{}

if _, err := p.client.ChangeResourceRecordSetsWithContext(ctx, params); err != nil {
log.Errorf("Failure in zone %s [Id: %s]", aws.StringValue(zones[z].Name), z)
log.Error(err) //TODO(ideahitme): consider changing the interface in cases when this error might be a concern for other components
failedUpdate = true
} else {
// z is the R53 Hosted Zone ID already as aws.StringValue
log.Infof("%d record(s) in zone %s [Id: %s] were successfully updated", len(b), aws.StringValue(zones[z].Name), z)
}
batchCs := batchChangeSet(changes, p.batchChangeSize)

if i != len(batchCs)-1 {
time.Sleep(p.batchChangeInterval)
}
for i, b := range batchCs {
if p.ChangeZone(ctx, b, z, zoneName) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take the opportunity to inverse the return value here (or return an error). It's unexpected that p.ChangeZone() => true means that an error happened.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could introduce a feature flag here that can be used to toggle between the old approach and the better bisect handling. I think it's fairly easy to do and it would greatly help with getting this merged.

changesByName, names := getChangesByName(b)

if len(names) == 1 {
return []error{fmt.Errorf("failed to submit changes: %v", b)}
}

size := len(names) / 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to ensure the minimum size is 2 because the main record and the ownership TXT records should be applied in the same transaction so that they either are both or applied or not at all.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For safety we should also ensure that names is even I guess. Otherwise this logic might become incorrect.

b1 := getChangesFromNames(changesByName, names[:size])
b2 := getChangesFromNames(changesByName, names[size:])

time.Sleep(p.batchChangeInterval)
res = append(res, p.applyWithBisect(ctx, b1, z, zoneName)...)
time.Sleep(p.batchChangeInterval)
res = append(res, p.applyWithBisect(ctx, b2, z, zoneName)...)
}

if failedUpdate {
failedZones = append(failedZones, z)
if i != len(batchCs)-1 {
time.Sleep(p.batchChangeInterval)
}
}

if len(failedZones) > 0 {
return fmt.Errorf("failed to submit all changes for the following zones: %v", failedZones)
return res
}

func getChangesFromNames(changesByName map[string][]*route53.Change, names []string) []*route53.Change {
res := make([]*route53.Change, 0)

for _, n := range names {
res = append(res, changesByName[n]...)
}

return nil
return res
}

func (p *AWSProvider) ChangeZone(ctx context.Context, cs []*route53.Change, z string, zoneName string) bool {
var failedUpdate bool

for _, c := range cs {
log.Infof("Desired change: %s %s %s [Id: %s]", *c.Action, *c.ResourceRecordSet.Name, *c.ResourceRecordSet.Type, z)
}

if !p.dryRun {
params := &route53.ChangeResourceRecordSetsInput{
HostedZoneId: aws.String(z),
ChangeBatch: &route53.ChangeBatch{
Changes: cs,
},
}

if _, err := p.client.ChangeResourceRecordSetsWithContext(ctx, params); err != nil {
log.Errorf("Failure in zone %s [Id: %s]", zoneName, z)
log.Error(err) //TODO(ideahitme): consider changing the interface in cases when this error might be a concern for other components
failedUpdate = true
} else {
// z is the R53 Hosted Zone ID already as aws.StringValue
log.Infof("%d record(s) in zone %s [Id: %s] were successfully updated", len(cs), zoneName, z)
}
}

return failedUpdate
}

// newChanges returns a collection of Changes based on the given records and action.
Expand Down Expand Up @@ -598,16 +636,7 @@ func batchChangeSet(cs []*route53.Change, batchSize int) [][]*route53.Change {

batchChanges := make([][]*route53.Change, 0)

changesByName := make(map[string][]*route53.Change)
for _, v := range cs {
changesByName[*v.ResourceRecordSet.Name] = append(changesByName[*v.ResourceRecordSet.Name], v)
}

names := make([]string, 0)
for v := range changesByName {
names = append(names, v)
}
sort.Strings(names)
changesByName, names := getChangesByName(cs)

for _, name := range names {
totalChangesByName := len(changesByName[name])
Expand Down Expand Up @@ -638,6 +667,20 @@ func batchChangeSet(cs []*route53.Change, batchSize int) [][]*route53.Change {
return batchChanges
}

func getChangesByName(cs []*route53.Change) (map[string][]*route53.Change, []string) {
changesByName := make(map[string][]*route53.Change)
for _, v := range cs {
changesByName[*v.ResourceRecordSet.Name] = append(changesByName[*v.ResourceRecordSet.Name], v)
}

names := make([]string, 0)
for v := range changesByName {
names = append(names, v)
}
sort.Strings(names)
return changesByName, names
}

func sortChangesByActionNameType(cs []*route53.Change) []*route53.Change {
sort.SliceStable(cs, func(i, j int) bool {
if *cs[i].Action > *cs[j].Action {
Expand Down
38 changes: 33 additions & 5 deletions provider/aws/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,21 +716,49 @@ func TestAWSsubmitChanges(t *testing.T) {
}

func TestAWSsubmitChangesError(t *testing.T) {
provider, clientStub := newAWSProvider(t, endpoint.NewDomainFilter([]string{"ext-dns-test-2.teapot.zalan.do."}), provider.NewZoneIDFilter([]string{}), provider.NewZoneTypeFilter(""), defaultEvaluateTargetHealth, false, []*endpoint.Endpoint{})
clientStub.MockMethod("ChangeResourceRecordSets", mock.Anything).Return(nil, fmt.Errorf("Mock route53 failure"))
getParams := func(changes []*route53.Change) *route53.ChangeResourceRecordSetsInput {
return &route53.ChangeResourceRecordSetsInput{
HostedZoneId: aws.String("/hostedzone/zone-1.ext-dns-test-2.teapot.zalan.do."),
ChangeBatch: &route53.ChangeBatch{
Changes: changes,
},
}
}

ep1 := endpoint.NewEndpointWithTTL("ep1.zone-1.ext-dns-test-2.teapot.zalan.do", endpoint.RecordTypeA, endpoint.TTL(recordTTL), "1.0.0.1")
ep2Del := endpoint.NewEndpointWithTTL("ep2.zone-1.ext-dns-test-2.teapot.zalan.do", endpoint.RecordTypeA, endpoint.TTL(recordTTL), "1.0.0.1")
ep2New := endpoint.NewEndpointWithTTL("ep2.zone-1.ext-dns-test-2.teapot.zalan.do", endpoint.RecordTypeA, endpoint.TTL(recordTTL), "2.0.0.2")

provider, clientStub := newAWSProvider(t, endpoint.NewDomainFilter([]string{"ext-dns-test-2.teapot.zalan.do."}), provider.NewZoneIDFilter([]string{}), provider.NewZoneTypeFilter(""), defaultEvaluateTargetHealth, false, []*endpoint.Endpoint{ep2Del})

ctx := context.Background()
zones, err := provider.Zones(ctx)
require.NoError(t, err)
records, err := provider.Records(ctx)
require.NoError(t, err)

ep := endpoint.NewEndpointWithTTL("fail.zone-1.ext-dns-test-2.teapot.zalan.do", endpoint.RecordTypeA, endpoint.TTL(recordTTL), "1.0.0.1")
cs := provider.newChanges(route53.ChangeActionCreate, []*endpoint.Endpoint{ep}, records, zones)
first := append(provider.newChanges(route53.ChangeActionDelete, []*endpoint.Endpoint{ep2Del}, records, zones),
provider.newChanges(route53.ChangeActionCreate, []*endpoint.Endpoint{ep1, ep2New}, records, zones)...)

second := append(provider.newChanges(route53.ChangeActionDelete, []*endpoint.Endpoint{ep2Del}, records, zones),
provider.newChanges(route53.ChangeActionCreate, []*endpoint.Endpoint{ep2New}, records, zones)...)

clientStub.MockMethod("ChangeResourceRecordSets", getParams(first)).Return(nil, fmt.Errorf("Mock route53 failure"))

clientStub.MockMethod("ChangeResourceRecordSets", getParams(second)).Return(nil, fmt.Errorf("Mock route53 failure"))

got := provider.submitChanges(ctx, first, zones)
require.Error(t, got)

records, err = provider.Records(ctx)
require.NoError(t, err)

validateEndpoints(t, records, []*endpoint.Endpoint{ep1, ep2Del})

require.Error(t, provider.submitChanges(ctx, cs, zones))
}



func TestAWSBatchChangeSet(t *testing.T) {
var cs []*route53.Change

Expand Down