-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* feat: add sagas * fix(saga): check of compensate function * fix(saga): check of compensate function * fix(saga): check of compensate function * fix(saga): check of compensate function * feat: add sagas * fix(ots3): missing trace in ots3 * feat(saga): checkpoint * feat(saga): rewrite saga * fix(saga): fix cases * fix(saga): fix cases * fix(clihttp): data race * fix(clihttp): data race * fix(clihttp): data race * fix(sagas): build failure * doc(sagas): add more docs
- Loading branch information
Showing
25 changed files
with
1,944 additions
and
47 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
package dtx | ||
|
||
type correlationIDType string | ||
|
||
// CorrelationID is an identifier to correlate transactions in context. | ||
const CorrelationID correlationIDType = "CorrelationID" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
/* | ||
Package dtx contains common utilities in the context of distributed transaction. | ||
Context Passing | ||
It is curial for all parties in the distributed transaction to share an | ||
transaction id. This package provides utility to pass this id across services. | ||
HTTPToContext() http.RequestFunc | ||
ContextToHTTP() http.RequestFunc | ||
GRPCToContext() grpc.ServerRequestFunc | ||
ContextToGRPC() grpc.ClientRequestFunc | ||
Idempotency | ||
Certain operations will be retried by the client more than once. A middleware is | ||
provided for the server to shield against repeated request in the same | ||
transaction. | ||
func MakeIdempotence(s Oncer) endpoint.Middleware | ||
Lock | ||
Certain resource in transaction cannot be concurrently accessed. A middleware is | ||
provided to lock such resources. | ||
func MakeLock(l Locker) endpoint.Middleware | ||
Allow Null Compensation and Prevent Resource Suspension | ||
Transaction participants may receive the compensation | ||
order before performing normal operations due to network exceptions. In this | ||
case, null compensation is required. | ||
If the forward operation arrives later than the compensating operation due to | ||
network exceptions, the forward operation must be discarded. Otherwise, resource | ||
suspension occurs. | ||
func MakeAttempt(s Sequencer) endpoint.Middleware | ||
func MakeCancel(s Sequencer) endpoint.Middleware | ||
*/ | ||
package dtx |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
package dtx | ||
|
||
import ( | ||
"context" | ||
|
||
"github.com/go-kit/kit/endpoint" | ||
"github.com/pkg/errors" | ||
) | ||
|
||
// ErrNonIdempotent is returned when an endpoint is requested more than once with the same CorrelationID. | ||
var ErrNonIdempotent = errors.New("rejected repeated request") | ||
|
||
// ErrNoLock is returned when the endpoint fail to fetch the distributed lock under the same CorrelationID. | ||
var ErrNoLock = errors.New("failed to grab lock") | ||
|
||
// Oncer should return true if the key has been observed before. | ||
type Oncer interface { | ||
Once(ctx context.Context, key string) bool | ||
} | ||
|
||
// MakeIdempotence returns a middleware that ensures the next endpoint can only be executed once per CorrelationID. | ||
func MakeIdempotence(s Oncer) endpoint.Middleware { | ||
return func(e endpoint.Endpoint) endpoint.Endpoint { | ||
return func(ctx context.Context, request interface{}) (response interface{}, err error) { | ||
correlationID, ok := ctx.Value(CorrelationID).(string) | ||
if !ok { | ||
return e(ctx, request) | ||
} | ||
if s.Once(ctx, correlationID) { | ||
return nil, ErrNonIdempotent | ||
} | ||
return e(ctx, request) | ||
} | ||
} | ||
} | ||
|
||
// Locker is an interface for the distributed lock. | ||
type Locker interface { | ||
// Lock should return true only when it successfully grabs the lock. | ||
Lock(ctx context.Context, key string) bool | ||
Unlock(ctx context.Context, key string) | ||
} | ||
|
||
// MakeLock returns a middleware that ensures the next endpoint is never concurrently accessed. | ||
func MakeLock(l Locker) endpoint.Middleware { | ||
return func(e endpoint.Endpoint) endpoint.Endpoint { | ||
return func(ctx context.Context, request interface{}) (response interface{}, err error) { | ||
correlationID, ok := ctx.Value(CorrelationID).(string) | ||
if !ok { | ||
return e(ctx, request) | ||
} | ||
if l.Lock(ctx, correlationID) { | ||
defer l.Unlock(ctx, correlationID) | ||
return e(ctx, request) | ||
} | ||
return nil, ErrNoLock | ||
} | ||
} | ||
} | ||
|
||
// Sequencer is an interface that shields against the disordering of | ||
// attempt and cancel in a transactional context. | ||
type Sequencer interface { | ||
MarkCancelledCheckAttempted(context.Context, string) bool | ||
MarkAttemptedCheckCancelled(context.Context, string) bool | ||
} | ||
|
||
// MakeAttempt returns a middleware that wraps around an attempt endpoint. If the | ||
// this segment of the distributed transaction is already cancelled, the next | ||
// endpoint will never be executed. | ||
// | ||
// If the forward operation arrives later than the compensating operation due to | ||
// network exceptions, the forward operation must be discarded. Otherwise, | ||
// resource suspension occurs. | ||
func MakeAttempt(s Sequencer) endpoint.Middleware { | ||
return func(e endpoint.Endpoint) endpoint.Endpoint { | ||
return func(ctx context.Context, request interface{}) (response interface{}, err error) { | ||
correlationID, ok := ctx.Value(CorrelationID).(string) | ||
if !ok { | ||
return e(ctx, request) | ||
} | ||
if s.MarkAttemptedCheckCancelled(ctx, correlationID) { | ||
return nil, nil | ||
} | ||
return e(ctx, request) | ||
} | ||
} | ||
} | ||
|
||
// MakeCancel returns a middleware that wraps around the cancellation endpoint. | ||
// It guarantees if this segment of the distributed transaction is never attempted, | ||
// the cancellation endpoint will not be executed. | ||
// | ||
// Transaction participants may receive the compensation order before performing | ||
// normal operations due to network exceptions. In this case, null compensation | ||
// is required. | ||
func MakeCancel(s Sequencer) endpoint.Middleware { | ||
return func(e endpoint.Endpoint) endpoint.Endpoint { | ||
return func(ctx context.Context, request interface{}) (response interface{}, err error) { | ||
correlationID, ok := ctx.Value(CorrelationID).(string) | ||
if !ok { | ||
return e(ctx, request) | ||
} | ||
if !s.MarkCancelledCheckAttempted(ctx, correlationID) { | ||
return nil, nil | ||
} | ||
return e(ctx, request) | ||
} | ||
} | ||
} |
Oops, something went wrong.