Skip to content

Commit

Permalink
provider/aws: Added S3 Bucket replication
Browse files Browse the repository at this point in the history
  • Loading branch information
Ninir committed Dec 7, 2016
1 parent 5445990 commit 14419bb
Show file tree
Hide file tree
Showing 5 changed files with 755 additions and 11 deletions.
240 changes: 239 additions & 1 deletion builtin/providers/aws/resource_aws_s3_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,65 @@ func resourceAwsS3Bucket() *schema.Resource {
ValidateFunc: validateS3BucketRequestPayerType,
},

"replication_configuration": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"role": {
Type: schema.TypeString,
Required: true,
},
"rules": {
Type: schema.TypeSet,
Required: true,
Set: rulesHash,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"id": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validateS3BucketReplicationRuleId,
},
"destination": {
Type: schema.TypeSet,
MaxItems: 1,
MinItems: 1,
Required: true,
Set: destinationHash,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"bucket": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validateArn,
},
"storage_class": {
Type: schema.TypeString,
Optional: true,
ValidateFunc: validateS3BucketReplicationDestinationStorageClass,
},
},
},
},
"prefix": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validateS3BucketReplicationRulePrefix,
},
"status": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validateS3BucketReplicationRuleStatus,
},
},
},
},
},
},
},

"tags": tagsSchema(),
},
}
Expand Down Expand Up @@ -436,6 +495,12 @@ func resourceAwsS3BucketUpdate(d *schema.ResourceData, meta interface{}) error {
}
}

if d.HasChange("replication_configuration") {
if err := resourceAwsS3BucketReplicationConfigurationUpdate(s3conn, d); err != nil {
return err
}
}

return resourceAwsS3BucketRead(d, meta)
}

Expand Down Expand Up @@ -763,6 +828,24 @@ func resourceAwsS3BucketRead(d *schema.ResourceData, meta interface{}) error {
}
}

// Read the bucket replication configuration
replication, err := s3conn.GetBucketReplication(&s3.GetBucketReplicationInput{
Bucket: aws.String(d.Id()),
})
if err != nil {
if awsError, ok := err.(awserr.RequestFailure); ok && awsError.StatusCode() != 404 {
return err
}
}

log.Printf("[DEBUG] S3 Bucket: %s, read replication configuration: %v", d.Id(), replication)
if r := replication.ReplicationConfiguration; r != nil {
if err := d.Set("replication_configuration", flattenAwsS3BucketReplicationConfiguration(replication.ReplicationConfiguration)); err != nil {
log.Printf("[DEBUG] Error setting replication configuration: %s", err)
return err
}
}

