Skip to content

Commit

Permalink
chore: cleanup unused code (#2561)
Browse files Browse the repository at this point in the history
  • Loading branch information
cisse21 authored Oct 13, 2022
1 parent 8cba05e commit 4de6105
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 212 deletions.
71 changes: 34 additions & 37 deletions app/apphandlers/embeddedAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
}))
}

enableGateway := true
var modeProvider cluster.ChangeEventProvider

switch deploymentType {
Expand Down Expand Up @@ -183,45 +182,43 @@ func (embedded *EmbeddedApp) StartRudderCore(ctx context.Context, options *app.O
MultiTenantStat: multitenantStats,
}

if enableGateway {
rateLimiter := ratelimiter.HandleT{}
rateLimiter.SetUp()
gw := gateway.HandleT{}
// This separate gateway db is created just to be used with gateway because in case of degraded mode,
// the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it
// will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode.
gatewayDB = jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStatusHandler(),
)
defer gwDBForProcessor.Close()
if err = gatewayDB.Start(); err != nil {
return fmt.Errorf("could not start gateway: %w", err)
}
defer gatewayDB.Stop()
rateLimiter := ratelimiter.HandleT{}
rateLimiter.SetUp()
gw := gateway.HandleT{}
// This separate gateway db is created just to be used with gateway because in case of degraded mode,
// the earlier created gwDb (which was created to be used mainly with processor) will not be running, and it
// will cause issues for gateway because gateway is supposed to receive jobs even in degraded mode.
gatewayDB = jobsdb.NewForWrite(
"gw",
jobsdb.WithClearDB(options.ClearDB),
jobsdb.WithStatusHandler(),
)
defer gwDBForProcessor.Close()
if err = gatewayDB.Start(); err != nil {
return fmt.Errorf("could not start gateway: %w", err)
}
defer gatewayDB.Stop()

gw.SetReadonlyDBs(&readonlyGatewayDB, &readonlyRouterDB, &readonlyBatchRouterDB)
err = gw.Setup(
embedded.App, backendconfig.DefaultBackendConfig, gatewayDB,
&rateLimiter, embedded.VersionHandler, rsourcesService,
)
if err != nil {
return fmt.Errorf("could not setup gateway: %w", err)
gw.SetReadonlyDBs(&readonlyGatewayDB, &readonlyRouterDB, &readonlyBatchRouterDB)
err = gw.Setup(
embedded.App, backendconfig.DefaultBackendConfig, gatewayDB,
&rateLimiter, embedded.VersionHandler, rsourcesService,
)
if err != nil {
return fmt.Errorf("could not setup gateway: %w", err)
}
defer func() {
if err := gw.Shutdown(); err != nil {
pkgLogger.Warnf("Gateway shutdown error: %v", err)
}
defer func() {
if err := gw.Shutdown(); err != nil {
pkgLogger.Warnf("Gateway shutdown error: %v", err)
}
}()
}()

g.Go(func() error {
return gw.StartAdminHandler(ctx)
})
g.Go(func() error {
return gw.StartWebHandler(ctx)
})
}
g.Go(func() error {
return gw.StartAdminHandler(ctx)
})
g.Go(func() error {
return gw.StartWebHandler(ctx)
})
if enableReplay {
var replayDB jobsdb.HandleT
err := replayDB.Setup(
Expand Down
56 changes: 26 additions & 30 deletions app/apphandlers/gatewayAppHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.
}
defer gatewayDB.Stop()

enableGateway := true
g, ctx := errgroup.WithContext(ctx)

var modeProvider cluster.ChangeEventProvider
Expand Down Expand Up @@ -88,37 +87,34 @@ func (gatewayApp *GatewayApp) StartRudderCore(ctx context.Context, options *app.
return dm.Run(ctx)
})

if enableGateway {
var gw gateway.HandleT
var rateLimiter ratelimiter.HandleT
var gw gateway.HandleT
var rateLimiter ratelimiter.HandleT

rateLimiter.SetUp()
gw.SetReadonlyDBs(&readonlyGatewayDB, &readonlyRouterDB, &readonlyBatchRouterDB)
rsourcesService, err := NewRsourcesService(deploymentType)
if err != nil {
return err
}
err = gw.Setup(
gatewayApp.App, backendconfig.DefaultBackendConfig, gatewayDB,
&rateLimiter, gatewayApp.VersionHandler, rsourcesService,
)
if err != nil {
return fmt.Errorf("failed to setup gateway: %w", err)
}
defer func() {
if err := gw.Shutdown(); err != nil {
pkgLogger.Warnf("Gateway shutdown error: %v", err)
}
}()

g.Go(func() error {
return gw.StartAdminHandler(ctx)
})
g.Go(func() error {
return gw.StartWebHandler(ctx)
})
rateLimiter.SetUp()
gw.SetReadonlyDBs(&readonlyGatewayDB, &readonlyRouterDB, &readonlyBatchRouterDB)
rsourcesService, err := NewRsourcesService(deploymentType)
if err != nil {
return err
}
err = gw.Setup(
gatewayApp.App, backendconfig.DefaultBackendConfig, gatewayDB,
&rateLimiter, gatewayApp.VersionHandler, rsourcesService,
)
if err != nil {
return fmt.Errorf("failed to setup gateway: %w", err)
}
// go readIOforResume(router) //keeping it as input from IO, to be replaced by UI
defer func() {
if err := gw.Shutdown(); err != nil {
pkgLogger.Warnf("Gateway shutdown error: %v", err)
}
}()

g.Go(func() error {
return gw.StartAdminHandler(ctx)
})
g.Go(func() error {
return gw.StartWebHandler(ctx)
})
return g.Wait()
}

Expand Down
129 changes: 0 additions & 129 deletions app/apphandlers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,24 @@ import (
"errors"
"fmt"
"net/http"
"sync"
"time"

"github.com/rudderlabs/rudder-server/app"
"github.com/rudderlabs/rudder-server/config"
backendconfig "github.com/rudderlabs/rudder-server/config/backend-config"
"github.com/rudderlabs/rudder-server/jobsdb"
"github.com/rudderlabs/rudder-server/processor"
"github.com/rudderlabs/rudder-server/router"
"github.com/rudderlabs/rudder-server/router/batchrouter"
"github.com/rudderlabs/rudder-server/services/diagnostics"
"github.com/rudderlabs/rudder-server/services/multitenant"
"github.com/rudderlabs/rudder-server/services/rsources"
"github.com/rudderlabs/rudder-server/services/transientsource"
"github.com/rudderlabs/rudder-server/services/validators"
"github.com/rudderlabs/rudder-server/utils/logger"
"github.com/rudderlabs/rudder-server/utils/misc"
utilsync "github.com/rudderlabs/rudder-server/utils/sync"
"github.com/rudderlabs/rudder-server/utils/types"
"github.com/rudderlabs/rudder-server/utils/types/deployment"
warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils"
)

var (
enableProcessor, enableRouter, enableReplay bool
objectStorageDestinations []string
asyncDestinations []string
routerLoaded utilsync.First
processorLoaded utilsync.First
pkgLogger logger.Logger
Diagnostics diagnostics.DiagnosticsI
readonlyGatewayDB, readonlyRouterDB, readonlyBatchRouterDB jobsdb.ReadonlyHandleT
Expand Down Expand Up @@ -73,8 +62,6 @@ func loadConfig() {
config.RegisterBoolConfigVariable(true, &enableProcessor, false, "enableProcessor")
config.RegisterBoolConfigVariable(types.DEFAULT_REPLAY_ENABLED, &enableReplay, false, "Replay.enabled")
config.RegisterBoolConfigVariable(true, &enableRouter, false, "enableRouter")
objectStorageDestinations = []string{"S3", "GCS", "AZURE_BLOB", "MINIO", "DIGITAL_OCEAN_SPACES"}
asyncDestinations = []string{"MARKETO_BULK_UPLOAD"}
}

func rudderCoreDBValidator() {
Expand Down Expand Up @@ -109,122 +96,6 @@ func rudderCoreBaseSetup() {
router.RegisterAdminHandlers(&readonlyRouterDB, &readonlyBatchRouterDB)
}

// StartProcessor atomically starts processor process if not already started
func StartProcessor(
ctx context.Context, clearDB *bool, gatewayDB, routerDB, batchRouterDB,
procErrorDB *jobsdb.HandleT, reporting types.ReportingI, multitenantStat multitenant.MultiTenantI,
transientSources transientsource.Service, rsourcesService rsources.JobService,
) error {
if !processorLoaded.First() {
pkgLogger.Debug("processor started by another go routine")
return nil
}

processorInstance := processor.NewProcessor()
processorInstance.Setup(
backendconfig.DefaultBackendConfig, gatewayDB, routerDB, batchRouterDB, procErrorDB,
clearDB, reporting, multitenantStat, transientSources, rsourcesService,
)
defer processorInstance.Shutdown()
return processorInstance.Start(ctx)
}

// StartRouter atomically starts router process if not already started
func StartRouter(
ctx context.Context, routerDB jobsdb.MultiTenantJobsDB, batchRouterDB *jobsdb.HandleT,
procErrorDB *jobsdb.HandleT, reporting types.ReportingI, multitenantStat multitenant.MultiTenantI,
transientSources transientsource.Service, rsourcesService rsources.JobService,
) {
if !routerLoaded.First() {
pkgLogger.Debug("processor started by an other go routine")
return
}

routerFactory := router.Factory{
BackendConfig: backendconfig.DefaultBackendConfig,
Reporting: reporting,
Multitenant: multitenantStat,
RouterDB: routerDB,
ProcErrorDB: procErrorDB,
TransientSources: transientSources,
RsourcesService: rsourcesService,
}

batchRouterFactory := batchrouter.Factory{
BackendConfig: backendconfig.DefaultBackendConfig,
Reporting: reporting,
Multitenant: multitenantStat,
ProcErrorDB: procErrorDB,
RouterDB: batchRouterDB,
TransientSources: transientSources,
RsourcesService: rsourcesService,
}

monitorDestRouters(ctx, &routerFactory, &batchRouterFactory)
}

// Gets the config from config backend and extracts enabled writekeys
func monitorDestRouters(ctx context.Context, routerFactory *router.Factory, batchRouterFactory *batchrouter.Factory) {
ch := backendconfig.DefaultBackendConfig.Subscribe(ctx, backendconfig.TopicBackendConfig)
dstToRouter := make(map[string]*router.HandleT)
dstToBatchRouter := make(map[string]*batchrouter.HandleT)
cleanup := make([]func(), 0)

// Crash recover routerDB, batchRouterDB
// Note: The following cleanups can take time if there are too many
// rt / batch_rt tables and there would be a delay reading from channel `ch`
// However, this shouldn't be the problem since backend config pushes config
// to its subscribers in separate goroutines to prevent blocking.
routerFactory.RouterDB.DeleteExecuting()
batchRouterFactory.RouterDB.DeleteExecuting()

for data := range ch {
config := data.Data.(map[string]backendconfig.ConfigT)
for _, wConfig := range config {
for i := range wConfig.Sources {
source := &wConfig.Sources[i] // Copy of large value inside loop: CRT-P0006
for k := range source.Destinations {
destination := &source.Destinations[k] // Copy of large value inside loop: CRT-P0006
// For batch router destinations
if misc.Contains(objectStorageDestinations, destination.DestinationDefinition.Name) ||
misc.Contains(warehouseutils.WarehouseDestinations, destination.DestinationDefinition.Name) ||
misc.Contains(asyncDestinations, destination.DestinationDefinition.Name) {
_, ok := dstToBatchRouter[destination.DestinationDefinition.Name]
if !ok {
pkgLogger.Info("Starting a new Batch Destination Router ", destination.DestinationDefinition.Name)
brt := batchRouterFactory.New(destination.DestinationDefinition.Name)
brt.Start()
cleanup = append(cleanup, brt.Shutdown)
dstToBatchRouter[destination.DestinationDefinition.Name] = brt
}
} else {
routerIdentifier := destination.DestinationDefinition.Name
_, ok := dstToRouter[routerIdentifier]
if !ok {
pkgLogger.Infof("Starting a new Destination: %s", routerIdentifier)
rt := routerFactory.New(destination, routerIdentifier)
rt.Start()
cleanup = append(cleanup, rt.Shutdown)
dstToRouter[routerIdentifier] = rt
}
}
}
}
}
}

var wg sync.WaitGroup
for _, f := range cleanup {
f := f
wg.Add(1)
go func() {
defer wg.Done()
f()
}()
}
wg.Wait()
}

// NewRsourcesService produces a rsources.JobService through environment configuration (env variables & config file)
func NewRsourcesService(deploymentType deployment.Type) (rsources.JobService, error) {
var rsourcesConfig rsources.JobServiceConfig
Expand Down
6 changes: 3 additions & 3 deletions jobsdb/jobsdb_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Function to return an ordered list of datasets and datasetRanges
Most callers use the in-memory list of dataset and datasetRanges
*/
func getDSList(jd assertInterface, dbHandle sqlDbOrTx, tablePrefix string) []dataSetT {
datasetList := []dataSetT{}
var datasetList []dataSetT

// Read the table names from PG
tableNames := mustGetAllTableNames(jd, dbHandle)
Expand All @@ -31,7 +31,7 @@ func getDSList(jd assertInterface, dbHandle sqlDbOrTx, tablePrefix string) []dat

jobNameMap := map[string]string{}
jobStatusNameMap := map[string]string{}
dnumList := []string{}
var dnumList []string

for _, t := range tableNames {
if strings.HasPrefix(t, tablePrefix+"_jobs_") {
Expand Down Expand Up @@ -94,7 +94,7 @@ func getAllTableNames(dbHandle sqlDbOrTx) ([]string, error) {
if err != nil {
return tableNames, err
}
defer rows.Close()
defer func() { _ = rows.Close() }()
for rows.Next() {
var tbName string
err = rows.Scan(&tbName)
Expand Down
2 changes: 1 addition & 1 deletion jobsdb/readonly_jobsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,7 @@ func (jd *ReadonlyHandleT) GetJobIDStatus(jobID, _ string) (string, error) {
}
sqlStatement = fmt.Sprintf(`SELECT job_id, job_state, attempt, exec_time, retry_time,error_code, error_response FROM %[1]s WHERE job_id = %[2]s;`, dsPair.JobStatusTable, jobID)
var statusCode sql.NullString
eventList := []JobStatusT{}
var eventList []JobStatusT
rows, err := jd.DbHandle.Query(sqlStatement)
if err != nil {
return "", err
Expand Down
6 changes: 3 additions & 3 deletions jobsdb/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ func (jd *HandleT) setupDatabaseTables(l lock.LockToken, clearAll bool) {
psqlInfo := misc.GetConnectionString()
db, err := sql.Open("postgres", psqlInfo)
if err != nil {
panic(fmt.Errorf("Error DB for migrate open: %w", err))
panic(fmt.Errorf("error DB for migrate open: %w", err))
}

defer db.Close()
defer func() { _ = db.Close() }()

// setup migrator with appropriate schema migrations table
m := &migrator.Migrator{
Expand All @@ -57,7 +57,7 @@ func (jd *HandleT) setupDatabaseTables(l lock.LockToken, clearAll bool) {
// execute any necessary migrations
err = m.MigrateFromTemplates("jobsdb", templateData)
if err != nil {
panic(fmt.Errorf("Error while migrating '%v' jobsdb tables: %w", jd.tablePrefix, err))
panic(fmt.Errorf("error while migrating '%v' jobsdb tables: %w", jd.tablePrefix, err))
}
}

Expand Down
7 changes: 0 additions & 7 deletions processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,6 @@ func Init() {
pkgLogger = logger.NewLogger().Child("processor")
}

// NewProcessor creates a new Processor instance
func NewProcessor() *HandleT {
return &HandleT{
transformer: transformer.NewTransformer(),
}
}

func (proc *HandleT) Status() interface{} {
proc.stats.transformEventsByTimeMutex.RLock()
defer proc.stats.transformEventsByTimeMutex.RUnlock()
Expand Down
Loading

0 comments on commit 4de6105

Please sign in to comment.