Skip to content

Commit

Permalink
some cleanup in inmemory storage
Browse files Browse the repository at this point in the history
  • Loading branch information
poopoothegorilla committed Jan 9, 2024
1 parent 2535e27 commit b1bfd67
Showing 1 changed file with 22 additions and 21 deletions.
43 changes: 22 additions & 21 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
// TODO: figure out if multiple tx attempts are actually stored in the db for each tx
// TODO: need a way to get id for a tx attempt. since there are some methods where the persistent store creates a tx attempt and doesnt returns it
// TODO: should txAttempt state transitions be handled by the address state manager?
// TODO: add RLock and RUnlock to address state usage where applicable

var (
// ErrInvalidChainID is returned when the chain ID is invalid
Expand Down Expand Up @@ -808,8 +809,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT
wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
for _, tx := range as.FetchTxs(states, filterFn) {
etx := ms.deepCopyTx(tx)
txsLock.Lock()
txs = append(txs, ms.deepCopyTx(tx))
txs = append(txs, etx)
txsLock.Unlock()
}
wg.Done()
Expand Down Expand Up @@ -848,8 +850,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT
wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
for _, tx := range as.FetchTxs(states, filterFn) {
etx := ms.deepCopyTx(tx)
txsLock.Lock()
txs = append(txs, ms.deepCopyTx(tx))
txs = append(txs, etx)
txsLock.Unlock()
}
wg.Done()
Expand Down Expand Up @@ -899,8 +902,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT
wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
for _, tx := range as.FetchTxs(nil, filterFn) {
etx := ms.deepCopyTx(tx)
txsLock.Lock()
txs = append(txs, ms.deepCopyTx(tx))
txs = append(txs, etx)
txsLock.Unlock()
}
wg.Done()
Expand Down Expand Up @@ -934,8 +938,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT
wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
for _, tx := range as.FetchTxs(states, filterFn, txIDs...) {
etx := ms.deepCopyTx(tx)
txsLock.Lock()
txs = append(txs, ms.deepCopyTx(tx))
txs = append(txs, etx)
txsLock.Unlock()
}
wg.Done()
Expand Down Expand Up @@ -1207,7 +1212,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT
if attempt.State != txmgrtypes.TxAttemptBroadcast {
return false
}
if attempt.Receipts == nil || len(attempt.Receipts) == 0 {
if len(attempt.Receipts) == 0 {
return false
}
if attempt.Receipts[0].GetBlockNumber() == nil {
Expand All @@ -1225,8 +1230,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT
for _, as := range ms.addressStates {
wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
ts := as.FetchTxs(states, filter)
txsLock.Lock()
txs = append(txs, as.FetchTxs(states, filter)...)
txs = append(txs, ts...)
txsLock.Unlock()
wg.Done()
}(as)
Expand Down Expand Up @@ -1262,8 +1268,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindE
for _, as := range ms.addressStates {
wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
etxs := as.FetchTxs(states, filter)
txsLock.Lock()
txs = append(txs, as.FetchTxs(states, filter)...)
txs = append(txs, etxs...)
txsLock.Unlock()
wg.Done()
}(as)
Expand Down Expand Up @@ -1302,8 +1309,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindE
for _, as := range ms.addressStates {
wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
etxs := as.FetchTxs(states, filter)
txsLock.Lock()
txs = append(txs, as.FetchTxs(states, filter)...)
txs = append(txs, etxs...)
txsLock.Unlock()
wg.Done()
}(as)
Expand Down Expand Up @@ -1367,8 +1375,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetNo
for _, as := range ms.addressStates {
wg.Add(1)
go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) {
etxs := as.FetchTxs(nil, filter)
txsLock.Lock()
txs = append(txs, as.FetchTxs(nil, filter)...)
txs = append(txs, etxs...)
txsLock.Unlock()
wg.Done()
}(as)
Expand Down Expand Up @@ -1485,11 +1494,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveC
}

// Update in memory store
if err := as.MoveInProgressToConfirmedMissingReceipt(*attempt, broadcastAt); err != nil {
return err
}

return nil
return as.MoveInProgressToConfirmedMissingReceipt(*attempt, broadcastAt)
}
func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error {
ms.addressStatesLock.Lock()
Expand Down Expand Up @@ -1610,11 +1615,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat
}

// Update in memory store
if err := as.MoveConfirmedToUnconfirmed(etxAttempt); err != nil {
return err
}

return nil
return as.MoveConfirmedToUnconfirmed(etxAttempt)
}
func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) {
if ms.chainID.String() != chainID.String() {
Expand Down Expand Up @@ -1814,15 +1815,15 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkO
}
}
}
result := result{
rr := result{
ID: tx.ID,
Sequence: *tx.Sequence,
FromAddress: tx.FromAddress,
MaxBroadcastBeforeBlockNum: maxBroadcastBeforeBlockNum,
TxHashes: hashes,
}
resultsLock.Lock()
results = append(results, result)
results = append(results, rr)
resultsLock.Unlock()
}
wg.Done()
Expand Down

0 comments on commit b1bfd67

Please sign in to comment.