From 46937aba8ed7a55ad7b1e845b279f75ace791b7a Mon Sep 17 00:00:00 2001 From: Konst Date: Thu, 19 Dec 2024 17:56:05 +0300 Subject: [PATCH] Added Adapter module --- pkg/indexer/indexer.go | 6 +-- pkg/indexer/subsquid/adapter/adapter.go | 40 +++++++++++++++++++ pkg/indexer/subsquid/adapter/listen.go | 36 +++++++++++++++++ .../receiver}/api/api.go | 0 .../receiver}/api/request.go | 12 +++++- .../receiver}/api/response.go | 0 .../receiver}/receiver.go | 14 +++---- .../receiver}/sequencer.go | 4 +- .../receiver}/sync.go | 2 +- .../receiver}/worker.go | 2 +- 10 files changed, 100 insertions(+), 16 deletions(-) create mode 100644 pkg/indexer/subsquid/adapter/adapter.go create mode 100644 pkg/indexer/subsquid/adapter/listen.go rename pkg/indexer/{sqd_receiver => subsquid/receiver}/api/api.go (100%) rename pkg/indexer/{sqd_receiver => subsquid/receiver}/api/request.go (92%) rename pkg/indexer/{sqd_receiver => subsquid/receiver}/api/response.go (100%) rename pkg/indexer/{sqd_receiver => subsquid/receiver}/receiver.go (92%) rename pkg/indexer/{sqd_receiver => subsquid/receiver}/sequencer.go (88%) rename pkg/indexer/{sqd_receiver => subsquid/receiver}/sync.go (97%) rename pkg/indexer/{sqd_receiver => subsquid/receiver}/worker.go (97%) diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 77897b2..397abba 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -3,7 +3,7 @@ package indexer import ( "bytes" "context" - "github.com/dipdup-io/starknet-indexer/pkg/indexer/sqd_receiver" + sqdRcvr "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver" "runtime" "sync" "time" @@ -54,7 +54,7 @@ type Indexer struct { state *state idGenerator *generator.IdGenerator receiver *receiver.Receiver - sqdReceiver *sqd_receiver.Receiver + sqdReceiver *sqdRcvr.Receiver statusChecker *statusChecker rollbackManager models.Rollback @@ -98,7 +98,7 @@ func New( switch cfg.Datasource { case "subsquid": - sqdReceiver, err := sqd_receiver.New( + sqdReceiver, err := sqdRcvr.New( cfg, datasource, cfg.StartLevel, diff --git a/pkg/indexer/subsquid/adapter/adapter.go b/pkg/indexer/subsquid/adapter/adapter.go new file mode 100644 index 0000000..0910c7c --- /dev/null +++ b/pkg/indexer/subsquid/adapter/adapter.go @@ -0,0 +1,40 @@ +package adapter + +import ( + "context" + "github.com/dipdup-net/indexer-sdk/pkg/modules" +) + +type Adapter struct { + modules.BaseModule +} + +var _ modules.Module = (*Adapter)(nil) + +const ( + InputName = "blocks" + OutputName = "parsed_blocks" + StopOutput = "stop" +) + +func New() Adapter { + m := Adapter{ + BaseModule: modules.New("sqd adapter"), + } + m.CreateInputWithCapacity(InputName, 128) + m.CreateOutput(OutputName) + m.CreateOutput(StopOutput) + + return m +} + +func (a *Adapter) Start(ctx context.Context) { + a.Log.Info().Msg("starting...") + a.G.GoCtx(ctx, a.listen) +} + +func (a *Adapter) Close() error { + a.Log.Info().Msg("closing...") + a.G.Wait() + return nil +} diff --git a/pkg/indexer/subsquid/adapter/listen.go b/pkg/indexer/subsquid/adapter/listen.go new file mode 100644 index 0000000..81f88bc --- /dev/null +++ b/pkg/indexer/subsquid/adapter/listen.go @@ -0,0 +1,36 @@ +package adapter + +import ( + "context" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api" +) + +func (a *Adapter) listen(ctx context.Context) { + a.Log.Info().Msg("module started") + + input := a.MustInput(InputName) + + for { + select { + case <-ctx.Done(): + return + case msg, ok := <-input.Listen(): + if !ok { + a.Log.Warn().Msg("can't read message from input, it was drained and closed") + a.MustOutput(StopOutput).Push(struct{}{}) + return + } + + block, ok := msg.(*api.SqdBlockResponse) + + if !ok { + a.Log.Warn().Msgf("invalid message type: %T", msg) + continue + } + + a.Log.Info(). + Uint64("level", block.Header.Number). + Msg("received block") + } + } +} diff --git a/pkg/indexer/sqd_receiver/api/api.go b/pkg/indexer/subsquid/receiver/api/api.go similarity index 100% rename from pkg/indexer/sqd_receiver/api/api.go rename to pkg/indexer/subsquid/receiver/api/api.go diff --git a/pkg/indexer/sqd_receiver/api/request.go b/pkg/indexer/subsquid/receiver/api/request.go similarity index 92% rename from pkg/indexer/sqd_receiver/api/request.go rename to pkg/indexer/subsquid/receiver/api/request.go index 5d3dfef..f20bf8c 100644 --- a/pkg/indexer/sqd_receiver/api/request.go +++ b/pkg/indexer/subsquid/receiver/api/request.go @@ -24,7 +24,11 @@ type Fields struct { } type BlockField struct { - Timestamp bool `json:"timestamp"` + Timestamp bool `json:"timestamp"` + ParentHash bool `json:"parentHash,omitempty"` + Status bool `json:"status,omitempty"` + NewRoot bool `json:"newRoot,omitempty"` + SequencerAddress bool `json:"sequencerAddress,omitempty"` } type StateUpdateField struct { @@ -98,7 +102,11 @@ func NewRequest(fromLevel uint64, toLevel uint64) *Request { IncludeAllBlocks: true, Fields: Fields{ Block: BlockField{ - Timestamp: true, + ParentHash: true, + Status: true, + NewRoot: true, + Timestamp: true, + SequencerAddress: true, }, StateUpdate: StateUpdateField{ NewRoot: true, diff --git a/pkg/indexer/sqd_receiver/api/response.go b/pkg/indexer/subsquid/receiver/api/response.go similarity index 100% rename from pkg/indexer/sqd_receiver/api/response.go rename to pkg/indexer/subsquid/receiver/api/response.go diff --git a/pkg/indexer/sqd_receiver/receiver.go b/pkg/indexer/subsquid/receiver/receiver.go similarity index 92% rename from pkg/indexer/sqd_receiver/receiver.go rename to pkg/indexer/subsquid/receiver/receiver.go index 9b34bdd..8256bfd 100644 --- a/pkg/indexer/sqd_receiver/receiver.go +++ b/pkg/indexer/subsquid/receiver/receiver.go @@ -1,9 +1,9 @@ -package sqd_receiver +package receiver import ( "context" rcvr "github.com/dipdup-io/starknet-indexer/pkg/indexer/receiver" - "github.com/dipdup-io/starknet-indexer/pkg/indexer/sqd_receiver/api" + api2 "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api" "github.com/dipdup-io/workerpool" "github.com/dipdup-net/indexer-sdk/pkg/modules" "github.com/pkg/errors" @@ -29,11 +29,11 @@ const ( type Receiver struct { modules.BaseModule - api *api.Subsquid + api *api2.Subsquid startLevel uint64 level uint64 threadsCount int - blocks chan *api.SqdBlockResponse + blocks chan *api2.SqdBlockResponse getIndexerHeight GetIndexerHeight pool *workerpool.Pool[BlocksToWorker] processing map[uint64]struct{} @@ -57,12 +57,12 @@ func New(cfg config.Config, } receiver := &Receiver{ - BaseModule: modules.New("subsquid receiver"), + BaseModule: modules.New("sqd receiver"), startLevel: startLevel, getIndexerHeight: getIndexerHeight, threadsCount: threadsCount, - api: api.NewSubsquid(dsCfg), - blocks: make(chan *api.SqdBlockResponse, cfg.ThreadsCount*10), + api: api2.NewSubsquid(dsCfg), + blocks: make(chan *api2.SqdBlockResponse, cfg.ThreadsCount*10), processing: make(map[uint64]struct{}), processingMx: new(sync.Mutex), timeout: time.Duration(cfg.Timeout) * time.Second, diff --git a/pkg/indexer/sqd_receiver/sequencer.go b/pkg/indexer/subsquid/receiver/sequencer.go similarity index 88% rename from pkg/indexer/sqd_receiver/sequencer.go rename to pkg/indexer/subsquid/receiver/sequencer.go index cca3926..6ab17a9 100644 --- a/pkg/indexer/sqd_receiver/sequencer.go +++ b/pkg/indexer/subsquid/receiver/sequencer.go @@ -1,8 +1,8 @@ -package sqd_receiver +package receiver import ( "context" - "github.com/dipdup-io/starknet-indexer/pkg/indexer/sqd_receiver/api" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/subsquid/receiver/api" ) func (r *Receiver) sequencer(ctx context.Context) { diff --git a/pkg/indexer/sqd_receiver/sync.go b/pkg/indexer/subsquid/receiver/sync.go similarity index 97% rename from pkg/indexer/sqd_receiver/sync.go rename to pkg/indexer/subsquid/receiver/sync.go index c59edab..bfeae6c 100644 --- a/pkg/indexer/sqd_receiver/sync.go +++ b/pkg/indexer/subsquid/receiver/sync.go @@ -1,4 +1,4 @@ -package sqd_receiver +package receiver import ( "context" diff --git a/pkg/indexer/sqd_receiver/worker.go b/pkg/indexer/subsquid/receiver/worker.go similarity index 97% rename from pkg/indexer/sqd_receiver/worker.go rename to pkg/indexer/subsquid/receiver/worker.go index f81f54d..f294203 100644 --- a/pkg/indexer/sqd_receiver/worker.go +++ b/pkg/indexer/subsquid/receiver/worker.go @@ -1,4 +1,4 @@ -package sqd_receiver +package receiver import ( "context"