From 5e204fb05de5df584fdd68e12985c3274a56d114 Mon Sep 17 00:00:00 2001 From: yun Date: Fri, 18 Oct 2024 02:27:20 -0700 Subject: [PATCH] add more striction --- core/sling/task_run.go | 20 ++++++++++++-------- core/sling/task_run_write.go | 34 ++++++++++++++++++---------------- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/core/sling/task_run.go b/core/sling/task_run.go index 77a4230f..9f341129 100644 --- a/core/sling/task_run.go +++ b/core/sling/task_run.go @@ -361,19 +361,22 @@ func (t *TaskExecution) runFileToDB() (err error) { } if t.Config.Target.Type == dbio.TypeDbProton && t.Config.Mode == IncrementalMode { - existed, err := database.TableExists(tgtConn, t.Config.Target.Object) - if err != nil { - return g.Error(err, "could not check if final table exists in incremental mode") - } - if !existed { - return g.Error(err, "final table %s not found in incremental mode, please create table %s first", t.Config.Target.Object, t.Config.Target.Object) - } + t.Config.Target.Object = setSchema(cast.ToString(t.Config.Target.Data["schema"]), t.Config.Target.Object) targetTable, err := database.ParseTableName(t.Config.Target.Object, tgtConn.GetType()) if err != nil { return g.Error(err, "could not parse target table") } + existed, err := database.TableExists(tgtConn, targetTable.FullName()) + if err != nil { + return g.Error(err, "could not check if final table exists in incremental mode") + } + if !existed { + err = g.Error("final table %s not found in incremental mode, please create table %s first", t.Config.Target.Object, t.Config.Target.Object) + return err + } + if targetTable.Columns, err = tgtConn.GetSQLColumns(targetTable); err != nil { return g.Error(err, "could not get table columns, when write to timeplusd database in incremental mode, final table %s need created first", targetTable.FullName()) } @@ -638,7 +641,8 @@ func (t *TaskExecution) runProtonToProton(srcConn, tgtConn database.Connection) return g.Error(err, "could not check if final table exists in incremental mode") } if !existed { - return g.Error(err, "final table %s not found in incremental mode, please create table %s first", t.Config.Target.Object, t.Config.Target.Object) + err = g.Error("final table %s not found in incremental mode, please create table %s first", t.Config.Target.Object, t.Config.Target.Object) + return err } } diff --git a/core/sling/task_run_write.go b/core/sling/task_run_write.go index 1c151405..b85cc5a3 100644 --- a/core/sling/task_run_write.go +++ b/core/sling/task_run_write.go @@ -373,10 +373,12 @@ func (t *TaskExecution) writeDirectly(cfg *Config, df *iop.Dataflow, tgtConn dat if cfg.Target.Type == dbio.TypeDbProton && cfg.Mode == IncrementalMode { existed, err := database.TableExists(tgtConn, targetTable.FullName()) if err != nil { - return 0, g.Error(err, "could not check if final table exists in incremental mode") + err = g.Error(err, "could not check if final table exists in incremental mode") + return 0, err } if !existed { - return 0, g.Error(err, "final table %s not found in incremental mode, please create table %s first", targetTable.FullName(), targetTable.FullName()) + err = g.Error("final table %s not found in incremental mode, please create table %s first", targetTable.FullName(), targetTable.FullName()) + return 0, err } } @@ -447,20 +449,6 @@ func (t *TaskExecution) writeDirectly(cfg *Config, df *iop.Dataflow, tgtConn dat return 0, err } - // Validate data - tCnt, err := tgtConn.GetCount(targetTable.FullName()) - if err != nil { - err = g.Error(err, "could not get count from final table %s", targetTable.FullName()) - return 0, err - } - if cnt != tCnt { - err = g.Error(err, "inserted into final table but table count (%d) != stream count (%d). Records missing/mismatch. Aborting", tCnt, cnt) - return 0, err - } else if tCnt == 0 && len(sampleData.Rows) > 0 { - err = g.Error(err, "Loaded 0 records while sample data has %d records. Exiting.", len(sampleData.Rows)) - return 0, err - } - // Handle empty data case if cnt == 0 && !cast.ToBool(os.Getenv("SLING_ALLOW_EMPTY_TABLES")) && !cast.ToBool(os.Getenv("SLING_ALLOW_EMPTY")) { g.Warn("No data or records found in stream. Nothing to do. To allow Sling to create empty tables, set SLING_ALLOW_EMPTY=TRUE") @@ -482,6 +470,20 @@ func (t *TaskExecution) writeDirectly(cfg *Config, df *iop.Dataflow, tgtConn dat } } + // Validate data + tCnt, err := tgtConn.GetCount(targetTable.FullName()) + if err != nil { + err = g.Error(err, "could not get count from final table %s", targetTable.FullName()) + return 0, err + } else { + if cnt != tCnt { + g.Warn("inserted into final table but target table %s count (%d) != new inserted count (%d). Records missing/mismatch.", targetTable.FullName(), tCnt, cnt) + } else if tCnt == 0 && len(sampleData.Rows) > 0 { + err = g.Error("Loaded 0 records while sample data has %d records. Exiting.", len(sampleData.Rows)) + return 0, err + } + } + // Commit final transaction if err := tgtConn.Commit(); err != nil { err = g.Error(err, "could not commit final transaction")