Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Handle external type changes in stream resources #3164

Merged
merged 2 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/resources/stream_on_directory_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ resource "snowflake_stream_on_directory_table" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.

<a id="nestedatt--describe_output"></a>
### Nested Schema for `describe_output`
Expand Down
1 change: 1 addition & 0 deletions docs/resources/stream_on_external_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ resource "snowflake_stream_on_external_table" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.

<a id="nestedblock--at"></a>
### Nested Schema for `at`
Expand Down
1 change: 1 addition & 0 deletions docs/resources/stream_on_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ resource "snowflake_stream_on_table" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.

<a id="nestedblock--at"></a>
### Nested Schema for `at`
Expand Down
1 change: 1 addition & 0 deletions docs/resources/stream_on_view.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ resource "snowflake_stream_on_view" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.

<a id="nestedblock--at"></a>
### Nested Schema for `at`
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions pkg/acceptance/helpers/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ func (c *StreamClient) CreateOnTableWithRequest(t *testing.T, req *sdk.CreateOnT
return stream, c.DropFunc(t, req.GetName())
}

func (c *StreamClient) CreateOnViewWithRequest(t *testing.T, req *sdk.CreateOnViewStreamRequest) (*sdk.Stream, func()) {
t.Helper()
ctx := context.Background()

err := c.client().CreateOnView(ctx, req)
require.NoError(t, err)

stream, err := c.client().ShowByID(ctx, req.GetName())
require.NoError(t, err)

return stream, c.DropFunc(t, req.GetName())
}

func (c *StreamClient) Update(t *testing.T, request *sdk.AlterStreamRequest) {
t.Helper()
ctx := context.Background()
Expand Down
32 changes: 32 additions & 0 deletions pkg/resources/custom_diffs.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,38 @@ func RecreateWhenSecretTypeChangedExternally(secretType sdk.SecretType) schema.C
}
}

// RecreateWhenStreamTypeChangedExternally recreates a stream when argument streamType is different than in the state.
func RecreateWhenStreamTypeChangedExternally(streamType sdk.StreamSourceType) schema.CustomizeDiffFunc {
return RecreateWhenResourceTypeChangedExternally("stream_type", streamType, sdk.ToStreamSourceType)
}

// RecreateWhenResourceTypeChangedExternally recreates a resource when argument wantType is different than the value in typeField.
func RecreateWhenResourceTypeChangedExternally[T ~string](typeField string, wantType T, toType func(string) (T, error)) schema.CustomizeDiffFunc {
return func(_ context.Context, diff *schema.ResourceDiff, _ interface{}) error {
if n := diff.Get(typeField); n != nil {
logging.DebugLogger.Printf("[DEBUG] new external value for %s: %s\n", typeField, n.(string))

gotTypeRaw := n.(string)
// if the type is empty, the state is empty - do not recreate
if gotTypeRaw == "" {
return nil
}

gotType, err := toType(gotTypeRaw)
if err != nil {
return fmt.Errorf("unknown type: %w", err)
}
if gotType != wantType {
sfc-gh-asawicki marked this conversation as resolved.
Show resolved Hide resolved
// we have to set here a value instead of just SetNewComputed
// because with empty value (default snowflake behavior for type) ForceNew fails
// because there are no changes (at least from the SDKv2 point of view) for typeField
return errors.Join(diff.SetNew(typeField, "<changed externally>"), diff.ForceNew(typeField))
}
}
return nil
}
}

