Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: one (*sql.DB) pool for all jobsdb #5170

Merged
merged 14 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"database/sql"
"fmt"
"net/http"
"time"
Expand Down Expand Up @@ -139,14 +140,25 @@
FeaturesRetryMaxAttempts: 10,
})

var dbPool *sql.DB
if config.GetBoolVar(true, "db.pool.shared") {
dbPool, err = misc.NewDatabaseConnectionPool(ctx, config, statsFactory, "embedded-app")
if err != nil {
return err

Check warning on line 147 in app/apphandlers/embeddedAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/embeddedAppHandler.go#L147

Added line #L147 was not covered by tests
}
defer dbPool.Close()
}

// This separate gateway db is created just to be used with gateway because in case of degraded mode,
// the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it
// will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode.
gatewayDB := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer gatewayDB.Close()
if err = gatewayDB.Start(); err != nil {
return fmt.Errorf("could not start gateway: %w", err)
}
Expand All @@ -160,6 +172,7 @@
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
Expand All @@ -168,14 +181,15 @@
jobsdb.WithDSLimit(a.config.routerDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer routerDB.Close()
Sidddddarth marked this conversation as resolved.
Show resolved Hide resolved
batchRouterDB := jobsdb.NewForReadWrite(
"batch_rt",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.batchRouterDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer batchRouterDB.Close()

Expand All @@ -186,14 +200,17 @@
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer errDBForRead.Close()
errDBForWrite := jobsdb.NewForWrite(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer errDBForWrite.Close()
if err = errDBForWrite.Start(); err != nil {
return fmt.Errorf("could not start errDBForWrite: %w", err)
}
Expand All @@ -205,6 +222,7 @@
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer schemaDB.Close()

Expand All @@ -219,6 +237,7 @@
return config.GetDuration("archival.jobRetention", 24, time.Hour)
},
),
jobsdb.WithDBHandle(dbPool),
)
defer archivalDB.Close()

Expand Down
13 changes: 13 additions & 0 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"database/sql"
"fmt"
"net/http"
"time"
Expand All @@ -23,6 +24,7 @@
"github.com/rudderlabs/rudder-server/jobsdb"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
"github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)

Expand Down Expand Up @@ -68,12 +70,22 @@
}
defer sourceHandle.Stop()

var dbPool *sql.DB
if config.GetBoolVar(true, "db.gateway.pool.shared", "db.pool.shared") {
dbPool, err = misc.NewDatabaseConnectionPool(ctx, config, statsFactory, "gateway-app")
if err != nil {
return err

Check warning on line 77 in app/apphandlers/gatewayAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/gatewayAppHandler.go#L77

Added line #L77 was not covered by tests
}
defer dbPool.Close()
}

gatewayDB := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer gatewayDB.Close()

Expand All @@ -87,6 +99,7 @@
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer errDB.Close()

Expand Down
18 changes: 18 additions & 0 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import (
"context"
"database/sql"
"fmt"
"net/http"
"strconv"
Expand Down Expand Up @@ -145,12 +146,22 @@
FeaturesRetryMaxAttempts: 10,
})

var dbPool *sql.DB
if config.GetBoolVar(true, "db.processor.pool.shared", "db.pool.shared") {
dbPool, err = misc.NewDatabaseConnectionPool(ctx, config, statsFactory, "processor-app")
if err != nil {
return err

Check warning on line 153 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L149-L153

Added lines #L149 - L153 were not covered by tests
}
defer dbPool.Close()

Check warning on line 155 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L155

Added line #L155 was not covered by tests
}

gwDBForProcessor := jobsdb.NewForRead(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),

Check warning on line 164 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L164

Added line #L164 was not covered by tests
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
Expand All @@ -159,6 +170,7 @@
jobsdb.WithDSLimit(a.config.routerDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),

Check warning on line 173 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L173

Added line #L173 was not covered by tests
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -167,6 +179,7 @@
jobsdb.WithDSLimit(a.config.batchRouterDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),

