This repository has been archived by the owner on Apr 30, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 56
/
Copy pathcoordinator.go
123 lines (111 loc) · 4.07 KB
/
coordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package saga
import (
"fmt"
"github.com/juju/errors"
"golang.org/x/net/context"
"reflect"
"strconv"
)
// DefaultSEC is default SEC use by package method
var DefaultSEC ExecutionCoordinator = NewSEC()
// ExecutionCoordinator presents Saga Execution Coordinator.
// It manages:
// - Saga log storage.
// - Sub-transaction definition with it's parameter info.
type ExecutionCoordinator struct {
subTxDefinitions subTxDefinitions
paramTypeRegister *paramTypeRegister
}
// NewSEC creates Saga Execution Coordinator
// This method require supply a log Storage to save & lookup log during tx execute.
func NewSEC() ExecutionCoordinator {
return ExecutionCoordinator{
subTxDefinitions: make(subTxDefinitions),
paramTypeRegister: ¶mTypeRegister{
nameToType: make(map[string]reflect.Type),
typeToName: make(map[reflect.Type]string),
},
}
}
// AddSubTxDef create & add definition base on given subTxID, action and compensate, and return current SEC.
//
// This execute as Default SEC.
// subTxID identifies a sub-transaction type, it also be use to persist into saga-log and be lookup for retry
// action defines the action that sub-transaction will execute.
// compensate defines the compensate that sub-transaction will execute when sage aborted.
//
// action and compensate MUST a function that context.Context as first argument.
func AddSubTxDef(subTxID string, action interface{}, compensate interface{}) *ExecutionCoordinator {
return DefaultSEC.AddSubTxDef(subTxID, action, compensate)
}
// AddSubTxDef create & add definition base on given subTxID, action and compensate, and return current SEC.
//
// subTxID identifies a sub-transaction type, it also be use to persist into saga-log and be lookup for retry
// action defines the action that sub-transaction will execute.
// compensate defines the compensate that sub-transaction will execute when sage aborted.
//
// action and compensate MUST a function that context.Context as first argument.
func (e *ExecutionCoordinator) AddSubTxDef(subTxID string, action interface{}, compensate interface{}) *ExecutionCoordinator {
e.paramTypeRegister.addParams(action)
e.paramTypeRegister.addParams(compensate)
e.subTxDefinitions.addDefinition(subTxID, action, compensate)
return e
}
// MustFindSubTxDef returns sub transaction definition by given subTxID.
// Panic if not found sub-transaction.
func (e *ExecutionCoordinator) MustFindSubTxDef(subTxID string) subTxDefinition {
define, ok := e.subTxDefinitions.findDefinition(subTxID)
if !ok {
panic("SubTxID: " + subTxID + " not found in context")
}
return define
}
// MustFindParamName return param name by given reflect type.
// Panic if param name not found.
func (e *ExecutionCoordinator) MustFindParamName(typ reflect.Type) string {
name, ok := e.paramTypeRegister.findTypeName(typ)
if !ok {
panic("Find Param Name Panic: " + typ.String())
}
return name
}
// MustFindParamType return param type by given name.
// Panic if param type not found.
func (e *ExecutionCoordinator) MustFindParamType(name string) reflect.Type {
typ, ok := e.paramTypeRegister.findType(name)
if !ok {
panic("Find Param Type Panic: " + name)
}
return typ
}
func (e *ExecutionCoordinator) StartCoordinator() error {
logIDs, err := LogStorage().LogIDs()
if err != nil {
return errors.Annotate(err, "Fetch logs failure")
}
for _, logID := range logIDs {
lastLogData, err := LogStorage().LastLog(logID)
if err != nil {
return errors.Annotate(err, "Fetch last log panic")
}
fmt.Println(lastLogData)
}
return nil
}
// StartSaga start a new saga, returns the saga was started in Default SEC.
// This method need execute context and UNIQUE id to identify saga instance.
func StartSaga(ctx context.Context, id uint64) *Saga {
return DefaultSEC.StartSaga(ctx, id)
}
// StartSaga start a new saga, returns the saga was started.
// This method need execute context and UNIQUE id to identify saga instance.
func (e *ExecutionCoordinator) StartSaga(ctx context.Context, id uint64) *Saga {
s := &Saga{
id: id,
context: ctx,
sec: e,
logID: LogPrefix + strconv.FormatInt(int64(id), 10),
}
s.startSaga()
return s
}