-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsetter.go
71 lines (62 loc) · 2.35 KB
/
setter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package dalgo2sql
import (
"context"
"fmt"
"github.com/dal-go/dalgo/dal"
)
func (dtb *database) Set(ctx context.Context, record dal.Record) error {
return setSingle(ctx, dtb.options, record, dtb.db.Query, dtb.db.ExecContext)
}
func (t transaction) Set(ctx context.Context, record dal.Record) error {
return setSingle(ctx, t.sqlOptions, record, t.tx.Query, t.tx.ExecContext)
}
func (dtb *database) SetMulti(ctx context.Context, records []dal.Record) error {
err := dtb.RunReadwriteTransaction(ctx, func(ctx context.Context, tx dal.ReadwriteTransaction) error {
return setMulti(ctx, dtb.options, records, dtb.db.Query, dtb.db.ExecContext)
})
return err
}
func (t transaction) SetMulti(ctx context.Context, records []dal.Record) error {
return setMulti(ctx, t.sqlOptions, records, t.tx.Query, t.tx.ExecContext)
}
func setSingle(ctx context.Context, options Options, record dal.Record, execQuery queryExecutor, exec statementExecutor) error {
key := record.Key()
exists, err := existsSingle(options, key, execQuery)
if err != nil {
return fmt.Errorf("failed to check if record exists: %w", err)
}
var o operation
if exists {
o = update
} else {
o = insert
}
qry := buildSingleRecordQuery(o, options, record)
if _, err := exec(ctx, qry.text, qry.args...); err != nil {
return err
}
return nil
}
func setMulti(ctx context.Context, options Options, records []dal.Record, execQuery queryExecutor, execStatement statementExecutor) error {
// TODO(help-wanted): insert of multiple rows at once as: "INSERT INTO table (colA, colB) VALUES (a1, b2), (a2, b2)"
for i, record := range records {
if err := setSingle(ctx, options, record, execQuery, execStatement); err != nil {
return fmt.Errorf("failed to set record #%d of %d: %w", i+1, len(records), err)
}
}
return nil
}
func existsSingle(options Options, key *dal.Key, execQuery queryExecutor) (bool, error) {
collection := key.Collection()
pk := options.PrimaryKeyFieldNames(key)
var where string
if len(pk) == 1 {
where = pk[0] + " = ?"
} else {
return false, fmt.Errorf("%w: composite primary keys are not suported yet", dal.ErrNotImplementedYet)
}
// `SELECT 1` is not supported by some SQL drivers so select 1st column from primary key
queryText := fmt.Sprintf("SELECT %s FROM %s WHERE %s", pk[0], collection, where)
rows, err := execQuery(queryText, key.ID)
return err == nil && rows.Next(), err
}