Skip to content

Commit

Permalink
[PSL-1245] implement meta migration tables
Browse files Browse the repository at this point in the history
  • Loading branch information
j-rafique committed Jul 31, 2024
1 parent eba2a52 commit d96d1ce
Showing 1 changed file with 65 additions and 0 deletions.
65 changes: 65 additions & 0 deletions p2p/kademlia/store/sqlite/meta_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,77 @@ func NewMigrationMetaStore(ctx context.Context, dataDir string) (*MigrationMetaS
updateTicker: time.NewTicker(commitLastAccessedInterval),
insertTicker: time.NewTicker(commitInsertsInterval),
}

if err := handler.migrateMeta(); err != nil {
log.P2P().WithContext(ctx).Errorf("cannot create meta table in sqlite database: %s", err.Error())
}

if err := handler.migrateMetaMigration(); err != nil {
log.P2P().WithContext(ctx).Errorf("cannot create meta-migration table in sqlite database: %s", err.Error())
}

if err := handler.migrateMigration(); err != nil {
log.P2P().WithContext(ctx).Errorf("cannot create migration table in sqlite database: %s", err.Error())
}

go handler.startLastAccessedUpdateWorker(ctx)
go handler.startInsertWorker(ctx)

return handler, nil
}

func (d *MigrationMetaStore) migrateMeta() error {
query := `
CREATE TABLE IF NOT EXISTS meta (
key TEXT PRIMARY KEY,
last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
access_count INTEGER DEFAULT 0,
data_size INTEGER DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
); `
if _, err := d.db.Exec(query); err != nil {
return fmt.Errorf("failed to create table 'del_keys': %w", err)
}

return nil
}

func (d *MigrationMetaStore) migrateMetaMigration() error {
query := `
CREATE TABLE IF NOT EXISTS meta_migration (
key TEXT,
migration_id INTEGER,
score INTEGER,
is_migrated BOOLEAN,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (key, migration_id)
);`
if _, err := d.db.Exec(query); err != nil {
return fmt.Errorf("failed to create table 'del_keys': %w", err)
}

return nil
}

func (d *MigrationMetaStore) migrateMigration() error {
query := `
CREATE TABLE IF NOT EXISTS migration (
id INTEGER PRIMARY KEY AUTOINCREMENT,
total_data_size INTEGER,
migration_started_at TIMESTAMP,
migration_finished_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);`
if _, err := d.db.Exec(query); err != nil {
return fmt.Errorf("failed to create table 'del_keys': %w", err)
}

return nil
}

// PostAccessUpdate sends access updates to be handled by the worker.
func PostAccessUpdate(updates []string) {
for _, update := range updates {
Expand Down

0 comments on commit d96d1ce

Please sign in to comment.