Skip to content

Commit

Permalink
refactor(kafka): reduce updates for storage_info
Browse files Browse the repository at this point in the history
  • Loading branch information
GlennChia committed May 17, 2022
1 parent 9766872 commit 917a661
Showing 1 changed file with 5 additions and 41 deletions.
46 changes: 5 additions & 41 deletions internal/service/kafka/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,13 +641,14 @@ func resourceClusterRead(ctx context.Context, d *schema.ResourceData, meta inter
func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
conn := meta.(*conns.AWSClient).KafkaConn

if d.HasChange("broker_node_group_info.0.ebs_volume_size") {
if d.HasChanges("broker_node_group_info.0.ebs_volume_size", "broker_node_group_info.0.storage_info") {
input := &kafka.UpdateBrokerStorageInput{
ClusterArn: aws.String(d.Id()),
CurrentVersion: aws.String(d.Get("current_version").(string)),
}
// case: deprecated ebs_volume_size replaced with storage_info
if d.HasChange("broker_node_group_info.0.storage_info") {
// case 1: deprecated ebs_volume_size replaced with storage_info
// case 2: regular update of storage_info
ebsVolumeInfo := &kafka.BrokerEBSVolumeInfo{
KafkaBrokerNodeId: aws.String("All"),
VolumeSizeGB: aws.Int64(int64(d.Get("broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size").(int))),
Expand All @@ -657,7 +658,7 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, meta int
}
input.TargetBrokerEBSVolumeInfo = []*kafka.BrokerEBSVolumeInfo{ebsVolumeInfo}
} else {
// case: regular update of deprecated ebs_volume_size
// case 3: regular update of deprecated ebs_volume_size
input.TargetBrokerEBSVolumeInfo = []*kafka.BrokerEBSVolumeInfo{
{
KafkaBrokerNodeId: aws.String("All"),
Expand All @@ -670,44 +671,7 @@ func resourceClusterUpdate(ctx context.Context, d *schema.ResourceData, meta int

// the following error is thrown if previous ebs_volume_size and new storage_info.ebs_storage_info.volume_size have the same value:
// BadRequestException: The request does not include any updates to the EBS volumes of the cluster. Verify the request, then try again
// ignore this error to allow users to replace deprecated ebs_volume_size with storage_info
if err != nil && !tfawserr.ErrMessageContains(err, kafka.ErrCodeBadRequestException, "The request does not include any updates to the EBS volumes of the cluster") {
return diag.Errorf("updating MSK Cluster (%s) broker storage: %s", d.Id(), err)
}

clusterOperationARN := aws.StringValue(output.ClusterOperationArn)

// when there are no changes, output.ClusterOperationArn is not returned leading to
// InvalidParameter: 1 validation error(s) found. - minimum field size of 1, DescribeClusterOperationInput.ClusterOperationArn.
// skip the wait if the EBS volume size is unchanged
if !tfawserr.ErrMessageContains(err, kafka.ErrCodeBadRequestException, "The request does not include any updates to the EBS volumes of the cluster") {
_, err = waitClusterOperationCompleted(ctx, conn, clusterOperationARN, d.Timeout(schema.TimeoutUpdate))

if err != nil {
return diag.Errorf("waiting for MSK Cluster (%s) operation (%s): %s", d.Id(), clusterOperationARN, err)
}
}
}

if d.HasChange("broker_node_group_info.0.storage_info") {
input := &kafka.UpdateBrokerStorageInput{
ClusterArn: aws.String(d.Id()),
CurrentVersion: aws.String(d.Get("current_version").(string)),
}
ebsVolumeInfo := &kafka.BrokerEBSVolumeInfo{
KafkaBrokerNodeId: aws.String("All"),
VolumeSizeGB: aws.Int64(int64(d.Get("broker_node_group_info.0.storage_info.0.ebs_storage_info.0.volume_size").(int))),
}
if v, ok := d.GetOk("broker_node_group_info.0.storage_info.0.ebs_storage_info.0.provisioned_throughput"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
ebsVolumeInfo.ProvisionedThroughput = expandProvisionedThroughput(v.([]interface{})[0].(map[string]interface{}))
}
input.TargetBrokerEBSVolumeInfo = []*kafka.BrokerEBSVolumeInfo{ebsVolumeInfo}

output, err := conn.UpdateBrokerStorageWithContext(ctx, input)

// the following error is thrown if previous ebs_volume_size and new storage_info.ebs_storage_info.volume_size have the same value:
// BadRequestException: The request does not include any updates to the EBS volumes of the cluster. Verify the request, then try again
// ignore this error to allow users to replace deprecated ebs_volume_size with storage_info
// ignore this error to allow users to replace deprecated ebs_volume_size with storage_info - Address case 1
if err != nil && !tfawserr.ErrMessageContains(err, kafka.ErrCodeBadRequestException, "The request does not include any updates to the EBS volumes of the cluster") {
return diag.Errorf("updating MSK Cluster (%s) broker storage: %s", d.Id(), err)
}
Expand Down

0 comments on commit 917a661

Please sign in to comment.