From 9fd8f887401c0931bbbd18d3b3c4d770b8410fd4 Mon Sep 17 00:00:00 2001 From: Jakub Michalak Date: Thu, 31 Oct 2024 16:53:00 +0100 Subject: [PATCH] fix: Handle external type changes in stream resources (#3164) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - handle external type changes in stream resources ## Test Plan * [x] acceptance tests * [ ] … ## References --- docs/resources/stream_on_directory_table.md | 1 + docs/resources/stream_on_external_table.md | 1 + docs/resources/stream_on_table.md | 1 + docs/resources/stream_on_view.md | 1 + .../stream_on_directory_table_resource_gen.go | 5 ++ .../stream_on_external_table_resource_gen.go | 5 ++ .../stream_on_table_resource_gen.go | 5 ++ .../stream_on_view_resource_gen.go | 5 ++ pkg/acceptance/helpers/stream_client.go | 13 ++++ pkg/resources/custom_diffs.go | 32 ++++++++++ pkg/resources/stream_common.go | 6 ++ pkg/resources/stream_on_directory_table.go | 1 + ...ream_on_directory_table_acceptance_test.go | 55 +++++++++++++++++ pkg/resources/stream_on_external_table.go | 1 + ...tream_on_external_table_acceptance_test.go | 60 +++++++++++++++++++ pkg/resources/stream_on_table.go | 1 + .../stream_on_table_acceptance_test.go | 57 ++++++++++++++++++ pkg/resources/stream_on_view.go | 1 + .../stream_on_view_acceptance_test.go | 59 ++++++++++++++++++ pkg/sdk/streams_dto_gen.go | 4 ++ 20 files changed, 314 insertions(+) diff --git a/docs/resources/stream_on_directory_table.md b/docs/resources/stream_on_directory_table.md index 490ba2eae1..ec6abfdb5e 100644 --- a/docs/resources/stream_on_directory_table.md +++ b/docs/resources/stream_on_directory_table.md @@ -73,6 +73,7 @@ resource "snowflake_stream_on_directory_table" "stream" { - `id` (String) The ID of this resource. - `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output)) - `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness). +- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed. ### Nested Schema for `describe_output` diff --git a/docs/resources/stream_on_external_table.md b/docs/resources/stream_on_external_table.md index 3b773cbfcd..e885a93a91 100644 --- a/docs/resources/stream_on_external_table.md +++ b/docs/resources/stream_on_external_table.md @@ -90,6 +90,7 @@ resource "snowflake_stream_on_external_table" "stream" { - `id` (String) The ID of this resource. - `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output)) - `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness). +- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed. ### Nested Schema for `at` diff --git a/docs/resources/stream_on_table.md b/docs/resources/stream_on_table.md index 3af9472bd3..99ec11a4be 100644 --- a/docs/resources/stream_on_table.md +++ b/docs/resources/stream_on_table.md @@ -75,6 +75,7 @@ resource "snowflake_stream_on_table" "stream" { - `id` (String) The ID of this resource. - `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output)) - `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness). +- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed. ### Nested Schema for `at` diff --git a/docs/resources/stream_on_view.md b/docs/resources/stream_on_view.md index b8e5d908e7..6795301683 100644 --- a/docs/resources/stream_on_view.md +++ b/docs/resources/stream_on_view.md @@ -79,6 +79,7 @@ resource "snowflake_stream_on_view" "stream" { - `id` (String) The ID of this resource. - `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output)) - `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness). +- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed. ### Nested Schema for `at` diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_directory_table_resource_gen.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_directory_table_resource_gen.go index 8e4cd10770..f72e461915 100644 --- a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_directory_table_resource_gen.go +++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_directory_table_resource_gen.go @@ -115,3 +115,8 @@ func (s *StreamOnDirectoryTableResourceAssert) HasNoStale() *StreamOnDirectoryTa s.AddAssertion(assert.ValueNotSet("stale")) return s } + +func (s *StreamOnDirectoryTableResourceAssert) HasStreamTypeString(expected string) *StreamOnDirectoryTableResourceAssert { + s.AddAssertion(assert.ValueSet("stream_type", expected)) + return s +} diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_external_table_resource_gen.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_external_table_resource_gen.go index 78b963026d..6b61c0b45f 100644 --- a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_external_table_resource_gen.go +++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_external_table_resource_gen.go @@ -145,3 +145,8 @@ func (s *StreamOnExternalTableResourceAssert) HasNoStale() *StreamOnExternalTabl s.AddAssertion(assert.ValueNotSet("stale")) return s } + +func (s *StreamOnExternalTableResourceAssert) HasStreamTypeString(expected string) *StreamOnExternalTableResourceAssert { + s.AddAssertion(assert.ValueSet("stream_type", expected)) + return s +} diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_table_resource_gen.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_table_resource_gen.go index 4175844bf4..e9fe89e6b8 100644 --- a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_table_resource_gen.go +++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_table_resource_gen.go @@ -155,3 +155,8 @@ func (s *StreamOnTableResourceAssert) HasNoTable() *StreamOnTableResourceAssert s.AddAssertion(assert.ValueNotSet("table")) return s } + +func (s *StreamOnTableResourceAssert) HasStreamTypeString(expected string) *StreamOnTableResourceAssert { + s.AddAssertion(assert.ValueSet("stream_type", expected)) + return s +} diff --git a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_view_resource_gen.go b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_view_resource_gen.go index 10ba5a153f..8db3b75211 100644 --- a/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_view_resource_gen.go +++ b/pkg/acceptance/bettertestspoc/assert/resourceassert/stream_on_view_resource_gen.go @@ -155,3 +155,8 @@ func (s *StreamOnViewResourceAssert) HasNoView() *StreamOnViewResourceAssert { s.AddAssertion(assert.ValueNotSet("view")) return s } + +func (s *StreamOnViewResourceAssert) HasStreamTypeString(expected string) *StreamOnViewResourceAssert { + s.AddAssertion(assert.ValueSet("stream_type", expected)) + return s +} diff --git a/pkg/acceptance/helpers/stream_client.go b/pkg/acceptance/helpers/stream_client.go index 9403063c9c..c027299800 100644 --- a/pkg/acceptance/helpers/stream_client.go +++ b/pkg/acceptance/helpers/stream_client.go @@ -43,6 +43,19 @@ func (c *StreamClient) CreateOnTableWithRequest(t *testing.T, req *sdk.CreateOnT return stream, c.DropFunc(t, req.GetName()) } +func (c *StreamClient) CreateOnViewWithRequest(t *testing.T, req *sdk.CreateOnViewStreamRequest) (*sdk.Stream, func()) { + t.Helper() + ctx := context.Background() + + err := c.client().CreateOnView(ctx, req) + require.NoError(t, err) + + stream, err := c.client().ShowByID(ctx, req.GetName()) + require.NoError(t, err) + + return stream, c.DropFunc(t, req.GetName()) +} + func (c *StreamClient) Update(t *testing.T, request *sdk.AlterStreamRequest) { t.Helper() ctx := context.Background() diff --git a/pkg/resources/custom_diffs.go b/pkg/resources/custom_diffs.go index f884fc7e9b..5d2ac7de3c 100644 --- a/pkg/resources/custom_diffs.go +++ b/pkg/resources/custom_diffs.go @@ -228,6 +228,38 @@ func RecreateWhenSecretTypeChangedExternally(secretType sdk.SecretType) schema.C } } +// RecreateWhenStreamTypeChangedExternally recreates a stream when argument streamType is different than in the state. +func RecreateWhenStreamTypeChangedExternally(streamType sdk.StreamSourceType) schema.CustomizeDiffFunc { + return RecreateWhenResourceTypeChangedExternally("stream_type", streamType, sdk.ToStreamSourceType) +} + +// RecreateWhenResourceTypeChangedExternally recreates a resource when argument wantType is different than the value in typeField. +func RecreateWhenResourceTypeChangedExternally[T ~string](typeField string, wantType T, toType func(string) (T, error)) schema.CustomizeDiffFunc { + return func(_ context.Context, diff *schema.ResourceDiff, _ interface{}) error { + if n := diff.Get(typeField); n != nil { + logging.DebugLogger.Printf("[DEBUG] new external value for %s: %s\n", typeField, n.(string)) + + gotTypeRaw := n.(string) + // if the type is empty, the state is empty - do not recreate + if gotTypeRaw == "" { + return nil + } + + gotType, err := toType(gotTypeRaw) + if err != nil { + return fmt.Errorf("unknown type: %w", err) + } + if gotType != wantType { + // we have to set here a value instead of just SetNewComputed + // because with empty value (default snowflake behavior for type) ForceNew fails + // because there are no changes (at least from the SDKv2 point of view) for typeField + return errors.Join(diff.SetNew(typeField, ""), diff.ForceNew(typeField)) + } + } + return nil + } +} + // RecreateWhenStreamIsStale detects when the stream is stale, and sets a `false` value for `stale` field. // This means that the provider can detect that change in `stale` from `true` to `false`, where `false` is our desired state. func RecreateWhenStreamIsStale() schema.CustomizeDiffFunc { diff --git a/pkg/resources/stream_common.go b/pkg/resources/stream_common.go index b928bc743e..dde23c0ea8 100644 --- a/pkg/resources/stream_common.go +++ b/pkg/resources/stream_common.go @@ -52,6 +52,11 @@ var streamCommonSchema = map[string]*schema.Schema{ Optional: true, Description: "Specifies a comment for the stream.", }, + "stream_type": { + Type: schema.TypeString, + Computed: true, + Description: "Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.", + }, ShowOutputAttributeName: { Type: schema.TypeList, Computed: true, @@ -203,6 +208,7 @@ func handleStreamRead(d *schema.ResourceData, ) error { return errors.Join( d.Set("comment", stream.Comment), + d.Set("stream_type", stream.SourceType), d.Set(ShowOutputAttributeName, []map[string]any{schemas.StreamToSchema(stream)}), d.Set(DescribeOutputAttributeName, []map[string]any{schemas.StreamDescriptionToSchema(*streamDescription)}), d.Set(FullyQualifiedNameAttributeName, id.FullyQualifiedName()), diff --git a/pkg/resources/stream_on_directory_table.go b/pkg/resources/stream_on_directory_table.go index f96d41ca56..9277c90616 100644 --- a/pkg/resources/stream_on_directory_table.go +++ b/pkg/resources/stream_on_directory_table.go @@ -41,6 +41,7 @@ func StreamOnDirectoryTable() *schema.Resource { ComputedIfAnyAttributeChanged(streamOnDirectoryTableSchema, ShowOutputAttributeName, "stage", "comment"), ComputedIfAnyAttributeChanged(streamOnDirectoryTableSchema, DescribeOutputAttributeName, "stage", "comment"), RecreateWhenStreamIsStale(), + RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeStage), ), Schema: streamOnDirectoryTableSchema, diff --git a/pkg/resources/stream_on_directory_table_acceptance_test.go b/pkg/resources/stream_on_directory_table_acceptance_test.go index bb82e8745c..0f61963f93 100644 --- a/pkg/resources/stream_on_directory_table_acceptance_test.go +++ b/pkg/resources/stream_on_directory_table_acceptance_test.go @@ -20,6 +20,7 @@ import ( "github.com/hashicorp/terraform-plugin-testing/helper/resource" "github.com/hashicorp/terraform-plugin-testing/plancheck" "github.com/hashicorp/terraform-plugin-testing/tfversion" + "github.com/stretchr/testify/require" ) func TestAcc_StreamOnDirectoryTable_Basic(t *testing.T) { @@ -459,3 +460,57 @@ func TestAcc_StreamOnDirectoryTable_InvalidConfiguration(t *testing.T) { }, }) } + +func TestAcc_StreamOnDirectoryTable_ExternalStreamTypeChange(t *testing.T) { + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + acc.TestAccPreCheck(t) + stage, cleanupStage := acc.TestClient().Stage.CreateStageWithDirectory(t) + t.Cleanup(cleanupStage) + model := model.StreamOnDirectoryTable("test", id.DatabaseName(), id.Name(), id.SchemaName(), stage.ID().FullyQualifiedName()) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable), + Steps: []resource.TestStep{ + { + Config: config.FromModel(t, model), + Check: resource.ComposeTestCheckFunc( + assert.AssertThat(t, + resourceassert.StreamOnDirectoryTableResource(t, model.ResourceReference()). + HasStreamTypeString(string(sdk.StreamSourceTypeStage)), + resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()). + HasSourceType(sdk.StreamSourceTypeStage), + ), + ), + }, + // external change with a different type + { + PreConfig: func() { + table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t) + t.Cleanup(cleanupTable) + acc.TestClient().Stream.DropFunc(t, id)() + externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id, table.ID())) + t.Cleanup(cleanup) + require.Equal(t, sdk.StreamSourceTypeTable, *externalChangeStream.SourceType) + }, + Config: config.FromModel(t, model), + ConfigPlanChecks: resource.ConfigPlanChecks{ + PreApply: []plancheck.PlanCheck{ + plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate), + }, + }, + Check: resource.ComposeTestCheckFunc( + assert.AssertThat(t, + resourceassert.StreamOnDirectoryTableResource(t, model.ResourceReference()). + HasStreamTypeString(string(sdk.StreamSourceTypeStage)), + resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()). + HasSourceType(sdk.StreamSourceTypeStage), + ), + ), + }, + }, + }) +} diff --git a/pkg/resources/stream_on_external_table.go b/pkg/resources/stream_on_external_table.go index 9262e3595e..73748790eb 100644 --- a/pkg/resources/stream_on_external_table.go +++ b/pkg/resources/stream_on_external_table.go @@ -52,6 +52,7 @@ func StreamOnExternalTable() *schema.Resource { ComputedIfAnyAttributeChanged(streamOnExternalTableSchema, ShowOutputAttributeName, "external_table", "insert_only", "comment"), ComputedIfAnyAttributeChanged(streamOnExternalTableSchema, DescribeOutputAttributeName, "external_table", "insert_only", "comment"), RecreateWhenStreamIsStale(), + RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeExternalTable), ), Schema: streamOnExternalTableSchema, diff --git a/pkg/resources/stream_on_external_table_acceptance_test.go b/pkg/resources/stream_on_external_table_acceptance_test.go index 292c383ee3..f3d9feb28f 100644 --- a/pkg/resources/stream_on_external_table_acceptance_test.go +++ b/pkg/resources/stream_on_external_table_acceptance_test.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/terraform-plugin-testing/helper/resource" "github.com/hashicorp/terraform-plugin-testing/plancheck" "github.com/hashicorp/terraform-plugin-testing/tfversion" + "github.com/stretchr/testify/require" ) func TestAcc_StreamOnExternalTable_Basic(t *testing.T) { @@ -890,3 +891,62 @@ func TestAcc_StreamOnExternalTable_InvalidConfiguration(t *testing.T) { }, }) } + +func TestAcc_StreamOnExternalTable_ExternalStreamTypeChange(t *testing.T) { + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + acc.TestAccPreCheck(t) + stageID := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + stageLocation := fmt.Sprintf("@%s", stageID.FullyQualifiedName()) + _, stageCleanup := acc.TestClient().Stage.CreateStageWithURL(t, stageID) + t.Cleanup(stageCleanup) + + externalTable, externalTableCleanup := acc.TestClient().ExternalTable.CreateWithLocation(t, stageLocation) + t.Cleanup(externalTableCleanup) + model := model.StreamOnExternalTableBase("test", id, externalTable.ID()) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable), + Steps: []resource.TestStep{ + { + Config: config.FromModel(t, model), + Check: resource.ComposeTestCheckFunc( + assert.AssertThat(t, + resourceassert.StreamOnExternalTableResource(t, model.ResourceReference()). + HasStreamTypeString(string(sdk.StreamSourceTypeExternalTable)), + resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()). + HasSourceType(sdk.StreamSourceTypeExternalTable), + ), + ), + }, + // external change with a different type + { + PreConfig: func() { + table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t) + t.Cleanup(cleanupTable) + acc.TestClient().Stream.DropFunc(t, id)() + externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id, table.ID())) + t.Cleanup(cleanup) + require.Equal(t, sdk.StreamSourceTypeTable, *externalChangeStream.SourceType) + }, + Config: config.FromModel(t, model), + ConfigPlanChecks: resource.ConfigPlanChecks{ + PreApply: []plancheck.PlanCheck{ + plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate), + }, + }, + Check: resource.ComposeTestCheckFunc( + assert.AssertThat(t, + resourceassert.StreamOnExternalTableResource(t, model.ResourceReference()). + HasStreamTypeString(string(sdk.StreamSourceTypeExternalTable)), + resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()). + HasSourceType(sdk.StreamSourceTypeExternalTable), + ), + ), + }, + }, + }) +} diff --git a/pkg/resources/stream_on_table.go b/pkg/resources/stream_on_table.go index 4fdcceba23..7dfaced2b1 100644 --- a/pkg/resources/stream_on_table.go +++ b/pkg/resources/stream_on_table.go @@ -59,6 +59,7 @@ func StreamOnTable() *schema.Resource { ComputedIfAnyAttributeChanged(streamOnTableSchema, ShowOutputAttributeName, "table", "append_only", "comment"), ComputedIfAnyAttributeChanged(streamOnTableSchema, DescribeOutputAttributeName, "table", "append_only", "comment"), RecreateWhenStreamIsStale(), + RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeTable), ), Schema: streamOnTableSchema, diff --git a/pkg/resources/stream_on_table_acceptance_test.go b/pkg/resources/stream_on_table_acceptance_test.go index 7f18495559..bc71c43560 100644 --- a/pkg/resources/stream_on_table_acceptance_test.go +++ b/pkg/resources/stream_on_table_acceptance_test.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/terraform-plugin-testing/helper/resource" "github.com/hashicorp/terraform-plugin-testing/plancheck" "github.com/hashicorp/terraform-plugin-testing/tfversion" + "github.com/stretchr/testify/require" ) func TestAcc_StreamOnTable_Basic(t *testing.T) { @@ -835,3 +836,59 @@ func TestAcc_StreamOnTable_InvalidConfiguration(t *testing.T) { }, }) } + +func TestAcc_StreamOnTable_ExternalStreamTypeChange(t *testing.T) { + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + acc.TestAccPreCheck(t) + table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t) + t.Cleanup(cleanupTable) + + model := model.StreamOnTableBase("test", id, table.ID()) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable), + Steps: []resource.TestStep{ + { + Config: config.FromModel(t, model), + Check: resource.ComposeTestCheckFunc( + assert.AssertThat(t, + resourceassert.StreamOnTableResource(t, model.ResourceReference()). + HasStreamTypeString(string(sdk.StreamSourceTypeTable)), + resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()). + HasSourceType(sdk.StreamSourceTypeTable), + ), + ), + }, + // external change with a different type + { + PreConfig: func() { + statement := fmt.Sprintf("SELECT * FROM %s", table.ID().FullyQualifiedName()) + view, cleanupView := acc.TestClient().View.CreateView(t, statement) + t.Cleanup(cleanupView) + acc.TestClient().Stream.DropFunc(t, id)() + externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnViewWithRequest(t, sdk.NewCreateOnViewStreamRequest(id, view.ID())) + t.Cleanup(cleanup) + require.Equal(t, sdk.StreamSourceTypeView, *externalChangeStream.SourceType) + }, + Config: config.FromModel(t, model), + ConfigPlanChecks: resource.ConfigPlanChecks{ + PreApply: []plancheck.PlanCheck{ + plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate), + }, + }, + Check: resource.ComposeTestCheckFunc( + assert.AssertThat(t, + resourceassert.StreamOnTableResource(t, model.ResourceReference()). + HasStreamTypeString(string(sdk.StreamSourceTypeTable)), + resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()). + HasSourceType(sdk.StreamSourceTypeTable), + ), + ), + }, + }, + }) +} diff --git a/pkg/resources/stream_on_view.go b/pkg/resources/stream_on_view.go index bf570fa5c3..c9e1bcb3a6 100644 --- a/pkg/resources/stream_on_view.go +++ b/pkg/resources/stream_on_view.go @@ -59,6 +59,7 @@ func StreamOnView() *schema.Resource { ComputedIfAnyAttributeChanged(StreamOnViewSchema, ShowOutputAttributeName, "view", "append_only", "comment"), ComputedIfAnyAttributeChanged(StreamOnViewSchema, DescribeOutputAttributeName, "view", "append_only", "comment"), RecreateWhenStreamIsStale(), + RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeView), ), Schema: StreamOnViewSchema, diff --git a/pkg/resources/stream_on_view_acceptance_test.go b/pkg/resources/stream_on_view_acceptance_test.go index fef512a7ef..279fdb1371 100644 --- a/pkg/resources/stream_on_view_acceptance_test.go +++ b/pkg/resources/stream_on_view_acceptance_test.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/terraform-plugin-testing/helper/resource" "github.com/hashicorp/terraform-plugin-testing/plancheck" "github.com/hashicorp/terraform-plugin-testing/tfversion" + "github.com/stretchr/testify/require" ) func TestAcc_StreamOnView_Basic(t *testing.T) { @@ -861,3 +862,61 @@ func TestAcc_StreamOnView_InvalidConfiguration(t *testing.T) { }, }) } + +func TestAcc_StreamOnView_ExternalStreamTypeChange(t *testing.T) { + id := acc.TestClient().Ids.RandomSchemaObjectIdentifier() + acc.TestAccPreCheck(t) + table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t) + t.Cleanup(cleanupTable) + statement := fmt.Sprintf("SELECT * FROM %s", table.ID().FullyQualifiedName()) + view, cleanupView := acc.TestClient().View.CreateView(t, statement) + t.Cleanup(cleanupView) + + model := model.StreamOnView("test", id.DatabaseName(), id.Name(), id.SchemaName(), view.ID().FullyQualifiedName()) + + resource.Test(t, resource.TestCase{ + ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories, + TerraformVersionChecks: []tfversion.TerraformVersionCheck{ + tfversion.RequireAbove(tfversion.Version1_5_0), + }, + CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable), + Steps: []resource.TestStep{ + { + Config: config.FromModel(t, model), + Check: resource.ComposeTestCheckFunc( + assert.AssertThat(t, + resourceassert.StreamOnViewResource(t, model.ResourceReference()). + HasStreamTypeString(string(sdk.StreamSourceTypeView)), + resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()). + HasSourceType(sdk.StreamSourceTypeView), + ), + ), + }, + // external change with a different type + { + PreConfig: func() { + table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t) + t.Cleanup(cleanupTable) + acc.TestClient().Stream.DropFunc(t, id)() + externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id, table.ID())) + t.Cleanup(cleanup) + require.Equal(t, sdk.StreamSourceTypeTable, *externalChangeStream.SourceType) + }, + Config: config.FromModel(t, model), + ConfigPlanChecks: resource.ConfigPlanChecks{ + PreApply: []plancheck.PlanCheck{ + plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate), + }, + }, + Check: resource.ComposeTestCheckFunc( + assert.AssertThat(t, + resourceassert.StreamOnViewResource(t, model.ResourceReference()). + HasStreamTypeString(string(sdk.StreamSourceTypeView)), + resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()). + HasSourceType(sdk.StreamSourceTypeView), + ), + ), + }, + }, + }) +} diff --git a/pkg/sdk/streams_dto_gen.go b/pkg/sdk/streams_dto_gen.go index 25201a7d11..d657571efc 100644 --- a/pkg/sdk/streams_dto_gen.go +++ b/pkg/sdk/streams_dto_gen.go @@ -79,6 +79,10 @@ type CreateOnViewStreamRequest struct { Comment *string } +func (r *CreateOnViewStreamRequest) GetName() SchemaObjectIdentifier { + return r.name +} + type CloneStreamRequest struct { OrReplace *bool name SchemaObjectIdentifier // required