Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

azurerm_data_factory_pipeline: Support for activities #6224

Merged
merged 9 commits into from
Jun 2, 2020
37 changes: 37 additions & 0 deletions azurerm/internal/services/datafactory/data_factory.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datafactory

import (
"encoding/json"
"fmt"
"log"
"regexp"
Expand All @@ -9,6 +10,7 @@ import (

"github.com/Azure/azure-sdk-for-go/services/datafactory/mgmt/2018-06-01/datafactory"
"github.com/hashicorp/terraform-plugin-sdk/helper/schema"
"github.com/terraform-providers/terraform-provider-azurerm/azurerm/helpers/azure"
)

func validateAzureRMDataFactoryLinkedServiceDatasetName(v interface{}, k string) (warnings []string, errors []error) {
Expand Down Expand Up @@ -190,3 +192,38 @@ func flattenDataFactoryStructureColumns(input interface{}) []interface{} {
}
return output
}

func deserializeDataFactoryPipelineActivities(jsonData string) (*[]datafactory.BasicActivity, error) {
jsonData = fmt.Sprintf(`{ "activities": %s }`, jsonData)
pipeline := &datafactory.Pipeline{}
err := pipeline.UnmarshalJSON([]byte(jsonData))
if err != nil {
return nil, err
}
return pipeline.Activities, nil
}

func serializeDataFactoryPipelineActivities(activities *[]datafactory.BasicActivity) (string, error) {
pipeline := &datafactory.Pipeline{Activities: activities}
result, err := pipeline.MarshalJSON()
if err != nil {
return "nil", err
}

var m map[string]*json.RawMessage
err = json.Unmarshal(result, &m)
if err != nil {
return "", err
}

activitiesJson, err := json.Marshal(m["activities"])
if err != nil {
return "", err
}

return string(activitiesJson), nil
}

func suppressJsonOrderingDifference(_, old, new string, _ *schema.ResourceData) bool {
return azure.NormalizeJson(old) == azure.NormalizeJson(new)
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func resourceArmDataFactoryPipeline() *schema.Resource {
Optional: true,
},

"activities_json": {
Type: schema.TypeString,
Optional: true,
StateFunc: azure.NormalizeJson,
DiffSuppressFunc: suppressJsonOrderingDifference,
},

"annotations": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -107,11 +114,18 @@ func resourceArmDataFactoryPipelineCreateUpdate(d *schema.ResourceData, meta int
}
}

activitiesJson := d.Get("activities_json").(string)
activities, err := deserializeDataFactoryPipelineActivities(activitiesJson)
if err != nil {
return fmt.Errorf("Error parsing 'activities_json' for Data Factory Pipeline %q (Resource Group %q / Data Factory %q) ID: %+v", name, resourceGroupName, dataFactoryName, err)
}

description := d.Get("description").(string)
pipeline := &datafactory.Pipeline{
Parameters: expandDataFactoryParameters(d.Get("parameters").(map[string]interface{})),
Variables: expandDataFactoryVariables(d.Get("variables").(map[string]interface{})),
Description: &description,
Activities: activities,
}

if v, ok := d.GetOk("annotations"); ok {
Expand Down Expand Up @@ -187,6 +201,14 @@ func resourceArmDataFactoryPipelineRead(d *schema.ResourceData, meta interface{}
if err := d.Set("variables", variables); err != nil {
return fmt.Errorf("Error setting `variables`: %+v", err)
}

activitiesJson, err := serializeDataFactoryPipelineActivities(props.Activities)
if err != nil {
return fmt.Errorf("Error serializing `activities_json`: %+v", err)
}
if err := d.Set("activities_json", activitiesJson); err != nil {
return fmt.Errorf("Error setting `activities_json`: %+v", err)
}
}

return nil
Expand Down
165 changes: 165 additions & 0 deletions azurerm/internal/services/datafactory/data_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,168 @@ func TestAzureRmDataFactoryLinkedServiceConnectionStringDiff(t *testing.T) {
}
}
}

func TestAzureRmDataFactoryDeserializePipelineActivities(t *testing.T) {
cases := []struct {
Json string
ExpectActivityCount int
ExpectErr bool
}{
{
Json: "{}",
ExpectActivityCount: 0,
ExpectErr: true,
},
{
Json: `[
{
"type": "ForEach",
"typeProperties": {
"isSequential": true,
"items": {
"value": "@pipeline().parameters.OutputBlobNameList",
"type": "Expression"
},
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "BlobSource"
},
"sink": {
"type": "BlobSink"
},
"dataIntegrationUnits": 32
},
"inputs": [
{
"referenceName": "exampleDataset",
"parameters": {
"MyFolderPath": "examplecontainer",
"MyFileName": "examplecontainer.csv"
},
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "exampleDataset",
"parameters": {
"MyFolderPath": "examplecontainer",
"MyFileName": {
"value": "@item()",
"type": "Expression"
}
},
"type": "DatasetReference"
}
],
"name": "ExampleCopyActivity"
}
]
},
"name": "ExampleForeachActivity"
}
]`,
ExpectActivityCount: 1,
ExpectErr: false,
},
}

for _, tc := range cases {
items, err := deserializeDataFactoryPipelineActivities(tc.Json)
if err != nil {
if tc.ExpectErr {
t.Log("Expected error and got error")
return
}

t.Fatal(err)
}

if items == nil && !tc.ExpectErr {
t.Fatal("Expected items got nil")
}

if len(*items) != tc.ExpectActivityCount {
t.Fatal("Failed to deserialise pipeline")
}
}
}

