Skip to content

Commit

Permalink
refactor: safe dababase initialisation
Browse files Browse the repository at this point in the history
  • Loading branch information
atzoum committed Nov 24, 2022
1 parent 88f1ec1 commit a7b4df5
Show file tree
Hide file tree
Showing 9 changed files with 196 additions and 112 deletions.
16 changes: 13 additions & 3 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,22 @@ func (*EmbeddedApp) GetAppType() string {
return fmt.Sprintf("rudder-server-%s", app.EMBEDDED)
}

func (appHandler *EmbeddedApp) PrepareDB() error {
if err := rudderCoreDBValidator(); err != nil {
return err
}
if err := rudderCoreWorkSpaceTableSetup(); err != nil {
return err
}
if err := rudderCoreNodeSetup(); err != nil {
return err
}
return nil
}

func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.Options) error {
pkgLogger.Info("Embedded mode: Starting Rudder Core")

rudderCoreDBValidator()
rudderCoreWorkSpaceTableSetup()
rudderCoreNodeSetup()
rudderCoreBaseSetup()

g, ctx := errgroup.WithContext(ctx)
Expand Down
12 changes: 10 additions & 2 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ func (*GatewayApp) GetAppType() string {
return fmt.Sprintf("rudder-server-%s", app.GATEWAY)
}

func (appHandler *GatewayApp) PrepareDB() error {
if err := rudderCoreDBValidator(); err != nil {
return err
}
if err := rudderCoreWorkSpaceTableSetup(); err != nil {
return err
}
return nil
}

func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.Options) error {
pkgLogger.Info("Gateway starting")

rudderCoreDBValidator()
rudderCoreWorkSpaceTableSetup()
rudderCoreBaseSetup()

deploymentType, err := deployment.GetFromEnv()
Expand Down
16 changes: 13 additions & 3 deletions app/apphandlers/processorAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,22 @@ func loadConfigHandler() {
config.RegisterIntConfigVariable(0, &batchRouterDSLimit, true, 1, "BatchRouter.jobsDB.dsLimit", "JobsDB.dsLimit")
}

func (processor *ProcessorApp) PrepareDB() error {
if err := rudderCoreDBValidator(); err != nil {
return err
}
if err := rudderCoreWorkSpaceTableSetup(); err != nil {
return err
}
if err := rudderCoreNodeSetup(); err != nil {
return err
}
return nil
}

func (processor *ProcessorApp) StartRudderCore(ctx context.Context, options *app.Options) error {
pkgLogger.Info("Processor starting")

rudderCoreDBValidator()
rudderCoreWorkSpaceTableSetup()
rudderCoreNodeSetup()
rudderCoreBaseSetup()
g, ctx := errgroup.WithContext(ctx)

Expand Down
13 changes: 7 additions & 6 deletions app/apphandlers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ var (
type AppHandler interface {
GetAppType() string
HandleRecovery(*app.Options)
PrepareDB() error
StartRudderCore(context.Context, *app.Options) error
}

Expand Down Expand Up @@ -64,16 +65,16 @@ func loadConfig() {
config.RegisterBoolConfigVariable(true, &enableRouter, false, "enableRouter")
}

func rudderCoreDBValidator() {
validators.ValidateEnv()
func rudderCoreDBValidator() error {
return validators.ValidateEnv()
}

func rudderCoreNodeSetup() {
validators.InitializeNodeMigrations()
func rudderCoreNodeSetup() error {
return validators.InitializeNodeMigrations()
}

func rudderCoreWorkSpaceTableSetup() {
validators.CheckAndValidateWorkspaceToken()
func rudderCoreWorkSpaceTableSetup() error {
return validators.CheckAndValidateWorkspaceToken()
}

func rudderCoreBaseSetup() {
Expand Down
12 changes: 6 additions & 6 deletions config/mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package config

// Rudder server supported config constants
const (
EmbeddedMode = "embedded"
MasterMode = "master"
MasterSlaveMode = "master_and_slave"
SlaveMode = "slave"
OffMode = "off"
PooledWHSlaveMode = "embedded_master"
EmbeddedMode = "embedded"
MasterMode = "master"
MasterSlaveMode = "master_and_slave"
SlaveMode = "slave"
OffMode = "off"
EmbeddedMasterMode = "embedded_master"
)
54 changes: 38 additions & 16 deletions runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,9 @@ type ReleaseInfo struct {

// Runner is responsible for running the application
type Runner struct {
releaseInfo ReleaseInfo
appType string
application app.App
releaseInfo ReleaseInfo
warehouseMode string
enableSuppressUserFeature bool
logger logger.Logger
Expand All @@ -112,6 +113,7 @@ func New(releaseInfo ReleaseInfo) *Runner {
return 0
}
return &Runner{
appType: strings.ToUpper(config.GetString("APP_TYPE", app.EMBEDDED)),
releaseInfo: releaseInfo,
logger: logger.NewLogger().Child("runner"),
warehouseMode: config.GetString("Warehouse.mode", "embedded"),
Expand Down Expand Up @@ -142,18 +144,17 @@ func (r *Runner) Run(ctx context.Context, args []string) int {
// application & backend setup should be done before starting any new goroutines.
r.application.Setup()

appTypeStr := strings.ToUpper(config.GetString("APP_TYPE", app.EMBEDDED))
r.appHandler = apphandlers.GetAppHandler(r.application, appTypeStr, r.versionHandler)
r.appHandler = apphandlers.GetAppHandler(r.application, r.appType, r.versionHandler)

versionDetail := r.versionInfo()
// Start bugsnag
bugsnag.Configure(bugsnag.Configuration{
APIKey: config.GetString("BUGSNAG_KEY", ""),
ReleaseStage: config.GetString("GO_ENV", "development"),
// The import paths for the Go packages containing your source files
ProjectPackages: []string{"main", "github.com/rudderlabs/rudder-server"},
// more configuration options
AppType: r.appHandler.GetAppType(),
AppVersion: versionDetail["Version"].(string),
AppVersion: r.releaseInfo.Version,
PanicHandler: func() {},
})
ctx = bugsnag.StartSession(ctx)
Expand All @@ -164,6 +165,8 @@ func (r *Runner) Run(ctx context.Context, args []string) int {
r.logger.Errorf("failed to get deployment type: %w", err)
return 1
}

// Start stats
// TODO: remove as soon as we update the configuration with statsExcludedTags where necessary
if !config.IsSet("statsExcludedTags") && deploymentType == deployment.MultiTenantType && (!config.IsSet("WORKSPACE_NAMESPACE") || strings.Contains(config.GetString("WORKSPACE_NAMESPACE", ""), "free")) {
config.Set("statsExcludedTags", []string{"workspaceId", "sourceID", "destId"})
Expand All @@ -186,23 +189,40 @@ func (r *Runner) Run(ctx context.Context, args []string) int {

configEnvHandler := r.application.Features().ConfigEnv.Setup()

if config.GetString("Warehouse.mode", "") != "slave" {
// Start backend config
if r.canStartBackendConfig() {
if err := backendconfig.Setup(configEnvHandler); err != nil {
r.logger.Errorf("Unable to setup backend config: %s", err)
return 1
}

backendconfig.DefaultBackendConfig.StartWithIDs(ctx, "")
}

// Prepare databases in sequential order, so that failure in one doesn't affect others (leaving dirty schema migration state)
if r.canStartServer() {
r.appHandler.HandleRecovery(options) // decide mode early
if err := r.appHandler.PrepareDB(); err != nil {
r.logger.Errorf("Unable to prepare rudder-core database: %s", err)
return 1
}
}
if r.canStartWarehouse() {
if err := warehouse.PrepareDB(ctx); err != nil {
r.logger.Errorf("Unable to prepare warehouse database: %s", err)
return 1
}
}
g, ctx := errgroup.WithContext(ctx)

// Start admin server
g.Go(func() error {
if err := admin.StartServer(ctx); err != nil {
return fmt.Errorf("admin server routine: %w", err)
}
return nil
})

// Start profiler
g.Go(func() error {
p := &profiler.Profiler{}
if err := p.StartServer(ctx); err != nil {
Expand All @@ -212,8 +232,9 @@ func (r *Runner) Run(ctx context.Context, args []string) int {
})

misc.AppStartTime = time.Now().Unix()

// Start rudder core
if r.canStartServer() {
r.appHandler.HandleRecovery(options)
g.Go(misc.WithBugsnag(func() (err error) {
if err := r.appHandler.StartRudderCore(ctx, options); err != nil {
return fmt.Errorf("rudder core: %w", err)
Expand All @@ -238,10 +259,11 @@ func (r *Runner) Run(ctx context.Context, args []string) int {
}))
}

// Start warehouse
// initialize warehouse service after core to handle non-normal recovery modes
if appTypeStr != app.GATEWAY && r.canStartWarehouse() {
if r.canStartWarehouse() {
g.Go(misc.WithBugsnagForWarehouse(func() error {
if err := r.startWarehouseService(ctx, r.application); err != nil {
if err := warehouse.Start(ctx, r.application); err != nil {
return fmt.Errorf("warehouse service routine: %w", err)
}
return nil
Expand Down Expand Up @@ -382,15 +404,15 @@ func (r *Runner) printVersion() {
fmt.Printf("Version Info %s\n", versionFormatted)
}

func (*Runner) startWarehouseService(ctx context.Context, application app.App) error {
return warehouse.Start(ctx, application)
}

func (r *Runner) canStartServer() bool {
r.logger.Info("warehousemode ", r.warehouseMode)
return r.warehouseMode == config.EmbeddedMode || r.warehouseMode == config.OffMode || r.warehouseMode == config.PooledWHSlaveMode
return r.warehouseMode == config.EmbeddedMode || r.warehouseMode == config.OffMode || r.warehouseMode == config.EmbeddedMasterMode
}

func (r *Runner) canStartWarehouse() bool {
return r.warehouseMode != config.OffMode
return r.appType != app.GATEWAY && r.warehouseMode != config.OffMode
}

func (r *Runner) canStartBackendConfig() bool {
return r.warehouseMode != config.SlaveMode
}
Loading

0 comments on commit a7b4df5

Please sign in to comment.