diff --git a/app/apphandlers/embeddedAppHandler.go b/app/apphandlers/embeddedAppHandler.go index 03876e8081..aef1468585 100644 --- a/app/apphandlers/embeddedAppHandler.go +++ b/app/apphandlers/embeddedAppHandler.go @@ -2,6 +2,7 @@ package apphandlers import ( "context" + "database/sql" "fmt" "net/http" "time" @@ -139,6 +140,15 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) FeaturesRetryMaxAttempts: 10, }) + var dbPool *sql.DB + if config.GetBoolVar(true, "db.pool.shared") { + dbPool, err = misc.NewDatabaseConnectionPool(ctx, config, statsFactory, "embedded-app") + if err != nil { + return err + } + defer dbPool.Close() + } + // This separate gateway db is created just to be used with gateway because in case of degraded mode, // the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it // will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode. @@ -146,7 +156,9 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) "gw", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) + defer gatewayDB.Close() if err = gatewayDB.Start(); err != nil { return fmt.Errorf("could not start gateway: %w", err) } @@ -160,6 +172,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithDSLimit(a.config.gatewayDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) defer gwDBForProcessor.Close() routerDB := jobsdb.NewForReadWrite( @@ -168,6 +181,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithDSLimit(a.config.routerDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) defer routerDB.Close() batchRouterDB := jobsdb.NewForReadWrite( @@ -176,6 +190,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithDSLimit(a.config.batchRouterDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) defer batchRouterDB.Close() @@ -186,6 +201,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithDSLimit(a.config.processorDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) defer errDBForRead.Close() errDBForWrite := jobsdb.NewForWrite( @@ -193,7 +209,9 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithClearDB(options.ClearDB), jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) + defer errDBForWrite.Close() if err = errDBForWrite.Start(); err != nil { return fmt.Errorf("could not start errDBForWrite: %w", err) } @@ -205,6 +223,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithDSLimit(a.config.processorDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) defer schemaDB.Close() @@ -219,6 +238,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) return config.GetDuration("archival.jobRetention", 24, time.Hour) }, ), + jobsdb.WithDBHandle(dbPool), ) defer archivalDB.Close() diff --git a/app/apphandlers/gatewayAppHandler.go b/app/apphandlers/gatewayAppHandler.go index 99bb25bebf..1a4f01fa5c 100644 --- a/app/apphandlers/gatewayAppHandler.go +++ b/app/apphandlers/gatewayAppHandler.go @@ -2,6 +2,7 @@ package apphandlers import ( "context" + "database/sql" "fmt" "net/http" "time" @@ -23,6 +24,7 @@ import ( "github.com/rudderlabs/rudder-server/jobsdb" sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source" "github.com/rudderlabs/rudder-server/services/transformer" + "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/utils/types/deployment" ) @@ -68,12 +70,22 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) } defer sourceHandle.Stop() + var dbPool *sql.DB + if config.GetBoolVar(true, "db.gateway.pool.shared", "db.pool.shared") { + dbPool, err = misc.NewDatabaseConnectionPool(ctx, config, statsFactory, "gateway-app") + if err != nil { + return err + } + defer dbPool.Close() + } + gatewayDB := jobsdb.NewForWrite( "gw", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithDSLimit(a.config.gatewayDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) defer gatewayDB.Close() @@ -87,6 +99,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) jobsdb.WithClearDB(options.ClearDB), jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) defer errDB.Close() diff --git a/app/apphandlers/processorAppHandler.go b/app/apphandlers/processorAppHandler.go index c462ec0e2c..0e97da041d 100644 --- a/app/apphandlers/processorAppHandler.go +++ b/app/apphandlers/processorAppHandler.go @@ -2,6 +2,7 @@ package apphandlers import ( "context" + "database/sql" "fmt" "net/http" "strconv" @@ -145,12 +146,22 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options FeaturesRetryMaxAttempts: 10, }) + var dbPool *sql.DB + if config.GetBoolVar(true, "db.processor.pool.shared", "db.pool.shared") { + dbPool, err = misc.NewDatabaseConnectionPool(ctx, config, statsFactory, "processor-app") + if err != nil { + return err + } + defer dbPool.Close() + } + gwDBForProcessor := jobsdb.NewForRead( "gw", jobsdb.WithClearDB(options.ClearDB), jobsdb.WithDSLimit(a.config.gatewayDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) defer gwDBForProcessor.Close() routerDB := jobsdb.NewForReadWrite( @@ -159,6 +170,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options jobsdb.WithDSLimit(a.config.routerDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) defer routerDB.Close() batchRouterDB := jobsdb.NewForReadWrite( @@ -167,6 +179,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options jobsdb.WithDSLimit(a.config.batchRouterDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) defer batchRouterDB.Close() errDBForRead := jobsdb.NewForRead( @@ -175,6 +188,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options jobsdb.WithDSLimit(a.config.processorDSLimit), jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) defer errDBForRead.Close() errDBForWrite := jobsdb.NewForWrite( @@ -182,7 +196,9 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options jobsdb.WithClearDB(options.ClearDB), jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) + errDBForWrite.Close() if err = errDBForWrite.Start(); err != nil { return fmt.Errorf("could not start errDBForWrite: %w", err) } @@ -192,6 +208,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options jobsdb.WithClearDB(options.ClearDB), jobsdb.WithDSLimit(a.config.processorDSLimit), jobsdb.WithStats(statsFactory), + jobsdb.WithDBHandle(dbPool), ) defer schemaDB.Close() @@ -206,6 +223,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options return config.GetDuration("archival.jobRetention", 24, time.Hour) }, ), + jobsdb.WithDBHandle(dbPool), ) defer archivalDB.Close() diff --git a/integration_test/docker_test/docker_test.go b/integration_test/docker_test/docker_test.go index 7892714450..59a31baeb2 100644 --- a/integration_test/docker_test/docker_test.go +++ b/integration_test/docker_test/docker_test.go @@ -78,24 +78,58 @@ type event struct { } func TestMainFlow(t *testing.T) { - if os.Getenv("SLOW") == "0" { - t.Skip("Skipping tests. Remove 'SLOW=0' env var to run them.") - } - hold = os.Getenv("HOLD") == "true" - var tearDownStart time.Time - defer func() { - if tearDownStart == (time.Time{}) { - t.Log("--- Teardown done (unexpected)") - } else { - t.Logf("--- Teardown done (%s)", time.Since(tearDownStart)) - } - }() + t.Run("common connection pool to database", func(t *testing.T) { + var tearDownStart time.Time + defer func() { + if tearDownStart == (time.Time{}) { + t.Log("--- Teardown done (unexpected)") + } else { + t.Logf("--- Teardown done (%s)", time.Since(tearDownStart)) + } + }() + + svcCtx, svcCancel := context.WithCancel(context.Background()) + svcDone := setupMainFlow(svcCtx, t, true) + sendEventsToGateway(t) - svcCtx, svcCancel := context.WithCancel(context.Background()) - svcDone := setupMainFlow(svcCtx, t) - sendEventsToGateway(t) + testCases(t) + + blockOnHold(t) + svcCancel() + t.Log("Waiting for service to stop") + <-svcDone + + tearDownStart = time.Now() + }) + + t.Run("separate connection pools to database", func(t *testing.T) { + var tearDownStart time.Time + defer func() { + if tearDownStart == (time.Time{}) { + t.Log("--- Teardown done (unexpected)") + } else { + t.Logf("--- Teardown done (%s)", time.Since(tearDownStart)) + } + }() + + svcCtx, svcCancel := context.WithCancel(context.Background()) + svcDone := setupMainFlow(svcCtx, t, false) + sendEventsToGateway(t) + + testCases(t) + + blockOnHold(t) + svcCancel() + t.Log("Waiting for service to stop") + <-svcDone + + tearDownStart = time.Now() + }) +} + +func testCases(t *testing.T) { t.Run("webhook", func(t *testing.T) { require.Eventually(t, func() bool { return webhook.RequestsCount() == 11 @@ -262,16 +296,9 @@ func TestMainFlow(t *testing.T) { }`) sendEvent(t, payload, "beacon/v1/batch", writeKey) }) - - blockOnHold(t) - svcCancel() - t.Log("Waiting for service to stop") - <-svcDone - - tearDownStart = time.Now() } -func setupMainFlow(svcCtx context.Context, t *testing.T) <-chan struct{} { +func setupMainFlow(svcCtx context.Context, t *testing.T, commonPool bool) <-chan struct{} { setupStart := time.Now() if testing.Verbose() { t.Setenv("LOG_LEVEL", "DEBUG") @@ -326,6 +353,9 @@ func setupMainFlow(svcCtx context.Context, t *testing.T) <-chan struct{} { t.Setenv("WAREHOUSE_JOBS_DB_PORT", postgresContainer.Port) t.Setenv("DEST_TRANSFORM_URL", transformerContainer.TransformerURL) t.Setenv("DEPLOYMENT_TYPE", string(deployment.DedicatedType)) + if !commonPool { + t.Setenv("RSERVER_DB_POOL_SHARED", strconv.FormatBool(commonPool)) + } httpPortInt, err := kithelper.GetFreePort() require.NoError(t, err) diff --git a/jobsdb/jobsdb.go b/jobsdb/jobsdb.go index e1a159772c..9f3ed2f8a1 100644 --- a/jobsdb/jobsdb.go +++ b/jobsdb/jobsdb.go @@ -446,11 +446,12 @@ Handle is the main type implementing the database for implementing jobs. The caller must call the SetUp function on a Handle object */ type Handle struct { - dbHandle *sql.DB - ownerType OwnerType - tablePrefix string - logger logger.Logger - stats stats.Stats + dbHandle *sql.DB + sharedConnectionPool bool + ownerType OwnerType + tablePrefix string + logger logger.Logger + stats stats.Stats datasetList []dataSetT datasetRangeList []dataSetRangeT @@ -769,7 +770,9 @@ func (jd *Handle) init() { jd.loadConfig() // Initialize dbHandle if not already set - if jd.dbHandle == nil { + if jd.dbHandle != nil { + jd.sharedConnectionPool = true + } else { var err error psqlInfo := misc.GetConnectionString(jd.config, "jobsdb_"+jd.tablePrefix) jd.dbHandle, err = sql.Open("postgres", psqlInfo) @@ -783,29 +786,29 @@ func (jd *Handle) init() { ), ), ) - } - var maxConns int - if !jd.conf.enableReaderQueue || !jd.conf.enableWriterQueue { - maxConns = jd.conf.maxOpenConnections - } else { - maxConns = 2 // buffer - maxConns += jd.conf.maxReaders + jd.conf.maxWriters - switch jd.ownerType { - case Read: - maxConns += 2 // migrate, refreshDsList - case Write: - maxConns += 1 // addNewDS - case ReadWrite: - maxConns += 3 // migrate, addNewDS, archive - } - if maxConns >= jd.conf.maxOpenConnections { + var maxConns int + if !jd.conf.enableReaderQueue || !jd.conf.enableWriterQueue { maxConns = jd.conf.maxOpenConnections + } else { + maxConns = 2 // buffer + maxConns += jd.conf.maxReaders + jd.conf.maxWriters + switch jd.ownerType { + case Read: + maxConns += 2 // migrate, refreshDsList + case Write: + maxConns += 1 // addNewDS + case ReadWrite: + maxConns += 3 // migrate, addNewDS, archive + } + if maxConns >= jd.conf.maxOpenConnections { + maxConns = jd.conf.maxOpenConnections + } } - } - jd.dbHandle.SetMaxOpenConns(maxConns) + jd.dbHandle.SetMaxOpenConns(maxConns) - jd.assertError(jd.dbHandle.Ping()) + jd.assertError(jd.dbHandle.Ping()) + } jd.workersAndAuxSetup() @@ -1091,8 +1094,14 @@ func (jd *Handle) TearDown() { // Close closes the database connection. // // Stop should be called before Close. +// +// Noop if the connection pool is shared with the handle. func (jd *Handle) Close() { - _ = jd.dbHandle.Close() + if !jd.sharedConnectionPool { + if err := jd.dbHandle.Close(); err != nil { + jd.logger.Errorw("error closing db connection", "error", err) + } + } } /* diff --git a/utils/misc/dbutils.go b/utils/misc/dbutils.go index b63c78d025..a1a197c2cf 100644 --- a/utils/misc/dbutils.go +++ b/utils/misc/dbutils.go @@ -1,6 +1,8 @@ package misc import ( + "context" + "database/sql" "fmt" "net/url" "os" @@ -9,6 +11,9 @@ import ( "github.com/lib/pq" "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/stats" + "github.com/rudderlabs/rudder-go-kit/stats/collectors" + "github.com/rudderlabs/rudder-server/rruntime" ) // GetConnectionString Returns Jobs DB connection configuration @@ -35,6 +40,71 @@ func GetConnectionString(c *config.Config, componentName string) string { ) } +func NewDatabaseConnectionPool( + ctx context.Context, + conf *config.Config, + stat stats.Stats, + componentName string, +) (*sql.DB, error) { + connStr := GetConnectionString(conf, componentName) + db, err := sql.Open("postgres", connStr) + if err != nil { + return nil, fmt.Errorf("opening connection to database: %w", err) + } + if err := db.Ping(); err != nil { + return nil, fmt.Errorf("Error pinging database: %w", err) + } + if err := stat.RegisterCollector( + collectors.NewDatabaseSQLStats( + componentName, + db, + ), + ); err != nil { + return nil, fmt.Errorf("Error registering database stats collector: %w", err) + } + + maxConnsVar := conf.GetReloadableIntVar(40, 1, "db."+componentName+".pool.maxOpenConnections", "db.pool.maxOpenConnections") + maxConns := maxConnsVar.Load() + db.SetMaxOpenConns(maxConns) + + maxIdleConnsVar := conf.GetReloadableIntVar(5, 1, "db."+componentName+".pool.maxIdleConnections", "db.pool.maxIdleConnections") + maxIdleConns := maxIdleConnsVar.Load() + db.SetMaxIdleConns(maxIdleConns) + + maxIdleTimeVar := conf.GetReloadableDurationVar(15, time.Minute, "db."+componentName+".pool.maxIdleTime", "db.pool.maxIdleTime") + maxIdleTime := maxIdleTimeVar.Load() + db.SetConnMaxIdleTime(maxIdleTime) + + maxConnLifetimeVar := conf.GetReloadableDurationVar(0, 0, "db."+componentName+".pool.maxConnLifetime", "db.pool.maxConnLifetime") + maxConnLifetime := maxConnLifetimeVar.Load() + db.SetConnMaxLifetime(maxConnLifetime) + + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + rruntime.Go(func() { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + updatePoolConfig(db.SetMaxOpenConns, &maxConns, maxConnsVar) + updatePoolConfig(db.SetConnMaxIdleTime, &maxIdleTime, maxIdleTimeVar) + updatePoolConfig(db.SetMaxIdleConns, &maxIdleConns, maxIdleConnsVar) + updatePoolConfig(db.SetConnMaxLifetime, &maxConnLifetime, maxConnLifetimeVar) + } + } + }) + return db, nil +} + +func updatePoolConfig[T comparable](setter func(T), current *T, conf config.ValueLoader[T]) { + newValue := conf.Load() + if newValue != *current { + setter(newValue) + *current = newValue + } +} + // SetAppNameInDBConnURL sets application name in db connection url // if application name is already present in dns it will get override by the appName func SetAppNameInDBConnURL(connectionUrl, appName string) (string, error) {