Skip to content

Commit

Permalink
create AllTransactions db query
Browse files Browse the repository at this point in the history
  • Loading branch information
poopoothegorilla committed Jan 17, 2024
1 parent 90a4c9d commit 3a66a05
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 165 deletions.
110 changes: 18 additions & 92 deletions common/txmgr/address_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,106 +63,32 @@ func NewAddressState[
allTransactions: map[int64]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{},
}

// Load all unstarted transactions from persistent storage
offset := 0
limit := 50
for {
txs, count, err := txStore.UnstartedTransactions(offset, limit, as.fromAddress, as.chainID)
if err != nil {
return nil, fmt.Errorf("address_state: initialization: %w", err)
}
for i := 0; i < len(txs); i++ {
tx := txs[i]
as.unstarted.AddTx(&tx)
as.allTransactions[tx.ID] = &tx
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx
}
}
if count <= offset+limit {
break
}
offset += limit
}

// Load all in progress transactions from persistent storage
// Load all transactions from persistent storage
ctx := context.Background()
tx, err := txStore.GetTxInProgress(ctx, as.fromAddress)
txs, err := txStore.AllTransactions(ctx, as.fromAddress, as.chainID)
if err != nil {
return nil, fmt.Errorf("address_state: initialization: %w", err)
}
as.inprogress = tx
if tx != nil {
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = tx
}
as.allTransactions[tx.ID] = tx
}

// Load all unconfirmed transactions from persistent storage
offset = 0
limit = 50
for {
txs, count, err := txStore.UnconfirmedTransactions(offset, limit, as.fromAddress, as.chainID)
if err != nil {
return nil, fmt.Errorf("address_state: initialization: %w", err)
}
for i := 0; i < len(txs); i++ {
tx := txs[i]
for i := 0; i < len(txs); i++ {
tx := txs[i]
switch tx.State {
case TxUnstarted:
as.unstarted.AddTx(&tx)
case TxInProgress:
as.inprogress = &tx
case TxUnconfirmed:
as.unconfirmed[tx.ID] = &tx
as.allTransactions[tx.ID] = &tx
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx
}
}
if count <= offset+limit {
break
}
offset += limit
}

// Load all confirmed transactions from persistent storage
offset = 0
limit = 50
for {
txs, count, err := txStore.ConfirmedTransactions(offset, limit, as.fromAddress, as.chainID)
if err != nil {
return nil, fmt.Errorf("address_state: initialization: %w", err)
}
for i := 0; i < len(txs); i++ {
tx := txs[i]
as.confirmed[tx.ID] = &tx
as.allTransactions[tx.ID] = &tx
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx
}
}
if count <= offset+limit {
break
}
offset += limit
}

// Load all unconfirmed transactions from persistent storage
offset = 0
limit = 50
for {
txs, count, err := txStore.ConfirmedMissingReceiptTransactions(offset, limit, as.fromAddress, as.chainID)
if err != nil {
return nil, fmt.Errorf("address_state: initialization: %w", err)
}
for i := 0; i < len(txs); i++ {
tx := txs[i]
case TxConfirmedMissingReceipt:
as.confirmedMissingReceipt[tx.ID] = &tx
as.allTransactions[tx.ID] = &tx
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx
}
case TxConfirmed:
as.confirmed[tx.ID] = &tx
case TxFatalError:
as.fatalErrored[tx.ID] = &tx
}
if count <= offset+limit {
break
as.allTransactions[tx.ID] = &tx
if tx.IdempotencyKey != nil {
as.idempotencyKeyToTx[*tx.IdempotencyKey] = &tx
}
offset += limit
}

return &as, nil
Expand Down
5 changes: 1 addition & 4 deletions common/txmgr/inmemory_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,7 @@ type PersistentTxStore[
] interface {
txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]

UnstartedTransactions(limit, offset int, fromAddress ADDR, chainID CHAIN_ID) ([]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, error)
UnconfirmedTransactions(limit, offset int, fromAddress ADDR, chainID CHAIN_ID) ([]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, error)
ConfirmedTransactions(limit, offset int, fromAddress ADDR, chainID CHAIN_ID) ([]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, error)
ConfirmedMissingReceiptTransactions(limit, offset int, fromAddress ADDR, chainID CHAIN_ID) ([]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, error)
AllTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) ([]txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)
}

type InMemoryStore[
Expand Down
5 changes: 1 addition & 4 deletions common/txmgr/types/tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,7 @@ type InMemoryInitializer[
SEQ types.Sequence,
FEE feetypes.Fee,
] interface {
UnstartedTransactions(offset, limit int, fromAddress ADDR, chainID CHAIN_ID) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, error)
UnconfirmedTransactions(offset, limit int, fromAddress ADDR, chainID CHAIN_ID) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, error)
ConfirmedTransactions(offset, limit int, fromAddress ADDR, chainID CHAIN_ID) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, error)
ConfirmedMissingReceiptTransactions(offset, limit int, fromAddress ADDR, chainID CHAIN_ID) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], int, error)
AllTransactions(ctx context.Context, fromAddress ADDR, chainID CHAIN_ID) ([]Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)
}

