Skip to content

Commit

Permalink
fixup! refactor: safe dababase initialisation
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 24, 2022
1 parent a7b4df5 commit 3b7a7d6
Show file tree
Hide file tree
Showing 11 changed files with 268 additions and 206 deletions.
92 changes: 60 additions & 32 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"net/http"

"github.com/rudderlabs/rudder-server/config"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/types/deployment"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -34,17 +36,40 @@ import (
"github.com/rudderlabs/rudder-server/utils/types/servermode"
)

// EmbeddedApp is the type for embedded type implementation
type EmbeddedApp struct {
App app.App
VersionHandler func(w http.ResponseWriter, r *http.Request)
// embeddedApp is the type for embedded type implementation
type embeddedApp struct {
setupDone bool
app app.App
versionHandler func(w http.ResponseWriter, r *http.Request)
log logger.Logger
config struct {
enableProcessor bool
enableRouter bool
enableReplay bool
processorDSLimit int
routerDSLimit int
batchRouterDSLimit int
gatewayDSLimit int
}
}

func (*EmbeddedApp) GetAppType() string {
return fmt.Sprintf("rudder-server-%s", app.EMBEDDED)
func (a *embeddedApp) loadConfiguration() {
config.RegisterBoolConfigVariable(true, &a.config.enableProcessor, false, "enableProcessor")
config.RegisterBoolConfigVariable(types.DEFAULT_REPLAY_ENABLED, &a.config.enableReplay, false, "Replay.enabled")
config.RegisterBoolConfigVariable(true, &a.config.enableRouter, false, "enableRouter")
config.RegisterIntConfigVariable(0, &a.config.processorDSLimit, true, 1, "Processor.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.gatewayDSLimit, true, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.routerDSLimit, true, 1, "Router.jobsDB.dsLimit", "JobsDB.dsLimit")
config.RegisterIntConfigVariable(0, &a.config.batchRouterDSLimit, true, 1, "BatchRouter.jobsDB.dsLimit", "JobsDB.dsLimit")
}

func (appHandler *EmbeddedApp) PrepareDB() error {
func (a *embeddedApp) Setup(options *app.Options) error {
a.loadConfiguration()

if err := db.HandleEmbeddedRecovery(options.NormalMode, options.DegradedMode, misc.AppStartTime, app.EMBEDDED); err != nil {
return err
}

if err := rudderCoreDBValidator(); err != nil {
return err
}
Expand All @@ -54,36 +79,43 @@ func (appHandler *EmbeddedApp) PrepareDB() error {
if err := rudderCoreNodeSetup(); err != nil {
return err
}
a.setupDone = true
return nil
}

func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.Options) error {
pkgLogger.Info("Embedded mode: Starting Rudder Core")
func (a *embeddedApp) StartRudderCore(ctx context.Context, options *app.Options) error {
if !a.setupDone {
return fmt.Errorf("embedded rudder core cannot start, database is not setup")
}
a.log.Info("Embedded mode: Starting Rudder Core")

rudderCoreBaseSetup()
readonlyGatewayDB, readonlyRouterDB, readonlyBatchRouterDB, err := setupReadonlyDBs()
if err != nil {
return err
}

g, ctx := errgroup.WithContext(ctx)

deploymentType, err := deployment.GetFromEnv()
if err != nil {
return fmt.Errorf("failed to get deployment type: %w", err)
}
pkgLogger.Infof("Configured deployment type: %q", deploymentType)
a.log.Infof("Configured deployment type: %q", deploymentType)

reporting := embedded.App.Features().Reporting.Setup(backendconfig.DefaultBackendConfig)
reporting := a.app.Features().Reporting.Setup(backendconfig.DefaultBackendConfig)

g.Go(func() error {
reporting.AddClient(ctx, types.Config{ConnInfo: misc.GetConnectionString()})
return nil
})

pkgLogger.Info("Clearing DB ", options.ClearDB)
a.log.Info("Clearing DB ", options.ClearDB)

transformationdebugger.Setup()
destinationdebugger.Setup(backendconfig.DefaultBackendConfig)
sourcedebugger.Setup(backendconfig.DefaultBackendConfig)

reportingI := embedded.App.Features().Reporting.GetReportingInstance()
reportingI := a.app.Features().Reporting.GetReportingInstance()
transientSources := transientsource.NewService(ctx, backendconfig.DefaultBackendConfig)
prebackupHandlers := []prebackup.Handler{
prebackup.DropSourceIds(transientSources.SourceIdsSupplier()),
Expand All @@ -103,7 +135,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&gatewayDSLimit),
jobsdb.WithDSLimit(&a.config.gatewayDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer gwDBForProcessor.Close()
Expand All @@ -112,7 +144,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&routerDSLimit),
jobsdb.WithDSLimit(&a.config.routerDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer routerDB.Close()
Expand All @@ -121,7 +153,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&batchRouterDSLimit),
jobsdb.WithDSLimit(&a.config.batchRouterDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer batchRouterDB.Close()
Expand All @@ -130,7 +162,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStatusHandler(),
jobsdb.WithPreBackupHandlers(prebackupHandlers),
jobsdb.WithDSLimit(&processorDSLimit),
jobsdb.WithDSLimit(&a.config.processorDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)

Expand All @@ -154,12 +186,12 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O

switch deploymentType {
case deployment.MultiTenantType:
pkgLogger.Info("using ETCD Based Dynamic Cluster Manager")
a.log.Info("using ETCD Based Dynamic Cluster Manager")
modeProvider = state.NewETCDDynamicProvider()
case deployment.DedicatedType:
// FIXME: hacky way to determine server mode
pkgLogger.Info("using Static Cluster Manager")
if enableProcessor && enableRouter {
a.log.Info("using Static Cluster Manager")
if a.config.enableProcessor && a.config.enableRouter {
modeProvider = state.NewStaticProvider(servermode.NormalMode)
} else {
modeProvider = state.NewStaticProvider(servermode.DegradedMode)
Expand Down Expand Up @@ -206,7 +238,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
// This separate gateway db is created just to be used with gateway because in case of degraded mode,
// the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it
// will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode.
gatewayDB = jobsdb.NewForWrite(
gatewayDB := jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStatusHandler(),
Expand All @@ -217,18 +249,18 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
}
defer gatewayDB.Stop()

gw.SetReadonlyDBs(&readonlyGatewayDB, &readonlyRouterDB, &readonlyBatchRouterDB)
gw.SetReadonlyDBs(readonlyGatewayDB, readonlyRouterDB, readonlyBatchRouterDB)
err = gw.Setup(
ctx,
embedded.App, backendconfig.DefaultBackendConfig, gatewayDB,
&rateLimiter, embedded.VersionHandler, rsourcesService,
a.app, backendconfig.DefaultBackendConfig, gatewayDB,
&rateLimiter, a.versionHandler, rsourcesService,
)
if err != nil {
return fmt.Errorf("could not setup gateway: %w", err)
}
defer func() {
if err := gw.Shutdown(); err != nil {
pkgLogger.Warnf("Gateway shutdown error: %v", err)
a.log.Warnf("Gateway shutdown error: %v", err)
}
}()

Expand All @@ -238,7 +270,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
g.Go(func() error {
return gw.StartWebHandler(ctx)
})
if enableReplay {
if a.config.enableReplay {
var replayDB jobsdb.HandleT
err := replayDB.Setup(
jobsdb.ReadWrite, options.ClearDB, "replay",
Expand All @@ -248,7 +280,7 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
return fmt.Errorf("could not setup replayDB: %w", err)
}
defer replayDB.TearDown()
embedded.App.Features().Replay.Setup(ctx, &replayDB, gatewayDB, routerDB, batchRouterDB)
a.app.Features().Replay.Setup(ctx, &replayDB, gatewayDB, routerDB, batchRouterDB)
}

g.Go(func() error {
Expand All @@ -271,7 +303,3 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O

return g.Wait()
}

func (*EmbeddedApp) HandleRecovery(options *app.Options) {
db.HandleEmbeddedRecovery(options.NormalMode, options.DegradedMode, misc.AppStartTime, app.EMBEDDED)
}
66 changes: 42 additions & 24 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,50 +10,72 @@ import (
"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/app/cluster"
"github.com/rudderlabs/rudder-server/app/cluster/state"
"github.com/rudderlabs/rudder-server/config"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/gateway"
"github.com/rudderlabs/rudder-server/jobsdb"
ratelimiter "github.com/rudderlabs/rudder-server/rate-limiter"
"github.com/rudderlabs/rudder-server/services/db"
sourcedebugger "github.com/rudderlabs/rudder-server/services/debugger/source"
fileuploader "github.com/rudderlabs/rudder-server/services/fileuploader"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
"github.com/rudderlabs/rudder-server/utils/types/servermode"
)

// GatewayApp is the type for Gateway type implementation
type GatewayApp struct {
App app.App
VersionHandler func(w http.ResponseWriter, r *http.Request)
// gatewayApp is the type for Gateway type implementation
type gatewayApp struct {
setupDone bool
app app.App
versionHandler func(w http.ResponseWriter, r *http.Request)
log logger.Logger
config struct {
enableProcessor bool
enableRouter bool
gatewayDSLimit int
}
}

func (*GatewayApp) GetAppType() string {
return fmt.Sprintf("rudder-server-%s", app.GATEWAY)
func (a *gatewayApp) loadConfiguration() {
config.RegisterBoolConfigVariable(true, &a.config.enableProcessor, false, "enableProcessor")
config.RegisterBoolConfigVariable(true, &a.config.enableRouter, false, "enableRouter")
config.RegisterIntConfigVariable(0, &a.config.gatewayDSLimit, true, 1, "Gateway.jobsDB.dsLimit", "JobsDB.dsLimit")
}

func (appHandler *GatewayApp) PrepareDB() error {
func (a *gatewayApp) Setup(options *app.Options) error {
a.loadConfiguration()
if err := db.HandleNullRecovery(options.NormalMode, options.DegradedMode, misc.AppStartTime, app.GATEWAY); err != nil {
return err
}
if err := rudderCoreDBValidator(); err != nil {
return err
}
if err := rudderCoreWorkSpaceTableSetup(); err != nil {
return err
}
a.setupDone = true
return nil
}

func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.Options) error {
pkgLogger.Info("Gateway starting")
func (a *gatewayApp) StartRudderCore(ctx context.Context, options *app.Options) error {
if !a.setupDone {
return fmt.Errorf("gateway cannot start, database is not setup")
}
a.log.Info("Gateway starting")

rudderCoreBaseSetup()
readonlyGatewayDB, readonlyRouterDB, readonlyBatchRouterDB, err := setupReadonlyDBs()
if err != nil {
return err
}

deploymentType, err := deployment.GetFromEnv()
if err != nil {
return fmt.Errorf("failed to get deployment type: %v", err)
}

pkgLogger.Infof("Configured deployment type: %q", deploymentType)
pkgLogger.Info("Clearing DB ", options.ClearDB)
a.log.Infof("Configured deployment type: %q", deploymentType)
a.log.Info("Clearing DB ", options.ClearDB)

sourcedebugger.Setup(backendconfig.DefaultBackendConfig)

Expand All @@ -63,7 +85,7 @@ func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStatusHandler(),
jobsdb.WithDSLimit(&gatewayDSLimit),
jobsdb.WithDSLimit(&a.config.gatewayDSLimit),
jobsdb.WithFileUploaderProvider(fileUploaderProvider),
)
defer gatewayDB.Close()
Expand All @@ -78,11 +100,11 @@ func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.

switch deploymentType {
case deployment.MultiTenantType:
pkgLogger.Info("using ETCD Based Dynamic Cluster Manager")
a.log.Info("using ETCD Based Dynamic Cluster Manager")
modeProvider = state.NewETCDDynamicProvider()
case deployment.DedicatedType:
pkgLogger.Info("using Static Cluster Manager")
if enableProcessor && enableRouter {
a.log.Info("using Static Cluster Manager")
if a.config.enableProcessor && a.config.enableRouter {
modeProvider = state.NewStaticProvider(servermode.NormalMode)
} else {
modeProvider = state.NewStaticProvider(servermode.DegradedMode)
Expand All @@ -103,22 +125,22 @@ func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.
var rateLimiter ratelimiter.HandleT

rateLimiter.SetUp()
gw.SetReadonlyDBs(&readonlyGatewayDB, &readonlyRouterDB, &readonlyBatchRouterDB)
gw.SetReadonlyDBs(readonlyGatewayDB, readonlyRouterDB, readonlyBatchRouterDB)
rsourcesService, err := NewRsourcesService(deploymentType)
if err != nil {
return err
}
err = gw.Setup(
ctx,
gatewayApp.App, backendconfig.DefaultBackendConfig, gatewayDB,
&rateLimiter, gatewayApp.VersionHandler, rsourcesService,
a.app, backendconfig.DefaultBackendConfig, gatewayDB,
&rateLimiter, a.versionHandler, rsourcesService,
)
if err != nil {
return fmt.Errorf("failed to setup gateway: %w", err)
}
defer func() {
if err := gw.Shutdown(); err != nil {
pkgLogger.Warnf("Gateway shutdown error: %v", err)
a.log.Warnf("Gateway shutdown error: %v", err)
}
}()

Expand All @@ -130,7 +152,3 @@ func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.
})
return g.Wait()
}

func (*GatewayApp) HandleRecovery(options *app.Options) {
db.HandleNullRecovery(options.NormalMode, options.DegradedMode, misc.AppStartTime, app.GATEWAY)
}
Loading

0 comments on commit 3b7a7d6

Please sign in to comment.