Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature flag to disable asset stats #668

Merged
merged 1 commit into from
Sep 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions services/horizon/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ var dbBackfillCmd = &cobra.Command{
initConfig()
hlog.DefaultLogger.Logger.Level = config.LogLevel

i := ingestSystem()
i := ingestSystem(ingest.Config{})
i.SkipCursorUpdate = true
parsed, err := strconv.ParseUint(args[0], 10, 32)
if err != nil {
Expand All @@ -49,7 +49,7 @@ var dbClearCmd = &cobra.Command{
initConfig()
hlog.DefaultLogger.Logger.Level = config.LogLevel

i := ingestSystem()
i := ingestSystem(ingest.Config{})
err := i.ClearAll()
if err != nil {
hlog.Error(err)
Expand Down Expand Up @@ -138,7 +138,7 @@ var dbRebaseCmd = &cobra.Command{
initConfig()
hlog.DefaultLogger.Logger.Level = config.LogLevel

i := ingestSystem()
i := ingestSystem(ingest.Config{})
i.SkipCursorUpdate = true

err := i.RebaseHistory()
Expand All @@ -156,7 +156,7 @@ var dbReingestCmd = &cobra.Command{
initConfig()
hlog.DefaultLogger.Logger.Level = config.LogLevel

i := ingestSystem()
i := ingestSystem(ingest.Config{})
i.SkipCursorUpdate = true
logStatus := func(stage string) {
count := i.Metrics.IngestLedgerTimer.Count()
Expand Down Expand Up @@ -207,7 +207,7 @@ func init() {
dbCmd.AddCommand(dbRebaseCmd)
}

func ingestSystem() *ingest.System {
func ingestSystem(ingestConfig ingest.Config) *ingest.System {
hdb, err := db.Open("postgres", config.DatabaseURL)
if err != nil {
log.Fatal(err)
Expand All @@ -223,7 +223,7 @@ func ingestSystem() *ingest.System {
log.Fatal("network-passphrase is blank: reingestion requires manually setting passphrase")
}

i := ingest.New(passphrase, config.StellarCoreURL, cdb, hdb)
i := ingest.New(passphrase, config.StellarCoreURL, cdb, hdb, ingestConfig)
return i
}

Expand Down
20 changes: 19 additions & 1 deletion services/horizon/internal/actions_assets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ package horizon
import (
"testing"

"github.com/stellar/go/protocols/horizon/base"
"github.com/stellar/go/protocols/horizon"
"github.com/stellar/go/protocols/horizon/base"
"github.com/stellar/go/services/horizon/internal/test"
"github.com/stellar/go/support/render/hal"
)

Expand Down Expand Up @@ -271,3 +272,20 @@ func TestInvalidAssetIssuer(t *testing.T) {
w = ht.Get("/assets?asset_issuer=invalid")
ht.Assert.Equal(400, w.Code)
}

func TestAssetStatsDisabled(t *testing.T) {
ht := StartHTTPTest(t, "ingest_asset_stats")
defer ht.Finish()

// Ugly but saves us time needed to change each `StartHTTPTest` occurence.
appConfig := NewTestConfig()
appConfig.DisableAssetStats = true

var err error
ht.App, err = NewApp(appConfig)
ht.Assert.Nil(err)
ht.RH = test.NewRequestHelper(ht.App.web.router)

w := ht.Get("/assets?asset_issuer=GC23QF2HUE52AMXUFUH3AYJAXXGXXV2VHXYYR6EYXETPKDXZSAW67XO4")
ht.Assert.Equal(404, w.Code)
}
7 changes: 5 additions & 2 deletions services/horizon/internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,16 @@ type Config struct {
// determining a "retention duration", each ledger roughly corresponds to 10
// seconds of real time.
HistoryRetentionCount uint

// StaleThreshold represents the number of ledgers a history database may be
// out-of-date by before horizon begins to respond with an error to history
// requests.
StaleThreshold uint

// SkipCursorUpdate causes the ingestor to skip reporting the "last imported
// ledger" state to stellar-core.
SkipCursorUpdate bool
// DisableAssetStats is a feature flag that determines whether to calculate
// asset stats during the ingestion and expose `/assets` endpoint.
// Disabling it will save CPU when ingesting ledgers full of many different
// assets related operations.
DisableAssetStats bool
}
43 changes: 40 additions & 3 deletions services/horizon/internal/ingest/ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestAssetIngest(t *testing.T) {

tt := test.Start(t).ScenarioWithoutHorizon("kahuna")
defer tt.Finish()
s := ingest(tt)
s := ingest(tt, false)
tt.Require.NoError(s.Err)
q := history.Q{Session: s.Ingestion.DB}

Expand All @@ -75,7 +75,7 @@ func TestAssetIngest(t *testing.T) {
func TestAssetStatsIngest(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("ingest_asset_stats")
defer tt.Finish()
s := ingest(tt)
s := ingest(tt, false)
tt.Require.NoError(s.Err)
q := history.Q{Session: s.Ingestion.DB}

Expand Down Expand Up @@ -139,12 +139,49 @@ func TestAssetStatsIngest(t *testing.T) {
}, assetStats[2])
}

func TestAssetStatsDisabledIngest(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("ingest_asset_stats")
defer tt.Finish()
s := ingest(tt, true)
tt.Require.NoError(s.Err)
q := history.Q{Session: s.Ingestion.DB}

type AssetStatResult struct {
Type string `db:"asset_type"`
Code string `db:"asset_code"`
Issuer string `db:"asset_issuer"`
Amount int64 `db:"amount"`
NumAccounts int32 `db:"num_accounts"`
Flags int8 `db:"flags"`
Toml string `db:"toml"`
}
assetStats := []AssetStatResult{}
err := q.Select(
&assetStats,
sq.
Select(
"hist.asset_type",
"hist.asset_code",
"hist.asset_issuer",
"stats.amount",
"stats.num_accounts",
"stats.flags",
"stats.toml",
).
From("history_assets hist").
Join("asset_stats stats ON hist.id = stats.id").
OrderBy("hist.asset_code ASC", "hist.asset_issuer ASC"),
)
tt.Require.NoError(err)
tt.Assert.Equal(0, len(assetStats))
}

func TestTradeIngestTimestamp(t *testing.T) {
//ingest trade scenario and verify that the trade timestamp
//matches the appropriate ledger's timestamp
tt := test.Start(t).ScenarioWithoutHorizon("trades")
defer tt.Finish()
s := ingest(tt)
s := ingest(tt, false)
q := history.Q{Session: s.Ingestion.DB}

var ledgers []history.Ledger
Expand Down
28 changes: 15 additions & 13 deletions services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ type Cursor struct {
data *LedgerBundle
}

// Config allows passing some configuration values to System and Session.
type Config struct {
// DisableAssetStats is a feature flag that determines whether to calculate
// asset stats in this ingestion system.
DisableAssetStats bool
}

// EffectIngestion is a helper struct to smooth the ingestion of effects. this
// struct will track what the correct operation to use and order to use when
// adding effects into an ingestion.
Expand All @@ -88,27 +95,23 @@ type LedgerBundle struct {

// System represents the data ingestion subsystem of horizon.
type System struct {
// Config allows passing some configuration values to System.
Config Config
// HorizonDB is the connection to the horizon database that ingested data will
// be written to.
HorizonDB *db.Session

// CoreDB is the stellar-core db that data is ingested from.
CoreDB *db.Session

CoreDB *db.Session
Metrics IngesterMetrics

// Network is the passphrase for the network being imported
Network string

// StellarCoreURL is the http endpoint of the stellar-core that data is being
// ingested from.
StellarCoreURL string

// SkipCursorUpdate causes the ingestor to skip
// reporting the "last imported ledger" cursor to
// stellar-core
SkipCursorUpdate bool

// HistoryRetentionCount is the desired minimum number of ledgers to
// keep in the history database, working backwards from the latest core
// ledger. 0 represents "all ledgers".
Expand Down Expand Up @@ -150,24 +153,22 @@ type Ingestion struct {
// Session represents a single attempt at ingesting data into the history
// database.
type Session struct {
// Config allows passing some configuration values to System.
Config Config
Cursor *Cursor
Ingestion *Ingestion
// Network is the passphrase for the network being imported
Network string

// StellarCoreURL is the http endpoint of the stellar-core that data is being
// ingested from.
StellarCoreURL string

// ClearExisting causes the session to clear existing data from the horizon db
// when the session is run.
ClearExisting bool

// SkipCursorUpdate causes the session to skip
// reporting the "last imported ledger" cursor to
// stellar-core
SkipCursorUpdate bool

// Metrics is a reference to where the session should record its metric information
Metrics *IngesterMetrics

Expand All @@ -177,16 +178,16 @@ type Session struct {

// Err is the error that caused this session to fail, if any.
Err error

// Ingested is the number of ledgers that were successfully ingested during
// this session.
Ingested int
}

// New initializes the ingester, causing it to begin polling the stellar-core
// database for now ledgers and ingesting data into the horizon database.
func New(network string, coreURL string, core, horizon *db.Session) *System {
func New(network string, coreURL string, core, horizon *db.Session, config Config) *System {
i := &System{
Config: config,
Network: network,
StellarCoreURL: coreURL,
HorizonDB: horizon,
Expand Down Expand Up @@ -215,6 +216,7 @@ func NewSession(i *System) *Session {
hdb := i.HorizonDB.Clone()

return &Session{
Config: i.Config,
Ingestion: &Ingestion{
DB: hdb,
},
Expand Down
13 changes: 7 additions & 6 deletions services/horizon/internal/ingest/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func TestIngest_Kahuna1(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("kahuna")
defer tt.Finish()

s := ingest(tt)
s := ingest(tt, false)

tt.Require.NoError(s.Err)
tt.Assert.Equal(62, s.Ingested)
Expand All @@ -34,7 +34,7 @@ func TestIngest_Kahuna2(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("kahuna-2")
defer tt.Finish()

s := ingest(tt)
s := ingest(tt, false)

tt.Require.NoError(s.Err)
tt.Assert.Equal(6, s.Ingested)
Expand All @@ -52,7 +52,7 @@ func TestIngest_Kahuna2(t *testing.T) {
func TestTick(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("base")
defer tt.Finish()
sys := sys(tt)
sys := sys(tt, false)

// ingest by tick
s := sys.Tick()
Expand All @@ -65,20 +65,21 @@ func TestTick(t *testing.T) {
tt.Require.NoError(s.Err)
}

func ingest(tt *test.T) *Session {
sys := sys(tt)
func ingest(tt *test.T, disableAssetStats bool) *Session {
sys := sys(tt, disableAssetStats)
s := NewSession(sys)
s.Cursor = NewCursor(1, ledger.CurrentState().CoreLatest, sys)
s.Run()

return s
}

func sys(tt *test.T) *System {
func sys(tt *test.T, disableAssetStats bool) *System {
return New(
network.TestNetworkPassphrase,
"",
tt.CoreSession(),
tt.HorizonSession(),
Config{DisableAssetStats: disableAssetStats},
)
}
5 changes: 4 additions & 1 deletion services/horizon/internal/ingest/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ func (is *Session) Run() {
break
}
}
is.Cursor.AssetsModified.UpdateAssetStats(is)

if !is.Config.DisableAssetStats {
is.Cursor.AssetsModified.UpdateAssetStats(is)
}

if is.Err != nil {
is.Ingestion.Rollback()
Expand Down
8 changes: 4 additions & 4 deletions services/horizon/internal/ingest/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func Test_ingestSignerEffects(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("set_options")
defer tt.Finish()

s := ingest(tt)
s := ingest(tt, false)
tt.Require.NoError(s.Err)

q := &history.Q{Session: tt.HorizonSession()}
Expand All @@ -33,7 +33,7 @@ func Test_ingestOperationEffects(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("set_options")
defer tt.Finish()

s := ingest(tt)
s := ingest(tt, false)
tt.Require.NoError(s.Err)

q := &history.Q{Session: tt.HorizonSession()}
Expand All @@ -49,7 +49,7 @@ func Test_ingestOperationEffects(t *testing.T) {

// HACK(scott): switch to kahuna recipe mid-stream. We need to integrate our test scenario loader to be compatible with go subtests/
tt.ScenarioWithoutHorizon("kahuna")
s = ingest(tt)
s = ingest(tt, false)
tt.Require.NoError(s.Err)
pq, err := db2.NewPageQuery("", "asc", 200)
tt.Require.NoError(err)
Expand Down Expand Up @@ -80,7 +80,7 @@ func Test_ingestBumpSeq(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("kahuna")
defer tt.Finish()

s := ingest(tt)
s := ingest(tt, false)
tt.Require.NoError(s.Err)

q := &history.Q{Session: tt.HorizonSession()}
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/ingest/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
func TestBackfill(t *testing.T) {
tt := test.Start(t).ScenarioWithoutHorizon("kahuna")
defer tt.Finish()
is := sys(tt)
is := sys(tt, false)

err := is.ReingestSingle(10)
tt.Require.NoError(err)
Expand All @@ -34,7 +34,7 @@ func TestBackfill(t *testing.T) {
func TestClearAll(t *testing.T) {
tt := test.Start(t).Scenario("kahuna")
defer tt.Finish()
is := sys(tt)
is := sys(tt, false)

err := is.ClearAll()

Expand All @@ -51,7 +51,7 @@ func TestValidation(t *testing.T) {
tt := test.Start(t).Scenario("kahuna")
defer tt.Finish()

sys := New(network.TestNetworkPassphrase, "", tt.CoreSession(), tt.HorizonSession())
sys := New(network.TestNetworkPassphrase, "", tt.CoreSession(), tt.HorizonSession(), Config{})

// intact chain
for i := int32(2); i <= 57; i++ {
Expand Down
Loading