Skip to content

Commit

Permalink
Self review.
Browse files Browse the repository at this point in the history
  • Loading branch information
arijitAD committed Aug 31, 2021
1 parent d0c3241 commit 71837c4
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 122 deletions.
3 changes: 1 addition & 2 deletions pkg/eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log
start := startingBlock.Int64()
end := endingBlock.Int64()
var logs []*types.Log
for i := start; i <= end; {
for i := start; i <= end; i++ {
filteredLog, err := pea.B.Retriever.RetrieveFilteredLog(tx, filter, i, nil)
if err != nil {
return nil, err
Expand All @@ -662,7 +662,6 @@ func (pea *PublicEthAPI) localGetLogs(crit filters.FilterCriteria) ([]*types.Log
}

logs = append(logs, logCIDs...)
i++
}

if err := tx.Commit(); err != nil {
Expand Down
5 changes: 2 additions & 3 deletions pkg/eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,11 @@ import (
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
"github.com/ethereum/go-ethereum/trie"
pgipfsethdb "github.com/vulcanize/ipfs-ethdb/postgres"

"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
)

var (
Expand Down
3 changes: 1 addition & 2 deletions pkg/eth/backend_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ import (
// This function is eth/internal so we have to make our own version here...
func RPCMarshalHeader(head *types.Header, extractMiner bool) map[string]interface{} {
if extractMiner {
err := recoverMiner(head)
if err != nil {
if err := recoverMiner(head); err != nil {
return nil
}
}
Expand Down
72 changes: 30 additions & 42 deletions pkg/eth/cid_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ func (ecr *CIDRetriever) RetrieveTxCIDs(tx *sqlx.Tx, txFilter TxFilter, headerID
return results, tx.Select(&results, pgStr, args...)
}

func topicFilterCondition(id int, topics [][]string, args []interface{}, pgStr string, first bool) (string, []interface{}, int) {
func topicFilterCondition(id *int, topics [][]string, args []interface{}, pgStr string, first bool) (string, []interface{}) {
for i, topicSet := range topics {
if len(topicSet) == 0 {
continue
Expand All @@ -220,73 +220,71 @@ func topicFilterCondition(id int, topics [][]string, args []interface{}, pgStr s
} else {
first = false
}
pgStr += fmt.Sprintf(` eth.log_cids.topic%d = ANY ($%d)`, i, id)
pgStr += fmt.Sprintf(` eth.log_cids.topic%d = ANY ($%d)`, i, *id)
args = append(args, pq.Array(topicSet))
id++
*id++
}
return pgStr, args, id
return pgStr, args
}

func logFilterCondition(id int, pgStr string, args []interface{}, rctFilter ReceiptFilter) (string, []interface{}, int) {
func logFilterCondition(id *int, pgStr string, args []interface{}, rctFilter ReceiptFilter) (string, []interface{}) {
if len(rctFilter.LogAddresses) > 0 {
pgStr += fmt.Sprintf(` AND eth.log_cids.address = ANY ($%d)`, id)
pgStr += fmt.Sprintf(` AND eth.log_cids.address = ANY ($%d)`, *id)
args = append(args, pq.Array(rctFilter.LogAddresses))
id++
*id++
}

// Filter on topics if there are any
if hasTopics(rctFilter.Topics) {
pgStr, args, id = topicFilterCondition(id, rctFilter.Topics, args, pgStr, false)
}
} else if hasTopics(rctFilter.Topics) {
pgStr, args, id = topicFilterCondition(id, rctFilter.Topics, args, pgStr, false)
// Filter on topics if there are any
if hasTopics(rctFilter.Topics) {
pgStr, args = topicFilterCondition(id, rctFilter.Topics, args, pgStr, false)
}

return pgStr, args, id
return pgStr, args
}

func receiptFilterConditions(id int, pgStr string, args []interface{}, rctFilter ReceiptFilter, trxIds []int64) (string, []interface{}, int) {
func receiptFilterConditions(id *int, pgStr string, args []interface{}, rctFilter ReceiptFilter, trxIds []int64) (string, []interface{}) {
rctCond := " AND (receipt_cids.id = ANY ( "
logQuery := "SELECT receipt_id FROM eth.log_cids WHERE"
if len(rctFilter.LogAddresses) > 0 {
// Filter on log contract addresses if there are any
pgStr += fmt.Sprintf(`%s %s eth.log_cids.address = ANY ($%d)`, rctCond, logQuery, id)
pgStr += fmt.Sprintf(`%s %s eth.log_cids.address = ANY ($%d)`, rctCond, logQuery, *id)
args = append(args, pq.Array(rctFilter.LogAddresses))
id++
*id++

// Filter on topics if there are any
if hasTopics(rctFilter.Topics) {
pgStr, args, id = topicFilterCondition(id, rctFilter.Topics, args, pgStr, false)
pgStr, args = topicFilterCondition(id, rctFilter.Topics, args, pgStr, false)
}

pgStr += ")"

// Filter on txIDs if there are any and we are matching txs
if rctFilter.MatchTxs && len(trxIds) > 0 {
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id)
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, *id)
args = append(args, pq.Array(trxIds))
}
pgStr += ")"
} else { // If there are no contract addresses to filter on
// Filter on topics if there are any
if hasTopics(rctFilter.Topics) {
pgStr += rctCond + logQuery
pgStr, args, id = topicFilterCondition(id, rctFilter.Topics, args, pgStr, true)
pgStr, args = topicFilterCondition(id, rctFilter.Topics, args, pgStr, true)
pgStr += ")"
// Filter on txIDs if there are any and we are matching txs
if rctFilter.MatchTxs && len(trxIds) > 0 {
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, id)
pgStr += fmt.Sprintf(` OR receipt_cids.tx_id = ANY($%d::INTEGER[])`, *id)
args = append(args, pq.Array(trxIds))
}
pgStr += ")"
} else if rctFilter.MatchTxs && len(trxIds) > 0 {
// If there are no contract addresses or topics to filter on,
// Filter on txIDs if there are any and we are matching txs
pgStr += fmt.Sprintf(` AND receipt_cids.tx_id = ANY($%d::INTEGER[])`, id)
pgStr += fmt.Sprintf(` AND receipt_cids.tx_id = ANY($%d::INTEGER[])`, *id)
args = append(args, pq.Array(trxIds))
}
}

return pgStr, args, id
return pgStr, args
}

// RetrieveRctCIDsByHeaderID retrieves and returns all of the rct cids at the provided header ID that conform to the provided
Expand All @@ -302,7 +300,8 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip
AND header_cids.id = $1`
id := 2
args = append(args, headerID)
pgStr, args, id = receiptFilterConditions(id, pgStr, args, rctFilter, trxIds)

pgStr, args = receiptFilterConditions(&id, pgStr, args, rctFilter, trxIds)

pgStr += ` ORDER BY transaction_cids.index`
receiptCids := make([]models.ReceiptModel, 0)
Expand All @@ -311,7 +310,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDsByHeaderID(tx *sqlx.Tx, rctFilter Receip

// RetrieveFilteredGQLLogs retrieves and returns all the log cIDs provided blockHash that conform to the provided
// filter parameters.
func (ecr *CIDRetriever) RetrieveFilteredGQLLogs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockHash *common.Hash) ([]customLog, error) {
func (ecr *CIDRetriever) RetrieveFilteredGQLLogs(tx *sqlx.Tx, rctFilter ReceiptFilter, blockHash *common.Hash) ([]logResult, error) {
log.Debug("retrieving log cids for receipt ids")
args := make([]interface{}, 0, 4)
id := 1
Expand All @@ -327,10 +326,10 @@ func (ecr *CIDRetriever) RetrieveFilteredGQLLogs(tx *sqlx.Tx, rctFilter ReceiptF
args = append(args, blockHash.String())
id++

pgStr, args, id = logFilterCondition(id, pgStr, args, rctFilter)
pgStr, args = logFilterCondition(&id, pgStr, args, rctFilter)
pgStr += ` ORDER BY log_cids.index`

logCIDs := make([]customLog, 0)
logCIDs := make([]logResult, 0)
err := tx.Select(&logCIDs, pgStr, args...)
if err != nil {
return nil, err
Expand All @@ -341,7 +340,7 @@ func (ecr *CIDRetriever) RetrieveFilteredGQLLogs(tx *sqlx.Tx, rctFilter ReceiptF

// RetrieveFilteredLog retrieves and returns all the log cIDs provided blockHeight or blockHash that conform to the provided
// filter parameters.
func (ecr *CIDRetriever) RetrieveFilteredLog(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash) ([]customLog, error) {
func (ecr *CIDRetriever) RetrieveFilteredLog(tx *sqlx.Tx, rctFilter ReceiptFilter, blockNumber int64, blockHash *common.Hash) ([]logResult, error) {
log.Debug("retrieving log cids for receipt ids")
args := make([]interface{}, 0, 4)
pgStr := `SELECT eth.log_cids.id,eth.log_cids.leaf_cid, eth.log_cids.index, eth.log_cids.receipt_id,
Expand All @@ -364,10 +363,10 @@ func (ecr *CIDRetriever) RetrieveFilteredLog(tx *sqlx.Tx, rctFilter ReceiptFilte
id++
}

pgStr, args, id = logFilterCondition(id, pgStr, args, rctFilter)
pgStr, args = logFilterCondition(&id, pgStr, args, rctFilter)
pgStr += ` ORDER BY log_cids.index`

logCIDs := make([]customLog, 0)
logCIDs := make([]logResult, 0)
err := tx.Select(&logCIDs, pgStr, args...)
if err != nil {
return nil, err
Expand Down Expand Up @@ -397,7 +396,7 @@ func (ecr *CIDRetriever) RetrieveRctCIDs(tx *sqlx.Tx, rctFilter ReceiptFilter, b
id++
}

pgStr, args, id = receiptFilterConditions(id, pgStr, args, rctFilter, trxIds)
pgStr, args = receiptFilterConditions(&id, pgStr, args, rctFilter, trxIds)

pgStr += ` ORDER BY transaction_cids.index`
receiptCids := make([]models.ReceiptModel, 0)
Expand Down Expand Up @@ -605,14 +604,3 @@ func (ecr *CIDRetriever) RetrieveReceiptCIDsByTxIDs(tx *sqlx.Tx, txIDs []int64)
var rctCIDs []models.ReceiptModel
return rctCIDs, tx.Select(&rctCIDs, pgStr, pq.Array(txIDs))
}

func (ecr *CIDRetriever) RetrieveTxCIDsByReceipt(tx *sqlx.Tx, txIDs []int64) ([]models.TxModel, error) {
log.Debugf("retrieving receipt cids for tx ids %v", txIDs)
pgStr := `SELECT transaction_cids.id,transaction_cids.mh_key,transaction_cids.cid,
transaction_cids.tx_hash,transaction_cids.index,transaction_cids.tx_type
FROM eth.transaction_cids WHERE eth.transaction_cids.id = ANY ( $1 )
ORDER BY transaction_cids.index`

var txnCIDs []models.TxModel
return txnCIDs, tx.Select(&txnCIDs, pgStr, pq.Array(txIDs))
}
6 changes: 3 additions & 3 deletions pkg/eth/filterer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@ package eth
import (
"bytes"

"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"github.com/multiformats/go-multihash"

"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
Expand Down Expand Up @@ -176,6 +175,7 @@ func (s *ResponseFilterer) filerReceipts(receiptFilter ReceiptFilter, response *
}
}

// TODO: Verify this filter logic.
if checkReceipts(receipt, receiptFilter.Topics, topics, receiptFilter.LogAddresses, contracts, trxHashes) {
receiptBuffer := new(bytes.Buffer)
if err := receipt.EncodeRLP(receiptBuffer); err != nil {
Expand Down
7 changes: 3 additions & 4 deletions pkg/eth/ipld_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,11 @@ import (

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/postgres"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"

"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/vulcanize/ipld-eth-server/pkg/shared"
)

Expand Down Expand Up @@ -169,7 +168,7 @@ func (f *IPLDFetcher) FetchRcts(tx *sqlx.Tx, cids []models.ReceiptModel) ([]ipfs
}

// FetchLogs fetches logs.
func (f *IPLDFetcher) FetchLogs(logCIDs []customLog) ([]*types.Log, error) {
func (f *IPLDFetcher) FetchLogs(logCIDs []logResult) ([]*types.Log, error) {
log.Debug("fetching logs")

logs := make([]*types.Log, len(logCIDs))
Expand Down Expand Up @@ -218,7 +217,7 @@ type logsCID struct {
}

// FetchGQLLogs fetches logs for graphql.
func (f *IPLDFetcher) FetchGQLLogs(logCIDs []customLog) ([]logsCID, error) {
func (f *IPLDFetcher) FetchGQLLogs(logCIDs []logResult) ([]logsCID, error) {
log.Debug("fetching logs")

logs := make([]logsCID, len(logCIDs))
Expand Down
15 changes: 7 additions & 8 deletions pkg/eth/test_helpers/test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,20 @@ import (
"crypto/rand"
"math/big"

"github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/trie"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/state"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/rlp"
"github.com/ethereum/go-ethereum/statediff/indexer"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/ipfs/ipld"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
"github.com/ethereum/go-ethereum/statediff/indexer/shared"
"github.com/ethereum/go-ethereum/statediff/testhelpers"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
"github.com/ethereum/go-ethereum/trie"
blocks "github.com/ipfs/go-block-format"
"github.com/multiformats/go-multihash"
log "github.com/sirupsen/logrus"
Expand Down
5 changes: 2 additions & 3 deletions pkg/eth/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/core/types"

"github.com/ethereum/go-ethereum/statediff/indexer/ipfs"
"github.com/ethereum/go-ethereum/statediff/indexer/models"
sdtypes "github.com/ethereum/go-ethereum/statediff/types"
Expand Down Expand Up @@ -168,8 +167,8 @@ type ConvertedPayload struct {
StorageNodes map[string][]sdtypes.StorageNode
}

// customLog represent a log.
type customLog struct {
// logResult represent a log.
type logResult struct {
ID int64 `db:"id"`
LeafCID string `db:"leaf_cid"`
LeafMhKey string `db:"leaf_mh_key"`
Expand Down
4 changes: 4 additions & 0 deletions pkg/graphql/graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,10 @@ func (r *Resolver) GetLogs(ctx context.Context, args struct {
}

filteredLogs, err := r.backend.Retriever.RetrieveFilteredGQLLogs(tx, filter, &args.BlockHash)
if err != nil {
return nil, err
}

if err = tx.Commit(); err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 71837c4

Please sign in to comment.