Skip to content
This repository has been archived by the owner on Apr 15, 2024. It is now read-only.

Commit

Permalink
feat: implement the broadcaster
Browse files Browse the repository at this point in the history
  • Loading branch information
rach-id committed Feb 9, 2023
1 parent 7162094 commit 3fdb814
Show file tree
Hide file tree
Showing 8 changed files with 157 additions and 33 deletions.
25 changes: 21 additions & 4 deletions orchestrator/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,29 @@ package orchestrator
import (
"context"

"github.com/celestiaorg/orchestrator-relayer/p2p"

"github.com/celestiaorg/orchestrator-relayer/types"
)

type BroadcasterI interface {
// BroadcastConfirm broadcasts an attestation confirm to the P2P network.
BroadcastConfirm(ctx context.Context, confirm types.AttestationConfirm) (string, error)
type Broadcaster struct {
QgbDHT *p2p.QgbDHT
}

func NewBroadcaster(qgbDHT *p2p.QgbDHT) *Broadcaster {
return &Broadcaster{QgbDHT: qgbDHT}
}

// Note: broadcaster implementation will be done after defining the P2P interfaces.
func (b Broadcaster) BroadcastDataCommitmentConfirm(ctx context.Context, nonce uint64, confirm types.DataCommitmentConfirm) error {
if len(b.QgbDHT.RoutingTable().ListPeers()) == 0 {
return ErrEmptyPeersTable
}
return b.QgbDHT.PutDataCommitmentConfirm(ctx, p2p.GetDataCommitmentConfirmKey(nonce, confirm.EthAddress), confirm)
}

func (b Broadcaster) BroadcastValsetConfirm(ctx context.Context, nonce uint64, confirm types.ValsetConfirm) error {
if len(b.QgbDHT.RoutingTable().ListPeers()) == 0 {
return ErrEmptyPeersTable
}
return b.QgbDHT.PutValsetConfirm(ctx, p2p.GetValsetConfirmKey(nonce, confirm.EthAddress), confirm)
}
107 changes: 107 additions & 0 deletions orchestrator/broadcaster_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package orchestrator_test

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/celestiaorg/orchestrator-relayer/orchestrator"
"github.com/celestiaorg/orchestrator-relayer/p2p"
qgbtesting "github.com/celestiaorg/orchestrator-relayer/testing"
"github.com/celestiaorg/orchestrator-relayer/types"
"github.com/stretchr/testify/assert"
)

func TestBroadcastDataCommitmentConfirm(t *testing.T) {
network := qgbtesting.NewDHTNetwork(context.Background(), 4)
defer network.Stop()

// create a test DataCommitmentConfirm
expectedConfirm := types.DataCommitmentConfirm{
EthAddress: "celes1qktu8009djs6uym9uwj84ead24exkezsaqrmn5",
Commitment: "test commitment",
Signature: "test signature",
}

// generate a test key for the DataCommitmentConfirm
testKey := p2p.GetDataCommitmentConfirmKey(10, "celes1qktu8009djs6uym9uwj84ead24exkezsaqrmn5")

// Broadcast the confirm
broadcaster := orchestrator.NewBroadcaster(network.DHTs[1])
err := broadcaster.BroadcastDataCommitmentConfirm(context.Background(), 10, expectedConfirm)
assert.NoError(t, err)

// try to get the confirm from another peer
actualConfirm, err := network.DHTs[3].GetDataCommitmentConfirm(context.Background(), testKey)
assert.NoError(t, err)
assert.NotNil(t, actualConfirm)

assert.Equal(t, expectedConfirm, actualConfirm)
}

func TestBroadcastValsetConfirm(t *testing.T) {
network := qgbtesting.NewDHTNetwork(context.Background(), 4)
defer network.Stop()

// create a test DataCommitmentConfirm
expectedConfirm := types.ValsetConfirm{
EthAddress: "celes1qktu8009djs6uym9uwj84ead24exkezsaqrmn5",
Signature: "test signature",
}

// generate a test key for the ValsetConfirm
testKey := p2p.GetValsetConfirmKey(10, "celes1qktu8009djs6uym9uwj84ead24exkezsaqrmn5")

// Broadcast the confirm
broadcaster := orchestrator.NewBroadcaster(network.DHTs[1])
err := broadcaster.BroadcastValsetConfirm(context.Background(), 10, expectedConfirm)
assert.NoError(t, err)

// try to get the confirm from another peer
actualConfirm, err := network.DHTs[3].GetValsetConfirm(context.Background(), testKey)
assert.NoError(t, err)
assert.NotNil(t, actualConfirm)

assert.Equal(t, expectedConfirm, actualConfirm)
}

// TestEmptyPeersTable tests that values are not broadcasted if the DHT peers
// table is empty.
func TestEmptyPeersTable(t *testing.T) {
_, _, dht := qgbtesting.NewTestDHT(context.Background())
defer func(dht *p2p.QgbDHT) {
err := dht.Close()
if err != nil {
require.NoError(t, err)
}
}(dht)

// create a test DataCommitmentConfirm
dcConfirm := types.DataCommitmentConfirm{
EthAddress: "celes1qktu8009djs6uym9uwj84ead24exkezsaqrmn5",
Commitment: "test commitment",
Signature: "test signature",
}

// Broadcast the confirm
broadcaster := orchestrator.NewBroadcaster(dht)
err := broadcaster.BroadcastDataCommitmentConfirm(context.Background(), 10, dcConfirm)

// check if the correct error is returned
assert.Error(t, err)
assert.Equal(t, orchestrator.ErrEmptyPeersTable, err)

// try with a valset confirm
vsConfirm := types.ValsetConfirm{
EthAddress: "celes1qktu8009djs6uym9uwj84ead24exkezsaqrmn5",
Signature: "test signature",
}

// Broadcast the confirm
err = broadcaster.BroadcastValsetConfirm(context.Background(), 10, vsConfirm)

// check if the correct error is returned
assert.Error(t, err)
assert.Equal(t, orchestrator.ErrEmptyPeersTable, err)
}
5 changes: 5 additions & 0 deletions orchestrator/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package orchestrator

