Skip to content

Commit

Permalink
align events implementation with FIP-0049 and FIP-0054.
Browse files Browse the repository at this point in the history
- Event keys are now t1, t2, t3, t4 for topics; and d for data.
- ref-fvm no longer stores events in the blockstore for us. It just
  returns events to the client, who is now responsible for handling
  them as it wishes / according to its configuration.
- Add a flag to VMOpts to have the events AMT be written in the blockstore.
- Add a flag to the ChainStore to advertise to the rest of the system
  if the ChainStore is storing events.
- Enable that flag if the EthRPC is enabled (can also add an explicit
  configuration flag if wanted).
  • Loading branch information
raulk committed Jan 31, 2023
1 parent dc5f865 commit 8db7fd6
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 24 deletions.
43 changes: 41 additions & 2 deletions chain/consensus/filcns/compute_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"sync/atomic"

"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
cbg "github.com/whyrusleeping/cbor-gen"
"go.opencensus.io/stats"
"go.opencensus.io/trace"
"golang.org/x/xerrors"

amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"
actorstypes "github.com/filecoin-project/go-state-types/actors"
"github.com/filecoin-project/go-state-types/big"
Expand Down Expand Up @@ -104,6 +106,7 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
LookbackState: stmgr.LookbackStateGetterForTipset(sm, ts),
TipSetGetter: stmgr.TipSetGetterForTipset(sm.ChainStore(), ts),
Tracing: vmTracing,
ReturnEvents: sm.ChainStore().IsStoringEvents(),
}

return sm.VMConstructor()(ctx, vmopt)
Expand Down Expand Up @@ -173,8 +176,13 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
return cid.Undef, cid.Undef, xerrors.Errorf("making vm: %w", err)
}

var receipts []cbg.CBORMarshaler
processedMsgs := make(map[cid.Cid]struct{})
var (
receipts []*types.MessageReceipt
storingEvents = sm.ChainStore().IsStoringEvents()
events [][]types.Event
processedMsgs = make(map[cid.Cid]struct{})
)

