Skip to content

Commit

Permalink
Add external subscriber unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronbuchwald committed Dec 18, 2024
1 parent 156ded0 commit 46b1bfa
Show file tree
Hide file tree
Showing 3 changed files with 143 additions and 22 deletions.
3 changes: 2 additions & 1 deletion extension/externalsubscriber/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package externalsubscriber

import (
"context"
"fmt"

"github.com/ava-labs/hypersdk/api"
"github.com/ava-labs/hypersdk/chain"
Expand Down Expand Up @@ -40,7 +41,7 @@ func OptionFunc(v api.VM, config Config) (vm.Opt, error) {
v.GetGenesisBytes(),
)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to create external subscriber client: %w", err)
}

blockSubscription := event.SubscriptionFuncFactory[*chain.ExecutedBlock]{
Expand Down
19 changes: 16 additions & 3 deletions snow/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,10 @@ func (b *StatefulBlock[I, O, A]) markAccepted(ctx context.Context) error {
b.vm.acceptedBlocksByHeight.Put(b.Height(), b.ID())
b.vm.acceptedBlocksByID.Put(b.ID(), b)

return nil
}

func (b *StatefulBlock[I, O, A]) notifyAccepted(ctx context.Context) error {
// If I was not actually marked accepted, notify pre ready subs
if !b.accepted {
return event.NotifyAll(ctx, b.Input, b.vm.app.PreReadyAcceptedSubs...)
Expand Down Expand Up @@ -253,13 +257,19 @@ func (b *StatefulBlock[I, O, A]) Accept(ctx context.Context) error {
if err := b.markAccepted(ctx); err != nil {
return err
}
return b.accept(ctx, parent.Accepted)
if err := b.accept(ctx, parent.Accepted); err != nil {
return err
}
return b.notifyAccepted(ctx)
}

// If I'm not ready yet, mark myself as accepted, and return early.
isReady := b.vm.app.Ready.Ready()
if !isReady {
return b.markAccepted(ctx)
if err := b.markAccepted(ctx); err != nil {
return err
}
return b.notifyAccepted(ctx)
}

// If I haven't verified myself, then I need to verify myself before before
Expand All @@ -278,7 +288,10 @@ func (b *StatefulBlock[I, O, A]) Accept(ctx context.Context) error {
if err := b.markAccepted(ctx); err != nil {
return err
}
return b.accept(ctx, parent.Accepted)
if err := b.accept(ctx, parent.Accepted); err != nil {
return err
}
return b.notifyAccepted(ctx)
}

// implements "statesync.StateSummaryBlock"
Expand Down
143 changes: 125 additions & 18 deletions vm/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/common"
"github.com/ava-labs/avalanchego/snow/engine/enginetest"
"github.com/ava-labs/avalanchego/snow/snowtest"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
"github.com/ava-labs/avalanchego/vms/rpcchainvm/grpcutils"
"github.com/ava-labs/avalanchego/x/merkledb"
"github.com/ava-labs/hypersdk/api/jsonrpc"
"github.com/ava-labs/hypersdk/auth"
Expand All @@ -27,8 +29,11 @@ import (
"github.com/ava-labs/hypersdk/codec"
"github.com/ava-labs/hypersdk/crypto"
"github.com/ava-labs/hypersdk/crypto/ed25519"
"github.com/ava-labs/hypersdk/event"
"github.com/ava-labs/hypersdk/extension/externalsubscriber"
"github.com/ava-labs/hypersdk/genesis"
"github.com/ava-labs/hypersdk/internal/mempool"
pb "github.com/ava-labs/hypersdk/proto/pb/externalsubscriber"
"github.com/ava-labs/hypersdk/snow"
"github.com/ava-labs/hypersdk/state/balance"
"github.com/ava-labs/hypersdk/state/metadata"
Expand Down Expand Up @@ -56,6 +61,7 @@ func NewTestVM(
ctx context.Context,
t *testing.T,
chainID ids.ID,
configBytes []byte,
allocations []*genesis.CustomAllocation,
) *VM {
r := require.New(t)
Expand Down Expand Up @@ -99,7 +105,7 @@ func NewTestVM(
snowCtx := snowtest.Context(t, chainID)
snowCtx.ChainDataDir = t.TempDir()
appSender := &enginetest.Sender{T: t}
r.NoError(snowVM.Initialize(ctx, snowCtx, nil, genesisBytes, nil, nil, toEngine, nil, appSender))
r.NoError(snowVM.Initialize(ctx, snowCtx, nil, genesisBytes, nil, configBytes, toEngine, nil, appSender))

router := http.NewServeMux()
handlers, err := snowVM.CreateHandlers(ctx)
Expand Down Expand Up @@ -133,13 +139,15 @@ func NewTestNetwork(
ctx context.Context,
t *testing.T,
chainID ids.ID,
numVMs int,
configBytes []byte,
) *TestNetwork {
r := require.New(t)
privKey, err := ed25519.GeneratePrivateKey()
r.NoError(err)
authFactory := auth.NewED25519Factory(privKey)
funds := uint64(1_000_000_000)
vms := make([]*VM, 2)
vms := make([]*VM, numVMs)
allocations := []*genesis.CustomAllocation{
{
Address: authFactory.Address(),
Expand All @@ -149,7 +157,7 @@ func NewTestNetwork(
nodeIDToVM := make(map[ids.NodeID]*VM)
uris := make([]string, len(vms))
for i := range vms {
vm := NewTestVM(ctx, t, chainID, allocations)
vm := NewTestVM(ctx, t, chainID, configBytes, allocations)
vms[i] = vm
uris[i] = vm.server.URL
nodeIDToVM[vm.nodeID] = vm
Expand Down Expand Up @@ -277,12 +285,17 @@ func (n *TestNetwork) BuildBlockAndUpdateHead(ctx context.Context, txs []*chain.
n.require.NoError(blk.Verify(ctx))
n.require.NoError(n.VMs[0].SnowVM.SetPreference(ctx, blk.ID()))

parsedBlk, err := n.VMs[1].SnowVM.ParseBlock(ctx, blk.Bytes())
n.require.NoError(err)
n.require.Equal(blk.ID(), parsedBlk.ID())
blks := make([]snowman.Block, len(n.VMs))
blks[0] = blk
for i, otherVM := range n.VMs[1:] {
parsedBlk, err := otherVM.SnowVM.ParseBlock(ctx, blk.Bytes())
n.require.NoError(err)
n.require.Equal(blk.ID(), parsedBlk.ID())

n.require.NoError(parsedBlk.Verify(ctx))
n.require.NoError(n.VMs[1].SnowVM.SetPreference(ctx, blk.ID()))
n.require.NoError(parsedBlk.Verify(ctx))
n.require.NoError(otherVM.SnowVM.SetPreference(ctx, blk.ID()))
blks[i+1] = parsedBlk
}

outputBlk, err := n.VMs[0].SnowVM.GetChainIndex().GetPreferredBlock(ctx)
n.require.NoError(err)
Expand All @@ -296,7 +309,7 @@ func (n *TestNetwork) BuildBlockAndUpdateHead(ctx context.Context, txs []*chain.
n.require.True(txsSet.Contains(tx.GetID()), "missing tx %s at index %d", tx, i)
}

return []snowman.Block{blk, parsedBlk}, nil
return blks, nil
}

func (n *TestNetwork) ConfirmTxs(ctx context.Context, txs []*chain.Transaction) error {
Expand Down Expand Up @@ -327,7 +340,7 @@ func TestEmptyBlock(t *testing.T) {
ctx := context.Background()
r := require.New(t)
chainID := ids.GenerateTestID()
network := NewTestNetwork(ctx, t, chainID)
network := NewTestNetwork(ctx, t, chainID, 2, nil)

r.NoError(network.ConfirmTxs(ctx, []*chain.Transaction{}))
}
Expand All @@ -336,7 +349,7 @@ func TestValidBlocks(t *testing.T) {
ctx := context.Background()
r := require.New(t)
chainID := ids.GenerateTestID()
network := NewTestNetwork(ctx, t, chainID)
network := NewTestNetwork(ctx, t, chainID, 2, nil)

tx, err := network.GenerateTx(ctx, []chain.Action{&chaintest.TestAction{
NumComputeUnits: 1,
Expand Down Expand Up @@ -452,7 +465,7 @@ func TestSubmitTx(t *testing.T) {
ctx := context.Background()
r := require.New(t)
chainID := ids.GenerateTestID()
network := NewTestNetwork(ctx, t, chainID)
network := NewTestNetwork(ctx, t, chainID, 2, nil)

invalidTx := test.makeTx(r, network)
network.ConfirmInvalidTx(ctx, invalidTx, test.targetErr)
Expand All @@ -464,7 +477,7 @@ func TestValidityWindowDuplicateAcceptedBlock(t *testing.T) {
ctx := context.Background()
r := require.New(t)
chainID := ids.GenerateTestID()
network := NewTestNetwork(ctx, t, chainID)
network := NewTestNetwork(ctx, t, chainID, 2, nil)

tx0, err := network.GenerateTx(ctx, []chain.Action{&chaintest.TestAction{
NumComputeUnits: 1,
Expand Down Expand Up @@ -492,7 +505,7 @@ func TestValidityWindowDuplicateProcessingAncestor(t *testing.T) {
ctx := context.Background()
r := require.New(t)
chainID := ids.GenerateTestID()
network := NewTestNetwork(ctx, t, chainID)
network := NewTestNetwork(ctx, t, chainID, 2, nil)

tx0, err := network.GenerateTx(ctx, []chain.Action{&chaintest.TestAction{
NumComputeUnits: 1,
Expand Down Expand Up @@ -530,7 +543,7 @@ func TestIssueDuplicateInMempool(t *testing.T) {
ctx := context.Background()
r := require.New(t)
chainID := ids.GenerateTestID()
network := NewTestNetwork(ctx, t, chainID)
network := NewTestNetwork(ctx, t, chainID, 2, nil)

tx0, err := network.GenerateTx(ctx, []chain.Action{&chaintest.TestAction{
NumComputeUnits: 1,
Expand All @@ -549,7 +562,7 @@ func TestForceGossip(t *testing.T) {
ctx := context.Background()
r := require.New(t)
chainID := ids.GenerateTestID()
network := NewTestNetwork(ctx, t, chainID)
network := NewTestNetwork(ctx, t, chainID, 2, nil)
network.SetState(ctx, avasnow.NormalOp)

tx0, err := network.GenerateTx(ctx, []chain.Action{&chaintest.TestAction{
Expand All @@ -567,9 +580,103 @@ func TestForceGossip(t *testing.T) {
r.True(mempool.Has(ctx, tx0.GetID()))
}

func TestAccepted(t *testing.T) {
ctx := context.Background()
r := require.New(t)
chainID := ids.GenerateTestID()
network := NewTestNetwork(ctx, t, chainID, 2, nil)

client := jsonrpc.NewJSONRPCClient(network.VMs[0].server.URL)
blockID, blockHeight, timestamp, err := client.Accepted(ctx)
r.NoError(err)
genesisBlock := network.VMs[0].SnowVM.GetCovariantVM().LastAcceptedBlock(ctx)
r.NoError(err)
r.Equal(genesisBlock.ID(), blockID)
r.Equal(uint64(0), blockHeight)
r.Equal(genesisBlock.Timestamp().UnixMilli(), timestamp)

tx, err := network.GenerateTx(ctx, []chain.Action{&chaintest.TestAction{
NumComputeUnits: 1,
}}, network.authFactory)
r.NoError(err)
r.NoError(network.ConfirmTxs(ctx, []*chain.Transaction{tx}))

blockID, blockHeight, timestamp, err = client.Accepted(ctx)
r.NoError(err)
blk1 := network.VMs[0].SnowVM.GetCovariantVM().LastAcceptedBlock(ctx)
r.NoError(err)
r.Equal(blk1.ID(), blockID)
r.Equal(uint64(1), blockHeight)
r.Equal(blk1.Timestamp().UnixMilli(), timestamp)
}

func TestExternalSubscriber(t *testing.T) {
ctx := context.Background()
r := require.New(t)
chainID := ids.GenerateTestID()

throwawayNetwork := NewTestNetwork(ctx, t, chainID, 1, nil)
createParserFromBytes := func(_ []byte) (chain.Parser, error) {
return throwawayNetwork.Configuration().Parser(), nil
}

listener, err := grpcutils.NewListener()
r.NoError(err)
serverCloser := grpcutils.ServerCloser{}

externalSubscriberAcceptedBlocksCh := make(chan ids.ID, 1)
externalSubscriber0 := externalsubscriber.NewExternalSubscriberServer(logging.NoLog{}, createParserFromBytes, []event.Subscription[*chain.ExecutedBlock]{
event.SubscriptionFunc[*chain.ExecutedBlock]{
NotifyF: func(_ context.Context, blk *chain.ExecutedBlock) error {
externalSubscriberAcceptedBlocksCh <- blk.Block.ID()
return nil
},
},
})

server := grpcutils.NewServer()
pb.RegisterExternalSubscriberServer(server, externalSubscriber0)
serverCloser.Add(server)

go grpcutils.Serve(listener, server)

t.Cleanup(func() {
serverCloser.Stop()
_ = listener.Close()
})

subscriberConfig := externalsubscriber.Config{
Enabled: true,
ServerAddress: listener.Addr().String(),
}
subscriberConfigBytes, err := json.Marshal(subscriberConfig)
r.NoError(err)
vmConfig := vm.NewConfig()
namespacedConfig := map[string]json.RawMessage{
externalsubscriber.Namespace: subscriberConfigBytes,
}
vmConfig.ServiceConfig = namespacedConfig
configBytes, err := json.Marshal(vmConfig)
r.NoError(err)

network := NewTestNetwork(ctx, t, chainID, 1, configBytes)

tx, err := network.GenerateTx(ctx, []chain.Action{&chaintest.TestAction{
NumComputeUnits: 1,
}}, network.authFactory)
r.NoError(err)
r.NoError(network.ConfirmTxs(ctx, []*chain.Transaction{tx}))

var acceptedBlkID ids.ID
select {
case acceptedBlkID = <-externalSubscriberAcceptedBlocksCh:
case <-time.After(time.Second):
r.Fail("timeout waiting for external subscriber to receive accepted block")
}
r.Equal(network.VMs[0].SnowVM.GetCovariantVM().LastAcceptedBlock(ctx).ID(), acceptedBlkID)
}

// APIs
// - external subscriber
// - accepted
// - chain index
// - websocket
// - staterpc (before / after tx execution)
Expand Down

0 comments on commit 46b1bfa

Please sign in to comment.