Skip to content

Commit

Permalink
添加累加器
Browse files Browse the repository at this point in the history
  • Loading branch information
zgwit committed Jul 13, 2024
1 parent 43b640a commit 8205dd8
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 42 deletions.
134 changes: 134 additions & 0 deletions accumulate/accumulate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package accumulate

import (
"context"
"github.com/PaesslerAG/gval"
"github.com/god-jason/bucket/pkg/calc"
"strings"
)

type Result struct {
Target string
Filter map[string]any
Document map[string]any
}

type Field struct {
Key string
Value any

_key gval.Evaluable
_value gval.Evaluable
}

type Accumulation struct {
Target string `json:"target"`
Filter map[string]any `json:"filter"`
Document map[string]any `json:"document"`

_target gval.Evaluable
_filter map[string]gval.Evaluable
_fields []*Field
}

func (a *Accumulation) Init() (err error) {
if expr, has := strings.CutPrefix(a.Target, "="); has {
a._target, err = calc.New(expr)
if err != nil {
return err
}
}

a._filter = make(map[string]gval.Evaluable)
for key, value := range a.Filter {
switch val := value.(type) {
case string:
if expr, has := strings.CutPrefix(val, "="); has {
a._filter[key], err = calc.New(expr)
if err != nil {
return err
}
}
default:
}
}

for key, value := range a.Document {

f := &Field{Key: key, Value: value}

//键
if expr, has := strings.CutPrefix(key, "="); has {
f._key, err = calc.New(expr)
if err != nil {
return err
}
}

//值
switch val := value.(type) {
case string:
if expr, has := strings.CutPrefix(val, "="); has {
f._value, err = calc.New(expr)
if err != nil {
return err
}
}
}

a._fields = append(a._fields, f)
}

return nil
}

func (a *Accumulation) Evaluate(args any) (result *Result, err error) {
var ret Result

//目标
if a._target != nil {
ret.Target, err = a._target.EvalString(context.Background(), args)
if err != nil {
return
}
} else {
ret.Target = a.Target
}

//过滤器
ret.Filter = make(map[string]any)
for key, value := range a._filter {
if value != nil {
ret.Filter[key], err = a._target(context.Background(), args)
if err != nil {
return
}
} else {
ret.Filter[key] = a.Filter[key]
}
}

ret.Document = make(map[string]any)

for _, f := range a._fields {
key := f.Key
if f._key != nil {
key, err = f._key.EvalString(context.Background(), args)
if err != nil {
return
}
}

val := f.Value
if f._value != nil {
val, err = f._value.EvalString(context.Background(), args)
if err != nil {
return
}
}

ret.Document[key] = val
}

return &ret, nil
}
5 changes: 3 additions & 2 deletions db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,12 @@ func UpdateOne(tab string, filter any, update any, upsert bool) (int64, error) {
return ret.ModifiedCount, nil
}

