Skip to content

Commit

Permalink
🐛 fix(replication): update replication test and snowflake stage handling
Browse files Browse the repository at this point in the history
- Update replication test to use new create_dt column instead of create_date
- Improve snowflake stage handling to create stage in correct schema if necessary
- Remove unnecessary schema switch in CopyViaStage function
- Update stageFolderPath to correctly include schema and staging area
  • Loading branch information
flarco committed Dec 15, 2024
1 parent 48b6627 commit 89bc2da
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 10 deletions.
6 changes: 3 additions & 3 deletions cmd/sling/tests/replications/r.12.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@ defaults:
add_new_columns: true

streams:
public.test1k:
public.test1k_postgres_pg:
primary_key: [id]
update_key: create_date
update_key: create_dt
target_options:
table_keys:
cluster: [ date(CREATE_DATE) ]
cluster: [ date(create_dt) ]
29 changes: 22 additions & 7 deletions core/dbio/database/database_snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,21 +174,36 @@ func getEncodedPrivateKey(pemStr, passphrase string) (epk string, err error) {
}

func (conn *SnowflakeConn) getOrCreateStage(schema string) string {
if conn.GetProp("internal_stage") == "" {
defStaging := "sling_staging"
internalStage := conn.GetProp("internal_stage")

createNew:
if internalStage == "" {
if schema == "" {
schema = conn.GetProp("schema")
}
conn.Exec("USE SCHEMA " + schema + noDebugKey)
_, err := conn.Exec("CREATE STAGE IF NOT EXISTS " + defStaging + noDebugKey)

// use Table struct, but is a Snowflake Internal Stage
defStaging := Table{
Schema: schema,
Name: "SLING_STAGING",
Dialect: dbio.TypeDbSnowflake,
}
conn.Exec("USE SCHEMA " + defStaging.Schema + noDebugKey)
_, err := conn.Exec("CREATE STAGE IF NOT EXISTS " + defStaging.FullName())
if err != nil {
g.Warn("Tried to create Internal Snowflake Stage but failed.\n" + g.ErrMsgSimple(err))
return ""
}
conn.SetProp("schema", schema)
conn.SetProp("internal_stage", defStaging)
conn.SetProp("internal_stage", defStaging.FullName())
} else {
conn.Exec("USE SCHEMA " + schema + noDebugKey)
defStaging, _ := ParseTableName(internalStage, dbio.TypeDbSnowflake)
if defStaging.Schema != schema {
// create new staging if schema is different
internalStage = ""
goto createNew
}
conn.Exec("USE SCHEMA " + defStaging.Schema + noDebugKey)
}
return conn.GetProp("internal_stage")
}
Expand Down Expand Up @@ -765,7 +780,7 @@ func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (co
}()

// Import to staging
stageFolderPath := g.F("@%s.%s/%s/%s", conn.GetProp("schema"), conn.GetProp("internal_stage"), env.CleanTableName(tableFName), g.NowFileStr())
stageFolderPath := g.F("@%s/%s/%s", conn.GetProp("internal_stage"), env.CleanTableName(tableFName), g.NowFileStr())
conn.Exec("USE SCHEMA " + conn.GetProp("schema"))
_, err = conn.Exec("REMOVE " + stageFolderPath)
if err != nil {
Expand Down

0 comments on commit 89bc2da

Please sign in to comment.