Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

les, light: LES/2 protocol version #14970

Merged
merged 20 commits into from
Oct 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
e9b9945
les: stop ODR retrievals at shutdown
zsfelfoldi Sep 16, 2017
8307f21
les: fixed peer register/head announce order
zsfelfoldi Sep 17, 2017
df7a7ae
core: some changes in the chain indexer
zsfelfoldi Sep 6, 2017
bb030e0
core/bloombits, eth/filters: bubble up errors
zsfelfoldi Sep 17, 2017
6016610
light, trie: use NodeSet and NodeList for Merkle proofs
zsfelfoldi Aug 12, 2017
c76e521
les: implement ProofsV2Msg
zsfelfoldi Aug 12, 2017
976133a
les: implement PPTProofsMsg
zsfelfoldi Aug 13, 2017
b844a63
light, les: CHT and bloom trie post processors
zsfelfoldi Aug 13, 2017
163f03f
les: protocol versions and topics for clients and servers
zsfelfoldi Aug 19, 2017
4a96afc
les: add tests for protocol version 2
zsfelfoldi Sep 18, 2017
bc23b73
les: optional signed announce messages
zsfelfoldi Aug 23, 2017
780fc83
les: implement GetTxStatusMsg and SendTxV2Msg
zsfelfoldi Sep 6, 2017
ff9dee8
les: implement bloom request handler
zsfelfoldi Sep 14, 2017
4b63aaa
core/bloombits: added context to Retrieval
zsfelfoldi Sep 16, 2017
8a92e0a
les: in server mode print latest CHT/BloomTrie at startup
zsfelfoldi Sep 17, 2017
d8bf82e
les: add test for TxStatus
zsfelfoldi Sep 26, 2017
abc5400
les, light: simplify NodeSet, use readTraceDB
zsfelfoldi Oct 10, 2017
7216e44
core, light, eth: handle errors in ChainIndexerBackend.Reset
zsfelfoldi Oct 11, 2017
21983ec
les: start bloombits indexer in light mode
zsfelfoldi Oct 11, 2017
c170349
les, light: changed names and abbreviations
zsfelfoldi Oct 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 44 additions & 7 deletions core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package bloombits

