diff --git a/router/router_test.go b/router/router_test.go index 32376862e66..68c4fcc88d9 100644 --- a/router/router_test.go +++ b/router/router_test.go @@ -9,26 +9,21 @@ import ( "testing" "time" + "github.com/google/uuid" + jsoniter "github.com/json-iterator/go" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/tidwall/gjson" - - "github.com/rudderlabs/rudder-server/enterprise/reporting" - - jsoniter "github.com/json-iterator/go" - "github.com/tidwall/sjson" - "go.uber.org/mock/gomock" - "github.com/google/uuid" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-server/admin" backendconfig "github.com/rudderlabs/rudder-server/backend-config" + "github.com/rudderlabs/rudder-server/enterprise/reporting" "github.com/rudderlabs/rudder-server/jobsdb" mocksBackendConfig "github.com/rudderlabs/rudder-server/mocks/backend-config" mocksJobsDB "github.com/rudderlabs/rudder-server/mocks/jobsdb" diff --git a/services/dedup/badger/badger.go b/services/dedup/badger/badger.go index fc3154e1225..ba944cc1b55 100644 --- a/services/dedup/badger/badger.go +++ b/services/dedup/badger/badger.go @@ -2,6 +2,7 @@ package badger import ( "context" + "errors" "fmt" "strconv" "sync" @@ -77,6 +78,9 @@ func NewBadgerDB(conf *config.Config, stats stats.Stats, path string) *Dedup { } func (d *BadgerDB) Get(key string) (int64, bool, error) { + start := time.Now() + defer d.stats.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "badger"}).Since(start) + var payloadSize int64 var found bool err := d.badgerDB.View(func(txn *badger.Txn) error { @@ -90,13 +94,16 @@ func (d *BadgerDB) Get(key string) (int64, bool, error) { } return nil }) - if err != nil && err != badger.ErrKeyNotFound { + if err != nil && !errors.Is(err, badger.ErrKeyNotFound) { return 0, false, err } return payloadSize, found, nil } func (d *BadgerDB) Set(kvs []types.KeyValue) error { + start := time.Now() + defer d.stats.NewTaggedStat("dedup_commit_duration_seconds", stats.TimerType, stats.Tags{"mode": "badger"}).Since(start) + txn := d.badgerDB.NewTransaction(true) for _, message := range kvs { value := strconv.FormatInt(message.Value, 10) diff --git a/services/dedup/dedup_test.go b/services/dedup/dedup_test.go index eb4e6c9ad3f..5a978d9ac51 100644 --- a/services/dedup/dedup_test.go +++ b/services/dedup/dedup_test.go @@ -210,3 +210,60 @@ func Benchmark_Dedup(b *testing.B) { b.Log("db size:", string(out)) } + +// Benchmark_DedupModes/MirrorBadger-12 1072 1101878 ns/op +// Benchmark_DedupModes/MirrorScylla-12 566 1986533 ns/op +// Benchmark_DedupModes/Scylla-12 990 1525086 ns/op +// Benchmark_DedupModes/Badger-12 108246 9981 ns/op + +func Benchmark_DedupModes(b *testing.B) { + testCases := []struct { + name string + }{ + { + name: "Badger", + }, + { + name: "Scylla", + }, + { + name: "MirrorScylla", + }, + { + name: "MirrorBadger", + }, + { + name: "Random", + }, + } + pool, err := dockertest.NewPool("") + require.NoError(b, err) + keySpace := strings.ToUpper(rand.String(5)) + table := rand.String(5) + resource, err := scylla.Setup(pool, b, scylla.WithKeyspace(keySpace)) + require.NoError(b, err) + for _, tc := range testCases { + config.Reset() + logger.Reset() + misc.Init() + dbPath := b.TempDir() + conf := config.New() + conf.Set("Scylla.Hosts", resource.URL) + conf.Set("Scylla.Keyspace", keySpace) + conf.Set("Scylla.TableName", table) + b.Setenv("RUDDER_TMPDIR", dbPath) + conf.Set("Dedup.Mode", tc.name) + d, err := dedup.New(conf, stats.Default) + require.Nil(b, err) + b.Run(tc.name, func(b *testing.B) { + for i := 0; i < b.N; i++ { + key := uuid.New().String() + _, _, err = d.Get(types.KeyValue{Key: key, Value: int64(i + 1), WorkspaceID: "2DAZvjf8PEMrAkbVm6smqEJnh"}) + require.NoError(b, err) + err = d.Commit([]string{key}) + require.NoError(b, err) + } + }) + d.Close() + } +} diff --git a/services/dedup/mirrorBadger/mirrorBadger.go b/services/dedup/mirrorBadger/mirrorBadger.go index a3c5f4fe450..b36c553e71b 100644 --- a/services/dedup/mirrorBadger/mirrorBadger.go +++ b/services/dedup/mirrorBadger/mirrorBadger.go @@ -1,6 +1,8 @@ package mirrorBadger import ( + "time" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/services/dedup/badger" @@ -33,6 +35,9 @@ func (mb *MirrorBadger) Close() { } func (mb *MirrorBadger) Get(kv types.KeyValue) (bool, int64, error) { + start := time.Now() + defer mb.stat.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "mirror_badger"}).Since(start) + _, _, err := mb.scylla.Get(kv) if err != nil { mb.stat.NewTaggedStat("dedup_mirror_badger_get_error", stats.CountType, stats.Tags{}).Increment() @@ -41,6 +46,9 @@ func (mb *MirrorBadger) Get(kv types.KeyValue) (bool, int64, error) { } func (mb *MirrorBadger) Commit(keys []string) error { + start := time.Now() + defer mb.stat.NewTaggedStat("dedup_commit_duration_seconds", stats.TimerType, stats.Tags{"mode": "mirror_badger"}).Since(start) + _ = mb.scylla.Commit(keys) return mb.badger.Commit(keys) } diff --git a/services/dedup/mirrorScylla/mirrorScylla.go b/services/dedup/mirrorScylla/mirrorScylla.go index 918a0850aad..608231be767 100644 --- a/services/dedup/mirrorScylla/mirrorScylla.go +++ b/services/dedup/mirrorScylla/mirrorScylla.go @@ -1,6 +1,8 @@ package mirrorScylla import ( + "time" + "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/services/dedup/badger" @@ -33,6 +35,9 @@ func (ms *MirrorScylla) Close() { } func (ms *MirrorScylla) Get(kv types.KeyValue) (bool, int64, error) { + start := time.Now() + defer ms.stat.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "mirror_scylla"}).Since(start) + _, _, err := ms.badger.Get(kv) if err != nil { ms.stat.NewTaggedStat("dedup_mirror_scylla_get_error", stats.CountType, stats.Tags{}).Increment() @@ -41,6 +46,9 @@ func (ms *MirrorScylla) Get(kv types.KeyValue) (bool, int64, error) { } func (ms *MirrorScylla) Commit(keys []string) error { + start := time.Now() + defer ms.stat.NewTaggedStat("dedup_commit_duration_seconds", stats.TimerType, stats.Tags{"mode": "mirror_scylla"}).Since(start) + _ = ms.badger.Commit(keys) return ms.scylla.Commit(keys) } diff --git a/services/dedup/scylla/scylla.go b/services/dedup/scylla/scylla.go index ea419228ae3..5576d23d179 100644 --- a/services/dedup/scylla/scylla.go +++ b/services/dedup/scylla/scylla.go @@ -35,6 +35,9 @@ func (d *ScyllaDB) Close() { func (d *ScyllaDB) Get(kv types.KeyValue) (bool, int64, error) { // Create the table if it doesn't exist + start := time.Now() + defer d.stat.NewTaggedStat("dedup_get_duration_seconds", stats.TimerType, stats.Tags{"mode": "scylla"}).Since(start) + var err error d.cacheMu.Lock() defer d.cacheMu.Unlock() @@ -59,6 +62,9 @@ func (d *ScyllaDB) Get(kv types.KeyValue) (bool, int64, error) { } func (d *ScyllaDB) Commit(keys []string) error { + start := time.Now() + defer d.stat.NewTaggedStat("dedup_commit_duration_seconds", stats.TimerType, stats.Tags{"mode": "scylla"}).Since(start) + d.cacheMu.Lock() kvs := make([]types.KeyValue, len(keys)) for i, key := range keys { diff --git a/warehouse/archive/archiver.go b/warehouse/archive/archiver.go index f8e623b04e3..f611f600017 100644 --- a/warehouse/archive/archiver.go +++ b/warehouse/archive/archiver.go @@ -45,7 +45,7 @@ type uploadRecord struct { endStagingFileId int64 startLoadFileID int64 endLoadFileID int64 - uploadMetdata json.RawMessage + uploadMetadata json.RawMessage workspaceID string } @@ -101,7 +101,7 @@ func New( } func (a *Archiver) backupRecords(ctx context.Context, args backupRecordsArgs) (backupLocation string, err error) { - a.log.Infof("[Archiver]: Starting backupRecords for uploadId: %s, sourceId: %s, destinationId: %s, tableName: %s,", + a.log.Infof("[Archiver]: Starting backupRecords for uploadId: %d, sourceId: %s, destinationId: %s, tableName: %s,", args.uploadID, args.sourceID, args.destID, args.tableName, ) @@ -164,7 +164,7 @@ func (a *Archiver) backupRecords(ctx context.Context, args backupRecordsArgs) (b } backupLocation, err = tableJSONArchiver.Do() - a.log.Infof(`[Archiver]: Completed backupRecords for uploadId: %s, sourceId: %s, destinationId: %s, tableName: %s,`, + a.log.Infof(`[Archiver]: Completed backupRecords for uploadId: %d, sourceId: %s, destinationId: %s, tableName: %s,`, args.uploadID, args.sourceID, args.destID, args.tableName, ) @@ -311,7 +311,7 @@ func (a *Archiver) archiveUploads(ctx context.Context, maxArchiveLimit int) erro &u.endStagingFileId, &u.startLoadFileID, &u.endLoadFileID, - &u.uploadMetdata, + &u.uploadMetadata, &u.workspaceID, ) if err != nil { @@ -372,7 +372,7 @@ func (a *Archiver) archiveUploads(ctx context.Context, maxArchiveLimit int) erro continue } - hasUsedRudderStorage := a.usedRudderStorage(u.uploadMetdata) + hasUsedRudderStorage := a.usedRudderStorage(u.uploadMetadata) // delete load file records if err := a.deleteLoadFileRecords(ctx, txn, stagingFileIDs, hasUsedRudderStorage); err != nil { @@ -383,14 +383,14 @@ func (a *Archiver) archiveUploads(ctx context.Context, maxArchiveLimit int) erro } // update upload metadata - u.uploadMetdata, _ = sjson.SetBytes(u.uploadMetdata, "archivedStagingAndLoadFiles", true) + u.uploadMetadata, _ = sjson.SetBytes(u.uploadMetadata, "archivedStagingAndLoadFiles", true) stmt := fmt.Sprintf(` UPDATE %s SET metadata = $1 WHERE id = $2;`, pq.QuoteIdentifier(warehouseutils.WarehouseUploadsTable), ) - _, err = txn.ExecContext(ctx, stmt, u.uploadMetdata, u.uploadID) + _, err = txn.ExecContext(ctx, stmt, u.uploadMetadata, u.uploadID) if err != nil { a.log.Errorf(`[Archiver]: Error running txn while archiving upload files. Query: %s Error: %v`, stmt, err) _ = txn.Rollback() diff --git a/warehouse/integrations/postgres/postgres.go b/warehouse/integrations/postgres/postgres.go index 5c6b41ee7c6..a3ba7bce5a4 100644 --- a/warehouse/integrations/postgres/postgres.go +++ b/warehouse/integrations/postgres/postgres.go @@ -10,21 +10,17 @@ import ( "strings" "time" - "github.com/rudderlabs/rudder-server/warehouse/integrations/tunnelling" - - "github.com/rudderlabs/rudder-go-kit/stats" - - sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" - "github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader" - "github.com/rudderlabs/rudder-server/warehouse/logfield" - - "github.com/rudderlabs/rudder-server/warehouse/internal/model" - "github.com/rudderlabs/rudder-go-kit/config" "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats" "github.com/rudderlabs/rudder-server/utils/misc" "github.com/rudderlabs/rudder-server/warehouse/client" + sqlmiddleware "github.com/rudderlabs/rudder-server/warehouse/integrations/middleware/sqlquerywrapper" + "github.com/rudderlabs/rudder-server/warehouse/integrations/tunnelling" + "github.com/rudderlabs/rudder-server/warehouse/internal/model" + "github.com/rudderlabs/rudder-server/warehouse/internal/service/loadfiles/downloader" + "github.com/rudderlabs/rudder-server/warehouse/logfield" warehouseutils "github.com/rudderlabs/rudder-server/warehouse/utils" )