Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(dispatcher): register event instead of broker #1949

Merged
merged 9 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions beacond/cmd/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,16 @@ func DefaultComponents() []any {
*BeaconBlock, *BeaconBlockBody, *BeaconBlockHeader, *Logger,
],
components.ProvideDepositStore,
components.ProvideDispatcher[*Logger],
components.ProvideDispatcher[
*BeaconBlock, *BlobSidecars, *Logger,
],
components.ProvideEngineClient[*Logger],
components.ProvideExecutionEngine[*Logger],
components.ProvideJWTSecret,
components.ProvideLocalBuilder[
*BeaconBlockHeader, *BeaconState, *BeaconStateMarshallable,
*KVStore, *Logger,
],
components.ProvidePublishers[*BeaconBlock, *BlobSidecars],
components.ProvideReportingService[*Logger],
components.ProvideServiceRegistry[
*AvailabilityStore, *BeaconBlock, *BeaconBlockBody,
Expand Down
11 changes: 9 additions & 2 deletions mod/async/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,18 @@ type Dispatcher struct {
// NewDispatcher creates a new event server.
func New(
logger log.Logger[any],
) *Dispatcher {
return &Dispatcher{
options ...Option,
) (*Dispatcher, error) {
d := &Dispatcher{
brokers: make(map[async.EventID]types.Broker),
logger: logger,
}
for _, option := range options {
if err := option(d); err != nil {
return nil, err
}
}
return d, nil
}

// Publish dispatches the given event to the broker with the given eventID.
Expand Down
38 changes: 38 additions & 0 deletions mod/async/pkg/dispatcher/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// SPDX-License-Identifier: BUSL-1.1
//
// Copyright (C) 2024, Berachain Foundation. All rights reserved.
// Use of this software is governed by the Business Source License included
// in the LICENSE file of this repository and at www.mariadb.com/bsl11.
//
// ANY USE OF THE LICENSED WORK IN VIOLATION OF THIS LICENSE WILL AUTOMATICALLY
// TERMINATE YOUR RIGHTS UNDER THIS LICENSE FOR THE CURRENT AND ALL OTHER
// VERSIONS OF THE LICENSED WORK.
//
// THIS LICENSE DOES NOT GRANT YOU ANY RIGHT IN ANY TRADEMARK OR LOGO OF
// LICENSOR OR ITS AFFILIATES (PROVIDED THAT YOU MAY USE A TRADEMARK OR LOGO OF
// LICENSOR AS EXPRESSLY REQUIRED BY THIS LICENSE).
//
// TO THE EXTENT PERMITTED BY APPLICABLE LAW, THE LICENSED WORK IS PROVIDED ON
// AN “AS IS” BASIS. LICENSOR HEREBY DISCLAIMS ALL WARRANTIES AND CONDITIONS,
// EXPRESS OR IMPLIED, INCLUDING (WITHOUT LIMITATION) WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, NON-INFRINGEMENT, AND
// TITLE.

package dispatcher

import (
"github.com/berachain/beacon-kit/mod/async/pkg/broker"
"github.com/berachain/beacon-kit/mod/async/pkg/types"
"github.com/berachain/beacon-kit/mod/primitives/pkg/async"
)

// Opt is a type that defines a function that modifies NodeBuilder.
type Option func(dispatcher types.Dispatcher) error

func WithEvent[
EventT async.BaseEvent,
](eventID string) Option {
return func(dispatcher types.Dispatcher) error {
return dispatcher.RegisterBrokers(broker.New[EventT](eventID))
}
}
2 changes: 1 addition & 1 deletion mod/beacon/block_store/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *Service[BeaconBlockT, _]) Start(ctx context.Context) error {

// subscribe a channel to the finalized block events.
if err := s.dispatcher.Subscribe(
async.BeaconBlockFinalizedEvent, s.subFinalizedBlkEvents,
async.BeaconBlockFinalized, s.subFinalizedBlkEvents,
); err != nil {
s.logger.Error("failed to subscribe to block events", "error", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion mod/beacon/blockchain/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (s *Service[
// via ticker later.
if err = s.dispatcher.Publish(
async.NewEvent(
ctx, async.BeaconBlockFinalizedEvent, blk,
ctx, async.BeaconBlockFinalized, blk,
),
); err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions mod/execution/pkg/deposit/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ func (s *Service[
_, _, _, _, _,
]) Start(ctx context.Context) error {
if err := s.dispatcher.Subscribe(
async.BeaconBlockFinalizedEvent, s.subFinalizedBlockEvents,
async.BeaconBlockFinalized, s.subFinalizedBlockEvents,
); err != nil {
s.logger.Error("failed to subscribe to event", "event",
async.BeaconBlockFinalizedEvent, "err", err)
async.BeaconBlockFinalized, "err", err)
return err
}

Expand Down
4 changes: 2 additions & 2 deletions mod/node-core/pkg/components/availability_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ func ProvideAvailabilityPruner[
// create new subscription for finalized blocks.
subFinalizedBlocks := make(chan async.Event[BeaconBlockT])
if err := in.Dispatcher.Subscribe(
async.BeaconBlockFinalizedEvent, subFinalizedBlocks,
async.BeaconBlockFinalized, subFinalizedBlocks,
); err != nil {
in.Logger.Error("failed to subscribe to event", "event",
async.BeaconBlockFinalizedEvent, "err", err)
async.BeaconBlockFinalized, "err", err)
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions mod/node-core/pkg/components/block_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,10 @@ func ProvideBlockStorePruner[
// create new subscription for finalized blocks.
subFinalizedBlocks := make(chan async.Event[BeaconBlockT])
if err := in.Dispatcher.Subscribe(
async.BeaconBlockFinalizedEvent, subFinalizedBlocks,
async.BeaconBlockFinalized, subFinalizedBlocks,
); err != nil {
in.Logger.Error("failed to subscribe to event", "event",
async.BeaconBlockFinalizedEvent, "err", err)
async.BeaconBlockFinalized, "err", err)
return nil, err
}

Expand Down
4 changes: 2 additions & 2 deletions mod/node-core/pkg/components/deposit_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ func ProvideDepositPruner[
// initialize a subscription for finalized blocks.
subFinalizedBlocks := make(chan async.Event[BeaconBlockT])
if err := in.Dispatcher.Subscribe(
async.BeaconBlockFinalizedEvent, subFinalizedBlocks,
async.BeaconBlockFinalized, subFinalizedBlocks,
); err != nil {
in.Logger.Error("failed to subscribe to event", "event",
async.BeaconBlockFinalizedEvent, "err", err)
async.BeaconBlockFinalized, "err", err)
return nil, err
}

Expand Down
27 changes: 18 additions & 9 deletions mod/node-core/pkg/components/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,40 @@ package components

import (
"cosmossdk.io/depinject"
"github.com/berachain/beacon-kit/mod/async/pkg/dispatcher"
asynctypes "github.com/berachain/beacon-kit/mod/async/pkg/types"
dp "github.com/berachain/beacon-kit/mod/async/pkg/dispatcher"
"github.com/berachain/beacon-kit/mod/log"
"github.com/berachain/beacon-kit/mod/primitives/pkg/async"
)

// DispatcherInput is the input for the Dispatcher.
type DispatcherInput[
LoggerT any,
] struct {
depinject.In
Logger LoggerT
Publishers []asynctypes.Broker
Logger LoggerT
}

// ProvideDispatcher provides a new Dispatcher.
func ProvideDispatcher[
BeaconBlockT any,
BlobSidecarsT any,
LoggerT log.AdvancedLogger[any, LoggerT],
](
in DispatcherInput[LoggerT],
) (*Dispatcher, error) {
d := dispatcher.New(
return dp.New(
in.Logger.With("service", "dispatcher"),
ocnc2 marked this conversation as resolved.
Show resolved Hide resolved
dp.WithEvent[GenesisEvent](async.GenesisDataReceived),
dp.WithEvent[ValidatorUpdateEvent](async.GenesisDataProcessed),
dp.WithEvent[async.Event[BeaconBlockT]](async.BuiltBeaconBlock),
dp.WithEvent[async.Event[BlobSidecarsT]](async.BuiltSidecars),
dp.WithEvent[async.Event[BeaconBlockT]](async.BeaconBlockReceived),
dp.WithEvent[async.Event[BlobSidecarsT]](async.SidecarsReceived),
dp.WithEvent[async.Event[BeaconBlockT]](async.BeaconBlockVerified),
dp.WithEvent[async.Event[BlobSidecarsT]](async.SidecarsVerified),
dp.WithEvent[async.Event[BeaconBlockT]](async.FinalBeaconBlockReceived),
dp.WithEvent[async.Event[BlobSidecarsT]](async.FinalSidecarsReceived),
dp.WithEvent[ValidatorUpdateEvent](async.FinalValidatorUpdatesProcessed),
dp.WithEvent[async.Event[BeaconBlockT]](async.BeaconBlockFinalized),
)
if err := d.RegisterBrokers(in.Publishers...); err != nil {
return nil, err
}
return d, nil
}
76 changes: 0 additions & 76 deletions mod/node-core/pkg/components/events.go

This file was deleted.

10 changes: 3 additions & 7 deletions mod/node-core/pkg/components/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,18 +176,14 @@ type (

// GenesisDataReceivedEvent is a type alias for the genesis data received
// event.
GenesisDataReceivedEvent = async.Event[*Genesis]

// GenesisDataProcessedEvent is a type alias for the genesis data processed
// event.
GenesisDataProcessedEvent = async.Event[transition.ValidatorUpdates]
GenesisEvent = async.Event[*Genesis]

// NewSlotEvent is a type alias for the new slot event.
NewSlotEvent = async.Event[*SlotData]
SlotEvent = async.Event[*SlotData]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tip

Codebase Verification

Update comment to reflect alias renaming.

The alias NewSlotEvent has been successfully renamed to SlotEvent in the code. However, the comment in types.go still refers to NewSlotEvent. Please update the comment to reflect the new alias name.

  • File: mod/node-core/pkg/components/types.go
  • Line: 141
Analysis chain

Verify the consistency of alias renaming.

The alias NewSlotEvent has been renamed to SlotEvent. Ensure that this new alias is used consistently throughout the codebase.

Run the following script to check the usage of SlotEvent:

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the usage of SlotEvent alias.

# Test: Search for the alias usage. Expect: Consistent usage of SlotEvent.
rg --type go -A 5 $'SlotEvent'

Length of output: 1208


// FinalValidatorUpdatesProcessedEvent is a type alias for the final
// validator updates processed event.
FinalValidatorUpdatesProcessedEvent = async.Event[transition.ValidatorUpdates]
ValidatorUpdateEvent = async.Event[transition.ValidatorUpdates]
)

// Messages.
Expand Down
2 changes: 1 addition & 1 deletion mod/primitives/pkg/async/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ const (
FinalBeaconBlockReceived = "final-beacon-block-received"
FinalSidecarsReceived = "final-blob-sidecars-received"
FinalValidatorUpdatesProcessed = "final-validator-updates"
BeaconBlockFinalizedEvent = "beacon-block-finalized"
BeaconBlockFinalized = "beacon-block-finalized"
)
2 changes: 1 addition & 1 deletion mod/storage/pkg/pruner/pruner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestPruner(t *testing.T) {
block.On("GetSlot").Return(math.U64(index))
event := async.NewEvent[pruner.BeaconBlock](
context.Background(),
async.BeaconBlockFinalizedEvent,
async.BeaconBlockFinalized,
&block,
)
ch <- event
Expand Down
Loading