// RecreateWhenStreamIsStale detects when the stream is stale, and sets a `false` value for `stale` field.
// This means that the provider can detect that change in `stale` from `true` to `false`, where `false` is our desired state.
func RecreateWhenStreamIsStale() schema.CustomizeDiffFunc {
Expand Down
6 changes: 6 additions & 0 deletions pkg/resources/stream_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ var streamCommonSchema = map[string]*schema.Schema{
Optional: true,
Description: "Specifies a comment for the stream.",
},
"stream_type": {
Type: schema.TypeString,
Computed: true,
Description: "Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.",
},
ShowOutputAttributeName: {
Type: schema.TypeList,
Computed: true,
Expand Down Expand Up @@ -203,6 +208,7 @@ func handleStreamRead(d *schema.ResourceData,
) error {
return errors.Join(
d.Set("comment", stream.Comment),
d.Set("stream_type", stream.SourceType),
d.Set(ShowOutputAttributeName, []map[string]any{schemas.StreamToSchema(stream)}),
d.Set(DescribeOutputAttributeName, []map[string]any{schemas.StreamDescriptionToSchema(*streamDescription)}),
d.Set(FullyQualifiedNameAttributeName, id.FullyQualifiedName()),
Expand Down
1 change: 1 addition & 0 deletions pkg/resources/stream_on_directory_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func StreamOnDirectoryTable() *schema.Resource {
ComputedIfAnyAttributeChanged(streamOnDirectoryTableSchema, ShowOutputAttributeName, "stage", "comment"),
ComputedIfAnyAttributeChanged(streamOnDirectoryTableSchema, DescribeOutputAttributeName, "stage", "comment"),
RecreateWhenStreamIsStale(),
RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeStage),
),

Schema: streamOnDirectoryTableSchema,
Expand Down
55 changes: 55 additions & 0 deletions pkg/resources/stream_on_directory_table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/plancheck"
"github.com/hashicorp/terraform-plugin-testing/tfversion"
"github.com/stretchr/testify/require"
)

func TestAcc_StreamOnDirectoryTable_Basic(t *testing.T) {
Expand Down Expand Up @@ -459,3 +460,57 @@ func TestAcc_StreamOnDirectoryTable_InvalidConfiguration(t *testing.T) {
},
})
}

func TestAcc_StreamOnDirectoryTable_ExternalStreamTypeChange(t *testing.T) {
id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
acc.TestAccPreCheck(t)
stage, cleanupStage := acc.TestClient().Stage.CreateStageWithDirectory(t)
t.Cleanup(cleanupStage)
model := model.StreamOnDirectoryTable("test", id.DatabaseName(), id.Name(), id.SchemaName(), stage.ID().FullyQualifiedName())

resource.Test(t, resource.TestCase{
ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
TerraformVersionChecks: []tfversion.TerraformVersionCheck{
tfversion.RequireAbove(tfversion.Version1_5_0),
},
CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
Steps: []resource.TestStep{
{
Config: config.FromModel(t, model),
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnDirectoryTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeStage)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeStage),
),
),
},
// external change with a different type
{
PreConfig: func() {
table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
t.Cleanup(cleanupTable)
acc.TestClient().Stream.DropFunc(t, id)()
externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id, table.ID()))
t.Cleanup(cleanup)
sfc-gh-asawicki marked this conversation as resolved.
Show resolved Hide resolved
require.Equal(t, sdk.StreamSourceTypeTable, *externalChangeStream.SourceType)
},
Config: config.FromModel(t, model),
ConfigPlanChecks: resource.ConfigPlanChecks{
PreApply: []plancheck.PlanCheck{
plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
},
},
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnDirectoryTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeStage)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeStage),
),
),
},
},
})
}
1 change: 1 addition & 0 deletions pkg/resources/stream_on_external_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func StreamOnExternalTable() *schema.Resource {
ComputedIfAnyAttributeChanged(streamOnExternalTableSchema, ShowOutputAttributeName, "external_table", "insert_only", "comment"),
ComputedIfAnyAttributeChanged(streamOnExternalTableSchema, DescribeOutputAttributeName, "external_table", "insert_only", "comment"),
RecreateWhenStreamIsStale(),
RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeExternalTable),
),

Schema: streamOnExternalTableSchema,
Expand Down
60 changes: 60 additions & 0 deletions pkg/resources/stream_on_external_table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/plancheck"
"github.com/hashicorp/terraform-plugin-testing/tfversion"
"github.com/stretchr/testify/require"
)

func TestAcc_StreamOnExternalTable_Basic(t *testing.T) {
Expand Down Expand Up @@ -890,3 +891,62 @@ func TestAcc_StreamOnExternalTable_InvalidConfiguration(t *testing.T) {
},
})
}