for _, b := range bms {
penalty := types.NewInt(0)
gasReward := big.Zero()
Expand All @@ -193,6 +201,11 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
gasReward = big.Add(gasReward, r.GasCosts.MinerTip)
penalty = big.Add(penalty, r.GasCosts.MinerPenalty)

if storingEvents {
// Appends nil when no events are returned to preserve positional alignment.
events = append(events, r.Events)
}

if em != nil {
if err := em.MessageApplied(ctx, ts, cm.Cid(), m, r, false); err != nil {
return cid.Undef, cid.Undef, err
Expand Down Expand Up @@ -258,6 +271,23 @@ func (t *TipSetExecutor) ApplyBlocks(ctx context.Context,
return cid.Undef, cid.Undef, xerrors.Errorf("failed to build receipts amt: %w", err)
}

// Slice will be empty if not storing events.
for i, evs := range events {
if len(evs) == 0 {
continue
}
switch root, err := t.StoreEventsAMT(ctx, sm.ChainStore(), evs); {
case err != nil:
return cid.Undef, cid.Undef, xerrors.Errorf("failed to store events amt: %w", err)
case i >= len(receipts):
return cid.Undef, cid.Undef, xerrors.Errorf("assertion failed: receipt and events array lengths inconsistent")
case receipts[i].EventsRoot == nil:
return cid.Undef, cid.Undef, xerrors.Errorf("assertion failed: VM returned events with no events root")
case root != *receipts[i].EventsRoot:
return cid.Undef, cid.Undef, xerrors.Errorf("assertion failed: returned events AMT root does not match derived")
}
}

st, err := vmi.Flush(ctx)
if err != nil {
return cid.Undef, cid.Undef, xerrors.Errorf("vm flush failed: %w", err)
Expand Down Expand Up @@ -316,4 +346,13 @@ func (t *TipSetExecutor) ExecuteTipSet(ctx context.Context,
return t.ApplyBlocks(ctx, sm, parentEpoch, pstate, fbmsgs, blks[0].Height, r, em, vmTracing, baseFee, ts)
}

func (t *TipSetExecutor) StoreEventsAMT(ctx context.Context, cs *store.ChainStore, events []types.Event) (cid.Cid, error) {
cst := cbor.NewCborStore(cs.ChainBlockstore())
objs := make([]cbg.CBORMarshaler, len(events))
for i := 0; i < len(events); i++ {
objs[i] = &events[i]
}
return amt4.FromArray(ctx, cst, objs, amt4.UseTreeBitWidth(types.EventAMTBitwidth))
}

var _ stmgr.Executor = &TipSetExecutor{}
14 changes: 13 additions & 1 deletion chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ type ChainStore struct {
evtTypes [1]journal.EventType
journal journal.Journal

storeEvents bool

cancelFn context.CancelFunc
wg sync.WaitGroup
}
Expand Down Expand Up @@ -680,7 +682,7 @@ func FlushValidationCache(ctx context.Context, ds dstore.Batching) error {
// If this is addressed (blockcache goes into its own sub-namespace) then
// strings.HasPrefix(...) below can be skipped
//
//Prefix: blockValidationCacheKeyPrefix.String()
// Prefix: blockValidationCacheKeyPrefix.String()
KeysOnly: true,
})
if err != nil {
Expand Down Expand Up @@ -1202,6 +1204,16 @@ func (cs *ChainStore) Weight(ctx context.Context, hts *types.TipSet) (types.BigI
return cs.weight(ctx, cs.StateBlockstore(), hts)
}

// StoreEvents marks this ChainStore as storing events.
func (cs *ChainStore) StoreEvents(store bool) {
cs.storeEvents = store
}

// IsStoringEvents indicates if this ChainStore is storing events.
func (cs *ChainStore) IsStoringEvents() bool {
return cs.storeEvents
}

// true if ts1 wins according to the filecoin tie-break rule
func breakWeightTie(ts1, ts2 *types.TipSet) bool {
s := len(ts1.Blocks())
Expand Down
8 changes: 4 additions & 4 deletions chain/types/ethtypes/eth_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import (
)

var (
EthTopic1 = "topic1"
EthTopic2 = "topic2"
EthTopic3 = "topic3"
EthTopic4 = "topic4"
EthTopic1 = "t1"
EthTopic2 = "t2"
EthTopic3 = "t3"
EthTopic4 = "t4"
)

var ErrInvalidAddress = errors.New("invalid Filecoin Eth address")
Expand Down
36 changes: 31 additions & 5 deletions chain/types/event.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,20 @@
package types

import (
"bytes"
"fmt"

cbg "github.com/whyrusleeping/cbor-gen"

"github.com/filecoin-project/go-state-types/abi"
)

// EventEntry flags defined in fvm_shared
const (
EventFlagIndexedKey = 0b00000001
EventFlagIndexedValue = 0b00000010
)

type Event struct {
// The ID of the actor that emitted this event.
Emitter abi.ActorID
Expand All @@ -25,8 +36,23 @@ type EventEntry struct {

type FilterID [32]byte // compatible with EthHash

// EventEntry flags defined in fvm_shared
const (
EventFlagIndexedKey = 0b00000001
EventFlagIndexedValue = 0b00000010
)
// DecodeEvents decodes a CBOR list of CBOR-encoded events.
func DecodeEvents(input []byte) ([]Event, error) {
r := bytes.NewReader(input)
typ, len, err := cbg.NewCborReader(r).ReadHeader()
if err != nil {
return nil, fmt.Errorf("failed to read events: %w", err)
}
if typ != cbg.MajArray {
return nil, fmt.Errorf("expected a CBOR list, was major type %d", typ)
}
events := make([]Event, 0, len)
for i := 0; i < int(len); i++ {
var evt Event
if err := evt.UnmarshalCBOR(r); err != nil {
return nil, fmt.Errorf("failed to parse event: %w", err)
}
events = append(events, evt)
}
return events, nil
}
51 changes: 40 additions & 11 deletions chain/vm/fvm.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,9 @@ func (x *FvmExtern) workerKeyAtLookback(ctx context.Context, minerId address.Add
type FVM struct {
fvm *ffi.FVM
nv network.Version

// returnEvents specifies whether to parse and return events when applying messages.
returnEvents bool
}

func defaultFVMOpts(ctx context.Context, opts *VMOpts) (*ffi.FVMOpts, error) {
Expand Down Expand Up @@ -335,10 +338,13 @@ func NewFVM(ctx context.Context, opts *VMOpts) (*FVM, error) {
return nil, xerrors.Errorf("failed to create FVM: %w", err)
}

return &FVM{
fvm: fvm,
nv: opts.NetworkVersion,
}, nil
ret := &FVM{
fvm: fvm,
nv: opts.NetworkVersion,
returnEvents: opts.ReturnEvents,
}

return ret, nil
}

func NewDebugFVM(ctx context.Context, opts *VMOpts) (*FVM, error) {
Expand Down Expand Up @@ -438,10 +444,13 @@ func NewDebugFVM(ctx context.Context, opts *VMOpts) (*FVM, error) {
return nil, err
}

return &FVM{
fvm: fvm,
nv: opts.NetworkVersion,
}, nil
ret := &FVM{
fvm: fvm,
nv: opts.NetworkVersion,
returnEvents: opts.ReturnEvents,
}

return ret, nil
}

func (vm *FVM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet, error) {
Expand Down Expand Up @@ -493,7 +502,7 @@ func (vm *FVM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet
et.Error = aerr.Error()
}

return &ApplyRet{
applyRet := &ApplyRet{
MessageReceipt: receipt,
GasCosts: &GasOutputs{
BaseFeeBurn: ret.BaseFeeBurn,
Expand All @@ -507,7 +516,16 @@ func (vm *FVM) ApplyMessage(ctx context.Context, cmsg types.ChainMsg) (*ApplyRet
ActorErr: aerr,
ExecutionTrace: et,
Duration: duration,
}, nil
}

if vm.returnEvents && len(ret.EventsBytes) > 0 {
applyRet.Events, err = types.DecodeEvents(ret.EventsBytes)
if err != nil {
return nil, fmt.Errorf("failed to decode events returned by the FVM: %w", err)
}
}

return applyRet, nil
}

func (vm *FVM) ApplyImplicitMessage(ctx context.Context, cmsg *types.Message) (*ApplyRet, error) {
Expand Down Expand Up @@ -565,6 +583,13 @@ func (vm *FVM) ApplyImplicitMessage(ctx context.Context, cmsg *types.Message) (*
Duration: duration,
}

if len(ret.EventsBytes) > 0 && vm.returnEvents {
applyRet.Events, err = types.DecodeEvents(ret.EventsBytes)
if err != nil {
return nil, fmt.Errorf("failed to decode events returned by the FVM: %w", err)
}
}

if ret.ExitCode != 0 {
return applyRet, fmt.Errorf("implicit message failed with exit code: %d and error: %w", ret.ExitCode, applyRet.ActorErr)
}
Expand All @@ -573,7 +598,11 @@ func (vm *FVM) ApplyImplicitMessage(ctx context.Context, cmsg *types.Message) (*
}

func (vm *FVM) Flush(ctx context.Context) (cid.Cid, error) {
return vm.fvm.Flush()
root, err := vm.fvm.Flush()
if err != nil {
return root, err
}
return root, err
}

type dualExecutionFVM struct {
Expand Down
3 changes: 3 additions & 0 deletions chain/vm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,8 @@ type VMOpts struct {
LookbackState LookbackStateGetter
TipSetGetter TipSetGetter
Tracing bool
// ReturnEvents decodes and returns emitted events.
ReturnEvents bool
}

func NewLegacyVM(ctx context.Context, opts *VMOpts) (*LegacyVM, error) {
Expand Down Expand Up @@ -282,6 +284,7 @@ type ApplyRet struct {
ExecutionTrace types.ExecutionTrace
Duration time.Duration
GasCosts *GasOutputs
Events []types.Event
}

func (vm *LegacyVM) send(ctx context.Context, msg *types.Message, parent *Runtime,
Expand Down
2 changes: 2 additions & 0 deletions node/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ const (

SetApiEndpointKey

StoreEventsKey

_nInvokes // keep this last
)

Expand Down
5 changes: 5 additions & 0 deletions node/builder_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,11 @@ func ConfigFullNode(c interface{}) Option {
Override(SetupFallbackBlockstoresKey, modules.InitFallbackBlockstores),
),

// If the Eth JSON-RPC is enabled, enable storing events at the ChainStore.
// This is the case even if real-time and historic filtering are disabled,
// as it enables us to serve logs in eth_getTransactionReceipt.
If(cfg.Fevm.EnableEthRPC, Override(StoreEventsKey, modules.EnableStoringEvents)),

Override(new(dtypes.ClientImportMgr), modules.ClientImportMgr),

Override(new(dtypes.ClientBlockstore), modules.ClientBlockstore),
Expand Down
2 changes: 1 addition & 1 deletion node/impl/full/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -1909,7 +1909,7 @@ func parseEthTopics(topics ethtypes.EthTopicSpec) (map[string][][]byte, error) {
continue
}
// Ethereum topics are emitted using `LOG{0..4}` opcodes resulting in topics1..4
key := fmt.Sprintf("topic%d", idx+1)
key := fmt.Sprintf("t%d", idx+1)
for _, v := range vals {
encodedVal, err := cborEncodeTopicValue(v[:])
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions node/modules/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,7 @@ func NewSlashFilter(ds dtypes.MetadataDS) *slashfilter.SlashFilter {
func UpgradeSchedule() stmgr.UpgradeSchedule {
return filcns.DefaultUpgradeSchedule()
}

func EnableStoringEvents(cs *store.ChainStore) {
cs.StoreEvents(true)
}

0 comments on commit 8db7fd6

Please sign in to comment.