Skip to content

Commit

Permalink
Handle external type changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sfc-gh-jmichalak committed Oct 31, 2024
1 parent b18bf30 commit 2d1bc60
Show file tree
Hide file tree
Showing 20 changed files with 303 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/resources/stream_on_directory_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ resource "snowflake_stream_on_directory_table" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.

<a id="nestedatt--describe_output"></a>
### Nested Schema for `describe_output`
Expand Down
1 change: 1 addition & 0 deletions docs/resources/stream_on_external_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ resource "snowflake_stream_on_external_table" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.

<a id="nestedblock--at"></a>
### Nested Schema for `at`
Expand Down
1 change: 1 addition & 0 deletions docs/resources/stream_on_table.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ resource "snowflake_stream_on_table" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.

<a id="nestedblock--at"></a>
### Nested Schema for `at`
Expand Down
1 change: 1 addition & 0 deletions docs/resources/stream_on_view.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ resource "snowflake_stream_on_view" "stream" {
- `id` (String) The ID of this resource.
- `show_output` (List of Object) Outputs the result of `SHOW STREAMS` for the given stream. (see [below for nested schema](#nestedatt--show_output))
- `stale` (Boolean) Indicated if the stream is stale. When Terraform detects that the stream is stale, the stream is recreated with `CREATE OR REPLACE`. Read more on stream staleness in Snowflake [docs](https://docs.snowflake.com/en/user-guide/streams-intro#data-retention-period-and-staleness).
- `stream_type` (String) Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.

<a id="nestedblock--at"></a>
### Nested Schema for `at`
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 13 additions & 0 deletions pkg/acceptance/helpers/stream_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ func (c *StreamClient) CreateOnTableWithRequest(t *testing.T, req *sdk.CreateOnT
return stream, c.DropFunc(t, req.GetName())
}

func (c *StreamClient) CreateOnViewWithRequest(t *testing.T, req *sdk.CreateOnViewStreamRequest) (*sdk.Stream, func()) {
t.Helper()
ctx := context.Background()

err := c.client().CreateOnView(ctx, req)
require.NoError(t, err)

stream, err := c.client().ShowByID(ctx, req.GetName())
require.NoError(t, err)

return stream, c.DropFunc(t, req.GetName())
}

func (c *StreamClient) Update(t *testing.T, request *sdk.AlterStreamRequest) {
t.Helper()
ctx := context.Background()
Expand Down
29 changes: 29 additions & 0 deletions pkg/resources/custom_diffs.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,35 @@ func RecreateWhenSecretTypeChangedExternally(secretType sdk.SecretType) schema.C
}
}

// RecreateWhenStreamTypeChangedExternally recreates a stream when argument streamType is different than in the state.
func RecreateWhenStreamTypeChangedExternally(streamType sdk.StreamSourceType) schema.CustomizeDiffFunc {
return RecreateWhenResourceTypeChangedExternally("stream_type", streamType, sdk.ToStreamSourceType)
}

// RecreateWhenResourceTypeChangedExternally recreates a resource when argument wantType is different than the value in typeField.
func RecreateWhenResourceTypeChangedExternally[T ~string](typeField string, wantType T, toType func(string) (T, error)) schema.CustomizeDiffFunc {
return func(_ context.Context, diff *schema.ResourceDiff, _ interface{}) error {
if n := diff.Get(typeField); n != nil {
logging.DebugLogger.Printf("[DEBUG] new external value for %s: %s\n", typeField, n.(string))

gotTypeRaw := n.(string)
// if the type is empty, the state is empty - do not recreate
if gotTypeRaw == "" {
return nil
}

gotType, err := toType(gotTypeRaw)
if err != nil {
return fmt.Errorf("unknown type: %w", err)
}
if gotType != wantType {
return errors.Join(diff.SetNew(typeField, "<changed externally>"), diff.ForceNew(typeField))
}
}
return nil
}
}

// RecreateWhenStreamIsStale detects when the stream is stale, and sets a `false` value for `stale` field.
// This means that the provider can detect that change in `stale` from `true` to `false`, where `false` is our desired state.
func RecreateWhenStreamIsStale() schema.CustomizeDiffFunc {
Expand Down
6 changes: 6 additions & 0 deletions pkg/resources/stream_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ var streamCommonSchema = map[string]*schema.Schema{
Optional: true,
Description: "Specifies a comment for the stream.",
},
"stream_type": {
Type: schema.TypeString,
Computed: true,
Description: "Specifies a type for the stream. This field is used for checking external changes and recreating the resources if needed.",
},
ShowOutputAttributeName: {
Type: schema.TypeList,
Computed: true,
Expand Down Expand Up @@ -203,6 +208,7 @@ func handleStreamRead(d *schema.ResourceData,
) error {
return errors.Join(
d.Set("comment", stream.Comment),
d.Set("stream_type", stream.SourceType),
d.Set(ShowOutputAttributeName, []map[string]any{schemas.StreamToSchema(stream)}),
d.Set(DescribeOutputAttributeName, []map[string]any{schemas.StreamDescriptionToSchema(*streamDescription)}),
d.Set(FullyQualifiedNameAttributeName, id.FullyQualifiedName()),
Expand Down
1 change: 1 addition & 0 deletions pkg/resources/stream_on_directory_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func StreamOnDirectoryTable() *schema.Resource {
ComputedIfAnyAttributeChanged(streamOnDirectoryTableSchema, ShowOutputAttributeName, "stage", "comment"),
ComputedIfAnyAttributeChanged(streamOnDirectoryTableSchema, DescribeOutputAttributeName, "stage", "comment"),
RecreateWhenStreamIsStale(),
RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeStage),
),

Schema: streamOnDirectoryTableSchema,
Expand Down
53 changes: 53 additions & 0 deletions pkg/resources/stream_on_directory_table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,56 @@ func TestAcc_StreamOnDirectoryTable_InvalidConfiguration(t *testing.T) {
},
})
}

func TestAcc_StreamOnDirectoryTable_ExternalStreamTypeChange(t *testing.T) {
id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
acc.TestAccPreCheck(t)
stage, cleanupStage := acc.TestClient().Stage.CreateStageWithDirectory(t)
t.Cleanup(cleanupStage)
model := model.StreamOnDirectoryTable("test", id.DatabaseName(), id.Name(), id.SchemaName(), stage.ID().FullyQualifiedName())

resource.Test(t, resource.TestCase{
ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
TerraformVersionChecks: []tfversion.TerraformVersionCheck{
tfversion.RequireAbove(tfversion.Version1_5_0),
},
CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
Steps: []resource.TestStep{
{
Config: config.FromModel(t, model),
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnDirectoryTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeStage)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeStage),
),
),
},
// external change with a different type
{
PreConfig: func() {
table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
t.Cleanup(cleanupTable)
acc.TestClient().Stream.DropFunc(t, id)()
_, cleanup := acc.TestClient().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id, table.ID()))
t.Cleanup(cleanup)
},
Config: config.FromModel(t, model),
ConfigPlanChecks: resource.ConfigPlanChecks{
PreApply: []plancheck.PlanCheck{
plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
},
},
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnDirectoryTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeStage)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeStage),
),
),
},
},
})
}
1 change: 1 addition & 0 deletions pkg/resources/stream_on_external_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func StreamOnExternalTable() *schema.Resource {
ComputedIfAnyAttributeChanged(streamOnExternalTableSchema, ShowOutputAttributeName, "external_table", "insert_only", "comment"),
ComputedIfAnyAttributeChanged(streamOnExternalTableSchema, DescribeOutputAttributeName, "external_table", "insert_only", "comment"),
RecreateWhenStreamIsStale(),
RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeExternalTable),
),