Check warning on line 182 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L182

Added line #L182 was not covered by tests
)
defer batchRouterDB.Close()
errDBForRead := jobsdb.NewForRead(
Expand All @@ -175,14 +188,17 @@
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),

Check warning on line 191 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L191

Added line #L191 was not covered by tests
)
defer errDBForRead.Close()
errDBForWrite := jobsdb.NewForWrite(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),

Check warning on line 199 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L199

Added line #L199 was not covered by tests
)
errDBForWrite.Close()

Check warning on line 201 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L201

Added line #L201 was not covered by tests
if err = errDBForWrite.Start(); err != nil {
return fmt.Errorf("could not start errDBForWrite: %w", err)
}
Expand All @@ -192,6 +208,7 @@
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),

Check warning on line 211 in app/apphandlers/processorAppHandler.go

View check run for this annotation

Codecov / codecov/patch

app/apphandlers/processorAppHandler.go#L211

Added line #L211 was not covered by tests
)
defer schemaDB.Close()

Expand All @@ -206,6 +223,7 @@
return config.GetDuration("archival.jobRetention", 24, time.Hour)
},
),
jobsdb.WithDBHandle(dbPool),
)
defer archivalDB.Close()

Expand Down
38 changes: 19 additions & 19 deletions jobsdb/jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,29 +783,29 @@
),
),
)
}

var maxConns int
if !jd.conf.enableReaderQueue || !jd.conf.enableWriterQueue {
maxConns = jd.conf.maxOpenConnections
} else {
maxConns = 2 // buffer
maxConns += jd.conf.maxReaders + jd.conf.maxWriters
switch jd.ownerType {
case Read:
maxConns += 2 // migrate, refreshDsList
case Write:
maxConns += 1 // addNewDS
case ReadWrite:
maxConns += 3 // migrate, addNewDS, archive
}
if maxConns >= jd.conf.maxOpenConnections {
var maxConns int
if !jd.conf.enableReaderQueue || !jd.conf.enableWriterQueue {
maxConns = jd.conf.maxOpenConnections
} else {
maxConns = 2 // buffer
maxConns += jd.conf.maxReaders + jd.conf.maxWriters
switch jd.ownerType {
case Read:
maxConns += 2 // migrate, refreshDsList
case Write:
maxConns += 1 // addNewDS
case ReadWrite:
maxConns += 3 // migrate, addNewDS, archive
}
if maxConns >= jd.conf.maxOpenConnections {
maxConns = jd.conf.maxOpenConnections

Check warning on line 802 in jobsdb/jobsdb.go

View check run for this annotation

Codecov / codecov/patch

jobsdb/jobsdb.go#L802

Added line #L802 was not covered by tests
}
}
}
jd.dbHandle.SetMaxOpenConns(maxConns)
jd.dbHandle.SetMaxOpenConns(maxConns)

jd.assertError(jd.dbHandle.Ping())
jd.assertError(jd.dbHandle.Ping())
}

jd.workersAndAuxSetup()

Expand Down
70 changes: 70 additions & 0 deletions utils/misc/dbutils.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package misc

import (
"context"
"database/sql"
"fmt"
"net/url"
"os"
Expand All @@ -9,6 +11,9 @@
"github.com/lib/pq"

"github.com/rudderlabs/rudder-go-kit/config"
"github.com/rudderlabs/rudder-go-kit/stats"
"github.com/rudderlabs/rudder-go-kit/stats/collectors"
"github.com/rudderlabs/rudder-server/rruntime"
)

// GetConnectionString Returns Jobs DB connection configuration
Expand All @@ -35,6 +40,71 @@
)
}