func UpdateMany(tab string, filter any, update any) (int64, error) {
func UpdateMany(tab string, filter any, update any, upsert bool) (int64, error) {
if db == nil {
return 0, ErrDisconnect
}
ret, err := db.Collection(tab).UpdateMany(context.Background(), filter, update)
opts := options.Update().SetUpsert(upsert)
ret, err := db.Collection(tab).UpdateMany(context.Background(), filter, update, opts)
if err != nil {
return 0, exception.Wrap(err)
}
Expand Down
149 changes: 109 additions & 40 deletions table/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package table
import (
"errors"
"github.com/dop251/goja"
"github.com/god-jason/bucket/accumulate"
"github.com/god-jason/bucket/db"
"github.com/god-jason/bucket/log"
"github.com/god-jason/bucket/pkg/exception"
"github.com/god-jason/bucket/pkg/javascript"
"github.com/santhosh-tekuri/jsonschema/v6"
Expand All @@ -18,12 +20,17 @@ type Table struct {
Name string `json:"name,omitempty"`
Fields []*Field `json:"fields,omitempty"`

//Json Schema
Schema string `json:"schema,omitempty"`
schema *jsonschema.Schema

//脚本
Scripts map[string]string `json:"scripts,omitempty"`
scripts map[string]*goja.Program

//累加器
Accumulations []*accumulate.Accumulation `json:"accumulations,omitempty"`

TimeSeries *options.TimeSeriesOptions `json:"-"` //时间序列参数
Hook *Hook `json:"-"`
}
Expand All @@ -47,6 +54,14 @@ func (t *Table) init() (err error) {
}
}

//初始化累加器
for _, a := range t.Accumulations {
err = a.Init()
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -124,26 +139,38 @@ func (t *Table) Insert(doc any) (id string, err error) {
}
}

//累加器
for _, a := range t.Accumulations {
ret, err := a.Evaluate(doc)
if err != nil {
log.Error(err)
continue
}
_, err = db.UpdateMany(ret.Target, ret.Filter, bson.M{"$inc": ret.Document}, true)
if err != nil {
log.Error(err)
}
}

return ret.Hex(), nil
}

func (t *Table) Import(docs []any) (ids []string, err error) {

//没有hook,则直接InsertMany
if t.Hook == nil || t.Hook.BeforeInsert == nil && t.Hook.AfterInsert == nil {
if _, ok := t.scripts["before.insert"]; !ok {
if _, ok := t.scripts["after.insert"]; !ok {
oids, err := db.InsertMany(t.Name, docs)
if err != nil {
return nil, err
}
for _, id := range oids {
ids = append(ids, id.Hex())
}
return ids, nil
}
}
}
//if t.Hook == nil || t.Hook.BeforeInsert == nil && t.Hook.AfterInsert == nil {
// if _, ok := t.scripts["before.insert"]; !ok {
// if _, ok := t.scripts["after.insert"]; !ok {
// oids, err := db.InsertMany(t.Name, docs)
// if err != nil {
// return nil, err
// }
// for _, id := range oids {
// ids = append(ids, id.Hex())
// }
// return ids, nil
// }
// }
//}

//依次插入
for _, doc := range docs {
Expand All @@ -157,26 +184,25 @@ func (t *Table) Import(docs []any) (ids []string, err error) {
}

func (t *Table) ImportDocument(docs []db.Document) (ids []string, err error) {

//没有hook,则直接InsertMany
if t.Hook == nil || t.Hook.BeforeInsert == nil && t.Hook.AfterInsert == nil {
if _, ok := t.scripts["before.insert"]; !ok {
if _, ok := t.scripts["after.insert"]; !ok {
ds := make([]any, 0, len(docs))
for _, doc := range docs {
ds = append(ds, doc)
}
oids, err := db.InsertMany(t.Name, ds)
if err != nil {
return nil, err
}
for _, id := range oids {
ids = append(ids, id.Hex())
}
return ids, nil
}
}
}
//if t.Hook == nil || t.Hook.BeforeInsert == nil && t.Hook.AfterInsert == nil {
// if _, ok := t.scripts["before.insert"]; !ok {
// if _, ok := t.scripts["after.insert"]; !ok {
// ds := make([]any, 0, len(docs))
// for _, doc := range docs {
// ds = append(ds, doc)
// }
// oids, err := db.InsertMany(t.Name, ds)
// if err != nil {
// return nil, err
// }
// for _, id := range oids {
// ids = append(ids, id.Hex())
// }
// return ids, nil
// }
// }
//}

//依次插入
for _, doc := range docs {
Expand Down Expand Up @@ -243,6 +269,19 @@ func (t *Table) Delete(id string) error {
}
}

//累加器
for _, a := range t.Accumulations {
ret, err := a.Evaluate(result)
if err != nil {
log.Error(err)
continue
}
_, err = db.UpdateMany(ret.Target, ret.Filter, bson.M{"$dec": ret.Document}, false)
if err != nil {
log.Error(err)
}
}

return err
}

Expand Down Expand Up @@ -283,8 +322,8 @@ func (t *Table) Update(id string, update any) error {
}
db.ParseDocumentObjectId(update)

var result db.Document
has, err := db.FindOneAndUpdate(t.Name, bson.D{{"_id", oid}}, bson.D{{"$set", update}}, &result)
var base db.Document
has, err := db.FindOneAndUpdate(t.Name, bson.D{{"_id", oid}}, bson.D{{"$set", update}}, &base)
if err != nil {
return err
}
Expand All @@ -293,15 +332,15 @@ func (t *Table) Update(id string, update any) error {
}

//把差异保存到修改历史表
_, _ = db.InsertOne(t.Name+".change", bson.M{"object_id": oid, "base": result, "change": update})
_, _ = db.InsertOne(t.Name+".change", bson.M{"object_id": oid, "base": base, "change": update})

//转换—_id
db.StringifyDocumentObjectId(update)
db.StringifyDocumentObjectId(result)
db.StringifyDocumentObjectId(base)

//after update
if t.Hook != nil && t.Hook.AfterUpdate != nil {
err := t.Hook.AfterUpdate(id, update, result)
err := t.Hook.AfterUpdate(id, update, base)
if err != nil {
return exception.Wrap(err)
}
Expand All @@ -310,13 +349,43 @@ func (t *Table) Update(id string, update any) error {
vm := javascript.Runtime()
_ = vm.Set("_id", id)
_ = vm.Set("change", update)
_ = vm.Set("base", result)
_ = vm.Set("base", base)
_, err = vm.RunProgram(hook)
if err != nil {
return exception.Wrap(err)
}
}

//累加器,先减,再加
for _, a := range t.Accumulations {
ret, err := a.Evaluate(base)
if err != nil {
log.Error(err)
continue
}
_, err = db.UpdateMany(ret.Target, ret.Filter, bson.M{"$dec": ret.Document}, true)
if err != nil {
log.Error(err)
}
}
//补充字段,base已经被污染
if u, ok := update.(map[string]any); ok {
for k, v := range u {
base[k] = v
}
}
for _, a := range t.Accumulations {
ret, err := a.Evaluate(update)
if err != nil {
log.Error(err)
continue
}
_, err = db.UpdateMany(ret.Target, ret.Filter, bson.M{"$inc": ret.Document}, true)
if err != nil {
log.Error(err)
}
}

return err
}

Expand Down

0 comments on commit 8205dd8

Please sign in to comment.