Schema: streamOnExternalTableSchema,
Expand Down
58 changes: 58 additions & 0 deletions pkg/resources/stream_on_external_table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -890,3 +890,61 @@ func TestAcc_StreamOnExternalTable_InvalidConfiguration(t *testing.T) {
},
})
}

func TestAcc_StreamOnExternalTable_ExternalStreamTypeChange(t *testing.T) {
id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
acc.TestAccPreCheck(t)
stageID := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
stageLocation := fmt.Sprintf("@%s", stageID.FullyQualifiedName())
_, stageCleanup := acc.TestClient().Stage.CreateStageWithURL(t, stageID)
t.Cleanup(stageCleanup)

externalTable, externalTableCleanup := acc.TestClient().ExternalTable.CreateWithLocation(t, stageLocation)
t.Cleanup(externalTableCleanup)
model := model.StreamOnExternalTableBase("test", id, externalTable.ID())

resource.Test(t, resource.TestCase{
ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
TerraformVersionChecks: []tfversion.TerraformVersionCheck{
tfversion.RequireAbove(tfversion.Version1_5_0),
},
CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
Steps: []resource.TestStep{
{
Config: config.FromModel(t, model),
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnExternalTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeExternalTable)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeExternalTable),
),
),
},
// external change with a different type
{
PreConfig: func() {
table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
t.Cleanup(cleanupTable)
acc.TestClient().Stream.DropFunc(t, id)()
_, cleanup := acc.TestClient().Stream.CreateOnTableWithRequest(t, sdk.NewCreateOnTableStreamRequest(id, table.ID()))
t.Cleanup(cleanup)
},
Config: config.FromModel(t, model),
ConfigPlanChecks: resource.ConfigPlanChecks{
PreApply: []plancheck.PlanCheck{
plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
},
},
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnExternalTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeExternalTable)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeExternalTable),
),
),
},
},
})
}
1 change: 1 addition & 0 deletions pkg/resources/stream_on_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func StreamOnTable() *schema.Resource {
ComputedIfAnyAttributeChanged(streamOnTableSchema, ShowOutputAttributeName, "table", "append_only", "comment"),
ComputedIfAnyAttributeChanged(streamOnTableSchema, DescribeOutputAttributeName, "table", "append_only", "comment"),
RecreateWhenStreamIsStale(),
RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeTable),
),

