From 5e96dbc75e4c8572a7b9d687b4a43449cbbde067 Mon Sep 17 00:00:00 2001 From: Fritz Larco Date: Wed, 23 Oct 2024 10:50:05 -0300 Subject: [PATCH] use CleanTableName for temp folder names --- core/dbio/database/database_bigquery.go | 2 +- core/dbio/database/database_duckdb.go | 2 +- core/dbio/database/database_duckdb_unix.go | 2 +- core/dbio/database/database_oracle.go | 6 +++--- core/dbio/database/database_snowflake.go | 4 ++-- core/dbio/database/database_sqlserver.go | 4 ++-- core/dbio/database/database_starrocks.go | 2 +- core/dbio/iop/datastream.go | 12 ++++-------- core/env/env.go | 4 ++++ 9 files changed, 19 insertions(+), 19 deletions(-) diff --git a/core/dbio/database/database_bigquery.go b/core/dbio/database/database_bigquery.go index c5c7cde6..a51a7134 100755 --- a/core/dbio/database/database_bigquery.go +++ b/core/dbio/database/database_bigquery.go @@ -573,7 +573,7 @@ func (conn *BigQueryConn) importViaLocalStorage(tableFName string, df *iop.Dataf return } - localPath := path.Join(env.GetTempFolder(), "bigquery", tableFName, g.NowFileStr()) + localPath := path.Join(env.GetTempFolder(), "bigquery", env.CleanTableName(tableFName), g.NowFileStr()) err = filesys.Delete(fs, localPath) if err != nil { return count, g.Error(err, "Could not Delete: "+localPath) diff --git a/core/dbio/database/database_duckdb.go b/core/dbio/database/database_duckdb.go index 94b9192c..99593f76 100644 --- a/core/dbio/database/database_duckdb.go +++ b/core/dbio/database/database_duckdb.go @@ -171,7 +171,7 @@ func (conn *DuckDbConn) importViaTempCSVs(tableFName string, df *iop.Dataflow) ( return } - folderPath := path.Join(env.GetTempFolder(), "duckdb", "import", g.NowFileStr()) + folderPath := path.Join(env.GetTempFolder(), "duckdb", "import", env.CleanTableName(tableFName), g.NowFileStr()) fileReadyChn := make(chan filesys.FileReady, 3) go func() { diff --git a/core/dbio/database/database_duckdb_unix.go b/core/dbio/database/database_duckdb_unix.go index e3829cfc..5ce4107c 100644 --- a/core/dbio/database/database_duckdb_unix.go +++ b/core/dbio/database/database_duckdb_unix.go @@ -32,7 +32,7 @@ func (conn *DuckDbConn) importViaNamedPipe(tableFName string, df *iop.Dataflow) } // Create a named pipe - folderPath := path.Join(env.GetTempFolder(), "duckdb", "import", g.NowFileStr()) + folderPath := path.Join(env.GetTempFolder(), "duckdb", "import", env.CleanTableName(tableFName), g.NowFileStr()) if err = os.MkdirAll(folderPath, 0755); err != nil { return 0, g.Error(err, "could not create temp folder: %s", folderPath) } diff --git a/core/dbio/database/database_oracle.go b/core/dbio/database/database_oracle.go index 18551b41..7cbd7a27 100755 --- a/core/dbio/database/database_oracle.go +++ b/core/dbio/database/database_oracle.go @@ -279,7 +279,7 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui } // write to ctlPath - ctlPath := path.Join(env.GetTempFolder(), g.NewTsID("oracle.data.sqlldr")+".ctl") + ctlPath := path.Join(env.GetTempFolder(), "oracle", env.CleanTableName(tableFName), g.NewTsID("oracle.data.sqlldr")+".ctl") ctlStr := g.R( conn.BaseConn.GetTemplateValue("core.sqlldr"), "table", tableFName, @@ -310,8 +310,8 @@ func (conn *OracleConn) SQLLoad(tableFName string, ds *iop.Datastream) (count ui postUpdates := cmap.New[int]() if runtime.GOOS == "windows" { - dataPath = path.Join(env.GetTempFolder(), g.NewTsID("oracle.data.temp")+".csv") - logPath = path.Join(env.GetTempFolder(), g.NewTsID("oracle.log.temp")) + dataPath = path.Join(env.GetTempFolder(), "oracle", env.CleanTableName(tableFName), g.NewTsID("oracle.data.temp")+".csv") + logPath = path.Join(env.GetTempFolder(), "oracle", env.CleanTableName(tableFName), g.NewTsID("oracle.log.temp")) file, err := os.Create(dataPath) if err != nil { diff --git a/core/dbio/database/database_snowflake.go b/core/dbio/database/database_snowflake.go index 97136203..4a7105c7 100755 --- a/core/dbio/database/database_snowflake.go +++ b/core/dbio/database/database_snowflake.go @@ -735,7 +735,7 @@ func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (co } // Write the ds to a temp file - folderPath := path.Join(env.GetTempFolder(), "snowflake", "put", tableFName, g.NowFileStr()) + folderPath := path.Join(env.GetTempFolder(), "snowflake", "put", env.CleanTableName(tableFName), g.NowFileStr()) // delete folder when done df.Defer(func() { env.RemoveAllLocalTempFile(folderPath) }) @@ -759,7 +759,7 @@ func (conn *SnowflakeConn) CopyViaStage(tableFName string, df *iop.Dataflow) (co }() // Import to staging - stageFolderPath := g.F("@%s.%s/%s/%s", conn.GetProp("schema"), conn.GetProp("internalStage"), tableFName, g.NowFileStr()) + stageFolderPath := g.F("@%s.%s/%s/%s", conn.GetProp("schema"), conn.GetProp("internalStage"), env.CleanTableName(tableFName), g.NowFileStr()) conn.Exec("USE SCHEMA " + conn.GetProp("schema")) _, err = conn.Exec("REMOVE " + stageFolderPath) if err != nil { diff --git a/core/dbio/database/database_sqlserver.go b/core/dbio/database/database_sqlserver.go index e1a31514..375450e8 100755 --- a/core/dbio/database/database_sqlserver.go +++ b/core/dbio/database/database_sqlserver.go @@ -334,7 +334,7 @@ func (conn *MsSQLServerConn) BcpImportFileParrallel(tableFName string, ds *iop.D // Write the ds to a temp file - filePath := path.Join(env.GetTempFolder(), g.NewTsID("sqlserver")+g.F("%d.csv", len(ds.Batches))) + filePath := path.Join(env.GetTempFolder(), "sqlserver", env.CleanTableName(tableFName), g.NewTsID("sqlserver")+g.F("%d.csv", len(ds.Batches))) csvRowCnt, err := writeCsvWithoutQuotes(filePath, batch, fileRowLimit) if err != nil { @@ -456,7 +456,7 @@ func (conn *MsSQLServerConn) BcpImportFile(tableFName, filePath string) (count u } errPath := "/dev/stderr" if runtime.GOOS == "windows" || true { - errPath = path.Join(env.GetTempFolder(), g.NewTsID("sqlserver")+".error") + errPath = path.Join(env.GetTempFolder(), "sqlserver", env.CleanTableName(tableFName), g.NewTsID("sqlserver")+".error") defer os.Remove(errPath) } diff --git a/core/dbio/database/database_starrocks.go b/core/dbio/database/database_starrocks.go index 7ea83728..e4187098 100755 --- a/core/dbio/database/database_starrocks.go +++ b/core/dbio/database/database_starrocks.go @@ -431,7 +431,7 @@ func (conn *StarRocksConn) StreamLoad(feURL, tableFName string, df *iop.Dataflow fs.SetProp("file_max_bytes", val) } - localPath := path.Join(env.GetTempFolder(), "starrocks", table.Schema, table.Name, g.NowFileStr()) + localPath := path.Join(env.GetTempFolder(), "starrocks", env.CleanTableName(tableFName), g.NowFileStr()) // TODO: use reader to fead HTTP directly. Need to get proper redirected URL first. // for ds := range df.StreamCh { diff --git a/core/dbio/iop/datastream.go b/core/dbio/iop/datastream.go index afaa2220..67397a57 100644 --- a/core/dbio/iop/datastream.go +++ b/core/dbio/iop/datastream.go @@ -1420,8 +1420,7 @@ func (ds *Datastream) ConsumeParquetReaderSeeker(reader *os.File) (err error) { // ConsumeParquetReader uses the provided reader to stream rows func (ds *Datastream) ConsumeParquetReader(reader io.Reader) (err error) { // need to write to temp file prior - tempDir := env.GetTempFolder() - parquetPath := path.Join(tempDir, g.NewTsID("parquet.temp")+".parquet") + parquetPath := path.Join(env.GetTempFolder(), g.NewTsID("parquet.temp")+".parquet") ds.Defer(func() { env.RemoveLocalTempFile(parquetPath) }) file, err := os.Create(parquetPath) @@ -1549,8 +1548,7 @@ func (ds *Datastream) ConsumeAvroReaderSeeker(reader io.ReadSeeker) (err error) // ConsumeAvroReader uses the provided reader to stream rows func (ds *Datastream) ConsumeAvroReader(reader io.Reader) (err error) { // need to write to temp file prior - tempDir := env.GetTempFolder() - avroPath := path.Join(tempDir, g.NewTsID("avro.temp")+".avro") + avroPath := path.Join(env.GetTempFolder(), g.NewTsID("avro.temp")+".avro") ds.Defer(func() { env.RemoveLocalTempFile(avroPath) }) file, err := os.Create(avroPath) @@ -1596,8 +1594,7 @@ func (ds *Datastream) ConsumeSASReaderSeeker(reader io.ReadSeeker) (err error) { // ConsumeSASReader uses the provided reader to stream rows func (ds *Datastream) ConsumeSASReader(reader io.Reader) (err error) { // need to write to temp file prior - tempDir := env.GetTempFolder() - sasPath := path.Join(tempDir, g.NewTsID("sas.temp")+".sas7bdat") + sasPath := path.Join(env.GetTempFolder(), g.NewTsID("sas.temp")+".sas7bdat") ds.Defer(func() { env.RemoveLocalTempFile(sasPath) }) file, err := os.Create(sasPath) @@ -1661,8 +1658,7 @@ func (ds *Datastream) ConsumeExcelReaderSeeker(reader io.ReadSeeker, props map[s // ConsumeSASReader uses the provided reader to stream rows func (ds *Datastream) ConsumeExcelReader(reader io.Reader, props map[string]string) (err error) { // need to write to temp file prior - tempDir := env.GetTempFolder() - excelPath := path.Join(tempDir, g.NewTsID("excel.temp")+".xlsx") + excelPath := path.Join(env.GetTempFolder(), g.NewTsID("excel.temp")+".xlsx") ds.Defer(func() { env.RemoveLocalTempFile(excelPath) }) file, err := os.Create(excelPath) diff --git a/core/env/env.go b/core/env/env.go index c20c4341..bb0c5d3f 100755 --- a/core/env/env.go +++ b/core/env/env.go @@ -260,6 +260,10 @@ func GetTempFolder() string { return CleanWindowsPath(tempDir) } +func CleanTableName(tableName string) string { + return strings.ToLower(strings.ReplaceAll(strings.ReplaceAll(tableName, `"`, ``), "`", "")) +} + func CleanWindowsPath(path string) string { return strings.ReplaceAll(path, `\`, `/`) }