Skip to content

Commit

Permalink
add more striction
Browse files Browse the repository at this point in the history
  • Loading branch information
yokofly committed Oct 18, 2024
1 parent 33a7943 commit 5e204fb
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 24 deletions.
20 changes: 12 additions & 8 deletions core/sling/task_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,19 +361,22 @@ func (t *TaskExecution) runFileToDB() (err error) {
}

if t.Config.Target.Type == dbio.TypeDbProton && t.Config.Mode == IncrementalMode {
existed, err := database.TableExists(tgtConn, t.Config.Target.Object)
if err != nil {
return g.Error(err, "could not check if final table exists in incremental mode")
}
if !existed {
return g.Error(err, "final table %s not found in incremental mode, please create table %s first", t.Config.Target.Object, t.Config.Target.Object)
}
t.Config.Target.Object = setSchema(cast.ToString(t.Config.Target.Data["schema"]), t.Config.Target.Object)

targetTable, err := database.ParseTableName(t.Config.Target.Object, tgtConn.GetType())
if err != nil {
return g.Error(err, "could not parse target table")
}

existed, err := database.TableExists(tgtConn, targetTable.FullName())
if err != nil {
return g.Error(err, "could not check if final table exists in incremental mode")
}
if !existed {
err = g.Error("final table %s not found in incremental mode, please create table %s first", t.Config.Target.Object, t.Config.Target.Object)
return err
}

if targetTable.Columns, err = tgtConn.GetSQLColumns(targetTable); err != nil {
return g.Error(err, "could not get table columns, when write to timeplusd database in incremental mode, final table %s need created first", targetTable.FullName())
}
Expand Down Expand Up @@ -638,7 +641,8 @@ func (t *TaskExecution) runProtonToProton(srcConn, tgtConn database.Connection)
return g.Error(err, "could not check if final table exists in incremental mode")
}
if !existed {
return g.Error(err, "final table %s not found in incremental mode, please create table %s first", t.Config.Target.Object, t.Config.Target.Object)
err = g.Error("final table %s not found in incremental mode, please create table %s first", t.Config.Target.Object, t.Config.Target.Object)
return err
}
}

Expand Down
34 changes: 18 additions & 16 deletions core/sling/task_run_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,10 +373,12 @@ func (t *TaskExecution) writeDirectly(cfg *Config, df *iop.Dataflow, tgtConn dat
if cfg.Target.Type == dbio.TypeDbProton && cfg.Mode == IncrementalMode {
existed, err := database.TableExists(tgtConn, targetTable.FullName())
if err != nil {
return 0, g.Error(err, "could not check if final table exists in incremental mode")
err = g.Error(err, "could not check if final table exists in incremental mode")
return 0, err
}
if !existed {
return 0, g.Error(err, "final table %s not found in incremental mode, please create table %s first", targetTable.FullName(), targetTable.FullName())
err = g.Error("final table %s not found in incremental mode, please create table %s first", targetTable.FullName(), targetTable.FullName())
return 0, err
}
}

Expand Down Expand Up @@ -447,20 +449,6 @@ func (t *TaskExecution) writeDirectly(cfg *Config, df *iop.Dataflow, tgtConn dat
return 0, err
}

// Validate data
tCnt, err := tgtConn.GetCount(targetTable.FullName())
if err != nil {
err = g.Error(err, "could not get count from final table %s", targetTable.FullName())
return 0, err
}
if cnt != tCnt {
err = g.Error(err, "inserted into final table but table count (%d) != stream count (%d). Records missing/mismatch. Aborting", tCnt, cnt)
return 0, err
} else if tCnt == 0 && len(sampleData.Rows) > 0 {
err = g.Error(err, "Loaded 0 records while sample data has %d records. Exiting.", len(sampleData.Rows))
return 0, err
}

// Handle empty data case
if cnt == 0 && !cast.ToBool(os.Getenv("SLING_ALLOW_EMPTY_TABLES")) && !cast.ToBool(os.Getenv("SLING_ALLOW_EMPTY")) {
g.Warn("No data or records found in stream. Nothing to do. To allow Sling to create empty tables, set SLING_ALLOW_EMPTY=TRUE")
Expand All @@ -482,6 +470,20 @@ func (t *TaskExecution) writeDirectly(cfg *Config, df *iop.Dataflow, tgtConn dat
}
}

// Validate data
tCnt, err := tgtConn.GetCount(targetTable.FullName())
if err != nil {
err = g.Error(err, "could not get count from final table %s", targetTable.FullName())
return 0, err
} else {
if cnt != tCnt {
g.Warn("inserted into final table but target table %s count (%d) != new inserted count (%d). Records missing/mismatch.", targetTable.FullName(), tCnt, cnt)
} else if tCnt == 0 && len(sampleData.Rows) > 0 {
err = g.Error("Loaded 0 records while sample data has %d records. Exiting.", len(sampleData.Rows))
return 0, err
}
}

// Commit final transaction
if err := tgtConn.Commit(); err != nil {
err = g.Error(err, "could not commit final transaction")
Expand Down

0 comments on commit 5e204fb

Please sign in to comment.