Skip to content

Commit

Permalink
batch support
Browse files Browse the repository at this point in the history
  • Loading branch information
altitude committed Jun 21, 2021
1 parent 59c33d1 commit f69c6a3
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 64 deletions.
2 changes: 1 addition & 1 deletion api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewHttpAPI(lc fx.Lifecycle, l *ledger.Ledger) *HttpAPI {
var t core.Transaction
c.ShouldBind(&t)

err := l.Commit(t)
err := l.Commit([]core.Transaction{t})

c.JSON(200, gin.H{
"ok": err == nil,
Expand Down
42 changes: 23 additions & 19 deletions ledger/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,13 @@ func (l *Ledger) Close() {
l.store.Close()
}

func (l *Ledger) Commit(t core.Transaction) error {
func (l *Ledger) Commit(ts []core.Transaction) error {
l.Lock()
defer l.Unlock()

count, _ := l.store.CountTransactions()
t.ID = count

if t.Timestamp == "" {
t.Timestamp = time.Now().Format(time.RFC3339)
}
rf := map[string]map[string]int64{}
timestamp := time.Now().Format(time.RFC3339)

if l._last == nil {
last, err := l.GetLastTransaction()
Expand All @@ -76,22 +73,29 @@ func (l *Ledger) Commit(t core.Transaction) error {
l._last = &last
}

t.Hash = core.Hash(l._last, &t)
last := l._last

rf := map[string]map[string]int64{}
for i := range ts {

for _, p := range t.Postings {
if _, ok := rf[p.Source]; !ok {
rf[p.Source] = map[string]int64{}
}
ts[i].ID = count + int64(i)
ts[i].Timestamp = timestamp

rf[p.Source][p.Asset] += p.Amount
ts[i].Hash = core.Hash(last, &ts[i])
last = &ts[i]

if _, ok := rf[p.Destination]; !ok {
rf[p.Destination] = map[string]int64{}
}
for _, p := range ts[i].Postings {
if _, ok := rf[p.Source]; !ok {
rf[p.Source] = map[string]int64{}
}

rf[p.Source][p.Asset] += p.Amount

rf[p.Destination][p.Asset] -= p.Amount
if _, ok := rf[p.Destination]; !ok {
rf[p.Destination] = map[string]int64{}
}

rf[p.Destination][p.Asset] -= p.Amount
}
}

for addr := range rf {
Expand Down Expand Up @@ -131,9 +135,9 @@ func (l *Ledger) Commit(t core.Transaction) error {
}
}

err := l.store.AppendTransaction(t)
err := l.store.SaveTransactions(ts)

l._last = &t
l._last = &ts[len(ts)-1]

return err
}
Expand Down
80 changes: 60 additions & 20 deletions ledger/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ import (

func with(f func(l *Ledger)) {
fx.New(
fx.Option(
fx.NopLogger,
),
fx.Provide(
func() config.Config {
c := config.DefaultConfig()
Expand All @@ -38,20 +41,17 @@ func TestMain(m *testing.M) {
func TestTransaction(t *testing.T) {
with(func(l *Ledger) {

testsize := 1e4
total := 0
batch := []core.Transaction{}

testsize := 1e5
for i := 0; i < int(testsize); i++ {
if i%int(testsize/10) == 0 && i > 0 {
fmt.Println(i)
}

for i := 1; i <= int(testsize); i++ {
user := fmt.Sprintf("users:%03d", 1+rand.Intn(100))
amount := 1 + rand.Intn(100)
amount = 100
total += amount

err := l.Commit(core.Transaction{
batch = append(batch, core.Transaction{
Postings: []core.Posting{
{
Source: "world",
Expand All @@ -68,10 +68,19 @@ func TestTransaction(t *testing.T) {
},
})

if i%int(1e3) != 0 {
continue
}

fmt.Println(i)

err := l.Commit(batch)

if err != nil {
fmt.Println(err)
t.Error(err)
}

batch = []core.Transaction{}
}

world, err := l.GetAccount("world")
Expand All @@ -95,13 +104,15 @@ func TestTransaction(t *testing.T) {

func TestBalance(t *testing.T) {
with(func(l *Ledger) {
err := l.Commit(core.Transaction{
Postings: []core.Posting{
{
Source: "empty_wallet",
Destination: "world",
Amount: 1,
Asset: "COIN",
err := l.Commit([]core.Transaction{
{
Postings: []core.Posting{
{
Source: "empty_wallet",
Destination: "world",
Amount: 1,
Asset: "COIN",
},
},
},
})
Expand All @@ -128,13 +139,13 @@ func TestReference(t *testing.T) {
},
}

err := l.Commit(tx)
err := l.Commit([]core.Transaction{tx})

if err != nil {
t.Error(err)
}

err = l.Commit(tx)
err = l.Commit([]core.Transaction{tx})

if err == nil {
t.Fail()
Expand All @@ -152,10 +163,12 @@ func TestLast(t *testing.T) {
})
}

func BenchmarkLedger(b *testing.B) {
func BenchmarkTransaction1(b *testing.B) {
with(func(l *Ledger) {
for i := 0; i < b.N; i++ {
l.Commit(core.Transaction{
for n := 0; n < b.N; n++ {
txs := []core.Transaction{}

txs = append(txs, core.Transaction{
Postings: []core.Posting{
{
Source: "world",
Expand All @@ -165,6 +178,33 @@ func BenchmarkLedger(b *testing.B) {
},
},
})

l.Commit(txs)
}
})
}

func BenchmarkTransaction_20_1k(b *testing.B) {
with(func(l *Ledger) {
for n := 0; n < b.N; n++ {
for i := 0; i < 20; i++ {
txs := []core.Transaction{}

for j := 0; j < 1e3; j++ {
txs = append(txs, core.Transaction{
Postings: []core.Posting{
{
Source: "world",
Destination: "benchmark",
Asset: "COIN",
Amount: 10,
},
},
})
}

l.Commit(txs)
}
}
})
}
Expand Down
48 changes: 25 additions & 23 deletions storage/sqlite/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,48 +92,50 @@ func (s *SQLiteStore) Close() {
fmt.Println("db closed")
}

func (s *SQLiteStore) AppendTransaction(t core.Transaction) error {
func (s *SQLiteStore) SaveTransactions(ts []core.Transaction) error {
tx, _ := s.db.Begin()

var ref *string
for _, t := range ts {
var ref *string

if t.Reference != "" {
ref = &t.Reference
}
if t.Reference != "" {
ref = &t.Reference
}

_, err := tx.Exec(`
_, err := tx.Exec(`
INSERT INTO "transactions"
("id", "reference", "timestamp", "hash")
VALUES
($1, $2, $3, $4)
`, t.ID, ref, t.Timestamp, t.Hash)

if err != nil {
tx.Rollback()
if err != nil {
tx.Rollback()

return err
}
return err
}

for i, p := range t.Postings {
_, err := tx.Exec(
`
for i, p := range t.Postings {
_, err := tx.Exec(
`
INSERT INTO "postings"
("id", "txid", "source", "destination", "amount", "asset")
VALUES
(:id, :txid, :source, :destination, :amount, :asset)
`,
sql.Named("id", i),
sql.Named("txid", t.ID),
sql.Named("source", p.Source),
sql.Named("destination", p.Destination),
sql.Named("amount", p.Amount),
sql.Named("asset", p.Asset),
)
sql.Named("id", i),
sql.Named("txid", t.ID),
sql.Named("source", p.Source),
sql.Named("destination", p.Destination),
sql.Named("amount", p.Amount),
sql.Named("asset", p.Asset),
)

if err != nil {
tx.Rollback()
if err != nil {
tx.Rollback()

return err
return err
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type Store interface {
AppendTransaction(core.Transaction) error
SaveTransactions([]core.Transaction) error
CountTransactions() (int64, error)
FindTransactions(query.Query) (query.Cursor, error)
AggregateBalances(string) (map[string]int64, error)
Expand Down

0 comments on commit f69c6a3

Please sign in to comment.