Skip to content

Commit

Permalink
use CleanTableName for temp folder names
Browse files Browse the repository at this point in the history
  • Loading branch information
flarco committed Oct 23, 2024
1 parent ba710fc commit 5e96dbc
Show file tree
Hide file tree
Showing 9 changed files with 19 additions and 19 deletions.
2 changes: 1 addition & 1 deletion core/dbio/database/database_bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func (conn *BigQueryConn) importViaLocalStorage(tableFName string, df *iop.Dataf
return
}

localPath := path.Join(env.GetTempFolder(), "bigquery", tableFName, g.NowFileStr())
localPath := path.Join(env.GetTempFolder(), "bigquery", env.CleanTableName(tableFName), g.NowFileStr())
err = filesys.Delete(fs, localPath)
if err != nil {
return count, g.Error(err, "Could not Delete: "+localPath)
Expand Down
2 changes: 1 addition & 1 deletion core/dbio/database/database_duckdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (conn *DuckDbConn) importViaTempCSVs(tableFName string, df *iop.Dataflow) (
return
}

folderPath := path.Join(env.GetTempFolder(), "duckdb", "import", g.NowFileStr())
folderPath := path.Join(env.GetTempFolder(), "duckdb", "import", env.CleanTableName(tableFName), g.NowFileStr())
fileReadyChn := make(chan filesys.FileReady, 3)

go func() {
Expand Down
2 changes: 1 addition & 1 deletion core/dbio/database/database_duckdb_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (conn *DuckDbConn) importViaNamedPipe(tableFName string, df *iop.Dataflow)
}

// Create a named pipe
folderPath := path.Join(env.GetTempFolder(), "duckdb", "import", g.NowFileStr())
folderPath := path.Join(env.GetTempFolder(), "duckdb", "import", env.CleanTableName(tableFName), g.NowFileStr())
if err = os.MkdirAll(folderPath, 0755); err != nil {
return 0, g.Error(err, "could not create temp folder: %s", folderPath)
}
Expand Down
6 changes: 3 additions & 3 deletions core/dbio/database/database_oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui
}