import (
"bytes"
"context"
"errors"
"math"
"sort"
Expand Down Expand Up @@ -60,6 +61,8 @@ type Retrieval struct {
Bit uint
Sections []uint64
Bitsets [][]byte
Error error
Context context.Context
}

// Matcher is a pipelined system of schedulers and logic matchers which perform
Expand Down Expand Up @@ -137,7 +140,7 @@ func (m *Matcher) addScheduler(idx uint) {
// Start starts the matching process and returns a stream of bloom matches in
// a given range of blocks. If there are no more matches in the range, the result
// channel is closed.
func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession, error) {
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if atomic.SwapUint32(&m.running, 1) == 1 {
return nil, errors.New("matcher already running")
Expand All @@ -149,6 +152,7 @@ func (m *Matcher) Start(begin, end uint64, results chan uint64) (*MatcherSession
matcher: m,
quit: make(chan struct{}),
kill: make(chan struct{}),
ctx: ctx,
}
for _, scheduler := range m.schedulers {
scheduler.reset()
Expand Down Expand Up @@ -502,15 +506,28 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
type MatcherSession struct {
matcher *Matcher

quit chan struct{} // Quit channel to request pipeline termination
kill chan struct{} // Term channel to signal non-graceful forced shutdown
pend sync.WaitGroup
quit chan struct{} // Quit channel to request pipeline termination
kill chan struct{} // Term channel to signal non-graceful forced shutdown
ctx context.Context
err error
stopping bool
lock sync.Mutex
pend sync.WaitGroup
}

// Close stops the matching process and waits for all subprocesses to terminate
// before returning. The timeout may be used for graceful shutdown, allowing the
// currently running retrievals to complete before this time.
func (s *MatcherSession) Close(timeout time.Duration) {
func (s *MatcherSession) Close() {
s.lock.Lock()
stopping := s.stopping
s.stopping = true
s.lock.Unlock()
// ensure that we only close the session once
if stopping {
return
}

// Bail out if the matcher is not running
select {
case <-s.quit:
Expand All @@ -519,10 +536,26 @@ func (s *MatcherSession) Close(timeout time.Duration) {
}
// Signal termination and wait for all goroutines to tear down
close(s.quit)
time.AfterFunc(timeout, func() { close(s.kill) })
time.AfterFunc(time.Second, func() { close(s.kill) })
s.pend.Wait()
}

// setError sets an error and stops the session
func (s *MatcherSession) setError(err error) {
s.lock.Lock()
s.err = err
s.lock.Unlock()
s.Close()
}

// Error returns an error if one has happened during the session
func (s *MatcherSession) Error() error {
s.lock.Lock()
defer s.lock.Unlock()

return s.err
}

// AllocateRetrieval assigns a bloom bit index to a client process that can either
// immediately reuest and fetch the section contents assigned to this bit or wait
// a little while for more sections to be requested.
Expand Down Expand Up @@ -618,9 +651,13 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan

case mux <- request:
// Retrieval accepted, something must arrive before we're aborting
request <- &Retrieval{Bit: bit, Sections: sections}
request <- &Retrieval{Bit: bit, Sections: sections, Context: s.ctx}

result := <-request
if result.Error != nil {
s.setError(result.Error)
}

s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
}
}
Expand Down
9 changes: 5 additions & 4 deletions core/bloombits/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package bloombits

import (
"context"
"math/rand"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -144,7 +145,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
quit := make(chan struct{})
matches := make(chan uint64, 16)

session, err := matcher.Start(0, blocks-1, matches)
session, err := matcher.Start(context.Background(), 0, blocks-1, matches)
if err != nil {
t.Fatalf("failed to stat matcher session: %v", err)
}
Expand All @@ -163,13 +164,13 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
}
// If we're testing intermittent mode, abort and restart the pipeline
if intermittent {
session.Close(time.Second)
session.Close()
close(quit)

quit = make(chan struct{})
matches = make(chan uint64, 16)

session, err = matcher.Start(i+1, blocks-1, matches)
session, err = matcher.Start(context.Background(), i+1, blocks-1, matches)
if err != nil {
t.Fatalf("failed to stat matcher session: %v", err)
}
Expand All @@ -183,7 +184,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, blocks uint64, intermitt
t.Errorf("filter = %v blocks = %v intermittent = %v: expected closed channel, got #%v", filter, blocks, intermittent, match)
}
// Clean up the session and ensure we match the expected retrieval count
session.Close(time.Second)
session.Close()
close(quit)

if retrievals != 0 && requested != retrievals {
Expand Down
69 changes: 50 additions & 19 deletions core/chain_indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ import (
type ChainIndexerBackend interface {
// Reset initiates the processing of a new chain segment, potentially terminating
// any partially completed operations (in case of a reorg).
Reset(section uint64)
Reset(section uint64, lastSectionHead common.Hash) error

// Process crunches through the next header in the chain segment. The caller
// will ensure a sequential order of headers.
Process(header *types.Header)

// Commit finalizes the section metadata and stores it into the database.
// Commit finalizes the section metadata and stores it into the database. This
// interface will usually be a batch writer.
Commit() error
}

Expand Down Expand Up @@ -100,11 +101,34 @@ func NewChainIndexer(chainDb, indexDb ethdb.Database, backend ChainIndexerBacken
return c
}

// AddKnownSectionHead marks a new section head as known/processed if it is newer
// than the already known best section head
func (c *ChainIndexer) AddKnownSectionHead(section uint64, shead common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()

if section < c.storedSections {
return
}
c.setSectionHead(section, shead)
c.setValidSections(section + 1)
}

// IndexerChain interface is used for connecting the indexer to a blockchain
type IndexerChain interface {
CurrentHeader() *types.Header
SubscribeChainEvent(ch chan<- ChainEvent) event.Subscription
}

// Start creates a goroutine to feed chain head events into the indexer for
// cascading background processing. Children do not need to be started, they
// are notified about new events by their parents.
func (c *ChainIndexer) Start(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
go c.eventLoop(currentHeader, chainEventer)
func (c *ChainIndexer) Start(chain IndexerChain) {
ch := make(chan ChainEvent, 10)
sub := chain.SubscribeChainEvent(ch)
currentHeader := chain.CurrentHeader()

go c.eventLoop(currentHeader, ch, sub)
}

// Close tears down all goroutines belonging to the indexer and returns any error
Expand All @@ -125,12 +149,14 @@ func (c *ChainIndexer) Close() error {
errs = append(errs, err)
}
}

// Close all children
for _, child := range c.children {
if err := child.Close(); err != nil {
errs = append(errs, err)
}
}

// Return any failures
switch {
case len(errs) == 0:
Expand All @@ -147,12 +173,10 @@ func (c *ChainIndexer) Close() error {
// eventLoop is a secondary - optional - event loop of the indexer which is only
// started for the outermost indexer to push chain head events into a processing
// queue.
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(ch chan<- ChainEvent) event.Subscription) {
func (c *ChainIndexer) eventLoop(currentHeader *types.Header, ch chan ChainEvent, sub event.Subscription) {
// Mark the chain indexer as active, requiring an additional teardown
atomic.StoreUint32(&c.active, 1)

events := make(chan ChainEvent, 10)
sub := chainEventer(events)
defer sub.Unsubscribe()

// Fire the initial new head event to start any outstanding processing
Expand All @@ -169,7 +193,7 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(
errc <- nil
return

case ev, ok := <-events:
case ev, ok := <-ch:
// Received a new event, ensure it's not nil (closing) and update
if !ok {
errc := <-c.quit
Expand All @@ -178,7 +202,9 @@ func (c *ChainIndexer) eventLoop(currentHeader *types.Header, chainEventer func(
}
header := ev.Block.Header()
if header.ParentHash != prevHash {
c.newHead(FindCommonAncestor(c.chainDb, prevHeader, header).Number.Uint64(), true)
if h := FindCommonAncestor(c.chainDb, prevHeader, header); h != nil {
c.newHead(h.Number.Uint64(), true)
}
}
c.newHead(header.Number.Uint64(), false)

Expand Down Expand Up @@ -233,9 +259,10 @@ func (c *ChainIndexer) newHead(head uint64, reorg bool) {
// down into the processing backend.
func (c *ChainIndexer) updateLoop() {
var (
updating bool
updated time.Time
updated time.Time
updateMsg bool
)

for {
select {
case errc := <-c.quit:
Expand All @@ -250,7 +277,7 @@ func (c *ChainIndexer) updateLoop() {
// Periodically print an upgrade log message to the user
if time.Since(updated) > 8*time.Second {
if c.knownSections > c.storedSections+1 {
updating = true
updateMsg = true
c.log.Info("Upgrading chain index", "percentage", c.storedSections*100/c.knownSections)
}
updated = time.Now()
Expand All @@ -259,7 +286,7 @@ func (c *ChainIndexer) updateLoop() {
section := c.storedSections
var oldHead common.Hash
if section > 0 {
oldHead = c.sectionHead(section - 1)
oldHead = c.SectionHead(section - 1)
}
// Process the newly defined section in the background
c.lock.Unlock()
Expand All @@ -270,11 +297,11 @@ func (c *ChainIndexer) updateLoop() {
c.lock.Lock()

// If processing succeeded and no reorgs occcurred, mark the section completed
if err == nil && oldHead == c.sectionHead(section-1) {
if err == nil && oldHead == c.SectionHead(section-1) {
c.setSectionHead(section, newHead)
c.setValidSections(section + 1)
if c.storedSections == c.knownSections && updating {
updating = false
if c.storedSections == c.knownSections && updateMsg {
updateMsg = false
c.log.Info("Finished upgrading chain index")
}

Expand Down Expand Up @@ -311,7 +338,11 @@ func (c *ChainIndexer) processSection(section uint64, lastHead common.Hash) (com
c.log.Trace("Processing new chain section", "section", section)

// Reset and partial processing
c.backend.Reset(section)

if err := c.backend.Reset(section, lastHead); err != nil {
c.setValidSections(0)
return common.Hash{}, err
}

for number := section * c.sectionSize; number < (section+1)*c.sectionSize; number++ {
hash := GetCanonicalHash(c.chainDb, number)
Expand Down Expand Up @@ -341,7 +372,7 @@ func (c *ChainIndexer) Sections() (uint64, uint64, common.Hash) {
c.lock.Lock()
defer c.lock.Unlock()

return c.storedSections, c.storedSections*c.sectionSize - 1, c.sectionHead(c.storedSections - 1)
return c.storedSections, c.storedSections*c.sectionSize - 1, c.SectionHead(c.storedSections - 1)
}

// AddChildIndexer adds a child ChainIndexer that can use the output of this one
Expand Down Expand Up @@ -383,7 +414,7 @@ func (c *ChainIndexer) setValidSections(sections uint64) {

// sectionHead retrieves the last block hash of a processed section from the
// index database.
func (c *ChainIndexer) sectionHead(section uint64) common.Hash {
func (c *ChainIndexer) SectionHead(section uint64) common.Hash {
var data [8]byte
binary.BigEndian.PutUint64(data[:], section)

Expand Down
4 changes: 3 additions & 1 deletion core/chain_indexer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
)
Expand Down Expand Up @@ -208,9 +209,10 @@ func (b *testChainIndexBackend) reorg(headNum uint64) uint64 {
return b.stored * b.indexer.sectionSize
}

func (b *testChainIndexBackend) Reset(section uint64) {
func (b *testChainIndexBackend) Reset(section uint64, lastSectionHead common.Hash) error {
b.section = section
b.headerCnt = 0
return nil
}

func (b *testChainIndexBackend) Process(header *types.Header) {
Expand Down
Loading