Skip to content

Commit

Permalink
Merge pull request #387 from yokofly/protondb-enhance
Browse files Browse the repository at this point in the history
Protondb enhance
  • Loading branch information
flarco authored Sep 27, 2024
2 parents 93f0525 + 99c5c1d commit 7b21780
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cmd/sling/sling_conns.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func processConns(c *g.CliSC) (ok bool, err error) {
return ok, g.Error(err, "cannot parse query")
}

if len(database.ParseSQLMultiStatements(query)) == 1 && (!sQuery.IsQuery() || strings.Contains(strings.ToLower(query), "select") || g.In(conn.Connection.Type, dbio.TypeDbPrometheus, dbio.TypeDbMongoDB)) {
if len(database.ParseSQLMultiStatements(query)) == 1 && (!sQuery.IsQuery() || (strings.Contains(strings.ToLower(query), "select") && !strings.Contains(strings.ToLower(query), "insert")) || g.In(conn.Connection.Type, dbio.TypeDbPrometheus, dbio.TypeDbMongoDB)) {

data, err := dbConn.Query(sQuery.Select(100, 0))
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions core/dbio/database/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,13 +318,13 @@ func (da *DataAnalyzer) ProcessRelationsString() (err error) {
g.Info("processing string relations: OneToOne")
err = da.GetOneToOne(uniqueCols, true)
if err != nil {
return g.Error(err, "could not run GetOneToMany")
return g.Error(err, "could not run GetOneToOne")
}

g.Info("processing string relations: ManyToMany")
err = da.GetManyToMany(nonUniqueCols, true)
if err != nil {
return g.Error(err, "could not run GetOneToMany")
return g.Error(err, "could not run GetManyToMany")
}

return
Expand Down Expand Up @@ -378,13 +378,13 @@ func (da *DataAnalyzer) ProcessRelationsInteger() (err error) {
g.Info("processing integer relations: OneToOne")
err = da.GetOneToOne(uniqueCols, false)
if err != nil {
return g.Error(err, "could not run GetOneToMany")
return g.Error(err, "could not run GetOneToOne")
}

g.Info("processing integer relations: ManyToMany")
err = da.GetManyToMany(nonUniqueCols, false)
if err != nil {
return g.Error(err, "could not run GetOneToMany")
return g.Error(err, "could not run GetManyToMany")
}

return
Expand Down
50 changes: 41 additions & 9 deletions core/dbio/database/database_proton.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (conn *ProtonConn) NewTransaction(ctx context.Context, options ...*sql.TxOp
Tx := &BaseTransaction{Tx: tx, Conn: conn.Self(), context: context}
conn.tx = Tx

// CH does not support transactions at the moment
// ProtonDB does not support transactions at the moment
// Tx := &BlankTransaction{Conn: conn.Self(), context: &context}

return Tx, nil
Expand Down Expand Up @@ -267,6 +267,27 @@ func (conn *ProtonConn) BulkImportStream(tableFName string, ds *iop.Datastream)
return count, nil
}

// ExecContext runs a sql query with context, returns `error`
func (conn *ProtonConn) ExecContext(ctx context.Context, q string, args ...interface{}) (result sql.Result, err error) {
const maxRetries = 3
retries := 0

for {
result, err = conn.BaseConn.ExecContext(ctx, q, args...)
if err == nil {
return
}

retries++
if retries >= maxRetries {
return
}

g.Info("Error (%s). Sleep 5 sec and retry", err.Error())
time.Sleep(5 * time.Second)
}
}

// GenerateInsertStatement returns the proper INSERT statement
func (conn *ProtonConn) GenerateInsertStatement(tableName string, cols iop.Columns, numRows int) string {
fields := cols.Names()
Expand All @@ -290,7 +311,7 @@ func (conn *ProtonConn) GenerateInsertStatement(tableName string, cols iop.Colum
}

statement := g.R(
"insert into {table} ({fields}) values {values}",
"insert into {table} ({fields}) values {values}",
"table", tableName,
"fields", strings.Join(qFields, ", "),
"values", strings.TrimSuffix(valuesStr, ","),
Expand All @@ -307,14 +328,8 @@ func (conn *ProtonConn) GenerateUpsertSQL(srcTable string, tgtTable string, pkFi
return
}

// proton does not support upsert with delete
sqlTempl := `
ALTER STREAM {tgt_table}
DELETE where ({pk_fields}) in (
select {pk_fields}
from table({src_table}) src
)
;
insert into {tgt_table}
({insert_fields})
select {src_fields}
Expand Down Expand Up @@ -364,3 +379,20 @@ func (conn *ProtonConn) GetCount(tableFName string) (uint64, error) {
}
return cast.ToUint64(data.Rows[0][0]), nil
}

func (conn *ProtonConn) GetNativeType(col iop.Column) (nativeType string, err error) {
nativeType, err = conn.BaseConn.GetNativeType(col)

// remove nullable if part of pk
if col.IsKeyType(iop.PrimaryKey) && strings.HasPrefix(nativeType, "nullable(") {
nativeType = strings.TrimPrefix(nativeType, "nullable(")
nativeType = strings.TrimSuffix(nativeType, ")")
}

// special case for _tp_time, Column _tp_time is reserved, expected type is non-nullable datetime64
if col.Name == "_tp_time" {
return "datetime64(3, 'UTC')", nil
}

return nativeType, err
}
2 changes: 2 additions & 0 deletions core/dbio/database/schemata.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ func (t *Table) Select(limit, offset int, fields ...string) (sql string) {
} else {
sql = t.SQL
}
} else if t.Dialect == dbio.TypeDbProton {
sql = g.F("select %s from table(%s)", fieldsStr, t.FDQN())
} else {
sql = g.F("select %s from %s", fieldsStr, t.FDQN())
}
Expand Down
2 changes: 1 addition & 1 deletion core/dbio/database/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func InsertBatchStream(conn Connection, tx Transaction, tableFName string, ds *i
} else if conn.GetType() == dbio.TypeDbTrino {
row = processTrinoInsertRow(bColumns, row)
} else if conn.GetType() == dbio.TypeDbProton {
row = processClickhouseInsertRow(bColumns, row)
row = processProtonInsertRow(bColumns, row)
}
vals = append(vals, row...)
}
Expand Down

0 comments on commit 7b21780

Please sign in to comment.