// write to ctlPath
ctlPath := path.Join(env.GetTempFolder(), g.NewTsID("oracle.data.sqlldr")+".ctl")
ctlPath := path.Join(env.GetTempFolder(), "oracle", env.CleanTableName(tableFName), g.NewTsID("oracle.data.sqlldr")+".ctl")
ctlStr := g.R(
conn.BaseConn.GetTemplateValue("core.sqlldr"),
"table", tableFName,
Expand Down Expand Up @@ -310,8 +310,8 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui
postUpdates := cmap.New[int]()

if runtime.GOOS == "windows" {
dataPath = path.Join(env.GetTempFolder(), g.NewTsID("oracle.data.temp")+".csv")
logPath = path.Join(env.GetTempFolder(), g.NewTsID("oracle.log.temp"))
dataPath = path.Join(env.GetTempFolder(), "oracle", env.CleanTableName(tableFName), g.NewTsID("oracle.data.temp")+".csv")
logPath = path.Join(env.GetTempFolder(), "oracle", env.CleanTableName(tableFName), g.NewTsID("oracle.log.temp"))

file, err := os.Create(dataPath)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions core/dbio/database/database_snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (co
}

// Write the ds to a temp file
folderPath := path.Join(env.GetTempFolder(), "snowflake", "put", tableFName, g.NowFileStr())
folderPath := path.Join(env.GetTempFolder(), "snowflake", "put", env.CleanTableName(tableFName), g.NowFileStr())

// delete folder when done
df.Defer(func() { env.RemoveAllLocalTempFile(folderPath) })
Expand All @@ -759,7 +759,7 @@ func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (co
}()

// Import to staging
stageFolderPath := g.F("@%s.%s/%s/%s", conn.GetProp("schema"), conn.GetProp("internalStage"), tableFName, g.NowFileStr())
stageFolderPath := g.F("@%s.%s/%s/%s", conn.GetProp("schema"), conn.GetProp("internalStage"), env.CleanTableName(tableFName), g.NowFileStr())
conn.Exec("USE SCHEMA " + conn.GetProp("schema"))
_, err = conn.Exec("REMOVE " + stageFolderPath)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions core/dbio/database/database_sqlserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (conn *MsSQLServerConn) BcpImportFileParrallel(tableFName string, ds *iop.D

// Write the ds to a temp file

filePath := path.Join(env.GetTempFolder(), g.NewTsID("sqlserver")+g.F("%d.csv", len(ds.Batches)))
filePath := path.Join(env.GetTempFolder(), "sqlserver", env.CleanTableName(tableFName), g.NewTsID("sqlserver")+g.F("%d.csv", len(ds.Batches)))
csvRowCnt, err := writeCsvWithoutQuotes(filePath, batch, fileRowLimit)

if err != nil {
Expand Down Expand Up @@ -456,7 +456,7 @@ func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count u
}
errPath := "/dev/stderr"
if runtime.GOOS == "windows" || true {
errPath = path.Join(env.GetTempFolder(), g.NewTsID("sqlserver")+".error")
errPath = path.Join(env.GetTempFolder(), "sqlserver", env.CleanTableName(tableFName), g.NewTsID("sqlserver")+".error")
defer os.Remove(errPath)
}

Expand Down
2 changes: 1 addition & 1 deletion core/dbio/database/database_starrocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,7 +431,7 @@ func (conn *StarRocksConn) StreamLoad(feURL, tableFName string, df *iop.Dataflow
fs.SetProp("file_max_bytes", val)
}

localPath := path.Join(env.GetTempFolder(), "starrocks", table.Schema, table.Name, g.NowFileStr())
localPath := path.Join(env.GetTempFolder(), "starrocks", env.CleanTableName(tableFName), g.NowFileStr())

// TODO: use reader to fead HTTP directly. Need to get proper redirected URL first.
// for ds := range df.StreamCh {
Expand Down
12 changes: 4 additions & 8 deletions core/dbio/iop/datastream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,8 +1420,7 @@ func (ds *Datastream) ConsumeParquetReaderSeeker(reader *os.File) (err error) {
// ConsumeParquetReader uses the provided reader to stream rows
func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error) {
// need to write to temp file prior
tempDir := env.GetTempFolder()
parquetPath := path.Join(tempDir, g.NewTsID("parquet.temp")+".parquet")
parquetPath := path.Join(env.GetTempFolder(), g.NewTsID("parquet.temp")+".parquet")
ds.Defer(func() { env.RemoveLocalTempFile(parquetPath) })

file, err := os.Create(parquetPath)
Expand Down Expand Up @@ -1549,8 +1548,7 @@ func (ds *Datastream) ConsumeAvroReaderSeeker(reader io.ReadSeeker) (err error)
// ConsumeAvroReader uses the provided reader to stream rows
func (ds *Datastream) ConsumeAvroReader(reader io.Reader) (err error) {
// need to write to temp file prior
tempDir := env.GetTempFolder()
avroPath := path.Join(tempDir, g.NewTsID("avro.temp")+".avro")
avroPath := path.Join(env.GetTempFolder(), g.NewTsID("avro.temp")+".avro")
ds.Defer(func() { env.RemoveLocalTempFile(avroPath) })

file, err := os.Create(avroPath)
Expand Down Expand Up @@ -1596,8 +1594,7 @@ func (ds *Datastream) ConsumeSASReaderSeeker(reader io.ReadSeeker) (err error) {
// ConsumeSASReader uses the provided reader to stream rows
func (ds *Datastream) ConsumeSASReader(reader io.Reader) (err error) {
// need to write to temp file prior
tempDir := env.GetTempFolder()
sasPath := path.Join(tempDir, g.NewTsID("sas.temp")+".sas7bdat")
sasPath := path.Join(env.GetTempFolder(), g.NewTsID("sas.temp")+".sas7bdat")
ds.Defer(func() { env.RemoveLocalTempFile(sasPath) })

file, err := os.Create(sasPath)
Expand Down Expand Up @@ -1661,8 +1658,7 @@ func (ds *Datastream) ConsumeExcelReaderSeeker(reader io.ReadSeeker, props map[s
// ConsumeSASReader uses the provided reader to stream rows
func (ds *Datastream) ConsumeExcelReader(reader io.Reader, props map[string]string) (err error) {
// need to write to temp file prior
tempDir := env.GetTempFolder()
excelPath := path.Join(tempDir, g.NewTsID("excel.temp")+".xlsx")
excelPath := path.Join(env.GetTempFolder(), g.NewTsID("excel.temp")+".xlsx")
ds.Defer(func() { env.RemoveLocalTempFile(excelPath) })

file, err := os.Create(excelPath)
Expand Down
4 changes: 4 additions & 0 deletions core/env/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,10 @@ func GetTempFolder() string {
return CleanWindowsPath(tempDir)
}

func CleanTableName(tableName string) string {
return strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(tableName, `"`, ``), "`", ""))
}

func CleanWindowsPath(path string) string {
return strings.ReplaceAll(path, `\`, `/`)
}
Expand Down

0 comments on commit 5e96dbc

Please sign in to comment.