Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

p2p receipts #11010

Merged
merged 33 commits into from
Jul 16, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions eth/protocols/eth/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ package eth
import (
"context"
"fmt"

libcommon "github.com/ledgerwatch/erigon-lib/common"
"github.com/ledgerwatch/erigon-lib/kv"
"github.com/ledgerwatch/erigon-lib/kv/rawdbv3"
"github.com/ledgerwatch/erigon-lib/log/v3"
"github.com/ledgerwatch/erigon/cmd/state/exec3"

"github.com/ledgerwatch/erigon/common"
"github.com/ledgerwatch/erigon/core/rawdb"
Expand Down Expand Up @@ -157,7 +158,7 @@ func AnswerGetBlockBodiesQuery(db kv.Tx, query GetBlockBodiesPacket, blockReader
return bodies
}

func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket) ([]rlp.RawValue, error) { //nolint:unparam
func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetReceiptsPacket, exec *exec3.TraceWorker) ([]rlp.RawValue, error) { //nolint:unparam
// Gather state data until the fetch or network limits is reached
var (
bytes int
Expand Down Expand Up @@ -190,6 +191,11 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece
continue
}
}
err = AddLogsToReceipts(db, results, b, exec)
if err != nil {
return nil, err
}

// If known, encode and queue for response packet
if encoded, err := rlp.EncodeToBytes(results); err != nil {
return nil, fmt.Errorf("failed to encode receipt: %w", err)
Expand All @@ -200,3 +206,20 @@ func AnswerGetReceiptsQuery(br services.FullBlockReader, db kv.Tx, query GetRece
}
return receipts, nil
}

func AddLogsToReceipts(db kv.Tx, receipts types.Receipts, block *types.Block, exec *exec3.TraceWorker) error {
txs := block.Transactions()
txNum, err := rawdbv3.TxNums.Min(db, block.NumberU64())
if err != nil {
return err
}
for i := range receipts {
_, err = exec.ExecTxn(txNum, i, txs[i])
if err != nil {
return err
}
logs := exec.GetLogs(i, txs[i])
receipts[i].Logs = logs
}
return nil
}
89 changes: 50 additions & 39 deletions p2p/sentry/sentry_multi_client/sentry_multi_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/ledgerwatch/erigon/cmd/state/exec3"
"golang.org/x/sync/semaphore"
"math/rand"
"sort"
"sync"
Expand Down Expand Up @@ -275,7 +277,8 @@ type MultiClient struct {
// decouple sentry multi client from header and body downloading logic is done
disableBlockDownload bool

logger log.Logger
logger log.Logger
semaphore *semaphore.Weighted
}

func NewMultiClient(
Expand Down Expand Up @@ -340,6 +343,7 @@ func NewMultiClient(
maxBlockBroadcastPeers: maxBlockBroadcastPeers,
disableBlockDownload: disableBlockDownload,
logger: logger,
semaphore: semaphore.NewWeighted(1),
}

return cs, nil
Expand Down Expand Up @@ -681,44 +685,51 @@ func (cs *MultiClient) getBlockBodies66(ctx context.Context, inreq *proto_sentry
}

func (cs *MultiClient) getReceipts66(ctx context.Context, inreq *proto_sentry.InboundMessage, sentry direct.SentryClient) error {
return nil //TODO: https://github.com/ledgerwatch/erigon/issues/10320
//var query eth.GetReceiptsPacket66
//if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
// return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data)
//}
//tx, err := cs.db.BeginRo(ctx)
//if err != nil {
// return err
//}
//defer tx.Rollback()
//receipts, err := eth.AnswerGetReceiptsQuery(cs.blockReader, tx, query.GetReceiptsPacket)
//if err != nil {
// return err
//}
//tx.Rollback()
//b, err := rlp.EncodeToBytes(&eth.ReceiptsRLPPacket66{
// RequestId: query.RequestId,
// ReceiptsRLPPacket: receipts,
//})
//if err != nil {
// return fmt.Errorf("encode header response: %w", err)
//}
//outreq := proto_sentry.SendMessageByIdRequest{
// PeerId: inreq.PeerId,
// Data: &proto_sentry.OutboundMessageData{
// Id: proto_sentry.MessageId_RECEIPTS_66,
// Data: b,
// },
//}
//_, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
//if err != nil {
// if isPeerNotFoundErr(err) {
// return nil
// }
// return fmt.Errorf("send bodies response: %w", err)
//}
////cs.logger.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", ConvertH512ToPeerID(inreq.PeerId), len(b)))
//return nil
err := cs.semaphore.Acquire(ctx, 1)
if err != nil {
return err
}
defer cs.semaphore.Release(1)
var query eth.GetReceiptsPacket66
if err := rlp.DecodeBytes(inreq.Data, &query); err != nil {
return fmt.Errorf("decoding getReceipts66: %w, data: %x", err, inreq.Data)
}

tx, err := cs.db.BeginRo(ctx)
if err != nil {
return err
}
defer tx.Rollback()
ttx := tx.(kv.TemporalTx)
exec := exec3.NewTraceWorker(ttx, cs.ChainConfig, cs.Engine, cs.blockReader, nil)
receipts, err := eth.AnswerGetReceiptsQuery(cs.blockReader, tx, query.GetReceiptsPacket, exec)
if err != nil {
return err
}
tx.Rollback()
b, err := rlp.EncodeToBytes(&eth.ReceiptsRLPPacket66{
RequestId: query.RequestId,
ReceiptsRLPPacket: receipts,
})
if err != nil {
return fmt.Errorf("encode header response: %w", err)
}
outreq := proto_sentry.SendMessageByIdRequest{
PeerId: inreq.PeerId,
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_RECEIPTS_66,
Data: b,
},
}
_, err = sentry.SendMessageById(ctx, &outreq, &grpc.EmptyCallOption{})
if err != nil {
if isPeerNotFoundErr(err) {
return nil
}
return fmt.Errorf("send bodies response: %w", err)
}
//cs.logger.Info(fmt.Sprintf("[%s] GetReceipts responseLen %d", ConvertH512ToPeerID(inreq.PeerId), len(b)))
return nil
}

func MakeInboundMessage() *proto_sentry.InboundMessage {
Expand Down
Loading