Skip to content

Commit

Permalink
Feature flag to disable asset stats (#668)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartekn authored Sep 19, 2018
1 parent bd60abd commit 9eca875
Show file tree
Hide file tree
Showing 12 changed files with 113 additions and 41 deletions.
12 changes: 6 additions & 6 deletions services/horizon/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var dbBackfillCmd = &cobra.Command{
initConfig()
hlog.DefaultLogger.Logger.Level = config.LogLevel

i := ingestSystem()
i := ingestSystem(ingest.Config{})
i.SkipCursorUpdate = true
parsed, err := strconv.ParseUint(args[0], 10, 32)
if err != nil {
Expand All @@ -49,7 +49,7 @@ var dbClearCmd = &cobra.Command{
initConfig()
hlog.DefaultLogger.Logger.Level = config.LogLevel

i := ingestSystem()
i := ingestSystem(ingest.Config{})
err := i.ClearAll()
if err != nil {
hlog.Error(err)
Expand Down Expand Up @@ -138,7 +138,7 @@ var dbRebaseCmd = &cobra.Command{
initConfig()
hlog.DefaultLogger.Logger.Level = config.LogLevel

i := ingestSystem()
i := ingestSystem(ingest.Config{})
i.SkipCursorUpdate = true

err := i.RebaseHistory()
Expand All @@ -156,7 +156,7 @@ var dbReingestCmd = &cobra.Command{
initConfig()
hlog.DefaultLogger.Logger.Level = config.LogLevel

i := ingestSystem()
i := ingestSystem(ingest.Config{})
i.SkipCursorUpdate = true
logStatus := func(stage string) {
count := i.Metrics.IngestLedgerTimer.Count()
Expand Down Expand Up @@ -207,7 +207,7 @@ func init() {
dbCmd.AddCommand(dbRebaseCmd)
}

func ingestSystem() *ingest.System {
func ingestSystem(ingestConfig ingest.Config) *ingest.System {
hdb, err := db.Open("postgres", config.DatabaseURL)
if err != nil {
log.Fatal(err)
Expand All @@ -223,7 +223,7 @@ func ingestSystem() *ingest.System {
log.Fatal("network-passphrase is blank: reingestion requires manually setting passphrase")
}

i := ingest.New(passphrase, config.StellarCoreURL, cdb, hdb)
i := ingest.New(passphrase, config.StellarCoreURL, cdb, hdb, ingestConfig)
return i
}

Expand Down
20 changes: 19 additions & 1 deletion services/horizon/internal/actions_assets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package horizon
import (
"testing"

"github.com/stellar/go/protocols/horizon/base"
"github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/protocols/horizon/base"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/support/render/hal"
)

Expand Down Expand Up @@ -271,3 +272,20 @@ func TestInvalidAssetIssuer(t *testing.T) {
w = ht.Get("/assets?asset_issuer=invalid")
ht.Assert.Equal(400, w.Code)
}

func TestAssetStatsDisabled(t *testing.T) {
ht := StartHTTPTest(t, "ingest_asset_stats")
defer ht.Finish()

// Ugly but saves us time needed to change each `StartHTTPTest` occurence.
appConfig := NewTestConfig()
appConfig.DisableAssetStats = true

var err error
ht.App, err = NewApp(appConfig)
ht.Assert.Nil(err)
ht.RH = test.NewRequestHelper(ht.App.web.router)

w := ht.Get("/assets?asset_issuer=GC23QF2HUE52AMXUFUH3AYJAXXGXXV2VHXYYR6EYXETPKDXZSAW67XO4")
ht.Assert.Equal(404, w.Code)
}
7 changes: 5 additions & 2 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ type Config struct {
// determining a "retention duration", each ledger roughly corresponds to 10
// seconds of real time.
HistoryRetentionCount uint

// StaleThreshold represents the number of ledgers a history database may be
// out-of-date by before horizon begins to respond with an error to history
// requests.
StaleThreshold uint

// SkipCursorUpdate causes the ingestor to skip reporting the "last imported
// ledger" state to stellar-core.
SkipCursorUpdate bool
// DisableAssetStats is a feature flag that determines whether to calculate
// asset stats during the ingestion and expose `/assets` endpoint.
// Disabling it will save CPU when ingesting ledgers full of many different
// assets related operations.
DisableAssetStats bool
}
43 changes: 40 additions & 3 deletions services/horizon/internal/ingest/ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestAssetIngest(t *testing.T) {

tt := test.Start(t).ScenarioWithoutHorizon("kahuna")
defer tt.Finish()
s := ingest(tt)
s := ingest(tt, false)
tt.Require.NoError(s.Err)
q := history.Q{Session: s.Ingestion.DB}

Expand All @@ -75,7 +75,7 @@ func TestAssetIngest(t *testing.T) {
func TestAssetStatsIngest(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("ingest_asset_stats")
defer tt.Finish()
s := ingest(tt)
s := ingest(tt, false)
tt.Require.NoError(s.Err)
q := history.Q{Session: s.Ingestion.DB}

Expand Down Expand Up @@ -139,12 +139,49 @@ func TestAssetStatsIngest(t *testing.T) {
}, assetStats[2])
}

func TestAssetStatsDisabledIngest(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("ingest_asset_stats")
defer tt.Finish()
s := ingest(tt, true)
tt.Require.NoError(s.Err)
q := history.Q{Session: s.Ingestion.DB}

type AssetStatResult struct {
Type string `db:"asset_type"`
Code string `db:"asset_code"`
Issuer string `db:"asset_issuer"`
Amount int64 `db:"amount"`
NumAccounts int32 `db:"num_accounts"`
Flags int8 `db:"flags"`
Toml string `db:"toml"`
}
assetStats := []AssetStatResult{}
err := q.Select(
&assetStats,
sq.
Select(
"hist.asset_type",
"hist.asset_code",
"hist.asset_issuer",
"stats.amount",
"stats.num_accounts",
"stats.flags",
"stats.toml",
).
From("history_assets hist").
Join("asset_stats stats ON hist.id = stats.id").
OrderBy("hist.asset_code ASC", "hist.asset_issuer ASC"),
)
tt.Require.NoError(err)
tt.Assert.Equal(0, len(assetStats))
}

func TestTradeIngestTimestamp(t *testing.T) {
//ingest trade scenario and verify that the trade timestamp
//matches the appropriate ledger's timestamp
tt := test.Start(t).ScenarioWithoutHorizon("trades")
defer tt.Finish()
s := ingest(tt)
s := ingest(tt, false)
q := history.Q{Session: s.Ingestion.DB}

var ledgers []history.Ledger
Expand Down
28 changes: 15 additions & 13 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ type Cursor struct {
data *LedgerBundle
}

// Config allows passing some configuration values to System and Session.
type Config struct {
// DisableAssetStats is a feature flag that determines whether to calculate
// asset stats in this ingestion system.
DisableAssetStats bool
}

// EffectIngestion is a helper struct to smooth the ingestion of effects. this
// struct will track what the correct operation to use and order to use when
// adding effects into an ingestion.
Expand All @@ -88,27 +95,23 @@ type LedgerBundle struct {

// System represents the data ingestion subsystem of horizon.
type System struct {
// Config allows passing some configuration values to System.
Config Config
// HorizonDB is the connection to the horizon database that ingested data will
// be written to.
HorizonDB *db.Session

// CoreDB is the stellar-core db that data is ingested from.
CoreDB *db.Session

CoreDB *db.Session
Metrics IngesterMetrics

// Network is the passphrase for the network being imported
Network string

// StellarCoreURL is the http endpoint of the stellar-core that data is being
// ingested from.
StellarCoreURL string

// SkipCursorUpdate causes the ingestor to skip
// reporting the "last imported ledger" cursor to
// stellar-core
SkipCursorUpdate bool

// HistoryRetentionCount is the desired minimum number of ledgers to
// keep in the history database, working backwards from the latest core
// ledger. 0 represents "all ledgers".
Expand Down Expand Up @@ -150,24 +153,22 @@ type Ingestion struct {
// Session represents a single attempt at ingesting data into the history
// database.
type Session struct {
// Config allows passing some configuration values to System.
Config Config
Cursor *Cursor
Ingestion *Ingestion
// Network is the passphrase for the network being imported
Network string

// StellarCoreURL is the http endpoint of the stellar-core that data is being
// ingested from.
StellarCoreURL string

// ClearExisting causes the session to clear existing data from the horizon db
// when the session is run.
ClearExisting bool

// SkipCursorUpdate causes the session to skip
// reporting the "last imported ledger" cursor to
// stellar-core
SkipCursorUpdate bool

// Metrics is a reference to where the session should record its metric information
Metrics *IngesterMetrics

Expand All @@ -177,16 +178,16 @@ type Session struct {

// Err is the error that caused this session to fail, if any.
Err error

// Ingested is the number of ledgers that were successfully ingested during
// this session.
Ingested int
}

// New initializes the ingester, causing it to begin polling the stellar-core
// database for now ledgers and ingesting data into the horizon database.
func New(network string, coreURL string, core, horizon *db.Session) *System {
func New(network string, coreURL string, core, horizon *db.Session, config Config) *System {
i := &System{
Config: config,
Network: network,
StellarCoreURL: coreURL,
HorizonDB: horizon,
Expand Down Expand Up @@ -215,6 +216,7 @@ func NewSession(i *System) *Session {
hdb := i.HorizonDB.Clone()

return &Session{
Config: i.Config,
Ingestion: &Ingestion{
DB: hdb,
},
Expand Down
13 changes: 7 additions & 6 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestIngest_Kahuna1(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("kahuna")
defer tt.Finish()

s := ingest(tt)
s := ingest(tt, false)

tt.Require.NoError(s.Err)
tt.Assert.Equal(62, s.Ingested)
Expand All @@ -34,7 +34,7 @@ func TestIngest_Kahuna2(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("kahuna-2")
defer tt.Finish()

s := ingest(tt)
s := ingest(tt, false)

tt.Require.NoError(s.Err)
tt.Assert.Equal(6, s.Ingested)
Expand All @@ -52,7 +52,7 @@ func TestIngest_Kahuna2(t *testing.T) {
func TestTick(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("base")
defer tt.Finish()
sys := sys(tt)
sys := sys(tt, false)

// ingest by tick
s := sys.Tick()
Expand All @@ -65,20 +65,21 @@ func TestTick(t *testing.T) {
tt.Require.NoError(s.Err)
}

func ingest(tt *test.T) *Session {
sys := sys(tt)
func ingest(tt *test.T, disableAssetStats bool) *Session {
sys := sys(tt, disableAssetStats)
s := NewSession(sys)
s.Cursor = NewCursor(1, ledger.CurrentState().CoreLatest, sys)
s.Run()

return s
}

func sys(tt *test.T) *System {
func sys(tt *test.T, disableAssetStats bool) *System {
return New(
network.TestNetworkPassphrase,
"",
tt.CoreSession(),
tt.HorizonSession(),
Config{DisableAssetStats: disableAssetStats},
)
}
5 changes: 4 additions & 1 deletion services/horizon/internal/ingest/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ func (is *Session) Run() {
break
}
}
is.Cursor.AssetsModified.UpdateAssetStats(is)

if !is.Config.DisableAssetStats {
is.Cursor.AssetsModified.UpdateAssetStats(is)
}

if is.Err != nil {
is.Ingestion.Rollback()
Expand Down
8 changes: 4 additions & 4 deletions services/horizon/internal/ingest/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func Test_ingestSignerEffects(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("set_options")
defer tt.Finish()

s := ingest(tt)
s := ingest(tt, false)
tt.Require.NoError(s.Err)

q := &history.Q{Session: tt.HorizonSession()}
Expand All @@ -33,7 +33,7 @@ func Test_ingestOperationEffects(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("set_options")
defer tt.Finish()

s := ingest(tt)
s := ingest(tt, false)
tt.Require.NoError(s.Err)

q := &history.Q{Session: tt.HorizonSession()}
Expand All @@ -49,7 +49,7 @@ func Test_ingestOperationEffects(t *testing.T) {

// HACK(scott): switch to kahuna recipe mid-stream. We need to integrate our test scenario loader to be compatible with go subtests/
tt.ScenarioWithoutHorizon("kahuna")
s = ingest(tt)
s = ingest(tt, false)
tt.Require.NoError(s.Err)
pq, err := db2.NewPageQuery("", "asc", 200)
tt.Require.NoError(err)
Expand Down Expand Up @@ -80,7 +80,7 @@ func Test_ingestBumpSeq(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("kahuna")
defer tt.Finish()

s := ingest(tt)
s := ingest(tt, false)
tt.Require.NoError(s.Err)

q := &history.Q{Session: tt.HorizonSession()}
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/ingest/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestBackfill(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("kahuna")
defer tt.Finish()
is := sys(tt)
is := sys(tt, false)

err := is.ReingestSingle(10)
tt.Require.NoError(err)
Expand All @@ -34,7 +34,7 @@ func TestBackfill(t *testing.T) {
func TestClearAll(t *testing.T) {
tt := test.Start(t).Scenario("kahuna")
defer tt.Finish()
is := sys(tt)
is := sys(tt, false)

err := is.ClearAll()

Expand All @@ -51,7 +51,7 @@ func TestValidation(t *testing.T) {
tt := test.Start(t).Scenario("kahuna")
defer tt.Finish()

sys := New(network.TestNetworkPassphrase, "", tt.CoreSession(), tt.HorizonSession())
sys := New(network.TestNetworkPassphrase, "", tt.CoreSession(), tt.HorizonSession(), Config{})

// intact chain
for i := int32(2); i <= 57; i++ {
Expand Down
Loading

0 comments on commit 9eca875

Please sign in to comment.