// Add the region as an attribute
location, err := s3conn.GetBucketLocation(
&s3.GetBucketLocationInput{
Expand Down Expand Up @@ -1270,6 +1353,92 @@ func resourceAwsS3BucketRequestPayerUpdate(s3conn *s3.S3, d *schema.ResourceData
return nil
}

func resourceAwsS3BucketReplicationConfigurationUpdate(s3conn *s3.S3, d *schema.ResourceData) error {
bucket := d.Get("bucket").(string)
replicationConfiguration := d.Get("replication_configuration").([]interface{})

if len(replicationConfiguration) == 0 {
i := &s3.DeleteBucketReplicationInput{
Bucket: aws.String(bucket),
}

err := resource.Retry(1*time.Minute, func() *resource.RetryError {
if _, err := s3conn.DeleteBucketReplication(i); err != nil {
return resource.NonRetryableError(err)
}
return nil
})
if err != nil {
return fmt.Errorf("Error removing S3 bucket replication: %s", err)
}
return nil
}

hasVersioning := false
// Validate that bucket versioning is enabled
if versioning, ok := d.GetOk("versioning"); ok {
v := versioning.(*schema.Set).List()

if v[0].(map[string]interface{})["enabled"].(bool) {
hasVersioning = true
}
}

if !hasVersioning {
return fmt.Errorf("versioning must be enabled to allow S3 bucket replication")
}

c := replicationConfiguration[0].(map[string]interface{})

rc := &s3.ReplicationConfiguration{}
if val, ok := c["role"]; ok {
rc.Role = aws.String(val.(string))
}

rcRules := c["rules"].(*schema.Set).List()
rules := []*s3.ReplicationRule{}
for _, v := range rcRules {
rr := v.(map[string]interface{})
rcRule := &s3.ReplicationRule{
Prefix: aws.String(rr["prefix"].(string)),
Status: aws.String(rr["status"].(string)),
}

if rrid, ok := rr["id"]; ok {
rcRule.ID = aws.String(rrid.(string))
}

ruleDestination := &s3.Destination{}
if destination, ok := rr["destination"]; ok {
dest := destination.(*schema.Set).List()

bd := dest[0].(map[string]interface{})
ruleDestination.Bucket = aws.String(bd["bucket"].(string))

if storageClass, ok := bd["storage_class"]; ok {
ruleDestination.StorageClass = aws.String(storageClass.(string))
}
}
rcRule.Destination = ruleDestination

rules = append(rules, rcRule)
}

rc.Rules = rules
i := &s3.PutBucketReplicationInput{
Bucket: aws.String(bucket),
ReplicationConfiguration: rc,
}
log.Printf("[DEBUG] S3 put bucket replication configuration: %#v", i)

_, err := s3conn.PutBucketReplication(i)
if err != nil {
return fmt.Errorf("Error putting S3 replication configuration: %s", err)
}

return nil
}

func resourceAwsS3BucketLifecycleUpdate(s3conn *s3.S3, d *schema.ResourceData) error {
bucket := d.Get("bucket").(string)

Expand All @@ -1287,7 +1456,7 @@ func resourceAwsS3BucketLifecycleUpdate(s3conn *s3.S3, d *schema.ResourceData) e
return nil
})
if err != nil {
return fmt.Errorf("Error putting S3 lifecycle: %s", err)
return fmt.Errorf("Error removing S3 lifecycle: %s", err)
}
return nil
}
Expand Down Expand Up @@ -1418,6 +1587,46 @@ func resourceAwsS3BucketLifecycleUpdate(s3conn *s3.S3, d *schema.ResourceData) e
return nil
}

func flattenAwsS3BucketReplicationConfiguration(r *s3.ReplicationConfiguration) []map[string]interface{} {
replication_configuration := make([]map[string]interface{}, 0, 1)
m := make(map[string]interface{})

if r.Role != nil && *r.Role != "" {
m["role"] = *r.Role
}

rules := make([]interface{}, 0, len(r.Rules))
for _, v := range r.Rules {
t := make(map[string]interface{})
if v.Destination != nil {
rd := make(map[string]interface{})
if v.Destination.Bucket != nil {
rd["bucket"] = *v.Destination.Bucket
}
if v.Destination.StorageClass != nil {
rd["storage_class"] = *v.Destination.StorageClass
}
t["destination"] = schema.NewSet(destinationHash, []interface{}{rd})
}

if v.ID != nil {
t["id"] = *v.ID
}
if v.Prefix != nil {
t["prefix"] = *v.Prefix
}
if v.Status != nil {
t["status"] = *v.Status
}
rules = append(rules, t)
}
m["rules"] = schema.NewSet(rulesHash, rules)

replication_configuration = append(replication_configuration, m)

return replication_configuration
}

func normalizeRoutingRules(w []*s3.RoutingRule) (string, error) {
withNulls, err := json.Marshal(w)
if err != nil {
Expand Down Expand Up @@ -1537,6 +1746,35 @@ func transitionHash(v interface{}) int {
return hashcode.String(buf.String())
}

func rulesHash(v interface{}) int {
var buf bytes.Buffer
m := v.(map[string]interface{})

if v, ok := m["id"]; ok {
buf.WriteString(fmt.Sprintf("%s-", v.(string)))
}
if v, ok := m["prefix"]; ok {
buf.WriteString(fmt.Sprintf("%s-", v.(string)))
}
if v, ok := m["status"]; ok {
buf.WriteString(fmt.Sprintf("%s-", v.(string)))
}
return hashcode.String(buf.String())
}

func destinationHash(v interface{}) int {
var buf bytes.Buffer
m := v.(map[string]interface{})

if v, ok := m["bucket"]; ok {
buf.WriteString(fmt.Sprintf("%s-", v.(string)))
}
if v, ok := m["storage_class"]; ok {
buf.WriteString(fmt.Sprintf("%s-", v.(string)))
}
return hashcode.String(buf.String())
}

type S3Website struct {
Endpoint, Domain string
}
Loading

0 comments on commit 14419bb

Please sign in to comment.