From eba2a528d2268c163434a9e4015676565c303b2f Mon Sep 17 00:00:00 2001 From: Matee ullah Date: Wed, 31 Jul 2024 04:49:53 +0500 Subject: [PATCH] [PSL-1240] pool & insert keys in metadata database, pool and update last_accessed field --- p2p/kademlia/store.go | 3 - p2p/kademlia/store/mem/mem.go | 5 - p2p/kademlia/store/sqlite/meta_worker.go | 238 +++++++++++++++++++++++ p2p/kademlia/store/sqlite/replication.go | 3 + p2p/kademlia/store/sqlite/sqlite.go | 24 +-- 5 files changed, 250 insertions(+), 23 deletions(-) create mode 100644 p2p/kademlia/store/sqlite/meta_worker.go diff --git a/p2p/kademlia/store.go b/p2p/kademlia/store.go index 3ca7f0de8..ef6f558b3 100644 --- a/p2p/kademlia/store.go +++ b/p2p/kademlia/store.go @@ -15,9 +15,6 @@ type Store interface { // Retrieve the queries key/value from store Retrieve(ctx context.Context, key []byte) ([]byte, error) - // RetrieveWithType gets data with type - RetrieveWithType(_ context.Context, key []byte) ([]byte, int, error) - // Delete a key/value pair from the store Delete(ctx context.Context, key []byte) diff --git a/p2p/kademlia/store/mem/mem.go b/p2p/kademlia/store/mem/mem.go index 239882f08..dc7919232 100644 --- a/p2p/kademlia/store/mem/mem.go +++ b/p2p/kademlia/store/mem/mem.go @@ -97,11 +97,6 @@ func (s *Store) StoreBatch(_ context.Context, values [][]byte, _ int, _ bool) er return nil } -// RetrieveWithType will return the queries key/value if it exists -func (s *Store) RetrieveWithType(_ context.Context, _ []byte) ([]byte, int, error) { - return []byte{}, 0, nil -} - // NewStore returns a new memory store func NewStore() *Store { return &Store{ diff --git a/p2p/kademlia/store/sqlite/meta_worker.go b/p2p/kademlia/store/sqlite/meta_worker.go new file mode 100644 index 000000000..62d0bf92b --- /dev/null +++ b/p2p/kademlia/store/sqlite/meta_worker.go @@ -0,0 +1,238 @@ +package sqlite + +import ( + "context" + "fmt" + + "os" + "path" + "sync" + "time" + + "github.com/jmoiron/sqlx" + _ "github.com/mattn/go-sqlite3" + "github.com/pastelnetwork/gonode/common/log" +) + +var ( + commitLastAccessedInterval = 60 * time.Second + migrationMetaDB = "data001-migration-meta.sqlite3" + accessUpdateBufferSize = 100000 + commitInsertsInterval = 90 * time.Second + updateChannel chan UpdateMessage + insertChannel chan UpdateMessage +) + +func init() { + updateChannel = make(chan UpdateMessage, accessUpdateBufferSize) + insertChannel = make(chan UpdateMessage, accessUpdateBufferSize) +} + +type UpdateMessages []UpdateMessage + +// AccessUpdate holds the key and the last accessed time. +type UpdateMessage struct { + Key string + LastAccessTime time.Time + Size int +} + +// MigrationMetaStore manages database operations. +type MigrationMetaStore struct { + db *sqlx.DB + + updateTicker *time.Ticker + insertTicker *time.Ticker + + updates sync.Map + inserts sync.Map +} + +// NewMigrationMetaStore initializes the MigrationMetaStore. +func NewMigrationMetaStore(ctx context.Context, dataDir string) (*MigrationMetaStore, error) { + if _, err := os.Stat(dataDir); os.IsNotExist(err) { + if err := os.MkdirAll(dataDir, 0750); err != nil { + return nil, fmt.Errorf("mkdir %q: %w", dataDir, err) + } + } else if err != nil { + return nil, fmt.Errorf("cannot create data folder: %w", err) + } + + dbFile := path.Join(dataDir, migrationMetaDB) + db, err := sqlx.Connect("sqlite3", dbFile) + if err != nil { + return nil, fmt.Errorf("cannot open sqlite database: %w", err) + } + db.SetMaxOpenConns(20) + db.SetMaxIdleConns(10) + + handler := &MigrationMetaStore{ + db: db, + updateTicker: time.NewTicker(commitLastAccessedInterval), + insertTicker: time.NewTicker(commitInsertsInterval), + } + go handler.startLastAccessedUpdateWorker(ctx) + go handler.startInsertWorker(ctx) + + return handler, nil +} + +// PostAccessUpdate sends access updates to be handled by the worker. +func PostAccessUpdate(updates []string) { + for _, update := range updates { + select { + case updateChannel <- UpdateMessage{ + Key: update, + LastAccessTime: time.Now(), + }: + // Inserted + default: + log.WithContext(context.Background()).Error("updateChannel is full, dropping update") + } + + } +} + +// startWorker listens for updates and commits them periodically. +func (d *MigrationMetaStore) startLastAccessedUpdateWorker(ctx context.Context) { + for { + select { + case update := <-updateChannel: + d.updates.Store(update.Key, update.LastAccessTime) + + case <-d.updateTicker.C: + d.commitLastAccessedUpdates(ctx) + case <-ctx.Done(): + log.WithContext(ctx).Info("Shutting down last accessed update worker") + d.commitLastAccessedUpdates(ctx) // Commit any remaining updates before shutdown + return + } + } +} + +func (d *MigrationMetaStore) commitLastAccessedUpdates(ctx context.Context) { + tx, err := d.db.BeginTxx(ctx, nil) + if err != nil { + log.WithContext(ctx).WithError(err).Error("Error starting transaction (commitLastAccessedUpdates)") + return + } + + stmt, err := tx.Prepare("UPDATE meta SET last_access_time = ? WHERE key = ?") + if err != nil { + log.WithContext(ctx).WithError(err).Error("Error preparing statement (commitLastAccessedUpdates)") + return + } + defer stmt.Close() + + keysToUpdate := make(map[string]time.Time) + d.updates.Range(func(key, value interface{}) bool { + k, ok := key.(string) + if !ok { + return false + } + v, ok := value.(time.Time) + if !ok { + return false + } + _, err := stmt.Exec(v, k) + if err != nil { + log.WithContext(ctx).WithError(err).WithField("key", key).Error("Error executing statement (commitLastAccessedUpdates)") + return true // continue + } + keysToUpdate[k] = v + + return true // continue + }) + + if err := tx.Commit(); err != nil { + tx.Rollback() + log.WithContext(ctx).WithError(err).Error("Error committing transaction (commitLastAccessedUpdates)") + return + } + + // Clear updated keys from map after successful commit + for k := range keysToUpdate { + d.updates.Delete(k) + } + + log.WithContext(ctx).WithField("count", len(keysToUpdate)).Info("Committed last accessed updates") +} + +func PostKeysInsert(updates []UpdateMessage) { + for _, update := range updates { + select { + case insertChannel <- update: + // Inserted + default: + log.WithContext(context.Background()).Error("insertChannel is full, dropping update") + } + } +} + +// startInsertWorker listens for updates and commits them periodically. +func (d *MigrationMetaStore) startInsertWorker(ctx context.Context) { + for { + select { + case update := <-insertChannel: + d.inserts.Store(update.Key, update) + case <-d.insertTicker.C: + d.commitInserts(ctx) + case <-ctx.Done(): + log.WithContext(ctx).Info("Shutting down insert meta keys worker") + d.commitInserts(ctx) + return + } + } +} + +func (d *MigrationMetaStore) commitInserts(ctx context.Context) { + tx, err := d.db.BeginTxx(ctx, nil) + if err != nil { + log.WithContext(ctx).WithError(err).Error("Error starting transaction (commitInserts)") + return + } + + // Prepare an INSERT OR REPLACE statement that handles new insertions or updates existing entries + stmt, err := tx.Preparex("INSERT OR REPLACE INTO meta (key, last_accessed, access_count, data_size) VALUES (?, ?, ?, ?)") + if err != nil { + tx.Rollback() // Ensure to rollback in case of an error + log.WithContext(ctx).WithError(err).Error("Error preparing statement (commitInserts)") + return + } + defer stmt.Close() + + keysToUpdate := make(map[string]bool) + d.inserts.Range(func(key, value interface{}) bool { + k, ok := key.(string) + if !ok { + return false + } + v, ok := value.(UpdateMessage) + if !ok { + return false + } + // Default values for access_count and data_size can be configured here + accessCount := 1 + _, err := stmt.Exec(k, v.LastAccessTime, accessCount, v.Size) + if err != nil { + log.WithContext(ctx).WithError(err).WithField("key", k).Error("Error executing statement (commitInserts)") + return true // continue + } + keysToUpdate[k] = true + + return true // continue + }) + + if err := tx.Commit(); err != nil { + tx.Rollback() // Rollback transaction if commit fails + log.WithContext(ctx).WithError(err).Error("Error committing transaction (commitInserts)") + return + } + + // Clear updated keys from map after successful commit + for k := range keysToUpdate { + d.inserts.Delete(k) + } + + log.WithContext(ctx).WithField("count", len(keysToUpdate)).Info("Committed inserts") +} diff --git a/p2p/kademlia/store/sqlite/replication.go b/p2p/kademlia/store/sqlite/replication.go index 85ccce6d3..aa2e8a4ec 100644 --- a/p2p/kademlia/store/sqlite/replication.go +++ b/p2p/kademlia/store/sqlite/replication.go @@ -443,6 +443,7 @@ func (s *Store) RetrieveBatchValues(ctx context.Context, keys []string) ([][]byt values := make([][]byte, len(keys)) keysFound := 0 + for rows.Next() { var key string var value []byte @@ -453,6 +454,8 @@ func (s *Store) RetrieveBatchValues(ctx context.Context, keys []string) ([][]byt if idx, found := keyToIndex[key]; found { values[idx] = value keysFound++ + + PostAccessUpdate([]string{key}) } } diff --git a/p2p/kademlia/store/sqlite/sqlite.go b/p2p/kademlia/store/sqlite/sqlite.go index ba22d3b7e..8c3cf260a 100644 --- a/p2p/kademlia/store/sqlite/sqlite.go +++ b/p2p/kademlia/store/sqlite/sqlite.go @@ -382,20 +382,9 @@ func (s *Store) Retrieve(_ context.Context, key []byte) ([]byte, error) { return nil, fmt.Errorf("failed to get record by key %s: %w", hkey, err) } - return r.Data, nil -} - -// RetrieveWithType will return the queries key/value if it exists -func (s *Store) RetrieveWithType(_ context.Context, key []byte) ([]byte, int, error) { - hkey := hex.EncodeToString(key) + PostAccessUpdate([]string{hkey}) - r := Record{} - err := s.db.Get(&r, `SELECT data,datatype FROM data WHERE key = ?`, hkey) - if err != nil { - return nil, 0, fmt.Errorf("failed to get record with type by key %s: %w", hkey, err) - } - - return r.Data, r.Datatype, nil + return r.Data, nil } // Checkpoint method for the store @@ -446,9 +435,8 @@ func (s *Store) performJob(j Job) error { // storeRecord will store a key/value pair for the queries node func (s *Store) storeRecord(key []byte, value []byte, typ int, isOriginal bool) error { + hkey := hex.EncodeToString(key) operation := func() error { - hkey := hex.EncodeToString(key) - now := time.Now().UTC() r := Record{Key: hkey, Data: value, UpdatedAt: now, Datatype: typ, Isoriginal: isOriginal, CreatedAt: now} res, err := s.db.NamedExec(`INSERT INTO data(key, data, datatype, is_original, createdAt, updatedAt) values(:key, :data, :datatype, :isoriginal, :createdat, :updatedat) ON CONFLICT(key) DO UPDATE SET data=:data,updatedAt=:updatedat`, r) @@ -472,12 +460,15 @@ func (s *Store) storeRecord(key []byte, value []byte, typ int, isOriginal bool) if err != nil { return fmt.Errorf("error storing data: %w", err) } + PostKeysInsert([]UpdateMessage{{Key: hkey, LastAccessTime: time.Now(), Size: len(value)}}) return nil } // storeBatchRecord will store a batch of values with their SHA256 hash as the key func (s *Store) storeBatchRecord(values [][]byte, typ int, isOriginal bool) error { + hkeys := make([]UpdateMessage, len(values)) + operation := func() error { tx, err := s.db.Beginx() if err != nil { @@ -505,6 +496,7 @@ func (s *Store) storeBatchRecord(values [][]byte, typ int, isOriginal bool) erro } hkey := hex.EncodeToString(hashed) + hkeys[i] = UpdateMessage{Key: hkey, LastAccessTime: now, Size: len(values[i])} r := Record{Key: hkey, Data: values[i], CreatedAt: now, UpdatedAt: now, Datatype: typ, Isoriginal: isOriginal} // Execute the insert statement @@ -532,6 +524,8 @@ func (s *Store) storeBatchRecord(values [][]byte, typ int, isOriginal bool) erro return fmt.Errorf("error storing data: %w", err) } + PostKeysInsert(hkeys) + return nil }