From d96d1ce84c9d8404ab970ce619f10e493eb1e5c7 Mon Sep 17 00:00:00 2001 From: j-rafique Date: Wed, 31 Jul 2024 14:17:24 +0500 Subject: [PATCH] [PSL-1245] implement meta migration tables --- p2p/kademlia/store/sqlite/meta_worker.go | 65 ++++++++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/p2p/kademlia/store/sqlite/meta_worker.go b/p2p/kademlia/store/sqlite/meta_worker.go index 62d0bf92b..c30925bc2 100644 --- a/p2p/kademlia/store/sqlite/meta_worker.go +++ b/p2p/kademlia/store/sqlite/meta_worker.go @@ -71,12 +71,77 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string) (*MigrationMetaS updateTicker: time.NewTicker(commitLastAccessedInterval), insertTicker: time.NewTicker(commitInsertsInterval), } + + if err := handler.migrateMeta(); err != nil { + log.P2P().WithContext(ctx).Errorf("cannot create meta table in sqlite database: %s", err.Error()) + } + + if err := handler.migrateMetaMigration(); err != nil { + log.P2P().WithContext(ctx).Errorf("cannot create meta-migration table in sqlite database: %s", err.Error()) + } + + if err := handler.migrateMigration(); err != nil { + log.P2P().WithContext(ctx).Errorf("cannot create migration table in sqlite database: %s", err.Error()) + } + go handler.startLastAccessedUpdateWorker(ctx) go handler.startInsertWorker(ctx) return handler, nil } +func (d *MigrationMetaStore) migrateMeta() error { + query := ` + CREATE TABLE IF NOT EXISTS meta ( + key TEXT PRIMARY KEY, + last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + access_count INTEGER DEFAULT 0, + data_size INTEGER DEFAULT 0, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ); ` + if _, err := d.db.Exec(query); err != nil { + return fmt.Errorf("failed to create table 'del_keys': %w", err) + } + + return nil +} + +func (d *MigrationMetaStore) migrateMetaMigration() error { + query := ` + CREATE TABLE IF NOT EXISTS meta_migration ( + key TEXT, + migration_id INTEGER, + score INTEGER, + is_migrated BOOLEAN, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (key, migration_id) + );` + if _, err := d.db.Exec(query); err != nil { + return fmt.Errorf("failed to create table 'del_keys': %w", err) + } + + return nil +} + +func (d *MigrationMetaStore) migrateMigration() error { + query := ` + CREATE TABLE IF NOT EXISTS migration ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + total_data_size INTEGER, + migration_started_at TIMESTAMP, + migration_finished_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + );` + if _, err := d.db.Exec(query); err != nil { + return fmt.Errorf("failed to create table 'del_keys': %w", err) + } + + return nil +} + // PostAccessUpdate sends access updates to be handled by the worker. func PostAccessUpdate(updates []string) { for _, update := range updates {