Skip to content

Commit

Permalink
fix: correctly export CorrelationID field (#87)
Browse files Browse the repository at this point in the history
* fix: correctly export CorrelationID field

* fix: build
  • Loading branch information
Reasno authored Mar 15, 2021
1 parent acd9797 commit dde8e94
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 31 deletions.
2 changes: 1 addition & 1 deletion dtx/sagas/in_process_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (i *InProcessStore) Log(ctx context.Context, log Log) error {
if i.transactions == nil {
i.transactions = make(map[string][]Log)
}
i.transactions[log.correlationID] = append(i.transactions[log.correlationID], log)
i.transactions[log.CorrelationID] = append(i.transactions[log.CorrelationID], log)
return nil
}

Expand Down
28 changes: 14 additions & 14 deletions dtx/sagas/in_process_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestInProcessStore_Ack(t *testing.T) {
"session without error",
Log{
ID: "1",
correlationID: "2",
CorrelationID: "2",
LogType: Session,
StartedAt: time.Now(),
},
Expand All @@ -34,44 +34,44 @@ func TestInProcessStore_Ack(t *testing.T) {
"session with error",
Log{
ID: "1",
correlationID: "2",
CorrelationID: "2",
LogType: Session,
StartedAt: time.Now(),
},
errors.New("foo"),
func(t *testing.T, log Log, s *InProcessStore) {
assert.Len(t, s.transactions, 1)
assert.Error(t, s.transactions[log.correlationID][0].StepError)
assert.Error(t, s.transactions[log.CorrelationID][0].StepError)
},
},
{
"do without error",
Log{
ID: "1",
correlationID: "2",
CorrelationID: "2",
LogType: Do,
StartedAt: time.Now(),
},
nil,
func(t *testing.T, log Log, s *InProcessStore) {
assert.Len(t, s.transactions, 1)
assert.False(t, s.transactions[log.correlationID][0].FinishedAt.IsZero())
assert.NoError(t, s.transactions[log.correlationID][0].StepError)
assert.False(t, s.transactions[log.CorrelationID][0].FinishedAt.IsZero())
assert.NoError(t, s.transactions[log.CorrelationID][0].StepError)
},
},
{
"do with error",
Log{
ID: "1",
correlationID: "2",
CorrelationID: "2",
LogType: Do,
StartedAt: time.Now(),
},
errors.New("foo"),
func(t *testing.T, log Log, s *InProcessStore) {
assert.Len(t, s.transactions, 1)
assert.False(t, s.transactions[log.correlationID][0].FinishedAt.IsZero())
assert.Error(t, s.transactions[log.correlationID][0].StepError)
assert.False(t, s.transactions[log.CorrelationID][0].FinishedAt.IsZero())
assert.Error(t, s.transactions[log.CorrelationID][0].StepError)
},
},
}
Expand All @@ -81,7 +81,7 @@ func TestInProcessStore_Ack(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
t.Parallel()
store := NewInProcessStore()
ctx := context.WithValue(context.Background(), dtx.CorrelationID, c.log.correlationID)
ctx := context.WithValue(context.Background(), dtx.CorrelationID, c.log.CorrelationID)
store.Log(ctx, c.log)
store.Ack(ctx, c.log.ID, c.err)
c.asserts(t, c.log, store)
Expand All @@ -94,13 +94,13 @@ func TestInProcessStore_UncommittedSteps(t *testing.T) {
ctx := context.WithValue(context.Background(), dtx.CorrelationID, "2")
store.Log(ctx, Log{
ID: "1",
correlationID: "2",
CorrelationID: "2",
StartedAt: time.Now(),
LogType: Session,
})
store.Log(ctx, Log{
ID: "2",
correlationID: "2",
CorrelationID: "2",
StartedAt: time.Now(),
LogType: Do,
})
Expand All @@ -110,7 +110,7 @@ func TestInProcessStore_UncommittedSteps(t *testing.T) {

store.Log(ctx, Log{
ID: "2",
correlationID: "2",
CorrelationID: "2",
StartedAt: time.Now(),
LogType: Undo,
})
Expand All @@ -129,7 +129,7 @@ func TestInProcessStore_UncommittedSagas(t *testing.T) {
store := NewInProcessStore()
store.transactions["test"] = []Log{{
ID: "1",
correlationID: "test",
CorrelationID: "test",
FinishedAt: time.Now(),
LogType: Session,
StepError: nil,
Expand Down
3 changes: 1 addition & 2 deletions dtx/sagas/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ const (
// Log is the structural Log type of the distributed saga.
type Log struct {
ID string
correlationID string
CorrelationID string
StartedAt time.Time
FinishedAt time.Time
LogType LogType
StepNumber int
StepParam interface{}
StepName string
StepError error
Expand Down
8 changes: 4 additions & 4 deletions dtx/sagas/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (r *Registry) StartTX(ctx context.Context) (*TX, context.Context) {
tx := &TX{
session: Log{
ID: xid.New().String(),
correlationID: cid,
CorrelationID: cid,
StartedAt: time.Now(),
LogType: Session,
},
Expand All @@ -94,7 +94,7 @@ func (r *Registry) AddStep(step *Step) func(context.Context, interface{}) (inter

compensateLog := Log{
ID: logID,
correlationID: tx.correlationID,
CorrelationID: tx.correlationID,
StartedAt: time.Now(),
LogType: Undo,
StepName: step.Name,
Expand Down Expand Up @@ -122,7 +122,7 @@ func (r *Registry) AddStep(step *Step) func(context.Context, interface{}) (inter
}
stepLog := Log{
ID: logID,
correlationID: tx.correlationID,
CorrelationID: tx.correlationID,
StartedAt: time.Now(),
LogType: Do,
StepName: step.Name,
Expand Down Expand Up @@ -155,7 +155,7 @@ func (r *Registry) Recover(ctx context.Context) {
continue
}
tx := TX{
correlationID: log.correlationID,
correlationID: log.CorrelationID,
store: r.Store,
}
ctx = context.WithValue(ctx, dtx.CorrelationID, tx.correlationID)
Expand Down
16 changes: 6 additions & 10 deletions dtx/sagas/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,14 @@ func TestRegistry_Recover(t *testing.T) {
store := NewInProcessStore()
store.transactions["test"] = []Log{{
ID: "0",
correlationID: "2",
CorrelationID: "2",
StartedAt: time.Now(),
LogType: Session,
StepNumber: 0,
}, {
ID: "1",
correlationID: "2",
CorrelationID: "2",
StartedAt: time.Now(),
FinishedAt: time.Time{},
StepNumber: 1,
LogType: Do,
StepError: nil,
}}
Expand All @@ -35,15 +33,14 @@ func TestRegistry_RecoverWithTimeout(t *testing.T) {
store := NewInProcessStore()
store.transactions["test"] = []Log{{
ID: "0",
correlationID: "2",
CorrelationID: "2",
StartedAt: time.Now(),
LogType: Session,
}, {
ID: "1",
correlationID: "2",
CorrelationID: "2",
StartedAt: time.Now(),
FinishedAt: time.Time{},
StepNumber: 0,
LogType: Do,
StepError: nil,
StepName: "foo",
Expand All @@ -67,15 +64,14 @@ func TestRegistry_RecoverSerialized(t *testing.T) {
store := NewInProcessStore()
store.transactions["test"] = []Log{{
ID: "0",
correlationID: "2",
CorrelationID: "2",
StartedAt: time.Now().Add(-time.Hour),
LogType: Session,
}, {
ID: "1",
correlationID: "2",
CorrelationID: "2",
StartedAt: time.Now().Add(-time.Hour),
FinishedAt: time.Time{},
StepNumber: 0,
LogType: Do,
StepError: nil,
StepName: "foo",
Expand Down

0 comments on commit dde8e94

Please sign in to comment.