Skip to content

Commit

Permalink
DexScreener v25 backport (#8386)
Browse files Browse the repository at this point in the history
* Initial commit for DexScreener POC

* Added some boilerplate items to be integrated with the core ingestion logic

* feat: Indexer service wiring (#8385)

* feat: Indexer service wiring

* wire supply listener

* working version

* progress

* Added config for indexer service and wiring up app init sequences

* feat(indexer): proper supply offset handling (#8404)

* feat(indexer): proper supply offset handling

* lint

* test: token supply write listener (#8405)

* test: token supply write listener

* updates

* feat: rename ingester to publisher, added block publishing code (#8407)

* Rename ingester to publisher. Added block publishing code

* additional renaming needed for publisher

* Fixed lint

---------

Co-authored-by: Calvin <calvin@osmosis.team>

* Fixed compilation error after merge

* clean up app.go

* clean up

---------

Co-authored-by: Calvin <calvin@osmosis.team>
Co-authored-by: Calvin <1727450+cryptomatictrader@users.noreply.github.com>
  • Loading branch information
3 people authored Jun 21, 2024
1 parent a1bf555 commit 32a78ad
Show file tree
Hide file tree
Showing 18 changed files with 869 additions and 10 deletions.
64 changes: 54 additions & 10 deletions app/app.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package app

import (
"context"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -36,17 +37,20 @@ import (
"github.com/cosmos/ibc-go/v7/modules/apps/transfer"
ibc "github.com/cosmos/ibc-go/v7/modules/core"

"github.com/osmosis-labs/osmosis/v25/ingest/indexer"
indexerdomain "github.com/osmosis-labs/osmosis/v25/ingest/indexer/domain"
indexerservice "github.com/osmosis-labs/osmosis/v25/ingest/indexer/service"
indexerwritelistener "github.com/osmosis-labs/osmosis/v25/ingest/indexer/service/writelistener"
"github.com/osmosis-labs/osmosis/v25/ingest/sqs"
"github.com/osmosis-labs/osmosis/v25/ingest/sqs/domain"
sqsservice "github.com/osmosis-labs/osmosis/v25/ingest/sqs/service"
sqswritelistener "github.com/osmosis-labs/osmosis/v25/ingest/sqs/service/writelistener"
concentratedtypes "github.com/osmosis-labs/osmosis/v25/x/concentrated-liquidity/types"
cosmwasmpooltypes "github.com/osmosis-labs/osmosis/v25/x/cosmwasmpool/types"
gammtypes "github.com/osmosis-labs/osmosis/v25/x/gamm/types"

storetypes "github.com/cosmos/cosmos-sdk/store/types"

"github.com/osmosis-labs/osmosis/v25/ingest/sqs/service"
"github.com/osmosis-labs/osmosis/v25/ingest/sqs/service/writelistener"

"github.com/osmosis-labs/osmosis/osmoutils"

nodeservice "github.com/cosmos/cosmos-sdk/client/grpc/node"
Expand Down Expand Up @@ -312,7 +316,7 @@ func NewOsmosisApp(

// Create pool tracker that tracks pool updates
// made by the write listenetrs.
poolTracker := service.NewPoolTracker()
poolTracker := sqsservice.NewPoolTracker()

// Create write listeners for the SQS service.
writeListeners := getSQSServiceWriteListeners(app, appCodec, poolTracker, app.WasmKeeper)
Expand All @@ -322,16 +326,44 @@ func NewOsmosisApp(
if !ok {
panic(fmt.Sprintf("failed to retrieve %s from config.toml", rpcAddressConfigName))
}
nodeStatusChecker := service.NewNodeStatusChecker(rpcAddress)
nodeStatusChecker := sqsservice.NewNodeStatusChecker(rpcAddress)

// Create the SQS streaming service by setting up the write listeners,
// the SQS ingester, and the pool tracker.
sqsStreamingService := service.New(writeListeners, sqsIngester, poolTracker, nodeStatusChecker)
sqsStreamingService := sqsservice.New(writeListeners, sqsIngester, poolTracker, nodeStatusChecker)

// Register the SQS streaming service with the app.
app.SetStreamingService(sqsStreamingService)
}

// Initialize the config object for the indexer
indexerConfig := indexer.NewConfigFromOptions(appOpts)

// initialize indexer if enabled
if indexerConfig.IsEnabled {
indexerIngester := indexerConfig.Initialize()

// TODO: handle graceful shutdown
pubSubCtx := context.Background()

// Create cold start manager
coldStartManager := indexerdomain.NewColdStartManager()

// Create write listeners for the indexer service.
writeListeners := getIndexerServiceWriteListeners(pubSubCtx, app, indexerIngester, coldStartManager)

// Create keepers for the indexer service.
keepers := indexerdomain.Keepers{
BankKeeper: app.BankKeeper,
}

// Create the indexer streaming service.
indexerStreamingService := indexerservice.New(writeListeners, coldStartManager, indexerIngester, keepers)

// Register the SQS streaming service with the app.
app.SetStreamingService(indexerStreamingService)
}

// TODO: There is a bug here, where we register the govRouter routes in InitNormalKeepers and then
// call setupHooks afterwards. Therefore, if a gov proposal needs to call a method and that method calls a
// hook, we will get a nil pointer dereference error due to the hooks in the keeper not being
Expand Down Expand Up @@ -540,20 +572,32 @@ func getSQSServiceWriteListeners(app *OsmosisApp, appCodec codec.Codec, blockPoo
writeListeners := make(map[storetypes.StoreKey][]storetypes.WriteListener)

writeListeners[app.GetKey(concentratedtypes.ModuleName)] = []storetypes.WriteListener{
writelistener.NewConcentrated(blockPoolUpdateTracker),
sqswritelistener.NewConcentrated(blockPoolUpdateTracker),
}
writeListeners[app.GetKey(gammtypes.StoreKey)] = []storetypes.WriteListener{
writelistener.NewGAMM(blockPoolUpdateTracker, appCodec),
sqswritelistener.NewGAMM(blockPoolUpdateTracker, appCodec),
}
writeListeners[app.GetKey(cosmwasmpooltypes.StoreKey)] = []storetypes.WriteListener{
writelistener.NewCosmwasmPool(blockPoolUpdateTracker, wasmkeeper),
sqswritelistener.NewCosmwasmPool(blockPoolUpdateTracker, wasmkeeper),
}
writeListeners[app.GetKey(banktypes.StoreKey)] = []storetypes.WriteListener{
writelistener.NewCosmwasmPoolBalance(blockPoolUpdateTracker),
sqswritelistener.NewCosmwasmPoolBalance(blockPoolUpdateTracker),
}
return writeListeners
}

// getIndexerServiceWriteListeners returns the write listeners for the app that are specific to the indexer service.
func getIndexerServiceWriteListeners(ctx context.Context, app *OsmosisApp, client indexerdomain.Publisher, coldStartManager indexerdomain.ColdStartManager) map[storetypes.StoreKey][]storetypes.WriteListener {
writeListeners := make(map[storetypes.StoreKey][]storetypes.WriteListener)

// Add write listeners for the bank module.
writeListeners[app.GetKey(banktypes.ModuleName)] = []storetypes.WriteListener{
indexerwritelistener.NewBank(ctx, client, coldStartManager),
}

return writeListeners
}

// we cache the reflectionService to save us time within tests.
var cachedReflectionService *runtimeservices.ReflectionService = nil

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/osmosis-labs/osmosis/v25
go 1.21.8

require (
cloud.google.com/go/pubsub v1.36.1
cosmossdk.io/api v0.3.1
cosmossdk.io/errors v1.0.1
cosmossdk.io/math v1.3.0
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ cloud.google.com/go/kms v1.10.1/go.mod h1:rIWk/TryCkR59GMC3YtHtXeLzd634lBbKenvyy
cloud.google.com/go/kms v1.11.0/go.mod h1:hwdiYC0xjnWsKQQCQQmIQnS9asjYVSK6jtXm+zFqXLM=
cloud.google.com/go/kms v1.12.1/go.mod h1:c9J991h5DTl+kg7gi3MYomh12YEENGrf48ee/N/2CDM=
cloud.google.com/go/kms v1.15.0/go.mod h1:c9J991h5DTl+kg7gi3MYomh12YEENGrf48ee/N/2CDM=
cloud.google.com/go/kms v1.15.7 h1:7caV9K3yIxvlQPAcaFffhlT7d1qpxjB1wHBtjWa13SM=
cloud.google.com/go/kms v1.15.7/go.mod h1:ub54lbsa6tDkUwnu4W7Yt1aAIFLnspgh0kPGToDukeI=
cloud.google.com/go/language v1.4.0/go.mod h1:F9dRpNFQmJbkaop6g0JhSBXCNlO90e1KWx5iDdxbWic=
cloud.google.com/go/language v1.6.0/go.mod h1:6dJ8t3B+lUYfStgls25GusK04NLh3eDLQnWM3mdEbhI=
cloud.google.com/go/language v1.7.0/go.mod h1:DJ6dYN/W+SQOjF8e1hLQXMF21AkH2w9wiPzPCJa2MIE=
Expand Down Expand Up @@ -550,6 +552,8 @@ cloud.google.com/go/pubsub v1.28.0/go.mod h1:vuXFpwaVoIPQMGXqRyUQigu/AX1S3IWugR9
cloud.google.com/go/pubsub v1.30.0/go.mod h1:qWi1OPS0B+b5L+Sg6Gmc9zD1Y+HaM0MdUr7LsupY1P4=
cloud.google.com/go/pubsub v1.32.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc=
cloud.google.com/go/pubsub v1.33.0/go.mod h1:f+w71I33OMyxf9VpMVcZbnG5KSUkCOUHYpFd5U1GdRc=
cloud.google.com/go/pubsub v1.36.1 h1:dfEPuGCHGbWUhaMCTHUFjfroILEkx55iUmKBZTP5f+Y=
cloud.google.com/go/pubsub v1.36.1/go.mod h1:iYjCa9EzWOoBiTdd4ps7QoMtMln5NwaZQpK1hbRfBDE=
cloud.google.com/go/pubsublite v1.5.0/go.mod h1:xapqNQ1CuLfGi23Yda/9l4bBCKz/wC3KIJ5gKcxveZg=
cloud.google.com/go/pubsublite v1.6.0/go.mod h1:1eFCS0U11xlOuMFV/0iBqw3zP12kddMeCbj/F3FSj9k=
cloud.google.com/go/pubsublite v1.7.0/go.mod h1:8hVMwRXfDfvGm3fahVbtDbiLePT3gpoiJYJY+vxWxVM=
Expand Down Expand Up @@ -2074,6 +2078,8 @@ go-simpler.org/musttag v0.8.0 h1:DR4UTgetNNhPRNo02rkK1hwDTRzAPotN+ZqYpdtEwWc=
go-simpler.org/musttag v0.8.0/go.mod h1:fiNdCkXt2S6je9Eblma3okjnlva9NT1Eg/WUt19rWu8=
go-simpler.org/sloglint v0.4.0 h1:UVJuUJo63iNQNFEOtZ6o1xAgagVg/giVLLvG9nNLobI=
go-simpler.org/sloglint v0.4.0/go.mod h1:v6zJ++j/thFPhefs2wEXoCKwT10yo5nkBDYRCXyqgNQ=
go.einride.tech/aip v0.66.0 h1:XfV+NQX6L7EOYK11yoHHFtndeaWh3KbD9/cN/6iWEt8=
go.einride.tech/aip v0.66.0/go.mod h1:qAhMsfT7plxBX+Oy7Huol6YUvZ0ZzdUz26yZsQwfl1M=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA=
go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw=
Expand Down
10 changes: 10 additions & 0 deletions ingest/indexer/domain/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package domain

import "time"

type Block struct {
ChainId string `json:"chain_id"`
Height uint64 `json:"height"`
BlockTime time.Time `json:"timestamp"`
GasConsumed uint64 `json:"gas_consumed"`
}
29 changes: 29 additions & 0 deletions ingest/indexer/domain/cold_start_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package domain

// ColdStartManager is an interface for managing the cold start state of the indexer.
type ColdStartManager interface {
// HasIngestedInitialData returns true if the indexer has ingested the initial data.
HasIngestedInitialData() bool

// MarkInitialDataIngested marks the initial data as ingested.
MarkInitialDataIngested()
}

type coldStartManager struct {
hasIngestedInitialData bool
}

var _ ColdStartManager = &coldStartManager{}

// NewColdStartManager creates a new cold start manager.
func NewColdStartManager() ColdStartManager {
return &coldStartManager{}
}

func (c *coldStartManager) HasIngestedInitialData() bool {
return c.hasIngestedInitialData
}

func (c *coldStartManager) MarkInitialDataIngested() {
c.hasIngestedInitialData = true
}
5 changes: 5 additions & 0 deletions ingest/indexer/domain/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package domain

import "errors"

var ErrColdStartManagerDidNotIngest = errors.New("cold start manager has not yet ingested initial data")
9 changes: 9 additions & 0 deletions ingest/indexer/domain/keepers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package domain

import (
bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper"
)

type Keepers struct {
BankKeeper bankkeeper.Keeper
}
32 changes: 32 additions & 0 deletions ingest/indexer/domain/mocks/token_supply_publisher_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package mocks

import (
"context"

indexerdomain "github.com/osmosis-labs/osmosis/v25/ingest/indexer/domain"
)

// TokenSupplyPublisherMock is a mock for TokenSupplyPublisher.
type TokenSupplyPublisherMock struct {
CalledWithTokenSupply indexerdomain.TokenSupply
ForceTokenSupplyError error

CalledWithTokenSupplyOffset indexerdomain.TokenSupplyOffset
ForceTokenSupplyOffsetError error
}

// PublishTokenSupply implements domain.PubSubClientI.
func (p *TokenSupplyPublisherMock) PublishTokenSupply(ctx context.Context, tokenSupply indexerdomain.TokenSupply) error {
p.CalledWithTokenSupply = tokenSupply
return p.ForceTokenSupplyError
}

// PublishTokenSupplyOffset implements domain.PubSubClientI.
func (p *TokenSupplyPublisherMock) PublishTokenSupplyOffset(ctx context.Context, tokenSupplyOffset indexerdomain.TokenSupplyOffset) error {
p.CalledWithTokenSupplyOffset = tokenSupplyOffset
return p.ForceTokenSupplyOffsetError
}

var (
_ indexerdomain.TokenSupplyPublisher = (*TokenSupplyPublisherMock)(nil)
)
5 changes: 5 additions & 0 deletions ingest/indexer/domain/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package domain

type Pool struct {
// TBD
}
20 changes: 20 additions & 0 deletions ingest/indexer/domain/publisher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package domain

import (
"context"
)

// TokenSupplyPublisher is an interface for publishing token supply data.
type TokenSupplyPublisher interface {
PublishTokenSupply(ctx context.Context, tokenSupply TokenSupply) error
PublishTokenSupplyOffset(ctx context.Context, tokenSupplyOffset TokenSupplyOffset) error
}

// Publisher is an interface for publishing various types of data.
type Publisher interface {
TokenSupplyPublisher

PublishBlock(ctx context.Context, block Block) error
PublishTransaction(ctx context.Context, txn Transaction) error
PublishPool(ctx context.Context, pool Pool) error
}
13 changes: 13 additions & 0 deletions ingest/indexer/domain/token_supply.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package domain

import "github.com/osmosis-labs/osmosis/osmomath"

type TokenSupply struct {
Denom string `json:"denom"`
Supply osmomath.Int `json:"supply"`
}

type TokenSupplyOffset struct {
Denom string `json:"denom"`
SupplyOffset osmomath.Int `json:"supply_offset"`
}
5 changes: 5 additions & 0 deletions ingest/indexer/domain/transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package domain

type Transaction struct {
// TBD
}
59 changes: 59 additions & 0 deletions ingest/indexer/indexer_config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package indexer

import (
servertypes "github.com/cosmos/cosmos-sdk/server/types"

"github.com/osmosis-labs/osmosis/osmoutils"
"github.com/osmosis-labs/osmosis/v25/ingest/indexer/domain"
service "github.com/osmosis-labs/osmosis/v25/ingest/indexer/service/client"
)

// Config defines the config for the indexer.
type Config struct {
IsEnabled bool `mapstructure:"enabled"`
GCPProjectId string `mapstructure:"gcp-project-id"`
BlockTopicId string `mapstructure:"block-topic-id"`
TransactionTopicId string `mapstructure:"transaction-topic-id"`
PoolTopicId string `mapstructure:"pool-topic-id"`
TokenSupplyTopicId string `mapstructure:"token-supply-topic-id"`
TokenSupplyOffsetTopicId string `mapstructure:"token-supply-offset-topic-id"`
}

// groupOptName is the name of the indexer options group.
const (
groupOptName = "osmosis-indexer"
)

// NewConfigFromOptions returns a new indexer config from the given options.
func NewConfigFromOptions(opts servertypes.AppOptions) Config {
isEnabled := osmoutils.ParseBool(opts, groupOptName, "is-enabled", false)

if !isEnabled {
return Config{
IsEnabled: false,
}
}

gcpProjectId := osmoutils.ParseString(opts, groupOptName, "gcp-project-id")
blockTopicId := osmoutils.ParseString(opts, groupOptName, "block-topic-id")
transactionTopicId := osmoutils.ParseString(opts, groupOptName, "transaction-topic-id")
poolTopicId := osmoutils.ParseString(opts, groupOptName, "pool-topic-id")
tokenSupplyTopicId := osmoutils.ParseString(opts, groupOptName, "token-supply-topic-id")
tokenSupplyOffsetTopicId := osmoutils.ParseString(opts, groupOptName, "token-supply-offset-topic-id")

return Config{
IsEnabled: isEnabled,
GCPProjectId: gcpProjectId,
BlockTopicId: blockTopicId,
TransactionTopicId: transactionTopicId,
PoolTopicId: poolTopicId,
TokenSupplyTopicId: tokenSupplyTopicId,
TokenSupplyOffsetTopicId: tokenSupplyOffsetTopicId,
}
}

// Initialize initializes the indexer by creating a new PubSubClient and returning a new IndexerIngester.
func (c Config) Initialize() domain.Publisher {
pubSubClient := service.NewPubSubCLient(c.GCPProjectId, c.BlockTopicId, c.TransactionTopicId, c.PoolTopicId, c.TokenSupplyTopicId, c.TokenSupplyOffsetTopicId)
return NewIndexerPublisher(*pubSubClient)
}
Loading

0 comments on commit 32a78ad

Please sign in to comment.