func NewDatabaseConnectionPool(
ctx context.Context,
conf *config.Config,
stat stats.Stats,
componentName string,
) (*sql.DB, error) {
connStr := GetConnectionString(conf, componentName)
db, err := sql.Open("postgres", connStr)
if err != nil {
return nil, fmt.Errorf("opening connection to database: %w", err)

Check warning on line 52 in utils/misc/dbutils.go

View check run for this annotation

Codecov / codecov/patch

utils/misc/dbutils.go#L52

Added line #L52 was not covered by tests
}
if err := db.Ping(); err != nil {
return nil, fmt.Errorf("Error pinging database: %w", err)

Check warning on line 55 in utils/misc/dbutils.go

View check run for this annotation

Codecov / codecov/patch

utils/misc/dbutils.go#L55

Added line #L55 was not covered by tests
}
if err := stat.RegisterCollector(
collectors.NewDatabaseSQLStats(
componentName,
db,
),
); err != nil {
return nil, fmt.Errorf("Error registering database stats collector: %w", err)

Check warning on line 63 in utils/misc/dbutils.go

View check run for this annotation

Codecov / codecov/patch

utils/misc/dbutils.go#L63

Added line #L63 was not covered by tests
}

maxConnsVar := conf.GetReloadableIntVar(40, 1, "db."+componentName+".pool.maxOpenConnections", "db.pool.maxOpenConnections")
maxConns := maxConnsVar.Load()
db.SetMaxOpenConns(maxConns)

maxIdleConnsVar := conf.GetReloadableIntVar(5, 1, "db."+componentName+".pool.maxIdleConnections", "db.pool.maxIdleConnections")
maxIdleConns := maxIdleConnsVar.Load()
db.SetMaxIdleConns(maxIdleConns)

maxIdleTimeVar := conf.GetReloadableDurationVar(15, time.Minute, "db."+componentName+".pool.maxIdleTime", "db.pool.maxIdleTime")
maxIdleTime := maxIdleTimeVar.Load()
db.SetConnMaxIdleTime(maxIdleTime)

maxConnLifetimeVar := conf.GetReloadableDurationVar(0, 0, "db."+componentName+".pool.maxConnLifetime", "db.pool.maxConnLifetime")
maxConnLifetime := maxConnLifetimeVar.Load()
db.SetConnMaxLifetime(maxConnLifetime)

ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
rruntime.Go(func() {
for {
select {
case <-ctx.Done():
return
case <-ticker.C:

Check warning on line 89 in utils/misc/dbutils.go

View check run for this annotation

Codecov / codecov/patch

utils/misc/dbutils.go#L89

Added line #L89 was not covered by tests
}
updatePoolConfig(db.SetMaxOpenConns, &maxConns, maxConnsVar)
updatePoolConfig(db.SetConnMaxIdleTime, &maxIdleTime, maxIdleTimeVar)
updatePoolConfig(db.SetMaxIdleConns, &maxIdleConns, maxIdleConnsVar)
updatePoolConfig(db.SetConnMaxLifetime, &maxConnLifetime, maxConnLifetimeVar)

Check warning on line 94 in utils/misc/dbutils.go

View check run for this annotation

Codecov / codecov/patch

utils/misc/dbutils.go#L91-L94

Added lines #L91 - L94 were not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not move this below <-ticker.C?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason
just used to writing it like this

}
})
return db, nil
}

func updatePoolConfig[T comparable](f func(T), current *T, conf config.ValueLoader[T]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func updatePoolConfig[T comparable](f func(T), current *T, conf config.ValueLoader[T]) {
func updatePoolConfig[T comparable](setter func(T), current *T, conf config.ValueLoader[T]) {

[Optional]

newValue := conf.Load()
if newValue != *current {
f(newValue)
*current = newValue

Check warning on line 104 in utils/misc/dbutils.go

View check run for this annotation

Codecov / codecov/patch

utils/misc/dbutils.go#L100-L104

Added lines #L100 - L104 were not covered by tests
}
}

// SetAppNameInDBConnURL sets application name in db connection url
// if application name is already present in dns it will get override by the appName
func SetAppNameInDBConnURL(connectionUrl, appName string) (string, error) {
Expand Down
Loading