Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: one (*sql.DB) pool for all jobsdb #5170

Merged
merged 14 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package apphandlers

import (
"context"
"database/sql"
"fmt"
"net/http"
"time"
Expand Down Expand Up @@ -139,14 +140,25 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
FeaturesRetryMaxAttempts: 10,
})

var dbPool *sql.DB
if config.GetBoolVar(true, "db.pool.shared") {
dbPool, err = misc.NewDatabaseConnectionPool(ctx, config, statsFactory, "embedded-app")
if err != nil {
return err
}
defer dbPool.Close()
}

// This separate gateway db is created just to be used with gateway because in case of degraded mode,
// the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it
// will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode.
gatewayDB := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer gatewayDB.Close()
if err = gatewayDB.Start(); err != nil {
return fmt.Errorf("could not start gateway: %w", err)
}
Expand All @@ -160,6 +172,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
Expand All @@ -168,6 +181,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithDSLimit(a.config.routerDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer routerDB.Close()
Sidddddarth marked this conversation as resolved.
Show resolved Hide resolved
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -176,6 +190,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithDSLimit(a.config.batchRouterDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer batchRouterDB.Close()

Expand All @@ -186,14 +201,17 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer errDBForRead.Close()
errDBForWrite := jobsdb.NewForWrite(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer errDBForWrite.Close()
if err = errDBForWrite.Start(); err != nil {
return fmt.Errorf("could not start errDBForWrite: %w", err)
}
Expand All @@ -205,6 +223,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer schemaDB.Close()

Expand All @@ -219,6 +238,7 @@ func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options)
return config.GetDuration("archival.jobRetention", 24, time.Hour)
},
),
jobsdb.WithDBHandle(dbPool),
)
defer archivalDB.Close()

Expand Down
13 changes: 13 additions & 0 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package apphandlers

import (
"context"
"database/sql"
"fmt"
"net/http"
"time"
Expand All @@ -23,6 +24,7 @@ import (
"github.com/rudderlabs/rudder-server/jobsdb"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
"github.com/rudderlabs/rudder-server/services/transformer"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
)

Expand Down Expand Up @@ -68,12 +70,22 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
}
defer sourceHandle.Stop()

var dbPool *sql.DB
if config.GetBoolVar(true, "db.gateway.pool.shared", "db.pool.shared") {
dbPool, err = misc.NewDatabaseConnectionPool(ctx, config, statsFactory, "gateway-app")
if err != nil {
return err
}
defer dbPool.Close()
}

gatewayDB := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer gatewayDB.Close()

Expand All @@ -87,6 +99,7 @@ func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options)
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer errDB.Close()

Expand Down
18 changes: 18 additions & 0 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package apphandlers

import (
"context"
"database/sql"
"fmt"
"net/http"
"strconv"
Expand Down Expand Up @@ -145,12 +146,22 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
FeaturesRetryMaxAttempts: 10,
})

var dbPool *sql.DB
if config.GetBoolVar(true, "db.processor.pool.shared", "db.pool.shared") {
dbPool, err = misc.NewDatabaseConnectionPool(ctx, config, statsFactory, "processor-app")
if err != nil {
return err
}
defer dbPool.Close()
}

