-
Notifications
You must be signed in to change notification settings - Fork 0
/
aspects.go
67 lines (60 loc) · 1.58 KB
/
aspects.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package main
import (
"context"
"github.com/jackc/pgx/v4"
"os"
"sync"
)
func doAspectsBatch(data [][][]interface{}) {
batchQueryG := &pgx.Batch{}
batchQueryLoc := &pgx.Batch{}
batchQueryLeg := &pgx.Batch{}
batchQueryInv := &pgx.Batch{}
operationsCache, _ := cache.nomenclators.Load("operations_ref")
op := operationsCache.(map[int]int)
for _, datum := range data{
assetId := int(datum[2][1].(int64))
operationId := op[assetId]
if operationId <= 0{
debugger.debug("Invalid operation id ", operationId, " assetId ", assetId)
}
datum[1][8] = operationId
datum[3][10] = operationId
makeAspectBatch(datum, batchQueryG,batchQueryLoc,batchQueryLeg,batchQueryInv)
}
wg := &sync.WaitGroup{}
wg.Add(4)
go runAspectBatch(batchQueryG, wg)
go runAspectBatch(batchQueryLoc, wg)
go runAspectBatch(batchQueryLeg, wg)
go runAspectBatch(batchQueryInv, wg)
wg.Wait()
}
func makeAspectBatch(data [][]interface{}, batch ...*pgx.Batch) {
for i := 0; i < len(batch); i++ {
iIndex := i
currentBatch := batch[iIndex]
params := data[iIndex]
query := params[0].(string)
rst := params[1:]
currentBatch.Queue(query, rst...)
}
}
func runAspectBatch(batchQuery *pgx.Batch, wg *sync.WaitGroup) {
defer wg.Done()
conn , err := cache.Conn.Acquire(context.Background())
if err != nil {
debugger.error(err.Error())
os.Exit(2)
}
defer conn.Release()
batchLen := batchQuery.Len()
r := conn.SendBatch(context.Background(), batchQuery)
for i := 0; i < batchLen; i++ {
_, _err := r.Exec()
if _err != nil {
debugger.error("Error inserting aspects ", _err.Error())
os.Exit(1)
}
}
}