From eaa81bd72bb91c00099c101b882209da011ff6b2 Mon Sep 17 00:00:00 2001 From: Francis Li Date: Tue, 16 Jul 2024 13:28:09 -0700 Subject: [PATCH] [PeerDAS] Parallelize data column sampling (#14105) * PeerDAS: parallelizing sample queries * PeerDAS: select sample from non custodied columns * Finish rebase * Add more test cases --- beacon-chain/core/peerdas/helpers.go | 8 + beacon-chain/sync/data_columns_sampling.go | 676 ++++++++++-------- .../sync/data_columns_sampling_test.go | 366 ++++++++-- beacon-chain/sync/service.go | 7 +- 4 files changed, 695 insertions(+), 362 deletions(-) diff --git a/beacon-chain/core/peerdas/helpers.go b/beacon-chain/core/peerdas/helpers.go index cb433532266b..8dd32ee5eb93 100644 --- a/beacon-chain/core/peerdas/helpers.go +++ b/beacon-chain/core/peerdas/helpers.go @@ -357,3 +357,11 @@ func CustodyCountFromRecord(record *enr.Record) (uint64, error) { return uint64(csc), nil } + +func CanSelfReconstruct(numCol uint64) bool { + total := params.BeaconConfig().NumberOfColumns + // if total is odd, then we need total / 2 + 1 columns to reconstruct + // if total is even, then we need total / 2 columns to reconstruct + columnsNeeded := total/2 + total%2 + return numCol >= columnsNeeded +} diff --git a/beacon-chain/sync/data_columns_sampling.go b/beacon-chain/sync/data_columns_sampling.go index d48c2f1da360..cded76c5c512 100644 --- a/beacon-chain/sync/data_columns_sampling.go +++ b/beacon-chain/sync/data_columns_sampling.go @@ -4,17 +4,21 @@ import ( "context" "fmt" "sort" + "sync" + "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/pkg/errors" - coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time" "github.com/sirupsen/logrus" + "github.com/prysmaticlabs/prysm/v5/async" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed" statefeed "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" + coreTime "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/time" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/startup" fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" @@ -23,255 +27,261 @@ import ( "github.com/prysmaticlabs/prysm/v5/runtime/version" ) +const PeerRefreshInterval = 1 * time.Minute + type roundSummary struct { RequestedColumns []uint64 MissingColumns map[uint64]bool } -// randomizeColumns returns a slice containing all columns in a random order. -func randomizeColumns(columns map[uint64]bool) []uint64 { - // Create a slice from columns. - randomized := make([]uint64, 0, len(columns)) - for column := range columns { - randomized = append(randomized, column) - } - - // Shuffle the slice. - rand.NewGenerator().Shuffle(len(randomized), func(i, j int) { - randomized[i], randomized[j] = randomized[j], randomized[i] - }) - - return randomized +// DataColumnSampler defines the interface for sampling data columns from peers for requested block root and samples count. +type DataColumnSampler interface { + // Run starts the data column sampling service. + Run(ctx context.Context) } -// sortedSliceFromMap returns a sorted slices of keys from a map. -func sortedSliceFromMap(m map[uint64]bool) []uint64 { - result := make([]uint64, 0, len(m)) - for k := range m { - result = append(result, k) - } +var _ DataColumnSampler = (*dataColumnSampler1D)(nil) - sort.Slice(result, func(i, j int) bool { - return result[i] < result[j] - }) +// dataColumnSampler1D implements the DataColumnSampler interface for PeerDAS 1D. +type dataColumnSampler1D struct { + sync.RWMutex - return result -} + p2p p2p.P2P + clock *startup.Clock + ctxMap ContextByteVersions + stateNotifier statefeed.Notifier -// custodyColumnsFromPeer returns the columns the peer should custody. -func (s *Service) custodyColumnsFromPeer(pid peer.ID) (map[uint64]bool, error) { - // Retrieve the custody count of the peer. - custodySubnetCount := s.cfg.p2p.CustodyCountFromRemotePeer(pid) + // nonCustodyColumns is a set of columns that are not custodied by the node. + nonCustodyColumns map[uint64]bool + // columnFromPeer maps a peer to the columns it is responsible for custody. + columnFromPeer map[peer.ID]map[uint64]bool + // peerFromColumn maps a column to the peer responsible for custody. + peerFromColumn map[uint64]map[peer.ID]bool +} - // Extract the node ID from the peer ID. - nodeID, err := p2p.ConvertPeerIDToNodeID(pid) - if err != nil { - return nil, errors.Wrap(err, "extract node ID") +// newDataColumnSampler1D creates a new 1D data column sampler. +func newDataColumnSampler1D( + p2p p2p.P2P, + clock *startup.Clock, + ctxMap ContextByteVersions, + stateNotifier statefeed.Notifier, +) *dataColumnSampler1D { + numColumns := params.BeaconConfig().NumberOfColumns + peerFromColumn := make(map[uint64]map[peer.ID]bool, numColumns) + for i := uint64(0); i < numColumns; i++ { + peerFromColumn[i] = make(map[peer.ID]bool) } - // Determine which columns the peer should custody. - custodyColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount) - if err != nil { - return nil, errors.Wrap(err, "custody columns") + return &dataColumnSampler1D{ + p2p: p2p, + clock: clock, + ctxMap: ctxMap, + stateNotifier: stateNotifier, + columnFromPeer: make(map[peer.ID]map[uint64]bool), + peerFromColumn: peerFromColumn, } - - return custodyColumns, nil } -// verifyColumn verifies the retrieved column against the root, the index, -// the KZG inclusion and the KZG proof. -func verifyColumn( - roDataColumn blocks.RODataColumn, - root [32]byte, - pid peer.ID, - requestedColumns map[uint64]bool, -) bool { - retrievedColumn := roDataColumn.ColumnIndex +// Run implements DataColumnSampler. +func (d *dataColumnSampler1D) Run(ctx context.Context) { + // verify if we need to run sampling or not, if not, return directly + csc := peerdas.CustodySubnetCount() + columns, err := peerdas.CustodyColumns(d.p2p.NodeID(), csc) + if err != nil { + log.WithError(err).Error("Failed to determine local custody columns") + return + } - // Filter out columns with incorrect root. - actualRoot := roDataColumn.BlockRoot() - if actualRoot != root { + custodyColumnsCount := uint64(len(columns)) + if peerdas.CanSelfReconstruct(custodyColumnsCount) { log.WithFields(logrus.Fields{ - "peerID": pid, - "requestedRoot": fmt.Sprintf("%#x", root), - "actualRoot": fmt.Sprintf("%#x", actualRoot), - }).Debug("Retrieved root does not match requested root") + "custodyColumnsCount": custodyColumnsCount, + "totalColumns": params.BeaconConfig().NumberOfColumns, + }).Debug("The node custodies at least the half the data columns, no need to sample") + return + } - return false + // initialize non custody columns. + d.nonCustodyColumns = make(map[uint64]bool) + for i := uint64(0); i < params.BeaconConfig().NumberOfColumns; i++ { + if exists := columns[i]; !exists { + d.nonCustodyColumns[i] = true + } } - // Filter out columns that were not requested. - if !requestedColumns[retrievedColumn] { - columnsToSampleList := sortedSliceFromMap(requestedColumns) + // initialize peer info first. + d.refreshPeerInfo() - log.WithFields(logrus.Fields{ - "peerID": pid, - "requestedColumns": columnsToSampleList, - "retrievedColumn": retrievedColumn, - }).Debug("Retrieved column was not requested") + // periodically refresh peer info to keep peer <-> column mapping up to date. + async.RunEvery(ctx, PeerRefreshInterval, d.refreshPeerInfo) - return false - } + // start the sampling loop. + d.samplingRoutine(ctx) +} - // Filter out columns which did not pass the KZG inclusion proof verification. - if err := blocks.VerifyKZGInclusionProofColumn(roDataColumn.DataColumnSidecar); err != nil { - log.WithFields(logrus.Fields{ - "peerID": pid, - "root": fmt.Sprintf("%#x", root), - "index": retrievedColumn, - }).Debug("Failed to verify KZG inclusion proof for retrieved column") +func (d *dataColumnSampler1D) samplingRoutine(ctx context.Context) { + stateCh := make(chan *feed.Event, 1) + stateSub := d.stateNotifier.StateFeed().Subscribe(stateCh) + defer stateSub.Unsubscribe() - return false + for { + select { + case evt := <-stateCh: + d.handleStateNotification(ctx, evt) + case err := <-stateSub.Err(): + log.WithError(err).Error("DataColumnSampler1D subscription to state feed failed") + case <-ctx.Done(): + log.Debug("Context canceled, exiting data column sampling loop.") + return + } } +} - // Filter out columns which did not pass the KZG proof verification. - verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roDataColumn.DataColumnSidecar) - if err != nil { - log.WithFields(logrus.Fields{ - "peerID": pid, - "root": fmt.Sprintf("%#x", root), - "index": retrievedColumn, - }).Debug("Error when verifying KZG proof for retrieved column") - - return false - } +// Refresh peer information. +func (d *dataColumnSampler1D) refreshPeerInfo() { + d.Lock() + defer d.Unlock() - if !verified { - log.WithFields(logrus.Fields{ - "peerID": pid, - "root": fmt.Sprintf("%#x", root), - "index": retrievedColumn, - }).Debug("Failed to verify KZG proof for retrieved column") + activePeers := d.p2p.Peers().Active() + d.prunePeerInfo(activePeers) - return false - } + for _, pid := range activePeers { + if _, ok := d.columnFromPeer[pid]; ok { + // TODO: need to update peer info here after validator custody. + continue + } - return true -} + csc := d.p2p.CustodyCountFromRemotePeer(pid) + nid, err := p2p.ConvertPeerIDToNodeID(pid) + if err != nil { + log.WithError(err).WithField("peerID", pid).Error("Failed to convert peer ID to node ID") + continue + } -// sampleDataColumnsFromPeer samples data columns from a peer. -// It returns the retrieved columns. -func (s *Service) sampleDataColumnsFromPeer( - pid peer.ID, - requestedColumns map[uint64]bool, - root [fieldparams.RootLength]byte, -) (map[uint64]bool, error) { - // Build the data column identifiers. - dataColumnIdentifiers := make(types.DataColumnSidecarsByRootReq, 0, len(requestedColumns)) - for index := range requestedColumns { - dataColumnIdentifiers = append(dataColumnIdentifiers, ð.DataColumnIdentifier{ - BlockRoot: root[:], - ColumnIndex: index, - }) - } + columns, err := peerdas.CustodyColumns(nid, csc) + if err != nil { + log.WithError(err).WithField("peerID", pid).Error("Failed to determine peer custody columns") + continue + } - // Send the request. - roDataColumns, err := SendDataColumnSidecarByRoot(s.ctx, s.cfg.clock, s.cfg.p2p, pid, s.ctxMap, &dataColumnIdentifiers) - if err != nil { - return nil, errors.Wrap(err, "send data column sidecar by root") + d.columnFromPeer[pid] = columns + for column := range columns { + d.peerFromColumn[column][pid] = true + } } - retrievedColumns := make(map[uint64]bool, len(roDataColumns)) + log.WithField("columnFromPeer", d.columnFromPeer).Debug("Peer info refreshed") - for _, roDataColumn := range roDataColumns { - if verifyColumn(roDataColumn, root, pid, requestedColumns) { - retrievedColumns[roDataColumn.ColumnIndex] = true + columnWithNoPeers := make([]uint64, 0) + for column, peers := range d.peerFromColumn { + if len(peers) == 0 { + columnWithNoPeers = append(columnWithNoPeers, column) } } - - if len(retrievedColumns) == len(requestedColumns) { - // This is the happy path. - log.WithFields(logrus.Fields{ - "peerID": pid, - "root": fmt.Sprintf("%#x", root), - "requestedColumns": sortedSliceFromMap(requestedColumns), - }).Debug("All requested columns were successfully sampled from peer") - - return retrievedColumns, nil + if len(columnWithNoPeers) > 0 { + log.WithField("columnWithNoPeers", columnWithNoPeers).Warn("Some columns have no peers responsible for custody") } +} - // Some columns are missing. - log.WithFields(logrus.Fields{ - "peerID": pid, - "root": fmt.Sprintf("%#x", root), - "requestedColumns": sortedSliceFromMap(requestedColumns), - "retrievedColumns": sortedSliceFromMap(retrievedColumns), - }).Warning("Some requested columns were not sampled from peer") +// prunePeerInfo prunes inactive peers from peerFromColumn and columnFromPeer. +// This should not be called outside of refreshPeerInfo without being locked. +func (d *dataColumnSampler1D) prunePeerInfo(activePeers []peer.ID) { + active := make(map[peer.ID]bool) + for _, pid := range activePeers { + active[pid] = true + } - return retrievedColumns, nil + for pid := range d.columnFromPeer { + if !active[pid] { + d.prunePeer(pid) + } + } } -// sampleDataColumnsFromPeers samples data columns from active peers. -// It returns the retrieved columns count. -// If one peer fails to return a column it should custody, the column is considered as missing. -func (s *Service) sampleDataColumnsFromPeers( - columnsToSample []uint64, - root [fieldparams.RootLength]byte, -) (map[uint64]bool, error) { - // Build all remaining columns to sample. - remainingColumnsToSample := make(map[uint64]bool, len(columnsToSample)) - for _, column := range columnsToSample { - remainingColumnsToSample[column] = true +// prunePeer removes a peer from stored peer info map, it should be called with lock held. +func (d *dataColumnSampler1D) prunePeer(pid peer.ID) { + delete(d.columnFromPeer, pid) + for _, peers := range d.peerFromColumn { + delete(peers, pid) } +} - // Get the active peers from the p2p service. - activePids := s.cfg.p2p.Peers().Active() +func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event *feed.Event) { + if event.Type != statefeed.BlockProcessed { + return + } - retrievedColumns := make(map[uint64]bool, len(columnsToSample)) + data, ok := event.Data.(*statefeed.BlockProcessedData) + if !ok { + log.Error("Event feed data is not of type *statefeed.BlockProcessedData") + return + } - // Query all peers until either all columns to request are retrieved or all active peers are queried (whichever comes first). - for i := 0; len(remainingColumnsToSample) > 0 && i < len(activePids); i++ { - // Get the peer ID. - pid := activePids[i] + if !data.Verified { + // We only process blocks that have been verified + log.Error("Data is not verified") + return + } - // Get the custody columns of the peer. - peerCustodyColumns, err := s.custodyColumnsFromPeer(pid) - if err != nil { - return nil, errors.Wrap(err, "custody columns from peer") - } + if data.SignedBlock.Version() < version.Deneb { + log.Debug("Pre Deneb block, skipping data column sampling") + return + } - // Compute the intersection of the peer custody columns and the remaining columns to request. - peerRequestedColumns := make(map[uint64]bool, len(peerCustodyColumns)) - for column := range remainingColumnsToSample { - if peerCustodyColumns[column] { - peerRequestedColumns[column] = true - } - } + if coreTime.PeerDASIsActive(data.Slot) { + // We do not trigger sampling if peerDAS is not active yet. + return + } - // Remove the newsly requested columns from the remaining columns to request. - for column := range peerRequestedColumns { - delete(remainingColumnsToSample, column) - } + // Get the commitments for this block. + commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments() + if err != nil { + log.WithError(err).Error("Failed to get blob KZG commitments") + return + } - // Sample data columns from the peer. - peerRetrievedColumns, err := s.sampleDataColumnsFromPeer(pid, peerRequestedColumns, root) - if err != nil { - return nil, errors.Wrap(err, "sample data columns from peer") - } + // Skip if there are no commitments. + if len(commitments) == 0 { + log.Debug("No commitments in block, skipping data column sampling") + return + } - // Update the retrieved columns. - for column := range peerRetrievedColumns { - retrievedColumns[column] = true - } + // Randomize columns for sample selection. + randomizedColumns := randomizeColumns(d.nonCustodyColumns) + samplesCount := min(params.BeaconConfig().SamplesPerSlot, uint64(len(d.nonCustodyColumns))-params.BeaconConfig().NumberOfColumns/2) + ok, _, err = d.incrementalDAS(ctx, data.BlockRoot, randomizedColumns, samplesCount) + if err != nil { + log.WithError(err).Error("Failed to run incremental DAS") } - return retrievedColumns, nil + if ok { + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", data.BlockRoot), + "columns": randomizedColumns, + }).Debug("Data column sampling successful") + } else { + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", data.BlockRoot), + "columns": randomizedColumns, + }).Warning("Data column sampling failed") + } } // incrementalDAS samples data columns from active peers using incremental DAS. // https://ethresear.ch/t/lossydas-lossy-incremental-and-diagonal-sampling-for-data-availability/18963#incrementaldas-dynamically-increase-the-sample-size-10 -func (s *Service) incrementalDAS( +// According to https://github.com/ethereum/consensus-specs/issues/3825, we're going to select query samples exclusively from the non custody columns. +func (d *dataColumnSampler1D) incrementalDAS( + ctx context.Context, root [fieldparams.RootLength]byte, columns []uint64, sampleCount uint64, ) (bool, []roundSummary, error) { - columnsCount, missingColumnsCount := uint64(len(columns)), uint64(0) - firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, 0) - + allowedFailures := uint64(0) + firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, allowedFailures) roundSummaries := make([]roundSummary, 0, 1) // We optimistically allocate only one round summary. for round := 1; ; /*No exit condition */ round++ { - if extendedSampleCount > columnsCount { + if extendedSampleCount > uint64(len(columns)) { // We already tried to sample all possible columns, this is the unhappy path. log.WithField("root", fmt.Sprintf("%#x", root)).Warning("Some columns are still missing after sampling all possible columns") return false, roundSummaries, nil @@ -281,14 +291,10 @@ func (s *Service) incrementalDAS( columnsToSample := columns[firstColumnToSample:extendedSampleCount] columnsToSampleCount := extendedSampleCount - firstColumnToSample - // Sample the data columns from the peers. - retrievedSamples, err := s.sampleDataColumnsFromPeers(columnsToSample, root) - if err != nil { - return false, nil, errors.Wrap(err, "sample data columns from peers") - } + // Sample data columns from peers in parallel. + retrievedSamples := d.sampleDataColumns(ctx, root, columnsToSample) - // Compute the missing samples. - missingSamples := make(map[uint64]bool, max(0, len(columnsToSample)-len(retrievedSamples))) + missingSamples := make(map[uint64]bool) for _, column := range columnsToSample { if !retrievedSamples[column] { missingSamples[column] = true @@ -301,7 +307,6 @@ func (s *Service) incrementalDAS( }) retrievedSampleCount := uint64(len(retrievedSamples)) - if retrievedSampleCount == columnsToSampleCount { // All columns were correctly sampled, this is the happy path. log.WithFields(logrus.Fields{ @@ -316,141 +321,238 @@ func (s *Service) incrementalDAS( return false, nil, errors.New("retrieved more columns than requested") } - // Some columns are missing, we need to extend the sample size. - missingColumnsCount += columnsToSampleCount - retrievedSampleCount - - firstColumnToSample = extendedSampleCount + // missing columns, extend the samples. + allowedFailures += columnsToSampleCount - retrievedSampleCount oldExtendedSampleCount := extendedSampleCount - extendedSampleCount = peerdas.ExtendedSampleCount(sampleCount, missingColumnsCount) + firstColumnToSample = extendedSampleCount + extendedSampleCount = peerdas.ExtendedSampleCount(sampleCount, allowedFailures) log.WithFields(logrus.Fields{ "root": fmt.Sprintf("%#x", root), "round": round, - "missingColumnsCount": missingColumnsCount, - "currentSampleCount": oldExtendedSampleCount, - "nextSampleCount": extendedSampleCount, + "missingColumnsCount": allowedFailures, + "currentSampleIndex": oldExtendedSampleCount, + "nextSampleIndex": extendedSampleCount, }).Debug("Some columns are still missing after sampling this round.") } } -// DataColumnSamplingRoutine runs incremental DAS on block when received. -func (s *Service) DataColumnSamplingRoutine(ctx context.Context) { - // Get the custody subnets count. - custodySubnetsCount := peerdas.CustodySubnetCount() +func (d *dataColumnSampler1D) sampleDataColumns( + ctx context.Context, + root [fieldparams.RootLength]byte, + columns []uint64, +) map[uint64]bool { + // distribute samples to peer + peerToColumns := d.distributeSamplesToPeer(columns) + + var ( + mu sync.Mutex + wg sync.WaitGroup + ) + res := make(map[uint64]bool) + sampleFromPeer := func(pid peer.ID, cols map[uint64]bool) { + defer wg.Done() + retrieved := d.sampleDataColumnsFromPeer(ctx, pid, root, cols) + + mu.Lock() + for col := range retrieved { + res[col] = true + } + mu.Unlock() + } - // Create a subscription to the state feed. - stateChannel := make(chan *feed.Event, 1) - stateSub := s.cfg.stateNotifier.StateFeed().Subscribe(stateChannel) + // sample from peers in parallel + for pid, cols := range peerToColumns { + wg.Add(1) + go sampleFromPeer(pid, cols) + } - // Unsubscribe from the state feed when the function returns. - defer stateSub.Unsubscribe() + wg.Wait() + return res +} - // Retrieve the number of columns. - columnsCount := params.BeaconConfig().NumberOfColumns +// distributeSamplesToPeer distributes samples to peers based on the columns they are responsible for. +// Currently it randomizes peer selection for a column and did not take into account whole peer distribution balance. It could be improved if needed. +func (d *dataColumnSampler1D) distributeSamplesToPeer( + columns []uint64, +) map[peer.ID]map[uint64]bool { + dist := make(map[peer.ID]map[uint64]bool) + + for _, col := range columns { + peers := d.peerFromColumn[col] + if len(peers) == 0 { + log.WithField("column", col).Warn("No peers responsible for custody of column") + continue + } - // Retrieve all columns we custody. - custodyColumns, err := peerdas.CustodyColumns(s.cfg.p2p.NodeID(), custodySubnetsCount) - if err != nil { - log.WithError(err).Error("Failed to get custody columns") - return + pid := selectRandomPeer(peers) + if _, ok := dist[pid]; !ok { + dist[pid] = make(map[uint64]bool) + } + dist[pid][col] = true } - custodyColumnsCount := uint64(len(custodyColumns)) + return dist +} - // Compute the number of columns to sample. - if custodyColumnsCount >= columnsCount/2 { - log.WithFields(logrus.Fields{ - "custodyColumnsCount": custodyColumnsCount, - "columnsCount": columnsCount, - }).Debug("The node custodies at least the half the data columns, no need to sample") - return +func (d *dataColumnSampler1D) sampleDataColumnsFromPeer( + ctx context.Context, + pid peer.ID, + root [fieldparams.RootLength]byte, + requestedColumns map[uint64]bool, +) map[uint64]bool { + retrievedColumns := make(map[uint64]bool) + + req := make(types.DataColumnSidecarsByRootReq, 0) + for col := range requestedColumns { + req = append(req, ð.DataColumnIdentifier{ + BlockRoot: root[:], + ColumnIndex: col, + }) } - samplesCount := min(params.BeaconConfig().SamplesPerSlot, columnsCount/2-custodyColumnsCount) + // Send the request to the peer. + roDataColumns, err := SendDataColumnSidecarByRoot(ctx, d.clock, d.p2p, pid, d.ctxMap, &req) + if err != nil { + log.WithError(err).Error("Failed to send data column sidecar by root") + return nil + } - // Compute all the columns we do NOT custody. - nonCustodyColums := make(map[uint64]bool, columnsCount-custodyColumnsCount) - for i := uint64(0); i < columnsCount; i++ { - if !custodyColumns[i] { - nonCustodyColums[i] = true + for _, roDataColumn := range roDataColumns { + if verifyColumn(roDataColumn, root, pid, requestedColumns) { + retrievedColumns[roDataColumn.ColumnIndex] = true } } - for { - select { - case e := <-stateChannel: - s.processEvent(e, nonCustodyColums, samplesCount) + if len(retrievedColumns) == len(requestedColumns) { + log.WithFields(logrus.Fields{ + "peerID": pid, + "root": fmt.Sprintf("%#x", root), + "requestedColumns": sortedSliceFromMap(requestedColumns), + }).Debug("All requested columns were successfully sampled from peer") + } else { + log.WithFields(logrus.Fields{ + "peerID": pid, + "root": fmt.Sprintf("%#x", root), + "requestedColumns": sortedSliceFromMap(requestedColumns), + "retrievedColumns": sortedSliceFromMap(retrievedColumns), + }).Debug("Some requested columns were not sampled from peer") + } - case <-s.ctx.Done(): - log.Debug("Context closed, exiting goroutine") - return + return retrievedColumns +} - case err := <-stateSub.Err(): - log.WithError(err).Error("Subscription to state feed failed") - } +// randomizeColumns returns a slice containing all the numbers between 0 and colNum in a random order. +func randomizeColumns(columns map[uint64]bool) []uint64 { + // Create a slice from columns. + randomized := make([]uint64, 0, len(columns)) + for column := range columns { + randomized = append(randomized, column) } + + // Shuffle the slice. + rand.NewGenerator().Shuffle(len(randomized), func(i, j int) { + randomized[i], randomized[j] = randomized[j], randomized[i] + }) + + return randomized } -func (s *Service) processEvent(e *feed.Event, nonCustodyColums map[uint64]bool, samplesCount uint64) { - if e.Type != statefeed.BlockProcessed { - return +// sortedSliceFromMap returns a sorted list of keys from a map. +func sortedSliceFromMap(m map[uint64]bool) []uint64 { + result := make([]uint64, 0, len(m)) + for k := range m { + result = append(result, k) } - data, ok := e.Data.(*statefeed.BlockProcessedData) - if !ok { - log.Error("Event feed data is not of type *statefeed.BlockProcessedData") - return - } + sort.Slice(result, func(i, j int) bool { + return result[i] < result[j] + }) - if !data.Verified { - // We only process blocks that have been verified - log.Error("Data is not verified") - return - } + return result +} - if data.SignedBlock.Version() < version.Deneb { - log.Debug("Pre Deneb block, skipping data column sampling") - return +// selectRandomPeer returns a random peer from the given list of peers. +func selectRandomPeer(peers map[peer.ID]bool) peer.ID { + pick := rand.NewGenerator().Uint64() % uint64(len(peers)) + for k := range peers { + if pick == 0 { + return k + } + pick-- } - if coreTime.PeerDASIsActive(data.Slot) { - // We do not trigger sampling if peerDAS is not active yet. - return - } + // This should never be reached. + return peer.ID("") +} - // Get the commitments for this block. - commitments, err := data.SignedBlock.Block().Body().BlobKzgCommitments() - if err != nil { - log.WithError(err).Error("Failed to get blob KZG commitments") - return +// verifyColumn verifies the retrieved column against the root, the index, +// the KZG inclusion and the KZG proof. +func verifyColumn( + roDataColumn blocks.RODataColumn, + root [32]byte, + pid peer.ID, + requestedColumns map[uint64]bool, +) bool { + retrievedColumn := roDataColumn.ColumnIndex + + // Filter out columns with incorrect root. + actualRoot := roDataColumn.BlockRoot() + if actualRoot != root { + log.WithFields(logrus.Fields{ + "peerID": pid, + "requestedRoot": fmt.Sprintf("%#x", root), + "actualRoot": fmt.Sprintf("%#x", actualRoot), + }).Debug("Retrieved root does not match requested root") + + return false } - // Skip if there are no commitments. - if len(commitments) == 0 { - log.Debug("No commitments in block, skipping data column sampling") - return + // Filter out columns that were not requested. + if !requestedColumns[retrievedColumn] { + columnsToSampleList := sortedSliceFromMap(requestedColumns) + + log.WithFields(logrus.Fields{ + "peerID": pid, + "requestedColumns": columnsToSampleList, + "retrievedColumn": retrievedColumn, + }).Debug("Retrieved column was not requested") + + return false } - // Ramdomize all columns. - randomizedColumns := randomizeColumns(nonCustodyColums) + // Filter out columns which did not pass the KZG inclusion proof verification. + if err := blocks.VerifyKZGInclusionProofColumn(roDataColumn.DataColumnSidecar); err != nil { + log.WithFields(logrus.Fields{ + "peerID": pid, + "root": fmt.Sprintf("%#x", root), + "index": retrievedColumn, + }).Debug("Failed to verify KZG inclusion proof for retrieved column") - // Sample data columns with incremental DAS. - ok, _, err = s.incrementalDAS(data.BlockRoot, randomizedColumns, samplesCount) - if err != nil { - log.WithError(err).Error("Error during incremental DAS") + return false } - if ok { + // Filter out columns which did not pass the KZG proof verification. + verified, err := peerdas.VerifyDataColumnSidecarKZGProofs(roDataColumn.DataColumnSidecar) + if err != nil { log.WithFields(logrus.Fields{ - "root": fmt.Sprintf("%#x", data.BlockRoot), - "columns": randomizedColumns, - "sampleCount": samplesCount, - }).Debug("Data column sampling successful") - } else { + "peerID": pid, + "root": fmt.Sprintf("%#x", root), + "index": retrievedColumn, + }).Debug("Error when verifying KZG proof for retrieved column") + + return false + } + + if !verified { log.WithFields(logrus.Fields{ - "root": fmt.Sprintf("%#x", data.BlockRoot), - "columns": randomizedColumns, - "sampleCount": samplesCount, - }).Warning("Data column sampling failed") + "peerID": pid, + "root": fmt.Sprintf("%#x", root), + "index": retrievedColumn, + }).Debug("Failed to verify KZG proof for retrieved column") + + return false } + + return true } diff --git a/beacon-chain/sync/data_columns_sampling_test.go b/beacon-chain/sync/data_columns_sampling_test.go index 12e5b924599f..656d36e2c584 100644 --- a/beacon-chain/sync/data_columns_sampling_test.go +++ b/beacon-chain/sync/data_columns_sampling_test.go @@ -19,6 +19,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/peers" p2ptest "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/testing" p2pTypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" + "github.com/prysmaticlabs/prysm/v5/config/params" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/runtime/version" @@ -61,10 +62,10 @@ func createAndConnectPeer( p2pService *p2ptest.TestP2P, chainService *mock.ChainService, dataColumnSidecars []*ethpb.DataColumnSidecar, - custodyCount uint64, + custodySubnetCount uint64, columnsNotToRespond map[uint64]bool, offset int, -) { +) *p2ptest.TestP2P { // Create the private key, depending on the offset. privateKeyBytes := make([]byte, 32) for i := 0; i < 32; i++ { @@ -104,73 +105,56 @@ func createAndConnectPeer( // Create the record and set the custody count. enr := &enr.Record{} - enr.Set(peerdas.Csc(custodyCount)) + enr.Set(peerdas.Csc(custodySubnetCount)) // Add the peer and connect it. p2pService.Peers().Add(enr, peer.PeerID(), nil, network.DirOutbound) p2pService.Peers().SetConnectionState(peer.PeerID(), peers.PeerConnected) p2pService.Connect(peer) -} - -func deterministicRandomness(seed int64) [32]byte { - // Converts an int64 to a byte slice - buf := new(bytes.Buffer) - err := binary.Write(buf, binary.BigEndian, seed) - if err != nil { - logrus.WithError(err).Error("Failed to write int64 to bytes buffer") - return [32]byte{} - } - bytes := buf.Bytes() - - return sha256.Sum256(bytes) -} - -// Returns a serialized random field element in big-endian -func getRandFieldElement(seed int64) [32]byte { - bytes := deterministicRandomness(seed) - var r fr.Element - r.SetBytes(bytes[:]) - return GoKZG.SerializeScalar(r) + return peer } -// Returns a random blob using the passed seed as entropy -func getRandBlob(seed int64) kzg.Blob { - var blob kzg.Blob - for i := 0; i < len(blob); i += 32 { - fieldElementBytes := getRandFieldElement(seed + int64(i)) - copy(blob[i:i+32], fieldElementBytes[:]) - } - return blob +type dataSamplerTest struct { + ctx context.Context + p2pSvc *p2ptest.TestP2P + peers []*p2ptest.TestP2P + ctxMap map[[4]byte]int + chainSvc *mock.ChainService + blockRoot [32]byte + blobs []kzg.Blob + kzgCommitments [][]byte + kzgProofs [][]byte + dataColumnSidecars []*ethpb.DataColumnSidecar } -func generateCommitmentAndProof(blob *kzg.Blob) (*kzg.Commitment, *kzg.Proof, error) { - commitment, err := kzg.BlobToKZGCommitment(blob) - if err != nil { - return nil, nil, err - } - proof, err := kzg.ComputeBlobKZGProof(blob, commitment) - if err != nil { - return nil, nil, err - } - return &commitment, &proof, err -} - -func TestIncrementalDAS(t *testing.T) { +func setupDefaultDataColumnSamplerTest(t *testing.T) (*dataSamplerTest, *dataColumnSampler1D) { const ( - blobCount = 3 + blobCount uint64 = 3 custodyRequirement uint64 = 1 ) - err := kzg.Start() - require.NoError(t, err) + test, sampler := setupDataColumnSamplerTest(t, blobCount) + // Custody columns: [6, 38, 70, 102] + p1 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, custodyRequirement, map[uint64]bool{}, 1) + // Custody columns: [3, 35, 67, 99] + p2 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, custodyRequirement, map[uint64]bool{}, 2) + // Custody columns: [12, 44, 76, 108] + p3 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, custodyRequirement, map[uint64]bool{}, 3) + test.peers = []*p2ptest.TestP2P{p1, p2, p3} + + return test, sampler +} + +func setupDataColumnSamplerTest(t *testing.T, blobCount uint64) (*dataSamplerTest, *dataColumnSampler1D) { + require.NoError(t, kzg.Start()) // Generate random blobs, commitments and inclusion proofs. blobs := make([]kzg.Blob, blobCount) kzgCommitments := make([][]byte, blobCount) kzgProofs := make([][]byte, blobCount) - for i := int64(0); i < blobCount; i++ { + for i := uint64(0); i < blobCount; i++ { blob := getRandBlob(int64(i)) kzgCommitment, kzgProof, err := generateCommitmentAndProof(&blob) @@ -192,6 +176,218 @@ func TestIncrementalDAS(t *testing.T) { blockRoot, err := dataColumnSidecars[0].GetSignedBlockHeader().Header.HashTreeRoot() require.NoError(t, err) + p2pSvc := p2ptest.NewTestP2P(t) + chainSvc, clock := defaultMockChain(t) + + test := &dataSamplerTest{ + ctx: context.Background(), + p2pSvc: p2pSvc, + peers: []*p2ptest.TestP2P{}, + ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Deneb}, + chainSvc: chainSvc, + blockRoot: blockRoot, + blobs: blobs, + kzgCommitments: kzgCommitments, + kzgProofs: kzgProofs, + dataColumnSidecars: dataColumnSidecars, + } + sampler := newDataColumnSampler1D(p2pSvc, clock, test.ctxMap, nil) + + return test, sampler +} + +func TestDataColumnSampler1D_PeerManagement(t *testing.T) { + testCases := []struct { + numPeers int + custodyRequirement uint64 + expectedColumns [][]uint64 + prunePeers map[int]bool // Peers to prune. + }{ + { + numPeers: 3, + custodyRequirement: 1, + expectedColumns: [][]uint64{ + {6, 38, 70, 102}, + {3, 35, 67, 99}, + {12, 44, 76, 108}, + }, + prunePeers: map[int]bool{ + 0: true, + }, + }, + { + numPeers: 3, + custodyRequirement: 2, + expectedColumns: [][]uint64{ + {6, 16, 38, 48, 70, 80, 102, 112}, + {3, 13, 35, 45, 67, 77, 99, 109}, + {12, 31, 44, 63, 76, 95, 108, 127}, + }, + prunePeers: map[int]bool{ + 0: true, + }, + }, + } + + for _, tc := range testCases { + test, sampler := setupDataColumnSamplerTest(t, uint64(tc.numPeers)) + for i := 0; i < tc.numPeers; i++ { + p := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, tc.custodyRequirement, nil, i+1) + test.peers = append(test.peers, p) + } + + // confirm everything works + sampler.refreshPeerInfo() + require.Equal(t, params.BeaconConfig().NumberOfColumns, uint64(len(sampler.peerFromColumn))) + + require.Equal(t, tc.numPeers, len(sampler.columnFromPeer)) + for i, peer := range test.peers { + // confirm peer has the expected columns + require.Equal(t, len(tc.expectedColumns[i]), len(sampler.columnFromPeer[peer.PeerID()])) + for _, column := range tc.expectedColumns[i] { + require.Equal(t, true, sampler.columnFromPeer[peer.PeerID()][column]) + } + + // confirm column to peer mapping are correct + for _, column := range tc.expectedColumns[i] { + require.Equal(t, true, sampler.peerFromColumn[column][peer.PeerID()]) + } + } + + // prune peers + for peer := range tc.prunePeers { + err := test.p2pSvc.Disconnect(test.peers[peer].PeerID()) + test.p2pSvc.Peers().SetConnectionState(test.peers[peer].PeerID(), peers.PeerDisconnected) + require.NoError(t, err) + } + sampler.refreshPeerInfo() + + require.Equal(t, tc.numPeers-len(tc.prunePeers), len(sampler.columnFromPeer)) + for i, peer := range test.peers { + for _, column := range tc.expectedColumns[i] { + expected := true + if tc.prunePeers[i] { + expected = false + } + require.Equal(t, expected, sampler.peerFromColumn[column][peer.PeerID()]) + } + } + } +} + +func TestDataColumnSampler1D_SampleDistribution(t *testing.T) { + testCases := []struct { + numPeers int + custodyRequirement uint64 + columnsToDistribute [][]uint64 + expectedDistribution []map[int][]uint64 + }{ + { + numPeers: 3, + custodyRequirement: 1, + // peer custody maps + // p0: {6, 38, 70, 102}, + // p1: {3, 35, 67, 99}, + // p2: {12, 44, 76, 108}, + columnsToDistribute: [][]uint64{ + {3, 6, 12}, + {6, 3, 12, 38, 35, 44}, + {6, 38, 70}, + {11}, + }, + expectedDistribution: []map[int][]uint64{ + { + 0: {6}, // p1 + 1: {3}, // p2 + 2: {12}, // p3 + }, + { + 0: {6, 38}, // p1 + 1: {3, 35}, // p2 + 2: {12, 44}, // p3 + }, + { + 0: {6, 38, 70}, // p1 + }, + {}, + }, + }, + { + numPeers: 3, + custodyRequirement: 2, + // peer custody maps + // p0: {6, 16, 38, 48, 70, 80, 102, 112}, + // p1: {3, 13, 35, 45, 67, 77, 99, 109}, + // p2: {12, 31, 44, 63, 76, 95, 108, 127}, + columnsToDistribute: [][]uint64{ + {3, 6, 12, 109, 112, 127}, // all covered by peers + {13, 16, 31, 32}, // 32 not in covered by peers + }, + expectedDistribution: []map[int][]uint64{ + { + 0: {6, 112}, // p1 + 1: {3, 109}, // p2 + 2: {12, 127}, // p3 + }, + { + 0: {16}, // p1 + 1: {13}, // p2 + 2: {31}, // p3 + }, + }, + }, + } + + for _, tc := range testCases { + test, sampler := setupDataColumnSamplerTest(t, uint64(tc.numPeers)) + for i := 0; i < tc.numPeers; i++ { + p := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, tc.custodyRequirement, nil, i+1) + test.peers = append(test.peers, p) + } + sampler.refreshPeerInfo() + + for idx, columns := range tc.columnsToDistribute { + result := sampler.distributeSamplesToPeer(columns) + require.Equal(t, len(tc.expectedDistribution[idx]), len(result)) + + for peerIdx, dist := range tc.expectedDistribution[idx] { + for _, column := range dist { + peerID := test.peers[peerIdx].PeerID() + require.Equal(t, true, result[peerID][column]) + } + } + } + } +} + +func TestDataColumnSampler1D_SampleDataColumns(t *testing.T) { + test, sampler := setupDefaultDataColumnSamplerTest(t) + sampler.refreshPeerInfo() + + // Sample all columns. + sampleColumns := []uint64{6, 3, 12, 38, 35, 44, 70, 67, 76, 102, 99, 108} + retrieved := sampler.sampleDataColumns(test.ctx, test.blockRoot, sampleColumns) + require.Equal(t, 12, len(retrieved)) + for _, column := range sampleColumns { + require.Equal(t, true, retrieved[column]) + } + + // Sample a subset of columns. + sampleColumns = []uint64{6, 3, 12, 38, 35, 44} + retrieved = sampler.sampleDataColumns(test.ctx, test.blockRoot, sampleColumns) + require.Equal(t, 6, len(retrieved)) + for _, column := range sampleColumns { + require.Equal(t, true, retrieved[column]) + } + + // Sample a subset of columns with missing columns. + sampleColumns = []uint64{6, 3, 12, 127} + retrieved = sampler.sampleDataColumns(test.ctx, test.blockRoot, sampleColumns) + require.Equal(t, 3, len(retrieved)) + require.DeepEqual(t, map[uint64]bool{6: true, 3: true, 12: true}, retrieved) +} + +func TestDataColumnSampler1D_IncrementalDAS(t *testing.T) { testCases := []struct { name string samplesCount uint64 @@ -250,37 +446,61 @@ func TestIncrementalDAS(t *testing.T) { } for _, tc := range testCases { - // Create a context. - ctx := context.Background() + test, sampler := setupDataColumnSamplerTest(t, 3) + p1 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, 1, tc.columnsNotToRespond, 1) + p2 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, 1, tc.columnsNotToRespond, 2) + p3 := createAndConnectPeer(t, test.p2pSvc, test.chainSvc, test.dataColumnSidecars, 1, tc.columnsNotToRespond, 3) + test.peers = []*p2ptest.TestP2P{p1, p2, p3} - // Create the p2p service. - p2pService := p2ptest.NewTestP2P(t) + sampler.refreshPeerInfo() - // Create a peer custodying `custodyRequirement` subnets. - chainService, clock := defaultMockChain(t) + success, summaries, err := sampler.incrementalDAS(test.ctx, test.blockRoot, tc.possibleColumnsToRequest, tc.samplesCount) + require.NoError(t, err) + require.Equal(t, tc.expectedSuccess, success) + require.DeepEqual(t, tc.expectedRoundSummaries, summaries) + } +} - // Custody columns: [6, 38, 70, 102] - createAndConnectPeer(t, p2pService, chainService, dataColumnSidecars, custodyRequirement, tc.columnsNotToRespond, 1) +func deterministicRandomness(seed int64) [32]byte { + // Converts an int64 to a byte slice + buf := new(bytes.Buffer) + err := binary.Write(buf, binary.BigEndian, seed) + if err != nil { + logrus.WithError(err).Error("Failed to write int64 to bytes buffer") + return [32]byte{} + } + bytes := buf.Bytes() - // Custody columns: [3, 35, 67, 99] - createAndConnectPeer(t, p2pService, chainService, dataColumnSidecars, custodyRequirement, tc.columnsNotToRespond, 2) + return sha256.Sum256(bytes) +} - // Custody columns: [12, 44, 76, 108] - createAndConnectPeer(t, p2pService, chainService, dataColumnSidecars, custodyRequirement, tc.columnsNotToRespond, 3) +// Returns a serialized random field element in big-endian +func getRandFieldElement(seed int64) [32]byte { + bytes := deterministicRandomness(seed) + var r fr.Element + r.SetBytes(bytes[:]) - service := &Service{ - cfg: &config{ - p2p: p2pService, - clock: clock, - }, - ctx: ctx, - ctxMap: map[[4]byte]int{{245, 165, 253, 66}: version.Deneb}, - } + return GoKZG.SerializeScalar(r) +} - actualSuccess, actualRoundSummaries, err := service.incrementalDAS(blockRoot, tc.possibleColumnsToRequest, tc.samplesCount) +// Returns a random blob using the passed seed as entropy +func getRandBlob(seed int64) kzg.Blob { + var blob kzg.Blob + for i := 0; i < len(blob); i += 32 { + fieldElementBytes := getRandFieldElement(seed + int64(i)) + copy(blob[i:i+32], fieldElementBytes[:]) + } + return blob +} - require.NoError(t, err) - require.Equal(t, tc.expectedSuccess, actualSuccess) - require.DeepEqual(t, tc.expectedRoundSummaries, actualRoundSummaries) +func generateCommitmentAndProof(blob *kzg.Blob) (*kzg.Commitment, *kzg.Proof, error) { + commitment, err := kzg.BlobToKZGCommitment(blob) + if err != nil { + return nil, nil, err } + proof, err := kzg.ComputeBlobKZGProof(blob, commitment) + if err != nil { + return nil, nil, err + } + return &commitment, &proof, err } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index c33d79bc7995..003e11d02047 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -15,6 +15,8 @@ import ( "github.com/libp2p/go-libp2p/core/peer" gcache "github.com/patrickmn/go-cache" "github.com/pkg/errors" + "github.com/trailofbits/go-mutexasserts" + "github.com/prysmaticlabs/prysm/v5/async" "github.com/prysmaticlabs/prysm/v5/async/abool" "github.com/prysmaticlabs/prysm/v5/async/event" @@ -45,7 +47,6 @@ import ( "github.com/prysmaticlabs/prysm/v5/runtime" prysmTime "github.com/prysmaticlabs/prysm/v5/time" "github.com/prysmaticlabs/prysm/v5/time/slots" - "github.com/trailofbits/go-mutexasserts" ) var _ runtime.Service = (*Service)(nil) @@ -168,6 +169,7 @@ type Service struct { receivedDataColumnsFromRoot map[[fieldparams.RootLength]byte]map[uint64]bool receivedDataColumnsFromRootLock sync.RWMutex ctxMap ContextByteVersions + sampler DataColumnSampler } // NewService initializes new regular sync service. @@ -253,7 +255,8 @@ func (s *Service) Start() { // Run data column sampling if params.PeerDASEnabled() { - go s.DataColumnSamplingRoutine(s.ctx) + s.sampler = newDataColumnSampler1D(s.cfg.p2p, s.cfg.clock, s.ctxMap, s.cfg.stateNotifier) + go s.sampler.Run(s.ctx) } }