diff --git a/.env.example b/.env.example index fdd7a67..26bb542 100644 --- a/.env.example +++ b/.env.example @@ -10,4 +10,5 @@ POSTGRES_DB=starknet POSTGRES_PASSWORD= # REQUIRED STARKNET_NODE_URL= # REQUIRED if INDEXER_DATASOURCE=node NODE_APIKEY= # REQUIRED if your node provider has api key. It's api key. -NODE_HEADER_APIKEY= # REQUIRED if your node provider has api key. It's header name. \ No newline at end of file +NODE_HEADER_APIKEY= # REQUIRED if your node provider has api key. It's header name. +STARKNET_SUBSQUID_URL= # REQUIRED if INDEXER_DATASOURCE=subsquid \ No newline at end of file diff --git a/build/dipdup.yml b/build/dipdup.yml index 44b5a12..49f6ee8 100644 --- a/build/dipdup.yml +++ b/build/dipdup.yml @@ -22,6 +22,9 @@ datasources: fallback: url: ${STARKNET_FALLBACK_NODE_URL} rps: ${STARKNET_FALLBACK_NODE_RPS:-1} + subsquid: + url: ${STARKNET_SUBSQUID_URL} + rps: ${STARKNET_SUBSQUID_RPS:-5} database: kind: postgres diff --git a/go.mod b/go.mod index fc3b18f..d1d94af 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/rs/zerolog v1.30.0 github.com/shopspring/decimal v1.3.1 github.com/spf13/cobra v1.7.0 - github.com/stretchr/testify v1.8.4 + github.com/stretchr/testify v1.9.0 github.com/uptrace/bun v1.1.14 go.uber.org/mock v0.2.0 google.golang.org/grpc v1.58.3 @@ -31,6 +31,7 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/jackc/puddle/v2 v2.2.1 // indirect github.com/moby/sys/user v0.1.0 // indirect + github.com/opus-domini/fast-shot v1.1.4 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect golang.org/x/sync v0.3.0 // indirect diff --git a/go.sum b/go.sum index 1a0cedb..a532ff8 100644 --- a/go.sum +++ b/go.sum @@ -235,6 +235,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0-rc4 h1:oOxKUJWnFC4YGHCCMNql1x4YaDfYBTS5Y4x/Cgeo1E0= github.com/opencontainers/image-spec v1.1.0-rc4/go.mod h1:X4pATf0uXsnn3g5aiGIsVnJBR4mxhKzfwmvK/B2NTm8= +github.com/opus-domini/fast-shot v1.1.4 h1:xWTO/4JEILjZM/rP6mwiWe/jZyE9+L1G9sC4BsoynAk= +github.com/opus-domini/fast-shot v1.1.4/go.mod h1:BOr2JXHQJhOnYsxyCvFbgBP3BuYCjgh2YfzWKweEL0A= github.com/paulmach/orb v0.10.0 h1:guVYVqzxHE/CQ1KpfGO077TR0ATHSNjp4s6XGLn3W9s= github.com/paulmach/orb v0.10.0/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/EnuLaLU= github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= @@ -281,6 +283,7 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/testcontainers/testcontainers-go v0.22.0 h1:hOK4NzNu82VZcKEB1aP9LO1xYssVFMvlfeuDW9JMmV0= github.com/testcontainers/testcontainers-go v0.22.0/go.mod h1:k0YiPa26xJCRUbUkYqy5rY6NGvSbVCeUBXCvucscBR4= github.com/testcontainers/testcontainers-go/modules/postgres v0.22.0 h1:OHVaqu9MRGMSlro9AD5UCfj8XiHwQdhB9thE4vINq+E= diff --git a/pkg/indexer/config/config.go b/pkg/indexer/config/config.go index 1364cc5..8cc8708 100644 --- a/pkg/indexer/config/config.go +++ b/pkg/indexer/config/config.go @@ -2,11 +2,11 @@ package config // Config - configuration structure for indexer type Config struct { - Name string `yaml:"name" validate:"omitempty"` - StartLevel uint64 `yaml:"start_level" validate:"omitempty"` - ThreadsCount int `yaml:"threads_count" validate:"omitempty,min=1"` - Timeout uint64 `yaml:"timeout" validate:"omitempty"` + Name string `yaml:"name" validate:"omitempty"` + StartLevel uint64 `yaml:"start_level" validate:"omitempty"` + ThreadsCount int `yaml:"threads_count" validate:"omitempty,min=1"` + Timeout uint64 `yaml:"timeout" validate:"omitempty"` ClassInterfacesDir string `yaml:"class_interfaces_dir" validate:"required,dir"` - BridgedTokensFile string `yaml:"bridged_tokens_file" validate:"required,file"` - Datasource string `yaml:"datasource" validate:"required,oneof=sequencer node"` + BridgedTokensFile string `yaml:"bridged_tokens_file" validate:"required,file"` + Datasource string `yaml:"datasource" validate:"required,oneof=sequencer node subsquid"` } diff --git a/pkg/indexer/indexer.go b/pkg/indexer/indexer.go index 9d01558..3cf6918 100644 --- a/pkg/indexer/indexer.go +++ b/pkg/indexer/indexer.go @@ -56,6 +56,8 @@ type Indexer struct { statusChecker *statusChecker rollbackManager models.Rollback + blocksFetcher IBlocksFetcher + rollback chan struct{} rollbackRerun chan struct{} rollbackWait *sync.WaitGroup @@ -63,6 +65,116 @@ type Indexer struct { txWriteMutex *sync.Mutex } +type IBlocksFetcher interface { + getNew(ctx context.Context) error +} + +type BlocksFetcher struct { + indexer *Indexer +} +type SqdBlocksFetcher struct { + indexer *Indexer +} + +func (f *BlocksFetcher) getNew(ctx context.Context) error { + head, err := f.indexer.receiver.Head(ctx) + if err != nil { + return err + } + + if head < f.indexer.state.Height() { + log.Warn(). + Uint64("indexer_height", f.indexer.state.Height()). + Uint64("node_height", head). + Msg("rollback detected by block height") + if err := f.indexer.makeRollback(ctx, head); err != nil { + return errors.Wrap(err, "makeRollback") + } + } + + for head > f.indexer.state.Height() { + f.indexer.Log.Info(). + Uint64("indexer_block", f.indexer.state.Height()). + Uint64("node_block", head). + Msg("syncing...") + + startLevel := f.indexer.cfg.StartLevel + if startLevel < f.indexer.state.Height() { + startLevel = f.indexer.state.Height() + if f.indexer.state.Height() > 0 { + startLevel += 1 + } + } + + for height := startLevel; height <= head; height++ { + select { + case <-ctx.Done(): + return nil + case <-f.indexer.rollback: + log.Info().Msg("stop receiving blocks") + return nil + default: + if f.indexer.checkQueue(ctx) { + return nil + } + f.indexer.receiver.AddTask(height) + } + } + + time.Sleep(5 * time.Second) + + for head, err = f.indexer.receiver.Head(ctx); err != nil; { + select { + case <-ctx.Done(): + return nil + case <-f.indexer.rollback: + log.Info().Msg("stop receiving blocks") + return nil + default: + log.Err(err).Msg("receive head error") + return err + } + } + } + + f.indexer.Log.Info().Uint64("height", f.indexer.state.Height()).Msg("synced") + return nil +} + +func (f *SqdBlocksFetcher) getNew(ctx context.Context) error { + head, err := f.indexer.receiver.Head(ctx) + if err != nil { + return err + } + + if head < f.indexer.state.Height() { + log.Warn(). + Uint64("indexer_height", f.indexer.state.Height()). + Uint64("node_height", head). + Msg("rollback detected by block height") + if err := f.indexer.makeRollback(ctx, head); err != nil { + return errors.Wrap(err, "makeRollback") + } + } + + f.indexer.Log.Info(). + Uint64("indexer_block", f.indexer.state.Height()). + Uint64("node_block", head). + Msg("syncing...") + + startLevel := f.indexer.cfg.StartLevel + if startLevel < f.indexer.state.Height() { + startLevel = f.indexer.state.Height() + if f.indexer.state.Height() > 0 { + startLevel += 1 + } + } + f.indexer.receiver.GetSqdData(ctx, startLevel) + + f.indexer.Log.Info().Uint64("height", f.indexer.state.Height()).Msg("synced") + return nil +} + // New - creates new indexer entity func New( cfg config.Config, @@ -93,6 +205,18 @@ func New( txWriteMutex: new(sync.Mutex), rollbackWait: new(sync.WaitGroup), } + + switch cfg.Datasource { + case "subsquid": + indexer.blocksFetcher = &SqdBlocksFetcher{ + indexer: indexer, + } + default: + indexer.blocksFetcher = &BlocksFetcher{ + indexer: indexer, + } + } + rcvr, err := receiver.NewReceiver(cfg, datasource) if err != nil { return nil, err @@ -201,73 +325,8 @@ func (indexer *Indexer) checkQueue(ctx context.Context) bool { return false } -func (indexer *Indexer) getNewBlocks(ctx context.Context) error { - head, err := indexer.receiver.Head(ctx) - if err != nil { - return err - } - - if head < indexer.state.Height() { - log.Warn(). - Uint64("indexer_height", indexer.state.Height()). - Uint64("node_height", head). - Msg("rollback detected by block height") - if err := indexer.makeRollback(ctx, head); err != nil { - return errors.Wrap(err, "makeRollback") - } - } - - for head > indexer.state.Height() { - indexer.Log.Info(). - Uint64("indexer_block", indexer.state.Height()). - Uint64("node_block", head). - Msg("syncing...") - - startLevel := indexer.cfg.StartLevel - if startLevel < indexer.state.Height() { - startLevel = indexer.state.Height() - if indexer.state.Height() > 0 { - startLevel += 1 - } - } - - for height := startLevel; height <= head; height++ { - select { - case <-ctx.Done(): - return nil - case <-indexer.rollback: - log.Info().Msg("stop receiving blocks") - return nil - default: - if indexer.checkQueue(ctx) { - return nil - } - indexer.receiver.AddTask(height) - } - } - - time.Sleep(5 * time.Second) - - for head, err = indexer.receiver.Head(ctx); err != nil; { - select { - case <-ctx.Done(): - return nil - case <-indexer.rollback: - log.Info().Msg("stop receiving blocks") - return nil - default: - log.Err(err).Msg("receive head error") - return err - } - } - } - - indexer.Log.Info().Uint64("height", indexer.state.Height()).Msg("synced") - return nil -} - func (indexer *Indexer) sync(ctx context.Context) { - if err := indexer.getNewBlocks(ctx); err != nil { + if err := indexer.blocksFetcher.getNew(ctx); err != nil { indexer.Log.Err(err).Msg("getNewBlocks") } @@ -281,11 +340,11 @@ func (indexer *Indexer) sync(ctx context.Context) { case <-ctx.Done(): return case <-ticker.C: - if err := indexer.getNewBlocks(ctx); err != nil { + if err := indexer.blocksFetcher.getNew(ctx); err != nil { indexer.Log.Err(err).Msg("getNewBlocks") } case <-indexer.rollbackRerun: - if err := indexer.getNewBlocks(ctx); err != nil { + if err := indexer.blocksFetcher.getNew(ctx); err != nil { indexer.Log.Err(err).Msg("getNewBlocks") } } diff --git a/pkg/indexer/receiver/receiver.go b/pkg/indexer/receiver/receiver.go index c97514b..83b7009 100644 --- a/pkg/indexer/receiver/receiver.go +++ b/pkg/indexer/receiver/receiver.go @@ -2,6 +2,7 @@ package receiver import ( "context" + "github.com/dipdup-io/starknet-indexer/pkg/indexer/receiver/subsquid" "sync" "time" @@ -59,6 +60,7 @@ func (r *Result) setStateUpdates(stateUpdate starknetData.StateUpdate) { type Receiver struct { api API fallbackAPI API + sqdAPI *subsquid.Subsquid result chan Result pool *workerpool.Pool[uint64] processing map[uint64]struct{} @@ -75,16 +77,7 @@ func NewReceiver(cfg config.Config, ds map[string]ddConfig.DataSource) (*Receive return nil, errors.Errorf("unknown datasource name: %s", cfg.Datasource) } - var api API - switch cfg.Datasource { - case "node": - api = NewNode(dsCfg) - default: - return nil, errors.Errorf("usupported datasource type: %s", cfg.Datasource) - } - receiver := &Receiver{ - api: api, result: make(chan Result, cfg.ThreadsCount*2), processing: make(map[uint64]struct{}), processingMx: new(sync.Mutex), @@ -93,6 +86,15 @@ func NewReceiver(cfg config.Config, ds map[string]ddConfig.DataSource) (*Receive wg: new(sync.WaitGroup), } + switch cfg.Datasource { + case "node": + receiver.api = NewNode(dsCfg) + case "subsquid": + receiver.sqdAPI = subsquid.NewSubsquid(dsCfg) + default: + return nil, errors.Errorf("usupported datasource type: %s", cfg.Datasource) + } + if fallbackDs, ok := ds["fallback"]; ok && fallbackDs.URL != "" { receiver.fallbackAPI = NewNode(fallbackDs) } @@ -172,6 +174,10 @@ func (r *Receiver) Head(ctx context.Context) (uint64, error) { requestCtx, cancel := context.WithTimeout(ctx, r.timeout) defer cancel() + if r.usingSqd() { + return r.sqdAPI.GetHead(requestCtx) + } + return r.api.Head(requestCtx) } @@ -248,6 +254,10 @@ func (r *Receiver) getBlock(ctx context.Context, blockId starknetData.BlockID, r } } +func (r *Receiver) usingSqd() bool { + return r.sqdAPI != nil +} + func (r *Receiver) traceBlock(ctx context.Context, blockId starknetData.BlockID, result *Result, wg *sync.WaitGroup) { defer wg.Done() @@ -305,3 +315,30 @@ func (r *Receiver) receiveStateUpdate(ctx context.Context, blockId starknetData. break } } + +func (r *Receiver) GetSqdData(ctx context.Context, startLevel uint64) { + r.sqdAPI.GetData(ctx, startLevel) + //for { + // select { + // case <-ctx.Done(): + // return + // default: + // } + // + // response, err := api.GetData(ctx, blockId) + // if err != nil { + // if errors.Is(err, context.Canceled) { + // return + // } + // r.log.Err(err).Uint64("height", *blockId.Number).Msg("get block request") + // if r.fallbackAPI != nil { + // r.log.Warn().Msg("trying fallback node...") + // api = r.fallbackAPI + // } + // time.Sleep(time.Second) + // continue + // } + // result.setBlock(response) + // break + //} +} diff --git a/pkg/indexer/receiver/subsquid/request.go b/pkg/indexer/receiver/subsquid/request.go new file mode 100644 index 0000000..7730b67 --- /dev/null +++ b/pkg/indexer/receiver/subsquid/request.go @@ -0,0 +1,166 @@ +package subsquid + +type Request struct { + Type string `json:"type"` + FromBlock uint64 `json:"fromBlock"` + IncludeAllBlocks bool `json:"includeAllBlocks"` + Fields Fields `json:"fields"` + StateUpdates []map[string]any `json:"stateUpdates"` + StorageDiffs []map[string]any `json:"storageDiffs"` + Traces []Trace `json:"traces"` + Messages []map[string]any `json:"messages"` + Transactions []TransactionWithTrace `json:"transactions"` +} + +type Fields struct { + Block BlockField `json:"block"` + StateUpdate StateUpdateField `json:"stateUpdate"` + StorageDiff StorageDiffField `json:"storageDiff"` + Trace TraceField `json:"trace"` + Transaction TransactionField `json:"transaction"` + Event EventField `json:"event"` + Message MessageField `json:"message"` +} + +type BlockField struct { + Timestamp bool `json:"timestamp"` +} + +type StateUpdateField struct { + NewRoot bool `json:"newRoot"` + OldRoot bool `json:"oldRoot"` + DeprecatedDeclaredClasses bool `json:"deprecatedDeclaredClasses"` + DeclaredClasses bool `json:"declaredClasses"` + DeployedContracts bool `json:"deployedContracts"` + ReplacedClasses bool `json:"replacedClasses"` + Nonces bool `json:"nonces"` +} + +type StorageDiffField struct { + Value bool `json:"value"` +} + +type TraceField struct { + TraceType bool `json:"traceType"` + InvocationType bool `json:"invocationType"` + CallerAddress bool `json:"callerAddress"` + ContractAddress bool `json:"contractAddress"` + CallType bool `json:"callType"` + ClassHash bool `json:"classHash"` + EntryPointSelector bool `json:"entryPointSelector"` + EntryPointType bool `json:"entryPointType"` + Calldata bool `json:"calldata"` + Result bool `json:"result"` +} + +type TransactionField struct { + TransactionHash bool `json:"transactionHash"` + ContractAddress bool `json:"contractAddress"` + EntryPointSelector bool `json:"entryPointSelector"` + Calldata bool `json:"calldata"` + MaxFee bool `json:"maxFee"` + Type bool `json:"type"` + SenderAddress bool `json:"senderAddress"` + Version bool `json:"version"` + Signature bool `json:"signature"` + Nonce bool `json:"nonce"` + ClassHash bool `json:"classHash"` + CompiledClassHash bool `json:"compiledClassHash"` + ContractAddressSalt bool `json:"contractAddressSalt"` + ConstructorCalldata bool `json:"constructorCalldata"` +} + +type EventField struct { + Keys bool `json:"keys"` +} + +type MessageField struct { + FromAddress bool `json:"fromAddress"` + ToAddress bool `json:"toAddress"` + Payload bool `json:"payload"` +} + +type Trace struct { + Events bool `json:"events"` +} + +type TransactionWithTrace struct { + Traces bool `json:"traces"` + Events bool `json:"events"` +} + +func NewRequest(level uint64) *Request { + return &Request{ + Type: "starknet", + FromBlock: level, + IncludeAllBlocks: true, + Fields: Fields{ + Block: BlockField{ + Timestamp: true, + }, + StateUpdate: StateUpdateField{ + NewRoot: true, + OldRoot: true, + DeprecatedDeclaredClasses: true, + DeclaredClasses: true, + DeployedContracts: true, + ReplacedClasses: true, + Nonces: true, + }, + StorageDiff: StorageDiffField{ + Value: true, + }, + Trace: TraceField{ + TraceType: true, + InvocationType: true, + CallerAddress: true, + ContractAddress: true, + CallType: true, + ClassHash: true, + EntryPointSelector: true, + EntryPointType: true, + Calldata: true, + Result: true, + }, + Transaction: TransactionField{ + TransactionHash: true, + ContractAddress: true, + EntryPointSelector: true, + Calldata: true, + MaxFee: true, + Type: true, + SenderAddress: true, + Version: true, + Signature: true, + Nonce: true, + ClassHash: true, + CompiledClassHash: true, + ContractAddressSalt: true, + ConstructorCalldata: true, + }, + Event: EventField{ + Keys: true, + }, + Message: MessageField{ + FromAddress: true, + ToAddress: true, + Payload: true, + }, + }, + StateUpdates: []map[string]any{ + {}, + }, + StorageDiffs: []map[string]any{ + {}, + }, + Traces: []Trace{ + {Events: true}, + }, + Messages: []map[string]any{ + {}, + }, + Transactions: []TransactionWithTrace{ + {Traces: true, Events: true}, + }, + } +} diff --git a/pkg/indexer/receiver/subsquid/response.go b/pkg/indexer/receiver/subsquid/response.go new file mode 100644 index 0000000..5bc239e --- /dev/null +++ b/pkg/indexer/receiver/subsquid/response.go @@ -0,0 +1,79 @@ +package subsquid + +type SqdBlockResponse struct { + Header BlockHeader `json:"header"` + Transactions []Transaction `json:"transactions"` + Traces []TraceResponse `json:"traces"` + Messages []Message `json:"messages"` + StateUpdates []StateUpdate `json:"state_updates"` + StorageDiffs []StorageDiff `json:"storage_diffs"` +} + +type BlockHeader struct { + Number uint64 `example:"321" json:"number"` + Hash string `example:"0x44529f2c44d9113e0ba4e53cb6e84f425ec186cda27545827b5a72d5540bfdc" json:"hash"` + Timestamp int64 `example:"1641950335" json:"timestamp"` +} + +type Transaction struct { + TransactionIndex uint `example:"0" json:"transactionIndex"` + TransactionHash string `example:"0x794fae89c8c4b8f5f77a4996948d2547740f90e54bb4a5cc6119a7c70eca42c" json:"transactionHash"` + ContractAddress *string `example:"0x1cee8364383aea317eefc181dbd8732f1504fd4511aed58f32c369dd546da0d" json:"contractAddress"` + EntryPointSelector *string `example:"0x317eb442b72a9fae758d4fb26830ed0d9f31c8e7da4dbff4e8c59ea6a158e7f" json:"entryPointSelector"` + Calldata *[]string `json:"calldata"` + MaxFee *string `example:"0x0" json:"maxFee"` + Type string `example:"INVOKE" json:"type"` + SenderAddress *string `json:"senderAddress"` + Version string `example:"0x0" json:"version"` + Signature *[]string `json:"signature"` + Nonce *uint64 `json:"nonce"` + ClassHash *string `json:"classHash"` + CompiledClassHash *string `json:"compiledClassHash"` + ContractAddressSalt *string `json:"contractAddressSalt"` + ConstructorCalldata *[]string `json:"constructorCalldata"` +} + +type TraceResponse struct { + TransactionIndex uint `json:"transaction_index"` + TraceAddress []int `json:"trace_address"` + TraceType string `json:"traceType"` + InvocationType string `json:"invocationType"` + CallerAddress string `json:"callerAddress"` + ContractAddress string `json:"contractAddress"` + CallType *string `json:"callType"` + ClassHash *string `json:"classHash"` + EntryPointSelector *string `json:"entryPointSelector"` + EntryPointType *string `json:"entryPointType"` + Calldata []string `json:"calldata"` + Result []string `json:"result"` +} + +type Message struct { + TransactionIndex uint `json:"transaction_index"` + TraceAddress []int `json:"trace_address"` + Order uint `json:"order"` + FromAddress *string `json:"fromAddress"` + ToAddress string `json:"toAddress"` + Payload []string `json:"payload"` +} + +type StateUpdate struct { + NewRoot string `json:"newRoot"` + OldRoot string `json:"oldRoot"` + DeprecatedClasses []any `json:"deprecatedDeclaredClasses"` + DeclaredClasses []any `json:"declaredClasses"` + DeployedContracts []DeployedContract `json:"deployedContracts"` + ReplacedClasses []any `json:"replacedClasses"` + Nonces []any `json:"nonces"` +} + +type DeployedContract struct { + Address string `json:"address"` + ClassHash string `json:"class_hash"` +} + +type StorageDiff struct { + Address string `json:"address"` + Key string `json:"key"` + Value string `json:"value"` +} diff --git a/pkg/indexer/receiver/subsquid/subsquid.go b/pkg/indexer/receiver/subsquid/subsquid.go new file mode 100644 index 0000000..f173afb --- /dev/null +++ b/pkg/indexer/receiver/subsquid/subsquid.go @@ -0,0 +1,79 @@ +package subsquid + +import ( + "context" + "fmt" + "github.com/dipdup-net/go-lib/config" + fastshot "github.com/opus-domini/fast-shot" + "github.com/opus-domini/fast-shot/constant/mime" + "strconv" +) + +type Subsquid struct { + httpClient fastshot.ClientHttpMethods +} + +func NewSubsquid(cfg config.DataSource) *Subsquid { + var httpClient = fastshot.NewClient(cfg.URL). + Build() + + return &Subsquid{ + httpClient: httpClient, + } +} + +func (s *Subsquid) GetWorkerUrl(ctx context.Context, startLevel uint64) (string, error) { + path := fmt.Sprintf("/%d/worker", startLevel) + response, err := s.httpClient. + GET(path). + Send() + + if err != nil { + return "", err + } + + return response.Body().AsString() +} + +func (s *Subsquid) GetData(ctx context.Context, startLevel uint64) { + workerUrl, err := s.GetWorkerUrl(ctx, startLevel) + if err != nil { + return + } + + var workerClient = fastshot.NewClient(workerUrl). + Build() + + response, err := workerClient.POST(""). + Header().AddContentType(mime.JSON). + Body().AsJSON(NewRequest(startLevel)). + Send() + + if err != nil { + return + } + + var result []SqdBlockResponse + err = response.Body().AsJSON(&result) + if err != nil { + return + } + fmt.Println("done") +} + +func (s *Subsquid) GetHead(ctx context.Context) (uint64, error) { + response, err := s.httpClient. + GET("/height"). + Send() + + if err != nil { + return 0, err + } + + stringResponse, err := response.Body().AsString() + if err != nil { + return 0, err + } + + return strconv.ParseUint(stringResponse, 10, 64) +}