Schema: streamOnTableSchema,
Expand Down
55 changes: 55 additions & 0 deletions pkg/resources/stream_on_table_acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -835,3 +835,58 @@ func TestAcc_StreamOnTable_InvalidConfiguration(t *testing.T) {
},
})
}

func TestAcc_StreamOnTable_ExternalStreamTypeChange(t *testing.T) {
id := acc.TestClient().Ids.RandomSchemaObjectIdentifier()
acc.TestAccPreCheck(t)
table, cleanupTable := acc.TestClient().Table.CreateWithChangeTracking(t)
t.Cleanup(cleanupTable)

model := model.StreamOnTableBase("test", id, table.ID())

resource.Test(t, resource.TestCase{
ProtoV6ProviderFactories: acc.TestAccProtoV6ProviderFactories,
TerraformVersionChecks: []tfversion.TerraformVersionCheck{
tfversion.RequireAbove(tfversion.Version1_5_0),
},
CheckDestroy: acc.CheckDestroy(t, resources.StreamOnDirectoryTable),
Steps: []resource.TestStep{
{
Config: config.FromModel(t, model),
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeTable)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeTable),
),
),
},
// external change with a different type
{
PreConfig: func() {
statement := fmt.Sprintf("SELECT * FROM %s", table.ID().FullyQualifiedName())
view, cleanupView := acc.TestClient().View.CreateView(t, statement)
t.Cleanup(cleanupView)
acc.TestClient().Stream.DropFunc(t, id)()
_, cleanup := acc.TestClient().Stream.CreateOnViewWithRequest(t, sdk.NewCreateOnViewStreamRequest(id, view.ID()))
t.Cleanup(cleanup)
},
Config: config.FromModel(t, model),
ConfigPlanChecks: resource.ConfigPlanChecks{
PreApply: []plancheck.PlanCheck{
plancheck.ExpectResourceAction(model.ResourceReference(), plancheck.ResourceActionDestroyBeforeCreate),
},
},
Check: resource.ComposeTestCheckFunc(
assert.AssertThat(t,
resourceassert.StreamOnTableResource(t, model.ResourceReference()).
HasStreamTypeString(string(sdk.StreamSourceTypeTable)),
resourceshowoutputassert.StreamShowOutput(t, model.ResourceReference()).
HasSourceType(sdk.StreamSourceTypeTable),
),
),
},
},
})
}
1 change: 1 addition & 0 deletions pkg/resources/stream_on_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func StreamOnView() *schema.Resource {
ComputedIfAnyAttributeChanged(StreamOnViewSchema, ShowOutputAttributeName, "view", "append_only", "comment"),
ComputedIfAnyAttributeChanged(StreamOnViewSchema, DescribeOutputAttributeName, "view", "append_only", "comment"),
RecreateWhenStreamIsStale(),
RecreateWhenStreamTypeChangedExternally(sdk.StreamSourceTypeView),
),

Schema: StreamOnViewSchema,
Expand Down
Loading

0 comments on commit 2d1bc60

Please sign in to comment.