Skip to content

Commit

Permalink
in-code migration to fill posts table
Browse files Browse the repository at this point in the history
  • Loading branch information
poszu committed Jul 1, 2024
1 parent 752b5d4 commit b7cba29
Show file tree
Hide file tree
Showing 5 changed files with 267 additions and 12 deletions.
5 changes: 4 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ import (
"github.com/spacemeshos/go-spacemesh/sql/layers"
"github.com/spacemeshos/go-spacemesh/sql/localsql"
dbmetrics "github.com/spacemeshos/go-spacemesh/sql/metrics"
"github.com/spacemeshos/go-spacemesh/sql/migrations"
"github.com/spacemeshos/go-spacemesh/syncer"
"github.com/spacemeshos/go-spacemesh/syncer/atxsync"
"github.com/spacemeshos/go-spacemesh/syncer/blockssync"
Expand Down Expand Up @@ -1901,14 +1902,16 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
if err := os.MkdirAll(dbPath, os.ModePerm); err != nil {
return fmt.Errorf("failed to create %s: %w", dbPath, err)
}
dbLog := app.addLogger(StateDbLogger, lg)
m21 := migrations.New0021Migration(dbLog.Zap(), 100_000)
migrations, err := sql.StateMigrations()
if err != nil {
return fmt.Errorf("failed to load migrations: %w", err)
}
dbLog := app.addLogger(StateDbLogger, lg)
dbopts := []sql.Opt{
sql.WithLogger(dbLog.Zap()),
sql.WithMigrations(migrations),
sql.WithMigration(m21),
sql.WithConnections(app.Config.DatabaseConnections),
sql.WithLatencyMetering(app.Config.DatabaseLatencyMetering),
sql.WithVacuumState(app.Config.DatabaseVacuumState),
Expand Down
17 changes: 10 additions & 7 deletions sql/atxs/atxs.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,16 +453,19 @@ func Add(db sql.Executor, atx *types.ActivationTx) error {
return fmt.Errorf("insert ATX ID %v: %w", atx.ID(), err)
}

enc = func(stmt *sql.Statement) {
stmt.BindBytes(1, atx.ID().Bytes())
stmt.BindBytes(2, atx.Blob)
stmt.BindInt64(3, int64(atx.Version))
return AddBlob(db, atx.ID(), atx.Blob, atx.Version)
}

func AddBlob(db sql.Executor, id types.ATXID, blob []byte, version types.AtxVersion) error {
enc := func(stmt *sql.Statement) {
stmt.BindBytes(1, id.Bytes())
stmt.BindBytes(2, blob)
stmt.BindInt64(3, int64(version))
}
_, err = db.Exec("insert into atx_blobs (id, atx, version) values (?1, ?2, ?3)", enc, nil)
_, err := db.Exec("insert into atx_blobs (id, atx, version) values (?1, ?2, ?3)", enc, nil)
if err != nil {
return fmt.Errorf("insert ATX blob %v: %w", atx.ID(), err)
return fmt.Errorf("insert ATX blob %v: %w", id, err)
}

return nil
}

Expand Down
5 changes: 1 addition & 4 deletions sql/migrations/state/0021_atx_posts.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
-- Table showing the exact number of PoST units commited by smesher in given ATX.
-- TODO(poszu): Migrate data for existing ATXs (require decoding blobs to be correct).
-- Alternatively, we could take the effective numUnits from `atxs` table,
-- which would be faster but it could cause temporary harm for ATXs growing in size.
CREATE TABLE posts (
atxid CHAR(32) NOT NULL,
units INT NOT NULL,
pubkey CHAR(32) NOT NULL,
units INT NOT NULL,
UNIQUE (atxid, pubkey)
);

Expand Down
156 changes: 156 additions & 0 deletions sql/migrations/state_0021_migration.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package migrations

import (
"errors"
"fmt"

"go.uber.org/zap"

"github.com/spacemeshos/go-spacemesh/activation/wire"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
)

type migration0021 struct {
batch int
logger *zap.Logger
}

func New0021Migration(log *zap.Logger, batch int) *migration0021 {
return &migration0021{
logger: log,
batch: batch,
}
}

func (*migration0021) Name() string {
return "populate posts table with units for each ATX"
}

func (*migration0021) Order() int {
return 21
}

func (*migration0021) Rollback() error {
return nil
}

func (m *migration0021) Apply(db sql.Executor) error {
if err := m.createTable(db); err != nil {
return err
}
var total int
_, err := db.Exec("SELECT count(*) FROM atx_blobs", nil, func(s *sql.Statement) bool {
total = s.ColumnInt(0)
return false
})
if err != nil {
return fmt.Errorf("counting all ATXs %w", err)
}
m.logger.Info("applying migration 21", zap.Int("total", total))

for offset := 0; ; offset += m.batch {
n, err := m.processBatch(db, offset, m.batch)
if err != nil {
return err
}

processed := offset + n
progress := float64(processed) * 100.0 / float64(total)
m.logger.Info("processed ATXs", zap.Float64("progress [%]", progress))
if processed >= total {
return nil
}
}
}

func (m *migration0021) createTable(db sql.Executor) error {
query := `CREATE TABLE posts (
atxid CHAR(32) NOT NULL,
pubkey CHAR(32) NOT NULL,
units INT NOT NULL,
UNIQUE (atxid, pubkey)
);`
_, err := db.Exec(query, nil, nil)
if err != nil {
return fmt.Errorf("creating posts table: %w", err)
}

query = "CREATE INDEX posts_by_atxid_by_pubkey ON posts (atxid, pubkey);"
_, err = db.Exec(query, nil, nil)
if err != nil {
return fmt.Errorf("creating index `posts_by_atxid_by_pubkey`: %w", err)
}
return nil
}

type update struct {
id types.NodeID
units uint32
}

func (m *migration0021) processBatch(db sql.Executor, offset, size int) (int, error) {
var blob sql.Blob
var id types.ATXID
var procErr error
updates := make(map[types.ATXID]*update)
rows, err := db.Exec("SELECT id, atx, version FROM atx_blobs LIMIT ?1 OFFSET ?2",
func(s *sql.Statement) {
s.BindInt64(1, int64(size))
s.BindInt64(2, int64(offset))
},
func(stmt *sql.Statement) bool {
_, procErr = stmt.ColumnReader(0).Read(id[:])
if procErr != nil {
return false
}

blob.FromColumn(stmt, 1)
version := types.AtxVersion(stmt.ColumnInt(2))

upd, err := processATX(types.AtxBlob{Blob: blob.Bytes, Version: version})
if err != nil {
procErr = fmt.Errorf("processing ATX %s: %w", id, err)
return false
}
updates[id] = upd
return true
},
)

if err := errors.Join(err, procErr); err != nil {
return 0, fmt.Errorf("getting ATX blobs: %w", err)
}
if rows == 0 {
return 0, nil
}

if err := m.applyPendingUpdates(db, updates); err != nil {
return 0, fmt.Errorf("applying updates: %w", err)
}
return rows, nil
}

func (m *migration0021) applyPendingUpdates(db sql.Executor, updates map[types.ATXID]*update) error {
for id, upd := range updates {
atxs.SetUnits(db, id, map[types.NodeID]uint32{upd.id: upd.units})
}
return nil
}

func processATX(blob types.AtxBlob) (*update, error) {
switch blob.Version {
case 0:
fallthrough
case types.AtxV1:
var watx wire.ActivationTxV1
if err := codec.Decode(blob.Blob, &watx); err != nil {
return nil, fmt.Errorf("decoding ATX V1: %w", err)
}
return &update{watx.SmesherID, watx.NumUnits}, nil
default:
return nil, fmt.Errorf("unsupported ATX version: %d", blob.Version)
}
}
96 changes: 96 additions & 0 deletions sql/migrations/state_0021_migration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package migrations

import (
"strings"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"github.com/spacemeshos/go-spacemesh/activation/wire"
"github.com/spacemeshos/go-spacemesh/codec"
"github.com/spacemeshos/go-spacemesh/common/types"
"github.com/spacemeshos/go-spacemesh/signing"
"github.com/spacemeshos/go-spacemesh/sql"
"github.com/spacemeshos/go-spacemesh/sql/atxs"
)

// Test that in-code migration results in the same schema as the .sql one.
func Test0021Migration_CompatibleSchema(t *testing.T) {
db := sql.InMemory(
sql.WithLogger(zaptest.NewLogger(t)),
sql.WithMigration(New0021Migration(zaptest.NewLogger(t), 1000)),
)

var schemasInCode []string
_, err := db.Exec("SELECT sql FROM sqlite_schema;", nil, func(stmt *sql.Statement) bool {
sql := stmt.ColumnText(0)
sql = strings.Join(strings.Fields(sql), " ") // remove whitespace
schemasInCode = append(schemasInCode, sql)
return true
})
require.NoError(t, err)
require.NoError(t, db.Close())

db = sql.InMemory()

var schemasInFile []string
_, err = db.Exec("SELECT sql FROM sqlite_schema;", nil, func(stmt *sql.Statement) bool {
sql := stmt.ColumnText(0)
sql = strings.Join(strings.Fields(sql), " ") // remove whitespace
schemasInFile = append(schemasInFile, sql)
return true
})
require.NoError(t, err)
require.NoError(t, db.Close())

require.Equal(t, schemasInFile, schemasInCode)
}

func Test0021Migration(t *testing.T) {
db := sql.InMemory(
sql.WithLogger(zaptest.NewLogger(t)),
sql.WithSkipMigrations(21),
)

var signers [177]*signing.EdSigner
for i := range signers {
var err error
signers[i], err = signing.NewEdSigner()
require.NoError(t, err)
}
type post struct {
id types.NodeID
units uint32
}
allPosts := make(map[types.EpochID]map[types.ATXID]post)
for epoch := range types.EpochID(40) {
allPosts[epoch] = make(map[types.ATXID]post)
for _, signer := range signers {
watx := wire.ActivationTxV1{
InnerActivationTxV1: wire.InnerActivationTxV1{
NumUnits: epoch.Uint32() * 10,
Coinbase: types.Address(types.RandomBytes(24)),
},
SmesherID: signer.NodeID(),
}
require.NoError(t, atxs.AddBlob(db, watx.ID(), codec.MustEncode(&watx), 0))
allPosts[epoch][watx.ID()] = post{
id: signer.NodeID(),
units: watx.NumUnits,
}
}
}

m := New0021Migration(zaptest.NewLogger(t), 1000)
require.Equal(t, 21, m.Order())
require.NoError(t, m.Apply(db))

for _, posts := range allPosts {
for atx, post := range posts {
units, err := atxs.Units(db, atx, post.id)
require.NoError(t, err)
require.Equal(t, post.units, units)
}
}
}

0 comments on commit b7cba29

Please sign in to comment.