Skip to content

Commit

Permalink
chore: add db.sql stat collector (#5146)
Browse files Browse the repository at this point in the history
  • Loading branch information
lvrach authored Oct 1, 2024
1 parent ef57540 commit 61e947c
Show file tree
Hide file tree
Showing 36 changed files with 315 additions and 195 deletions.
32 changes: 21 additions & 11 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ func (a *embeddedApp) Setup() error {

func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) error {
config := config.Default
statsFactory := stats.Default

if !a.setupDone {
return fmt.Errorf("embedded rudder core cannot start, database is not setup")
}
Expand Down Expand Up @@ -126,7 +128,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)

fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)

rsourcesService, err := NewRsourcesService(deploymentType, true)
rsourcesService, err := NewRsourcesService(deploymentType, true, statsFactory)
if err != nil {
return err
}
Expand All @@ -143,6 +145,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
gatewayDB := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStats(statsFactory),
)
if err = gatewayDB.Start(); err != nil {
return fmt.Errorf("could not start gateway: %w", err)
Expand All @@ -156,20 +159,23 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
"rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.routerDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
"batch_rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.batchRouterDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
)
defer batchRouterDB.Close()

Expand All @@ -179,12 +185,14 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
)
defer errDBForRead.Close()
errDBForWrite := jobsdb.NewForWrite(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
)
if err = errDBForWrite.Start(); err != nil {
return fmt.Errorf("could not start errDBForWrite: %w", err)
Expand All @@ -196,6 +204,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
)
defer schemaDB.Close()

Expand All @@ -204,6 +213,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithJobMaxAge(
func() time.Duration {
return config.GetDuration("archival.jobRetention", 24, time.Hour)
Expand All @@ -219,9 +229,9 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
return err
}
defer client.Close()
schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, schemaDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config, stats.Default)
schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, schemaDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config, statsFactory)
} else {
schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, schemaDB, logger.NewLogger().Child("jobs_forwarder"), config, stats.Default)
schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, schemaDB, logger.NewLogger().Child("jobs_forwarder"), config, statsFactory)
}

modeProvider, err := resolveModeProvider(a.log, deploymentType)
Expand All @@ -231,7 +241,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)

adaptiveLimit := payload.SetupAdaptiveLimiter(ctx, g)

enrichers, err := setupPipelineEnrichers(config, a.log, stats.Default)
enrichers, err := setupPipelineEnrichers(config, a.log, statsFactory)
if err != nil {
return fmt.Errorf("setting up pipeline enrichers: %w", err)
}
Expand Down Expand Up @@ -263,7 +273,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
trackedUsersReporter,
processor.WithAdaptiveLimit(adaptiveLimit),
)
throttlerFactory, err := rtThrottler.NewFactory(config, stats.Default)
throttlerFactory, err := rtThrottler.NewFactory(config, statsFactory)
if err != nil {
return fmt.Errorf("failed to create rt throttler factory: %w", err)
}
Expand Down Expand Up @@ -307,16 +317,16 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
archivalDB,
fileUploaderProvider,
config,
stats.Default,
statsFactory,
archiver.WithAdaptiveLimit(adaptiveLimit),
),
}

rateLimiter, err := gwThrottler.New(stats.Default)
rateLimiter, err := gwThrottler.New(statsFactory)
if err != nil {
return fmt.Errorf("failed to create gw rate limiter: %w", err)
}
drainConfigManager, err := drain_config.NewDrainConfigManager(config, a.log.Child("drain-config"))
drainConfigManager, err := drain_config.NewDrainConfigManager(config, a.log.Child("drain-config"), statsFactory)
if err != nil {
return fmt.Errorf("drain config manager setup: %v", err)
}
Expand All @@ -329,7 +339,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
}))
streamMsgValidator := stream.NewMessageValidator()
gw := gateway.Handle{}
err = gw.Setup(ctx, config, logger.NewLogger().Child("gateway"), stats.Default, a.app, backendconfig.DefaultBackendConfig,
err = gw.Setup(ctx, config, logger.NewLogger().Child("gateway"), statsFactory, a.app, backendconfig.DefaultBackendConfig,
gatewayDB, errDBForWrite, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
streamMsgValidator, gateway.WithInternalHttpHandlers(
map[string]http.Handler{
Expand Down Expand Up @@ -378,8 +388,8 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
})

g.Go(func() error {
replicationLagStat := stats.Default.NewStat("rsources_log_replication_lag", stats.GaugeType)
replicationSlotStat := stats.Default.NewStat("rsources_log_replication_slot", stats.GaugeType)
replicationLagStat := statsFactory.NewStat("rsources_log_replication_lag", stats.GaugeType)
replicationSlotStat := statsFactory.NewStat("rsources_log_replication_slot", stats.GaugeType)
rsourcesService.Monitor(ctx, replicationLagStat, replicationSlotStat)
return nil
})
Expand Down
11 changes: 7 additions & 4 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (a *gatewayApp) Setup() error {

func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) error {
config := config.Default
statsFactory := stats.Default
if !a.setupDone {
return fmt.Errorf("gateway cannot start, database is not setup")
}
Expand All @@ -72,6 +73,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
)
defer gatewayDB.Close()

Expand All @@ -84,6 +86,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
)
defer errDB.Close()

Expand All @@ -108,11 +111,11 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
})

