Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/allow direct insert #411

Merged
merged 3 commits into from
Oct 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 136 additions & 0 deletions core/sling/task_run_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
return 0, err
}

allowDirectInsert := cast.ToBool(os.Getenv("SLING_ALLOW_DIRECT_INSERT"))

if allowDirectInsert {
return t.writeDirectly(cfg, df, tgtConn)
}

// Initialize target and temp tables
targetTable, err := initializeTargetTable(cfg, tgtConn)
if err != nil {
Expand Down Expand Up @@ -342,6 +348,136 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
return cnt, nil
}

func (t *TaskExecution) writeDirectly(cfg *Config, df *iop.Dataflow, tgtConn database.Connection) (cnt uint64, err error) {
// Initialize target table
targetTable, err := initializeTargetTable(cfg, tgtConn)
if err != nil {
return 0, err
}

// Ensure schema exists
if err := ensureSchemaExists(tgtConn, targetTable.Schema); err != nil {
return 0, err
}

// Pause dataflow to set up DDL and handlers
if paused := df.Pause(); !paused {
err = g.Error(err, "could not pause streams to infer columns")
return 0, err
}

// Prepare dataflow
sampleData, err := prepareDataflow(t, df, tgtConn)
if err != nil {
return 0, err
}

// Set table keys
if err := targetTable.SetKeys(cfg.Source.PrimaryKey(), cfg.Source.UpdateKey, cfg.Target.Options.TableKeys); err != nil {
err = g.Error(err, "could not set keys for "+targetTable.FullName())
return 0, err
}

// Execute pre-SQL
if err := executeSQL(t, tgtConn, cfg.Target.Options.PreSQL, "pre"); err != nil {
return cnt, err
}

// Create final table
if err := createTable(t, tgtConn, targetTable, sampleData, false); err != nil {
return 0, err
}

cfg.Target.Columns = sampleData.Columns
df.Columns = sampleData.Columns
setStage("Direct Insert - prepare-final")

// Begin transaction for final table operations
txOptions := determineTxOptions(tgtConn.GetType())
if err := tgtConn.BeginContext(df.Context.Ctx, &txOptions); err != nil {
err = g.Error(err, "could not open transaction to write to final table")
return 0, err
}

defer tgtConn.Rollback()

// Configure column handlers (if applicable)
if err := configureColumnHandlers(t, cfg, df, tgtConn, targetTable); err != nil {
return 0, err
}

df.Unpause() // Resume dataflow
t.SetProgress("streaming data directly into final table")

// Set batch limit if specified
if batchLimit := cfg.Target.Options.BatchLimit; batchLimit != nil {
df.SetBatchLimit(*batchLimit)
}

// Bulk import data directly into final table
cnt, err = tgtConn.BulkImportFlow(targetTable.FullName(), df)
if err != nil {
tgtConn.Rollback()
err = g.Error(err, "could not insert into "+targetTable.FullName())
return 0, err
}

// Validate data
tCnt, err := tgtConn.GetCount(targetTable.FullName())
if err != nil {
err = g.Error(err, "could not get count from final table %s", targetTable.FullName())
return 0, err
}
if cnt != tCnt {
err = g.Error(err, "inserted into final table but table count (%d) != stream count (%d). Records missing/mismatch. Aborting", tCnt, cnt)
return 0, err
} else if tCnt == 0 && len(sampleData.Rows) > 0 {
err = g.Error(err, "Loaded 0 records while sample data has %d records. Exiting.", len(sampleData.Rows))
return 0, err
}

// Handle empty data case
if cnt == 0 && !cast.ToBool(os.Getenv("SLING_ALLOW_EMPTY_TABLES")) && !cast.ToBool(os.Getenv("SLING_ALLOW_EMPTY")) {
g.Warn("No data or records found in stream. Nothing to do. To allow Sling to create empty tables, set SLING_ALLOW_EMPTY=TRUE")
return 0, nil
} else if cnt > 0 {
// FIXME: find root cause of why columns don't sync while streaming
df.SyncColumns()

// Aggregate stats from stream processors
df.Inferred = !cfg.sourceIsFile() // Re-infer if source is file
df.SyncStats()

// Checksum Comparison, data quality. Limit to env var SLING_CHECKSUM_ROWS, cause sums get too high
if val := cast.ToUint64(os.Getenv("SLING_CHECKSUM_ROWS")); val > 0 && df.Count() <= val {
err = tgtConn.CompareChecksums(targetTable.FullName(), df.Columns)
if err != nil {
return
}
}
}

// Commit final transaction
if err := tgtConn.Commit(); err != nil {
err = g.Error(err, "could not commit final transaction")
return 0, err
}

// Execute post-SQL
if err := executeSQL(t, tgtConn, cfg.Target.Options.PostSQL, "post"); err != nil {
return cnt, err
}

// Finalize progress
if err := df.Err(); err != nil {
setStage("6 - closing")
return cnt, err
}

setStage("6 - closing")
return cnt, nil
}

func determineTxOptions(dbType dbio.Type) sql.TxOptions {
switch dbType {
case dbio.TypeDbSnowflake, dbio.TypeDbDuckDb:
Expand Down