Skip to content

Commit

Permalink
Merge pull request #18 from jdextraze/transaction
Browse files Browse the repository at this point in the history
Transaction
  • Loading branch information
jdextraze authored Nov 6, 2017
2 parents 2262bd3 + ef50b3b commit c6773f6
Show file tree
Hide file tree
Showing 9 changed files with 532 additions and 4 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ MIT. See [LICENSE](https://github.com/jdextraze/go-gesclient/blob/master/LICENSE
* Global authentication
* Get/Set stream metadata
* Set system settings
* Transaction

## TODO

* Complete unit and integration tests
* Benchmarks
* Transaction
* Projections Management

## External tools

Expand Down
7 changes: 4 additions & 3 deletions client/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ type Connection interface {
AppendToStreamAsync(stream string, expectedVersion int, events []*EventData, userCredentials *UserCredentials) (
*tasks.Task, error)

//StartTransactionAsync(stream string, expectedVersion int, userCredentials *UserCredentials) (
// <-chan Transaction, error)
// Task.Result() returns *client.Transaction
StartTransactionAsync(stream string, expectedVersion int, userCredentials *UserCredentials) (
*tasks.Task, error)

//ContinueTransaction(transactionId int64, userCredentials *UserCredentials) (Transaction, error)
ContinueTransaction(transactionId int64, userCredentials *UserCredentials) *Transaction

// Task.Result() returns *client.EventReadResult
ReadEventAsync(stream string, eventNumber int, resolveTos bool, userCredentials *UserCredentials) (
Expand Down
1 change: 1 addition & 0 deletions client/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
)

// TODO review those
var (
WrongExpectedVersion = errors.New("Wrong expected version")
StreamDeleted = errors.New("Stream deleted")
Expand Down
72 changes: 72 additions & 0 deletions client/transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package client

import (
"errors"
"github.com/jdextraze/go-gesclient/tasks"
)

type Transaction struct {
transactionId int64
userCredentials *UserCredentials
connection TransactionConnection
isRolledBack bool
isCommitted bool
}

type TransactionConnection interface {
TransactionalWriteAsync(*Transaction, []*EventData, *UserCredentials) (*tasks.Task, error)
CommitTransactionAsync(*Transaction, *UserCredentials) (*tasks.Task, error)
}

var (
CannotCommitRolledBackTransaction = errors.New("cannot commit a rolled back transaction")
TransactionIsAlreadyCommitted = errors.New("transaction is already committed")
)

func NewTransaction(
transactionId int64,
userCredentials *UserCredentials,
connection TransactionConnection,
) *Transaction {
if transactionId < 0 {
panic("transactionId must be positive")
}

return &Transaction{
transactionId: transactionId,
userCredentials: userCredentials,
connection: connection,
}
}

func (t *Transaction) TransactionId() int64 { return t.transactionId }

// Task.Result() returns *client.WriteResult
func (t *Transaction) CommitAsync() (*tasks.Task, error) {
if t.isRolledBack {
return nil, CannotCommitRolledBackTransaction
}
if t.isCommitted {
return nil, TransactionIsAlreadyCommitted
}
t.isCommitted = true
return t.connection.CommitTransactionAsync(t, t.userCredentials)
}

func (t *Transaction) WriteAsync(events []*EventData) (*tasks.Task, error) {
if t.isRolledBack {
return nil, CannotCommitRolledBackTransaction
}
if t.isCommitted {
return nil, TransactionIsAlreadyCommitted
}
return t.connection.TransactionalWriteAsync(t, events, t.userCredentials)
}

func (t *Transaction) Rollback() error {
if t.isCommitted {
return TransactionIsAlreadyCommitted
}
t.isRolledBack = true
return nil
}
145 changes: 145 additions & 0 deletions examples/transaction/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package main

import (
"encoding/json"
"flag"
"github.com/jdextraze/go-gesclient"
"github.com/jdextraze/go-gesclient/client"
"github.com/satori/go.uuid"
"log"
"net"
"net/url"
"strings"
)

func main() {
var debug bool
var addr string
var stream string
var verbose bool
var expectedVersion int
var transactionId int64

flag.BoolVar(&debug, "debug", false, "Debug")
flag.StringVar(&addr, "endpoint", "tcp://127.0.0.1:1113", "EventStore address")
flag.StringVar(&stream, "stream", "Default", "Stream ID")
flag.BoolVar(&verbose, "verbose", false, "Verbose logging (Requires debug)")
flag.IntVar(&expectedVersion, "expected-version", client.ExpectedVersion_Any, "expected version")
flag.Int64Var(&transactionId, "continue", -1, "Continue transaction with id")
flag.Parse()

if debug {
gesclient.Debug()
}

c := getConnection(addr, verbose)
if err := c.ConnectAsync().Wait(); err != nil {
log.Fatalf("Error connecting: %v", err)
}

var t *client.Transaction
if transactionId < 0 {
task, err := c.StartTransactionAsync(stream, expectedVersion, nil)
if err != nil {
log.Fatalf("Failed starting an async transaction: %v", err)
} else if err := task.Error(); err != nil {
log.Fatalf("Failed waiting to start an async transaction: %v", err)
} else {
t = task.Result().(*client.Transaction)
}
} else {
t = c.ContinueTransaction(transactionId, nil)
}

switch flag.Arg(0) {
case "write":
write(t)
case "commit":
commit(t)
case "rollback":
rollback(t)
default:
log.Fatalf("Unknown action. Use write, commit or rollback.")
}
}

func getConnection(addr string, verbose bool) client.Connection {
settingsBuilder := client.CreateConnectionSettings()

var uri *url.URL
var err error
if !strings.Contains(addr, "://") {
gossipSeeds := strings.Split(addr, ",")
endpoints := make([]*net.TCPAddr, len(gossipSeeds))
for i, gossipSeed := range gossipSeeds {
endpoints[i], err = net.ResolveTCPAddr("tcp", gossipSeed)
if err != nil {
log.Fatalf("Error resolving: %v", gossipSeed)
}
}
settingsBuilder.SetGossipSeedEndPoints(endpoints)
} else {
uri, err = url.Parse(addr)
if err != nil {
log.Fatalf("Error parsing address: %v", err)
}

if uri.User != nil {
username := uri.User.Username()
password, _ := uri.User.Password()
settingsBuilder.SetDefaultUserCredentials(client.NewUserCredentials(username, password))
}
}

if verbose {
settingsBuilder.EnableVerboseLogging()
}

c, err := gesclient.Create(settingsBuilder.Build(), uri, "AllCatchUpSubscriber")
if err != nil {
log.Fatalf("Error creating connection: %v", err)
}

c.Connected().Add(func(evt client.Event) error { log.Printf("Connected: %+v", evt); return nil })
c.Disconnected().Add(func(evt client.Event) error { log.Printf("Disconnected: %+v", evt); return nil })
c.Reconnecting().Add(func(evt client.Event) error { log.Printf("Reconnecting: %+v", evt); return nil })
c.Closed().Add(func(evt client.Event) error { log.Fatalf("Connection closed: %+v", evt); return nil })
c.ErrorOccurred().Add(func(evt client.Event) error { log.Printf("Error: %+v", evt); return nil })
c.AuthenticationFailed().Add(func(evt client.Event) error { log.Printf("Auth failed: %+v", evt); return nil })

return c
}

type TestEvent struct{}

func write(t *client.Transaction) {
log.Printf("Writing to transaction #%d", t.TransactionId())
data, _ := json.Marshal(&TestEvent{})
evt := client.NewEventData(uuid.NewV4(), "TestEvent", true, data, nil)
task, err := t.WriteAsync([]*client.EventData{evt})
if err != nil {
log.Printf("Error occured while writing to transaction: %v", err)
} else if err := task.Error(); err != nil {
log.Printf("Error occured while waiting for result of writing to transaction: %v", err)
}
}

func commit(t *client.Transaction) {
log.Printf("Committing transaction #%d", t.TransactionId())
task, err := t.CommitAsync()
if err != nil {
log.Printf("Error occured while committing transaction: %v", err)
} else if err := task.Error(); err != nil {
log.Printf("Error occured while waiting for result of committing transaction: %v", err)
} else {
result := task.Result().(*client.WriteResult)
log.Printf("<- %+v", result)
}
}

func rollback(t *client.Transaction) {
log.Printf("Rollbacking transaction #%d", t.TransactionId())
if err := t.Rollback(); err != nil {
log.Printf("Error occured while rollbacking transaction: %v", err)
}
}
51 changes: 51 additions & 0 deletions internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,57 @@ func (c *connection) AppendToStreamAsync(
return source.Task(), c.enqueueOperation(op)
}

func (c *connection) StartTransactionAsync(
stream string,
expectedVersion int,
userCredentials *client.UserCredentials,
) (*tasks.Task, error) {
if stream == "" {
panic("stream is empty")
}
source := tasks.NewCompletionSource()
op := operations.NewStartTransaction(source, c.connectionSettings.RequireMaster(), stream, expectedVersion, c,
userCredentials)
return source.Task(), c.enqueueOperation(op)
}

func (c *connection) ContinueTransaction(
transactionId int64,
userCredentials *client.UserCredentials,
) *client.Transaction {
return client.NewTransaction(transactionId, userCredentials, c)
}

func (c *connection) TransactionalWriteAsync(
transaction *client.Transaction,
events []*client.EventData,
userCredentials *client.UserCredentials,
) (*tasks.Task, error) {
if transaction == nil {
panic("transaction is nil")
}
if events == nil {
panic("events is nil")
}
source := tasks.NewCompletionSource()
op := operations.NewTransactionalWrite(source, c.connectionSettings.RequireMaster(), transaction.TransactionId(),
events, userCredentials)
return source.Task(), c.enqueueOperation(op)
}

func (c *connection) CommitTransactionAsync(
transaction *client.Transaction,
userCredentials *client.UserCredentials,
) (*tasks.Task, error) {
if transaction == nil {
panic("transaction is nil")
}
source := tasks.NewCompletionSource()
op := operations.NewCommitTransaction(source, c.connectionSettings.RequireMaster(), transaction.TransactionId(),
userCredentials)
return source.Task(), c.enqueueOperation(op)
}

func (c *connection) ReadEventAsync(
stream string,
eventNumber int,
Expand Down
83 changes: 83 additions & 0 deletions operations/commit_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package operations

import (
"fmt"
"github.com/golang/protobuf/proto"
"github.com/jdextraze/go-gesclient/client"
"github.com/jdextraze/go-gesclient/messages"
"github.com/jdextraze/go-gesclient/tasks"
)

type CommitTransaction struct {
*baseOperation
requireMaster bool
transactionId int64
}

func NewCommitTransaction(
source *tasks.CompletionSource,
requireMaster bool,
transactionId int64,
userCredentials *client.UserCredentials,
) *CommitTransaction {
obj := &CommitTransaction{
requireMaster: requireMaster,
transactionId: transactionId,
}
obj.baseOperation = newBaseOperation(client.Command_TransactionCommit, client.Command_TransactionCommitCompleted,
userCredentials, source, obj.createRequestDto, obj.inspectResponse, obj.transformResponse, obj.createResponse)
return obj
}

func (o *CommitTransaction) createRequestDto() proto.Message {
return &messages.TransactionCommit{
TransactionId: &o.transactionId,
RequireMaster: &o.requireMaster,
}
}

func (o *CommitTransaction) inspectResponse(message proto.Message) (res *client.InspectionResult, err error) {
msg := message.(*messages.TransactionCommitCompleted)
switch msg.GetResult() {
case messages.OperationResult_Success:
o.succeed()
case messages.OperationResult_PrepareTimeout, messages.OperationResult_CommitTimeout,
messages.OperationResult_ForwardTimeout:
res = client.NewInspectionResult(client.InspectionDecision_Retry, msg.GetResult().String(), nil, nil)
case messages.OperationResult_WrongExpectedVersion:
err = o.Fail(client.WrongExpectedVersion)
case messages.OperationResult_StreamDeleted:
err = o.Fail(client.StreamDeleted)
case messages.OperationResult_InvalidTransaction:
err = o.Fail(client.InvalidTransaction)
case messages.OperationResult_AccessDenied:
err = o.Fail(client.AccessDenied)
default:
err = fmt.Errorf("unexpected OperationResult: %s", msg.GetResult())
}
if res == nil && err == nil {
res = client.NewInspectionResult(client.InspectionDecision_EndOperation, msg.GetResult().String(), nil, nil)
}
return
}

func (o *CommitTransaction) transformResponse(message proto.Message) (interface{}, error) {
msg := message.(*messages.TransactionCommitCompleted)
preparePosition := int64(-1)
if msg.PreparePosition != nil {
preparePosition = *msg.PreparePosition
}
commitPosition := int64(-1)
if msg.CommitPosition != nil {
commitPosition = *msg.CommitPosition
}
return client.NewWriteResult(int(msg.GetLastEventNumber()), client.NewPosition(preparePosition, commitPosition)), nil
}

func (o *CommitTransaction) createResponse() proto.Message {
return &messages.TransactionCommitCompleted{}
}

func (o *CommitTransaction) String() string {
return fmt.Sprintf("Commit transaction #%d", o.transactionId)
}
Loading

0 comments on commit c6773f6

Please sign in to comment.