var gw gateway.Handle
rateLimiter, err := gwThrottler.New(stats.Default)
rateLimiter, err := gwThrottler.New(statsFactory)
if err != nil {
return fmt.Errorf("failed to create rate limiter: %w", err)
}
rsourcesService, err := NewRsourcesService(deploymentType, false)
rsourcesService, err := NewRsourcesService(deploymentType, false, statsFactory)
if err != nil {
return err
}
Expand All @@ -121,7 +124,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
TransformerURL: config.GetString("DEST_TRANSFORM_URL", "http://localhost:9090"),
FeaturesRetryMaxAttempts: 10,
})
drainConfigManager, err := drain_config.NewDrainConfigManager(config, a.log.Child("drain-config"))
drainConfigManager, err := drain_config.NewDrainConfigManager(config, a.log.Child("drain-config"), statsFactory)
if err != nil {
a.log.Errorw("drain config manager setup failed while starting gateway", "error", err)
}
Expand All @@ -132,7 +135,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
drainConfigHttpHandler = drainConfigManager.DrainConfigHttpHandler()
}
streamMsgValidator := stream.NewMessageValidator()
err = gw.Setup(ctx, config, logger.NewLogger().Child("gateway"), stats.Default, a.app, backendconfig.DefaultBackendConfig,
err = gw.Setup(ctx, config, logger.NewLogger().Child("gateway"), statsFactory, a.app, backendconfig.DefaultBackendConfig,
gatewayDB, errDB, rateLimiter, a.versionHandler, rsourcesService, transformerFeaturesService, sourceHandle,
streamMsgValidator, gateway.WithInternalHttpHandlers(
map[string]http.Handler{
Expand Down
26 changes: 17 additions & 9 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (a *processorApp) Setup() error {

func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options) error {
config := config.Default
statsFactory := stats.Default
if !a.setupDone {
return fmt.Errorf("processor service cannot start, database is not setup")
}
Expand Down Expand Up @@ -133,7 +134,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options

fileUploaderProvider := fileuploader.NewProvider(ctx, backendconfig.DefaultBackendConfig)

rsourcesService, err := NewRsourcesService(deploymentType, true)
rsourcesService, err := NewRsourcesService(deploymentType, true, statsFactory)
if err != nil {
return err
}
Expand All @@ -149,33 +150,38 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
"rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.routerDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
"batch_rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.batchRouterDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
)
defer batchRouterDB.Close()
errDBForRead := jobsdb.NewForRead(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
)
defer errDBForRead.Close()
errDBForWrite := jobsdb.NewForWrite(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
)
if err = errDBForWrite.Start(); err != nil {
return fmt.Errorf("could not start errDBForWrite: %w", err)
Expand All @@ -185,6 +191,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
"esch",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithStats(statsFactory),
)
defer schemaDB.Close()

Expand All @@ -193,6 +200,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithJobMaxAge(
func() time.Duration {
return config.GetDuration("archival.jobRetention", 24, time.Hour)
Expand All @@ -208,9 +216,9 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
return err
}
defer client.Close()
schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, schemaDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config, stats.Default)
schemaForwarder = schema_forwarder.NewForwarder(terminalErrFn, schemaDB, &client, backendconfig.DefaultBackendConfig, logger.NewLogger().Child("jobs_forwarder"), config, statsFactory)
} else {
schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, schemaDB, logger.NewLogger().Child("jobs_forwarder"), config, stats.Default)
schemaForwarder = schema_forwarder.NewAbortingForwarder(terminalErrFn, schemaDB, logger.NewLogger().Child("jobs_forwarder"), config, statsFactory)
}

modeProvider, err := resolveModeProvider(a.log, deploymentType)
Expand All @@ -220,7 +228,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options

adaptiveLimit := payload.SetupAdaptiveLimiter(ctx, g)

enrichers, err := setupPipelineEnrichers(config, a.log, stats.Default)
enrichers, err := setupPipelineEnrichers(config, a.log, statsFactory)
if err != nil {
return fmt.Errorf("setting up pipeline enrichers: %w", err)
}
Expand All @@ -231,7 +239,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
}
}()