// TransactionStore contains the persistence layer methods needed to manage Txs and TxAttempts
Expand Down
76 changes: 11 additions & 65 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@ type TxStoreWebApi interface {

// TxStoreInMemory encapsulates the methods that are used by the txmgr to initialize the in memory tx store.
type TxStoreInMemory interface {
UnstartedTransactions(offset, limit int, fromAddress common.Address, chainID *big.Int) (txs []Tx, count int, err error)
UnconfirmedTransactions(offset, limit int, fromAddress common.Address, chainID *big.Int) (txs []Tx, count int, err error)
ConfirmedTransactions(offset, limit int, fromAddress common.Address, chainID *big.Int) (txs []Tx, count int, err error)
ConfirmedMissingReceiptTransactions(offset, limit int, fromAddress common.Address, chainID *big.Int) (txs []Tx, count int, err error)
AllTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (txs []Tx, err error)
}

type TestEvmTxStore interface {
Expand Down Expand Up @@ -469,69 +466,18 @@ func (o *evmTxStore) TransactionsWithAttempts(offset, limit int) (txs []Tx, coun
return
}

// UnstartedTransactions returns all eth transactions that have no attempts.
func (o *evmTxStore) UnstartedTransactions(offset, limit int, fromAddress common.Address, chainID *big.Int) (txs []Tx, count int, err error) {
sql := `SELECT count(*) FROM evm.txes WHERE state = 'unstarted' AND from_address = $1 AND evm_chain_id = $2`
if err = o.q.Get(&count, sql, fromAddress, chainID.String()); err != nil {
return
}

sql = `SELECT * FROM evm.txes WHERE state = 'unstarted' AND from_address = $1 AND evm_chain_id = $2 ORDER BY id desc LIMIT $3 OFFSET $4`
var dbTxs []DbEthTx
if err = o.q.Select(&dbTxs, sql, fromAddress, chainID.String(), limit, offset); err != nil {
return
}
txs = dbEthTxsToEvmEthTxs(dbTxs)
return
}

// UnconfirmedTransactions returns all eth transactions that have at least one attempt and in the unconfirmed state.
func (o *evmTxStore) UnconfirmedTransactions(offset, limit int, fromAddress common.Address, chainID *big.Int) (txs []Tx, count int, err error) {
sql := `SELECT count(*) FROM evm.txes WHERE id IN (SELECT DISTINCT eth_tx_id FROM evm.tx_attempts) AND state = 'unconfirmed' AND from_address = $1 AND evm_chain_id = $2`
if err = o.q.Get(&count, sql, fromAddress, chainID.String()); err != nil {
return
}

sql = `SELECT * FROM evm.txes WHERE id IN (SELECT DISTINCT eth_tx_id FROM evm.tx_attempts) AND state = 'unconfirmed' AND from_address = $1 AND evm_chain_id = $2 ORDER BY id desc LIMIT $3 OFFSET $4`
var dbTxs []DbEthTx
if err = o.q.Select(&dbTxs, sql, fromAddress, chainID.String(), limit, offset); err != nil {
return
}
txs = dbEthTxsToEvmEthTxs(dbTxs)
err = o.preloadTxAttempts(txs)
return
}

// ConfirmedTransactions returns all eth transactions that have at least one attempt and in the confirmed state.
func (o *evmTxStore) ConfirmedTransactions(offset, limit int, fromAddress common.Address, chainID *big.Int) (txs []Tx, count int, err error) {
sql := `SELECT count(*) FROM evm.txes WHERE id IN (SELECT DISTINCT eth_tx_id FROM evm.tx_attempts) AND state = 'confirmed' AND from_address = $1 AND evm_chain_id = $2`
if err = o.q.Get(&count, sql, fromAddress, chainID.String()); err != nil {
return
}

sql = `SELECT * FROM evm.txes WHERE id IN (SELECT DISTINCT eth_tx_id FROM evm.tx_attempts) AND state = 'confirmed' AND from_address = $1 AND evm_chain_id = $2 ORDER BY id desc LIMIT $3 OFFSET $4`
var dbTxs []DbEthTx
if err = o.q.Select(&dbTxs, sql, fromAddress, chainID.String(), limit, offset); err != nil {
return
}
txs = dbEthTxsToEvmEthTxs(dbTxs)
err = o.preloadTxAttempts(txs)
return
}

// ConfirmedMissingReceiptTransactions returns all eth transactions that have at least one attempt and in the confirmed_missing_receipt state.
func (o *evmTxStore) ConfirmedMissingReceiptTransactions(offset, limit int, fromAddress common.Address, chainID *big.Int) (txs []Tx, count int, err error) {
sql := `SELECT count(*) FROM evm.txes WHERE id IN (SELECT DISTINCT eth_tx_id FROM evm.tx_attempts) AND state = 'confirmed_missing_receipt' AND from_address = $1 AND evm_chain_id = $2`
if err = o.q.Get(&count, sql, fromAddress, chainID.String()); err != nil {
return
}

sql = `SELECT * FROM evm.txes WHERE id IN (SELECT DISTINCT eth_tx_id FROM evm.tx_attempts) AND state = 'confirmed_missing_receipt' AND from_address = $1 AND evm_chain_id = $2 ORDER BY id desc LIMIT $3 OFFSET $4`
var dbTxs []DbEthTx
if err = o.q.Select(&dbTxs, sql, fromAddress, chainID.String(), limit, offset); err != nil {
// AllTransactions returns all eth transactions
func (o *evmTxStore) AllTransactions(ctx context.Context, fromAddress common.Address, chainID *big.Int) (txs []Tx, err error) {
var cancel context.CancelFunc
ctx, cancel = o.mergeContexts(ctx)
defer cancel()
qq := o.q.WithOpts(pg.WithParentCtx(ctx))
var dbEtxs []DbEthTx
sql := `SELECT * FROM evm.txes WHERE from_address = $1 AND evm_chain_id = $2 ORDER BY id desc`
if err = qq.Select(&dbEtxs, sql, fromAddress, chainID.String()); err != nil {
return
}
txs = dbEthTxsToEvmEthTxs(dbTxs)
txs = dbEthTxsToEvmEthTxs(dbEtxs)
err = o.preloadTxAttempts(txs)
return
}
Expand Down

0 comments on commit 3a66a05

Please sign in to comment.