import "errors"

var ErrEmptyPeersTable = errors.New("empty peers table")
12 changes: 6 additions & 6 deletions orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type Orchestrator struct {
AppQuerier rpc.AppQuerierI
TmQuerier rpc.TmQuerierI
P2PQuerier p2p.QuerierI
Broadcaster BroadcasterI
Broadcaster *Broadcaster
Retrier RetrierI
}

Expand All @@ -55,7 +55,7 @@ func New(
appQuerier rpc.AppQuerierI,
tmQuerier rpc.TmQuerierI,
p2pQuerier p2p.QuerierI,
broadcaster BroadcasterI,
broadcaster *Broadcaster,
retrier RetrierI,
signer *blobtypes.KeyringSigner,
evmPrivateKey ecdsa.PrivateKey,
Expand Down Expand Up @@ -333,11 +333,11 @@ func (orch Orchestrator) ProcessValsetEvent(ctx context.Context, valset celestia
orch.OrchEVMAddress,
ethcmn.Bytes2Hex(signature),
)
hash, err := orch.Broadcaster.BroadcastConfirm(ctx, msg)
err = orch.Broadcaster.BroadcastValsetConfirm(ctx, valset.Nonce, *msg)
if err != nil {
return err
}
orch.Logger.Info("signed Valset", "nonce", valset.Nonce, "tx_hash", hash)
orch.Logger.Info("signed Valset", "nonce", valset.Nonce)
return nil
}

Expand All @@ -360,11 +360,11 @@ func (orch Orchestrator) ProcessDataCommitmentEvent(
}

msg := types.NewMsgDataCommitmentConfirm(commitment.String(), ethcmn.Bytes2Hex(dcSig), orch.OrchEVMAddress)
hash, err := orch.Broadcaster.BroadcastConfirm(ctx, msg)
err = orch.Broadcaster.BroadcastDataCommitmentConfirm(ctx, dc.Nonce, *msg)
if err != nil {
return err
}
orch.Logger.Info("signed commitment", "nonce", dc.Nonce, "begin_block", dc.BeginBlock, "end_block", dc.EndBlock, "commitment", commitment, "tx_hash", hash)
orch.Logger.Info("signed commitment", "nonce", dc.Nonce, "begin_block", dc.BeginBlock, "end_block", dc.EndBlock, "commitment", commitment)
return nil
}

Expand Down
30 changes: 18 additions & 12 deletions testing/dht_network.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,19 @@ type DHTNetwork struct {
// The `count` parameter specifies the number of nodes that the network will run.
// This function doesn't return any errors, and panics in case any unexpected happened.
func NewDHTNetwork(ctx context.Context, count int) *DHTNetwork {
if count <= 0 {
panic("can't create a test network with a negative nodes count")
if count <= 1 {
panic("can't create a test network with a negative nodes count or only 1 DHT node")
}
hosts := make([]host.Host, count)
stores := make([]ds.Batching, count)
dhts := make([]*p2p.QgbDHT, count)
for i := 0; i < count; i++ {
h, err := libp2p.New()
if err != nil {
panic(err)
}
h, store, dht := NewTestDHT(ctx)
hosts[i] = h
store := dssync.MutexWrap(ds.NewMapDatastore())
stores[i] = store
dht, err := p2p.NewQgbDHT(ctx, h, store)
if err != nil {
panic(err)
}
dhts[i] = dht
if i != 0 {
err = h.Connect(ctx, peer.AddrInfo{
err := h.Connect(ctx, peer.AddrInfo{
ID: hosts[0].ID(),
Addrs: hosts[0].Addrs(),
})
Expand All @@ -69,6 +61,20 @@ func NewDHTNetwork(ctx context.Context, count int) *DHTNetwork {
}
}

// NewTestDHT creates a test DHT not connected to any peers.
func NewTestDHT(ctx context.Context) (host.Host, ds.Batching, *p2p.QgbDHT) {
h, err := libp2p.New()
if err != nil {
panic(err)
}
dataStore := dssync.MutexWrap(ds.NewMapDatastore())
dht, err := p2p.NewQgbDHT(ctx, h, dataStore)
if err != nil {
panic(err)
}
return h, dataStore, dht
}

// WaitForPeerTableToUpdate waits for nodes to have updated their peers list
func WaitForPeerTableToUpdate(ctx context.Context, dhts []*p2p.QgbDHT, timeout time.Duration) error {
withTimeout, cancel := context.WithTimeout(ctx, timeout)
Expand Down
7 changes: 0 additions & 7 deletions types/attestation_confirm.go

This file was deleted.

2 changes: 0 additions & 2 deletions types/data_commitment_confirm.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
)

var _ AttestationConfirm = &DataCommitmentConfirm{}

// DataCommitmentConfirm describes a data commitment for a set of blocks.
type DataCommitmentConfirm struct {
// Signature over the commitment, the range of blocks, the validator address
Expand Down
2 changes: 0 additions & 2 deletions types/valset_confirm.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"github.com/ethereum/go-ethereum/common"
)

var _ AttestationConfirm = &ValsetConfirm{}

// ValsetConfirm
// this is the message sent by the validators when they wish to submit their
// signatures over the validator set at a given block height. A validators sign the validator set,
Expand Down

0 comments on commit 3fdb814

Please sign in to comment.