drainConfigManager, err := drain_config.NewDrainConfigManager(config, a.log.Child("drain-config"))
drainConfigManager, err := drain_config.NewDrainConfigManager(config, a.log.Child("drain-config"), statsFactory)
if err != nil {
return fmt.Errorf("drain config manager setup: %v", err)
}
Expand Down Expand Up @@ -264,7 +272,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
trackedUsersReporter,
proc.WithAdaptiveLimit(adaptiveLimit),
)
throttlerFactory, err := throttler.NewFactory(config, stats.Default)
throttlerFactory, err := throttler.NewFactory(config, statsFactory)
if err != nil {
return fmt.Errorf("failed to create throttler factory: %w", err)
}
Expand Down Expand Up @@ -309,7 +317,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
archivalDB,
fileUploaderProvider,
config,
stats.Default,
statsFactory,
archiver.WithAdaptiveLimit(adaptiveLimit),
),
}
Expand All @@ -330,8 +338,8 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
})

g.Go(func() error {
replicationLagStat := stats.Default.NewStat("rsources_log_replication_lag", stats.GaugeType)
replicationSlotStat := stats.Default.NewStat("rsources_log_replication_slot", stats.GaugeType)
replicationLagStat := statsFactory.NewStat("rsources_log_replication_lag", stats.GaugeType)
replicationSlotStat := statsFactory.NewStat("rsources_log_replication_slot", stats.GaugeType)
rsourcesService.Monitor(ctx, replicationLagStat, replicationSlotStat)
return nil
})
Expand Down
4 changes: 2 additions & 2 deletions app/apphandlers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func rudderCoreNodeSetup() error {
}

// NewRsourcesService produces a rsources.JobService through environment configuration (env variables & config file)
func NewRsourcesService(deploymentType deployment.Type, shouldSetupSharedDB bool) (rsources.JobService, error) {
func NewRsourcesService(deploymentType deployment.Type, shouldSetupSharedDB bool, stats stats.Stats) (rsources.JobService, error) {
var rsourcesConfig rsources.JobServiceConfig
rsourcesConfig.MaxPoolSize = config.GetInt("Rsources.MaxPoolSize", 3)
rsourcesConfig.MinPoolSize = config.GetInt("Rsources.MinPoolSize", 1)
Expand All @@ -80,7 +80,7 @@ func NewRsourcesService(deploymentType deployment.Type, shouldSetupSharedDB bool

rsourcesConfig.ShouldSetupSharedDB = shouldSetupSharedDB

return rsources.NewJobService(rsourcesConfig)
return rsources.NewJobService(rsourcesConfig, stats)
}

func resolveModeProvider(log logger.Logger, deploymentType deployment.Type) (cluster.ChangeEventProvider, error) {
Expand Down
16 changes: 8 additions & 8 deletions app/cluster/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,22 +103,22 @@ func TestDynamicClusterManager(t *testing.T) {
mockTransformer := mocksTransformer.NewMockTransformer(mockCtrl)
mockRsourcesService := rsources.NewMockJobService(mockCtrl)

gwDB := jobsdb.NewForReadWrite("gw")
gwDB := jobsdb.NewForReadWrite("gw", jobsdb.WithStats(stats.NOP))
defer gwDB.TearDown()
eschDB := jobsdb.NewForReadWrite("esch")
eschDB := jobsdb.NewForReadWrite("esch", jobsdb.WithStats(stats.NOP))
defer eschDB.TearDown()
archiveDB := jobsdb.NewForReadWrite("archive")
archiveDB := jobsdb.NewForReadWrite("archive", jobsdb.WithStats(stats.NOP))
defer archiveDB.TearDown()
rtDB := jobsdb.NewForReadWrite("rt")
rtDB := jobsdb.NewForReadWrite("rt", jobsdb.WithStats(stats.NOP))
defer rtDB.TearDown()
brtDB := jobsdb.NewForReadWrite("batch_rt")
brtDB := jobsdb.NewForReadWrite("batch_rt", jobsdb.WithStats(stats.NOP))
defer brtDB.TearDown()

archDB := jobsdb.NewForReadWrite("archival")
archDB := jobsdb.NewForReadWrite("archival", jobsdb.WithStats(stats.NOP))
defer archDB.TearDown()
readErrDB := jobsdb.NewForRead("proc_error")
readErrDB := jobsdb.NewForRead("proc_error", jobsdb.WithStats(stats.NOP))
defer readErrDB.TearDown()
writeErrDB := jobsdb.NewForWrite("proc_error")
writeErrDB := jobsdb.NewForWrite("proc_error", jobsdb.WithStats(stats.NOP))
require.NoError(t, writeErrDB.Start())
defer writeErrDB.TearDown()

Expand Down
Loading

0 comments on commit 61e947c

Please sign in to comment.