func TestNormalizeJSON(t *testing.T) {
cases := []struct {
Old string
New string
Suppress bool
}{
{
Old: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "bob",
"value": "something"
}
}
]`,
New: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"value": "something",
"variableName": "bob"
}
}
]`,
Suppress: true,
},
{
Old: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "bobdifferent",
"value": "something"
}
}
]`,
New: `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"value": "something",
"variableName": "bob"
}
}
]`,
Suppress: false,
},
{
Old: `{ "notbob": "notbill" }`,
New: `{ "bob": "bill" }`,
Suppress: false,
},
}

for _, tc := range cases {
suppress := suppressJsonOrderingDifference("test", tc.Old, tc.New, nil)

if suppress != tc.Suppress {
t.Fatalf("Expected JsonOrderingDifference to be '%t' for '%s' '%s' - got '%t'", tc.Suppress, tc.Old, tc.New, suppress)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ func TestAccAzureRMDataFactoryPipeline_basic(t *testing.T) {
Config: testAccAzureRMDataFactoryPipeline_basic(data),
Check: resource.ComposeTestCheckFunc(
testCheckAzureRMDataFactoryPipelineExists(data.ResourceName),
resource.TestCheckResourceAttrSet(data.ResourceName, "activities_json"),
testCheckAzureRMDataFactoryPipelineHasAppenVarActivity(data.ResourceName, "Append variable1"),
),
},
data.ImportStep(),
Expand Down Expand Up @@ -115,6 +117,51 @@ func testCheckAzureRMDataFactoryPipelineExists(resourceName string) resource.Tes
}
}

func testCheckAzureRMDataFactoryPipelineHasAppenVarActivity(resourceName string, activityName string) resource.TestCheckFunc {
return func(s *terraform.State) error {
client := acceptance.AzureProvider.Meta().(*clients.Client).DataFactory.PipelinesClient
ctx := acceptance.AzureProvider.Meta().(*clients.Client).StopContext

rs, ok := s.RootModule().Resources[resourceName]
if !ok {
return fmt.Errorf("Not found: %s", resourceName)
}

name := rs.Primary.Attributes["name"]
dataFactoryName := rs.Primary.Attributes["data_factory_name"]
resourceGroup := rs.Primary.Attributes["resource_group_name"]

resp, err := client.Get(ctx, resourceGroup, dataFactoryName, name, "")
if err != nil {
if utils.ResponseWasNotFound(resp.Response) {
return fmt.Errorf("Bad: Data Factory Pipeline %q (Resource Group %q / Data Factory %q) does not exist", name, resourceGroup, dataFactoryName)
}
return fmt.Errorf("Bad: Get on DataFactoryPipelineClient: %+v", err)
}

activities := *resp.Activities
appvarActivity, _ := activities[0].AsAppendVariableActivity()
if *appvarActivity.Name != activityName {
return fmt.Errorf("Bad: Data Factory Pipeline %q (Resource Group %q / Data Factory %q) could not cast as activity", name, resourceGroup, dataFactoryName)
}

return nil
}
}

var activities_json = `[
{
"name": "Append variable1",
"type": "AppendVariable",
"dependsOn": [],
"userProperties": [],
"typeProperties": {
"variableName": "bob",
"value": "something"
}
}
]`

func testAccAzureRMDataFactoryPipeline_basic(data acceptance.TestData) string {
return fmt.Sprintf(`
provider "azurerm" {
Expand All @@ -136,8 +183,14 @@ resource "azurerm_data_factory_pipeline" "test" {
name = "acctest%d"
resource_group_name = azurerm_resource_group.test.name
data_factory_name = azurerm_data_factory.test.name
variables = {
"bob" = "item1"
}
activities_json = <<JSON
%s
JSON
}
`, data.RandomInteger, data.Locations.Primary, data.RandomInteger, data.RandomInteger)
`, data.RandomInteger, data.Locations.Primary, data.RandomInteger, data.RandomInteger, activities_json)
}

func testAccAzureRMDataFactoryPipeline_update1(data acceptance.TestData) string {
Expand Down