func TestAcc_StreamOnExternalTable_ExternalStreamTypeChange(t *testing.T) {
id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
acc.TestAccPreCheck(t)
stageID := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
stageLocation := fmt.Sprintf("@%s", stageID.FullyQualifiedName())
_, stageCleanup := acc.TestClient().Stage.CreateStageWithURL(t, stageID)
t.Cleanup(stageCleanup)

externalTable, externalTableCleanup := acc.TestClient().ExternalTable.CreateWithLocation(t, stageLocation)
t.Cleanup(externalTableCleanup)
model := model.StreamOnExternalTableBase("test", id, externalTable.ID())

resource.Test(t, resource.TestCase{
ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
TerraformVersionChecks: []tfversion.TerraformVersionCheck{
tfversion.RequireAbove(tfversion.Version1_5_0),
},
CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
Steps: []resource.TestStep{
{
Config: config.FromModel(t, model),
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnExternalTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeExternalTable)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeExternalTable),
),
),
},
// external change with a different type
{
PreConfig: func() {
table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
t.Cleanup(cleanupTable)
acc.TestClient().Stream.DropFunc(t, id)()
externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id, table.ID()))
t.Cleanup(cleanup)
require.Equal(t, sdk.StreamSourceTypeTable, *externalChangeStream.SourceType)
},
Config: config.FromModel(t, model),
ConfigPlanChecks: resource.ConfigPlanChecks{
PreApply: []plancheck.PlanCheck{
plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
},
},
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnExternalTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeExternalTable)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeExternalTable),
),
),
},
},
})
}
1 change: 1 addition & 0 deletions pkg/resources/stream_on_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func StreamOnTable() *schema.Resource {
ComputedIfAnyAttributeChanged(streamOnTableSchema, ShowOutputAttributeName, "table", "append_only", "comment"),
ComputedIfAnyAttributeChanged(streamOnTableSchema, DescribeOutputAttributeName, "table", "append_only", "comment"),
RecreateWhenStreamIsStale(),
RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeTable),
),

Schema: streamOnTableSchema,
Expand Down
57 changes: 57 additions & 0 deletions pkg/resources/stream_on_table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/plancheck"
"github.com/hashicorp/terraform-plugin-testing/tfversion"
"github.com/stretchr/testify/require"
)

func TestAcc_StreamOnTable_Basic(t *testing.T) {
Expand Down Expand Up @@ -835,3 +836,59 @@ func TestAcc_StreamOnTable_InvalidConfiguration(t *testing.T) {
},
})
}

func TestAcc_StreamOnTable_ExternalStreamTypeChange(t *testing.T) {
id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
acc.TestAccPreCheck(t)
table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
t.Cleanup(cleanupTable)

model := model.StreamOnTableBase("test", id, table.ID())

resource.Test(t, resource.TestCase{
ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
TerraformVersionChecks: []tfversion.TerraformVersionCheck{
tfversion.RequireAbove(tfversion.Version1_5_0),
},
CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
Steps: []resource.TestStep{
{
Config: config.FromModel(t, model),
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeTable)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeTable),
),
),
},
// external change with a different type
{
PreConfig: func() {
statement := fmt.Sprintf("SELECT * FROM %s", table.ID().FullyQualifiedName())
view, cleanupView := acc.TestClient().View.CreateView(t, statement)
t.Cleanup(cleanupView)
acc.TestClient().Stream.DropFunc(t, id)()
externalChangeStream, cleanup := acc.TestClient().Stream.CreateOnViewWithRequest(t, sdk.NewCreateOnViewStreamRequest(id, view.ID()))
t.Cleanup(cleanup)
require.Equal(t, sdk.StreamSourceTypeView, *externalChangeStream.SourceType)
},
Config: config.FromModel(t, model),
ConfigPlanChecks: resource.ConfigPlanChecks{
PreApply: []plancheck.PlanCheck{
plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
},
},
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeTable)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeTable),
),
),
},
},
})
}
1 change: 1 addition & 0 deletions pkg/resources/stream_on_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func StreamOnView() *schema.Resource {
ComputedIfAnyAttributeChanged(StreamOnViewSchema, ShowOutputAttributeName, "view", "append_only", "comment"),
ComputedIfAnyAttributeChanged(StreamOnViewSchema, DescribeOutputAttributeName, "view", "append_only", "comment"),
RecreateWhenStreamIsStale(),
RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeView),
),

Schema: StreamOnViewSchema,
Expand Down
Loading
Loading