gwDBForProcessor := jobsdb.NewForRead(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.gatewayDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Gateway.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer gwDBForProcessor.Close()
routerDB := jobsdb.NewForReadWrite(
Expand All @@ -159,6 +170,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithDSLimit(a.config.routerDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Router.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer routerDB.Close()
batchRouterDB := jobsdb.NewForReadWrite(
Expand All @@ -167,6 +179,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithDSLimit(a.config.batchRouterDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("BatchRouter.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer batchRouterDB.Close()
errDBForRead := jobsdb.NewForRead(
Expand All @@ -175,14 +188,17 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", false)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer errDBForRead.Close()
errDBForWrite := jobsdb.NewForWrite(
"proc_error",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithSkipMaintenanceErr(config.GetBool("Processor.jobsDB.skipMaintenanceError", true)),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
errDBForWrite.Close()
if err = errDBForWrite.Start(); err != nil {
return fmt.Errorf("could not start errDBForWrite: %w", err)
}
Expand All @@ -192,6 +208,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithDSLimit(a.config.processorDSLimit),
jobsdb.WithStats(statsFactory),
jobsdb.WithDBHandle(dbPool),
)
defer schemaDB.Close()

Expand All @@ -206,6 +223,7 @@ func (a *processorApp) StartRudderCore(ctx context.Context, options *app.Options
return config.GetDuration("archival.jobRetention", 24, time.Hour)
},
),
jobsdb.WithDBHandle(dbPool),
)
defer archivalDB.Close()

Expand Down
76 changes: 53 additions & 23 deletions integration_test/docker_test/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,24 +78,58 @@ type event struct {
}

func TestMainFlow(t *testing.T) {
if os.Getenv("SLOW") == "0" {
t.Skip("Skipping tests. Remove 'SLOW=0' env var to run them.")
}

hold = os.Getenv("HOLD") == "true"

var tearDownStart time.Time
defer func() {
if tearDownStart == (time.Time{}) {
t.Log("--- Teardown done (unexpected)")
} else {
t.Logf("--- Teardown done (%s)", time.Since(tearDownStart))
}
}()
t.Run("common connection pool to database", func(t *testing.T) {
var tearDownStart time.Time
defer func() {
if tearDownStart == (time.Time{}) {
t.Log("--- Teardown done (unexpected)")
} else {
t.Logf("--- Teardown done (%s)", time.Since(tearDownStart))
}
}()

svcCtx, svcCancel := context.WithCancel(context.Background())
svcDone := setupMainFlow(svcCtx, t, true)
sendEventsToGateway(t)

svcCtx, svcCancel := context.WithCancel(context.Background())
svcDone := setupMainFlow(svcCtx, t)
sendEventsToGateway(t)
testCases(t)

blockOnHold(t)
svcCancel()
t.Log("Waiting for service to stop")
<-svcDone

tearDownStart = time.Now()
})

t.Run("separate connection pools to database", func(t *testing.T) {
var tearDownStart time.Time
defer func() {
if tearDownStart == (time.Time{}) {
t.Log("--- Teardown done (unexpected)")
} else {
t.Logf("--- Teardown done (%s)", time.Since(tearDownStart))
}
}()

svcCtx, svcCancel := context.WithCancel(context.Background())
svcDone := setupMainFlow(svcCtx, t, false)
sendEventsToGateway(t)

testCases(t)

blockOnHold(t)
svcCancel()
t.Log("Waiting for service to stop")
<-svcDone

tearDownStart = time.Now()
})
}

func testCases(t *testing.T) {
t.Run("webhook", func(t *testing.T) {
require.Eventually(t, func() bool {
return webhook.RequestsCount() == 11
Expand Down Expand Up @@ -262,16 +296,9 @@ func TestMainFlow(t *testing.T) {
}`)
sendEvent(t, payload, "beacon/v1/batch", writeKey)
})

blockOnHold(t)
svcCancel()
t.Log("Waiting for service to stop")
<-svcDone

tearDownStart = time.Now()
}

func setupMainFlow(svcCtx context.Context, t *testing.T) <-chan struct{} {
func setupMainFlow(svcCtx context.Context, t *testing.T, commonPool bool) <-chan struct{} {
setupStart := time.Now()
if testing.Verbose() {
t.Setenv("LOG_LEVEL", "DEBUG")
Expand Down Expand Up @@ -326,6 +353,9 @@ func setupMainFlow(svcCtx context.Context, t *testing.T) <-chan struct{} {
t.Setenv("WAREHOUSE_JOBS_DB_PORT", postgresContainer.Port)
t.Setenv("DEST_TRANSFORM_URL", transformerContainer.TransformerURL)
t.Setenv("DEPLOYMENT_TYPE", string(deployment.DedicatedType))
if !commonPool {
t.Setenv("RSERVER_DB_POOL_SHARED", strconv.FormatBool(commonPool))
}

httpPortInt, err := kithelper.GetFreePort()
require.NoError(t, err)
Expand Down
Loading
Loading