diff --git a/common/txmgr/inmemory_store.go b/common/txmgr/inmemory_store.go index 0433948e9fb..43e5b691338 100644 --- a/common/txmgr/inmemory_store.go +++ b/common/txmgr/inmemory_store.go @@ -23,6 +23,7 @@ import ( // TODO: figure out if multiple tx attempts are actually stored in the db for each tx // TODO: need a way to get id for a tx attempt. since there are some methods where the persistent store creates a tx attempt and doesnt returns it // TODO: should txAttempt state transitions be handled by the address state manager? +// TODO: add RLock and RUnlock to address state usage where applicable var ( // ErrInvalidChainID is returned when the chain ID is invalid @@ -808,8 +809,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { for _, tx := range as.FetchTxs(states, filterFn) { + etx := ms.deepCopyTx(tx) txsLock.Lock() - txs = append(txs, ms.deepCopyTx(tx)) + txs = append(txs, etx) txsLock.Unlock() } wg.Done() @@ -848,8 +850,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { for _, tx := range as.FetchTxs(states, filterFn) { + etx := ms.deepCopyTx(tx) txsLock.Lock() - txs = append(txs, ms.deepCopyTx(tx)) + txs = append(txs, etx) txsLock.Unlock() } wg.Done() @@ -899,8 +902,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { for _, tx := range as.FetchTxs(nil, filterFn) { + etx := ms.deepCopyTx(tx) txsLock.Lock() - txs = append(txs, ms.deepCopyTx(tx)) + txs = append(txs, etx) txsLock.Unlock() } wg.Done() @@ -934,8 +938,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { for _, tx := range as.FetchTxs(states, filterFn, txIDs...) { + etx := ms.deepCopyTx(tx) txsLock.Lock() - txs = append(txs, ms.deepCopyTx(tx)) + txs = append(txs, etx) txsLock.Unlock() } wg.Done() @@ -1207,7 +1212,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT if attempt.State != txmgrtypes.TxAttemptBroadcast { return false } - if attempt.Receipts == nil || len(attempt.Receipts) == 0 { + if len(attempt.Receipts) == 0 { return false } if attempt.Receipts[0].GetBlockNumber() == nil { @@ -1225,8 +1230,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindT for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + ts := as.FetchTxs(states, filter) txsLock.Lock() - txs = append(txs, as.FetchTxs(states, filter)...) + txs = append(txs, ts...) txsLock.Unlock() wg.Done() }(as) @@ -1262,8 +1268,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindE for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + etxs := as.FetchTxs(states, filter) txsLock.Lock() - txs = append(txs, as.FetchTxs(states, filter)...) + txs = append(txs, etxs...) txsLock.Unlock() wg.Done() }(as) @@ -1302,8 +1309,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindE for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + etxs := as.FetchTxs(states, filter) txsLock.Lock() - txs = append(txs, as.FetchTxs(states, filter)...) + txs = append(txs, etxs...) txsLock.Unlock() wg.Done() }(as) @@ -1367,8 +1375,9 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetNo for _, as := range ms.addressStates { wg.Add(1) go func(as *AddressState[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) { + etxs := as.FetchTxs(nil, filter) txsLock.Lock() - txs = append(txs, as.FetchTxs(nil, filter)...) + txs = append(txs, etxs...) txsLock.Unlock() wg.Done() }(as) @@ -1485,11 +1494,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveC } // Update in memory store - if err := as.MoveInProgressToConfirmedMissingReceipt(*attempt, broadcastAt); err != nil { - return err - } - - return nil + return as.MoveInProgressToConfirmedMissingReceipt(*attempt, broadcastAt) } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) SaveInProgressAttempt(ctx context.Context, attempt *txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error { ms.addressStatesLock.Lock() @@ -1610,11 +1615,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Updat } // Update in memory store - if err := as.MoveConfirmedToUnconfirmed(etxAttempt); err != nil { - return err - } - - return nil + return as.MoveConfirmedToUnconfirmed(etxAttempt) } func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsTxFinalized(ctx context.Context, blockHeight int64, txID int64, chainID CHAIN_ID) (bool, error) { if ms.chainID.String() != chainID.String() { @@ -1814,7 +1815,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkO } } } - result := result{ + rr := result{ ID: tx.ID, Sequence: *tx.Sequence, FromAddress: tx.FromAddress, @@ -1822,7 +1823,7 @@ func (ms *InMemoryStore[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkO TxHashes: hashes, } resultsLock.Lock() - results = append(results, result) + results = append(results, rr) resultsLock.Unlock() } wg.Done()