Skip to content

Commit

Permalink
try to use csv as intermediate layout (#27)
Browse files Browse the repository at this point in the history
todo:
1. handle readfromdb fail
2. direct insert into proton instead temp table
3. for 2, if fail we can idempotent insert retry, if continue retry fail, we need drop csv and retry.
  • Loading branch information
yokofly authored Oct 17, 2024
1 parent 10e99b4 commit 49d78fa
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 0 deletions.
142 changes: 142 additions & 0 deletions core/sling/task_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ import (

"github.com/flarco/g"
"github.com/slingdata-io/sling-cli/core/dbio"
"github.com/slingdata-io/sling-cli/core/dbio/connection"
"github.com/slingdata-io/sling-cli/core/dbio/database"
"github.com/slingdata-io/sling-cli/core/dbio/filesys"
"github.com/slingdata-io/sling-cli/core/env"
"github.com/spf13/cast"
)
Expand Down Expand Up @@ -527,6 +529,10 @@ func (t *TaskExecution) runDbToDb() (err error) {
}
}

if srcConn.GetType() == dbio.TypeDbProton && tgtConn.GetType() == dbio.TypeDbProton {
return t.runProtonToProton(srcConn, tgtConn)
}

t.SetProgress("reading from source database")
t.df, err = t.ReadFromDB(t.Config, srcConn)
if err != nil {
Expand Down Expand Up @@ -564,3 +570,139 @@ func (t *TaskExecution) runDbToDb() (err error) {
}
return
}

func (t *TaskExecution) createIntermediateConfig() *Config {
intermediateConfig := *t.Config // Create a copy of the original config

// Set up the intermediate file
tempFile, err := os.CreateTemp("", "proton_transfer_*.csv")
if err != nil {
g.Error(err, "Could not create temporary file")
return nil
}

intermediateConfig.Target = Target{
Conn: "LOCAL",
Type: "file",
Object: tempFile.Name(),
Options: &TargetOptions{
Format: dbio.FileTypeCsv,
Delimiter: "~", // Use ~ as delimiter for safety
},
Data: map[string]interface{}{
"type": "file",
"url": "file://" + tempFile.Name(),
},
}

intermediateConfig.TgtConn = connection.Connection{
Name: "LOCAL",
Type: "file",
Data: map[string]interface{}{
"type": "file",
"url": "file://" + tempFile.Name(),
},
File: &filesys.LocalFileSysClient{},
}

return &intermediateConfig
}

func (t *TaskExecution) runProtonToProton(srcConn, tgtConn database.Connection) (err error) {
start := time.Now()
maxRetries := 3
retryDelay := time.Second * 5

// Step 1: Create intermediate config
t.SetProgress("Creating intermediate configuration")
intermediateConfig := t.createIntermediateConfig()
if intermediateConfig == nil {
err = g.Error("Failed to create intermediate config")
return
}
defer os.Remove(intermediateConfig.Target.Object) // Clean up the temp file

// Step 2: Run DbToFile with retries
t.SetProgress("Exporting data from source Proton database")
// t.PBar.Start()
originalConfig := t.Config
t.Config = intermediateConfig
err = retry(maxRetries, retryDelay, func() error {
return t.runDbToFile()
})
t.Config = originalConfig // Restore original config
if err != nil {
err = g.Error(err, "Failed to export data from source Proton database")
return
}

// Check if the file is empty
t.SetProgress("Checking exported data")
fileInfo, err := os.Stat(intermediateConfig.Target.Object)
if err != nil {
err = g.Error(err, "Failed to get file info for intermediate file")
return
}
if fileInfo.Size() == 0 {
t.SetProgress("No new data to transfer")
return nil
}

// Step 3: Run FileToDB with retries
t.SetProgress("Preparing to import data to target Proton database")
originalSource := t.Config.Source
originalSrcConn := t.Config.SrcConn

csvFormat := dbio.FileTypeCsv
delimiter := "~"
header := true

t.Config.Source = Source{
Conn: "LOCAL",
Type: "file",
Stream: intermediateConfig.Target.Object,
Options: &SourceOptions{
Format: &csvFormat,
Delimiter: delimiter,
Header: &header,
},
Data: intermediateConfig.Target.Data,
}
t.Config.SrcConn = intermediateConfig.TgtConn

t.SetProgress("Importing data to target Proton database")
err = retry(maxRetries, retryDelay, func() error {
return t.runFileToDB()
})

// Restore original config
t.Config.Source = originalSource
t.Config.SrcConn = originalSrcConn
if err != nil {
err = g.Error(err, "Failed to import data to target Proton database after %d retries", maxRetries)
return
}

elapsed := int(time.Since(start).Seconds())
cnt := t.df.Count()
t.SetProgress("Transferred %d rows between Proton databases in %d secs [%s r/s]", cnt, elapsed, getRate(cnt))

return nil
}

func retry(attempts int, sleep time.Duration, f func() error) (err error) {
for i := 0; ; i++ {
err = f()
if err == nil {
return
}

if i >= (attempts - 1) {
break
}

time.Sleep(sleep)
g.Debug("Retrying after error: %v", err)
}
return err
}
5 changes: 5 additions & 0 deletions core/sling/task_run_write.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ func (t *TaskExecution) WriteToDb(cfg *Config, df *iop.Dataflow, tgtConn databas
return
}
cfg.Target.Options.TableDDL = g.String(tableTmp.DDL)
defer func() {
// reset table ddl (when retrying)
cfg.Target.Options.TableDDL = nil
}()

cfg.Target.TmpTableCreated = true
df.Columns = sampleData.Columns
setStage("4 - load-into-temp")
Expand Down

0 comments on commit 49d78fa

Please sign in to comment.