Skip to content

Commit

Permalink
azurerm_stream_analytics_output_servicebus_queue - support new proper…
Browse files Browse the repository at this point in the history
…ties property_columns and system_property_columns (#16572)
  • Loading branch information
Neil Ye authored Apr 29, 2022
1 parent 77497cd commit ff6470a
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@ func resourceStreamAnalyticsOutputServiceBusQueue() *pluginsdk.Resource {
},

"serialization": schemaStreamAnalyticsOutputSerialization(),

"property_columns": {
Type: pluginsdk.TypeList,
Optional: true,
Elem: &pluginsdk.Schema{
Type: pluginsdk.TypeString,
ValidateFunc: validation.StringIsNotEmpty,
},
},

"system_property_columns": {
Type: pluginsdk.TypeMap,
Optional: true,
Elem: &pluginsdk.Schema{
Type: pluginsdk.TypeString,
ValidateFunc: validation.StringIsNotEmpty,
},
},
},
}
}
Expand Down Expand Up @@ -123,6 +141,8 @@ func resourceStreamAnalyticsOutputServiceBusQueueCreateUpdate(d *pluginsdk.Resou
ServiceBusNamespace: utils.String(serviceBusNamespace),
SharedAccessPolicyKey: utils.String(sharedAccessPolicyKey),
SharedAccessPolicyName: utils.String(sharedAccessPolicyName),
PropertyColumns: utils.ExpandStringSlice(d.Get("property_columns").([]interface{})),
SystemPropertyColumns: d.Get("system_property_columns").(map[string]interface{}),
},
},
Serialization: serialization,
Expand Down Expand Up @@ -177,6 +197,8 @@ func resourceStreamAnalyticsOutputServiceBusQueueRead(d *pluginsdk.ResourceData,
d.Set("queue_name", v.QueueName)
d.Set("servicebus_namespace", v.ServiceBusNamespace)
d.Set("shared_access_policy_name", v.SharedAccessPolicyName)
d.Set("property_columns", v.PropertyColumns)
d.Set("system_property_columns", v.SystemPropertyColumns)

if err := d.Set("serialization", flattenStreamAnalyticsOutputSerialization(props.Serialization)); err != nil {
return fmt.Errorf("setting `serialization`: %+v", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,78 @@ func TestAccStreamAnalyticsOutputServiceBusQueue_requiresImport(t *testing.T) {
})
}

func TestAccStreamAnalyticsOutputServiceBusQueue_propertyColumns(t *testing.T) {
data := acceptance.BuildTestData(t, "azurerm_stream_analytics_output_servicebus_queue", "test")
r := StreamAnalyticsOutputServiceBusQueueResource{}

data.ResourceTest(t, r, []acceptance.TestStep{
{
Config: r.csv(data),
Check: acceptance.ComposeTestCheckFunc(
check.That(data.ResourceName).ExistsInAzure(r),
),
},
data.ImportStep("shared_access_policy_key"),
{
Config: r.propertyColumns(data),
Check: acceptance.ComposeTestCheckFunc(
check.That(data.ResourceName).ExistsInAzure(r),
),
},
data.ImportStep("shared_access_policy_key"),
{
Config: r.updatePropertyColumns(data),
Check: acceptance.ComposeTestCheckFunc(
check.That(data.ResourceName).ExistsInAzure(r),
),
},
data.ImportStep("shared_access_policy_key"),
{
Config: r.csv(data),
Check: acceptance.ComposeTestCheckFunc(
check.That(data.ResourceName).ExistsInAzure(r),
),
},
data.ImportStep("shared_access_policy_key"),
})
}

func TestAccStreamAnalyticsOutputServiceBusQueue_systemPropertyColumns(t *testing.T) {
data := acceptance.BuildTestData(t, "azurerm_stream_analytics_output_servicebus_queue", "test")
r := StreamAnalyticsOutputServiceBusQueueResource{}

data.ResourceTest(t, r, []acceptance.TestStep{
{
Config: r.csv(data),
Check: acceptance.ComposeTestCheckFunc(
check.That(data.ResourceName).ExistsInAzure(r),
),
},
data.ImportStep("shared_access_policy_key"),
{
Config: r.systemPropertyColumns(data),
Check: acceptance.ComposeTestCheckFunc(
check.That(data.ResourceName).ExistsInAzure(r),
),
},
data.ImportStep("shared_access_policy_key"),
{
Config: r.updateSystemPropertyColumns(data),
Check: acceptance.ComposeTestCheckFunc(
check.That(data.ResourceName).ExistsInAzure(r),
),
},
data.ImportStep("shared_access_policy_key"),
{
Config: r.csv(data),
Check: acceptance.ComposeTestCheckFunc(
check.That(data.ResourceName).ExistsInAzure(r),
),
},
data.ImportStep("shared_access_policy_key"),
})
}

func (r StreamAnalyticsOutputServiceBusQueueResource) Exists(ctx context.Context, client *clients.Client, state *pluginsdk.InstanceState) (*bool, error) {
name := state.Attributes["name"]
jobName := state.Attributes["stream_analytics_job_name"]
Expand Down Expand Up @@ -211,6 +283,113 @@ resource "azurerm_stream_analytics_output_servicebus_queue" "test" {
`, template, data.RandomInteger, data.RandomInteger, data.RandomInteger)
}

func (r StreamAnalyticsOutputServiceBusQueueResource) propertyColumns(data acceptance.TestData) string {
template := r.template(data)
return fmt.Sprintf(`
%s
resource "azurerm_stream_analytics_output_servicebus_queue" "test" {
name = "acctestinput-%d"
stream_analytics_job_name = azurerm_stream_analytics_job.test.name
resource_group_name = azurerm_stream_analytics_job.test.resource_group_name
queue_name = azurerm_servicebus_queue.test.name
servicebus_namespace = azurerm_servicebus_namespace.test.name
shared_access_policy_key = azurerm_servicebus_namespace.test.default_primary_key
shared_access_policy_name = "RootManageSharedAccessKey"
serialization {
type = "Csv"
encoding = "UTF8"
field_delimiter = ","
}
property_columns = ["column1", "column2", "column3"]
}
`, template, data.RandomInteger)
}

func (r StreamAnalyticsOutputServiceBusQueueResource) updatePropertyColumns(data acceptance.TestData) string {
template := r.template(data)
return fmt.Sprintf(`
%s
resource "azurerm_stream_analytics_output_servicebus_queue" "test" {
name = "acctestinput-%d"
stream_analytics_job_name = azurerm_stream_analytics_job.test.name
resource_group_name = azurerm_stream_analytics_job.test.resource_group_name
queue_name = azurerm_servicebus_queue.test.name
servicebus_namespace = azurerm_servicebus_namespace.test.name
shared_access_policy_key = azurerm_servicebus_namespace.test.default_primary_key
shared_access_policy_name = "RootManageSharedAccessKey"
serialization {
type = "Csv"
encoding = "UTF8"
field_delimiter = ","
}
property_columns = ["column1", "column3"]
}
`, template, data.RandomInteger)
}

func (r StreamAnalyticsOutputServiceBusQueueResource) systemPropertyColumns(data acceptance.TestData) string {
template := r.template(data)
return fmt.Sprintf(`
%s
resource "azurerm_stream_analytics_output_servicebus_queue" "test" {
name = "acctestinput-%d"
stream_analytics_job_name = azurerm_stream_analytics_job.test.name
resource_group_name = azurerm_stream_analytics_job.test.resource_group_name
queue_name = azurerm_servicebus_queue.test.name
servicebus_namespace = azurerm_servicebus_namespace.test.name
shared_access_policy_key = azurerm_servicebus_namespace.test.default_primary_key
shared_access_policy_name = "RootManageSharedAccessKey"
serialization {
type = "Csv"
encoding = "UTF8"
field_delimiter = ","
}
system_property_columns = {
TimeToLive = "100"
Label = "Test"
CorrelationId = "79b839ac-be78-4542-8185-098170483986"
}
}
`, template, data.RandomInteger)
}

func (r StreamAnalyticsOutputServiceBusQueueResource) updateSystemPropertyColumns(data acceptance.TestData) string {
template := r.template(data)
return fmt.Sprintf(`
%s
resource "azurerm_stream_analytics_output_servicebus_queue" "test" {
name = "acctestinput-%d"
stream_analytics_job_name = azurerm_stream_analytics_job.test.name
resource_group_name = azurerm_stream_analytics_job.test.resource_group_name
queue_name = azurerm_servicebus_queue.test.name
servicebus_namespace = azurerm_servicebus_namespace.test.name
shared_access_policy_key = azurerm_servicebus_namespace.test.default_primary_key
shared_access_policy_name = "RootManageSharedAccessKey"
serialization {
type = "Csv"
encoding = "UTF8"
field_delimiter = ","
}
system_property_columns = {
TimeToLive = "101"
CorrelationId = "79b839ac-be78-4542-8185-098170483986"
}
}
`, template, data.RandomInteger)
}

func (r StreamAnalyticsOutputServiceBusQueueResource) requiresImport(data acceptance.TestData) string {
template := r.json(data)
return fmt.Sprintf(`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ The following arguments are supported:

* `serialization` - (Required) A `serialization` block as defined below.

* `property_columns` - (Optional) A list of property columns to add to the Service Bus Queue output.

* `system_property_columns` - (Optional) A key-value pair of system property columns that will be attached to the outgoing messages for the Service Bus Queue Output.

-> **NOTE:** The acceptable keys are `ContentType`, `CorrelationId`, `Label`, `MessageId`, `PartitionKey`, `ReplyTo`, `ReplyToSessionId`, `ScheduledEnqueueTimeUtc`, `SessionId`, `TimeToLive` and `To`.

---

A `serialization` block supports the following:
Expand Down

0 comments on commit ff6470a

Please sign in to comment.