Skip to content

Commit

Permalink
Merge pull request #11352 from vegaprotocol/datanode-fixes
Browse files Browse the repository at this point in the history
Remove nested transactions, connection source can't handle them
  • Loading branch information
EVODelavega authored Jun 14, 2024
2 parents dcde144 + d24fdcc commit 70ae513
Show file tree
Hide file tree
Showing 102 changed files with 745 additions and 670 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
- [11319](https://github.com/vegaprotocol/vega/issues/11319) - Do not leak Ethereum client secrets in the logs.
- [11336](https://github.com/vegaprotocol/vega/issues/11336) - Add support for decay factor in governance recurring transfers and report the proposal amount rather than 0 when the proposal gets enacted.
- [11368](https://github.com/vegaprotocol/vega/issues/11368) - Add support for update vesting stats in REST API and fix summing the quantum balance for vesting stats.
- [11136](https://github.com/vegaprotocol/vega/issues/11136) - Fix premature invocation of post commit hooks in case of fee stats event.

## 0.76.1

Expand Down
16 changes: 9 additions & 7 deletions blockexplorer/store/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,22 @@ func TestMain(m *testing.M) {
}

cancel()
connectionSource, err = sqlstore.NewTransactionalConnectionSource(log, sqlConfig.ConnectionConfig)
ctx, cancel = context.WithCancel(context.Background())
connectionSource, err = sqlstore.NewTransactionalConnectionSource(ctx, log, sqlConfig.ConnectionConfig)
if err != nil {
cancel()
panic(err)
}
defer embeddedPostgres.Stop()

if err = sqlstore.WipeDatabaseAndMigrateSchemaToLatestVersion(log, sqlConfig.ConnectionConfig, store.EmbedMigrations, false); err != nil {
log.Errorf("failed to wipe database and migrate schema, dumping postgres log:\n %s", postgresLog.String())
cancel()
panic(err)
}

code := m.Run()
cancel()
os.Exit(code)
}

Expand All @@ -119,7 +123,6 @@ type txResult struct {
func addTestTxResults(ctx context.Context, t *testing.T, txResultTable string, txResults ...txResult) []*pb.Transaction {
t.Helper()

conn := connectionSource.Connection
rows := make([]*pb.Transaction, 0, len(txResults))
blockIDs := make(map[int64]int64)

Expand All @@ -136,14 +139,14 @@ func addTestTxResults(ctx context.Context, t *testing.T, txResultTable string, t
var ok bool

if blockID, ok = blockIDs[txr.height]; !ok {
require.NoError(t, conn.QueryRow(ctx, blockSQL, txr.height, "test-chain", txr.createdAt).Scan(&blockID))
require.NoError(t, connectionSource.QueryRow(ctx, blockSQL, txr.height, "test-chain", txr.createdAt).Scan(&blockID))
blockIDs[txr.height] = blockID
}

index := txr.index

var rowID int64
require.NoError(t, conn.QueryRow(ctx, resultSQL, blockID, index, txr.createdAt, txr.txHash, txr.txResult, txr.submitter, txr.cmdType).Scan(&rowID))
require.NoError(t, connectionSource.QueryRow(ctx, resultSQL, blockID, index, txr.createdAt, txr.txHash, txr.txResult, txr.submitter, txr.cmdType).Scan(&rowID))

row := entities.TxResultRow{
RowID: rowID,
Expand All @@ -168,10 +171,9 @@ func addTestTxResults(ctx context.Context, t *testing.T, txResultTable string, t
func cleanupTransactionsTest(ctx context.Context, t *testing.T) {
t.Helper()

conn := connectionSource.Connection
_, err := conn.Exec(ctx, `DELETE FROM tx_results`)
_, err := connectionSource.Exec(ctx, `DELETE FROM tx_results`)
require.NoError(t, err)
_, err = conn.Exec(ctx, `DELETE FROM blocks`)
_, err = connectionSource.Exec(ctx, `DELETE FROM blocks`)
require.NoError(t, err)
}

Expand Down
6 changes: 3 additions & 3 deletions cmd/data-node/commands/networkhistory/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (cmd *fetchCmd) Execute(args []string) error {
return fmt.Errorf("failed to fix config:%w", err)
}

err = verifyChainID(cmd.SQLStore.ConnectionConfig, cmd.ChainID)
err = verifyChainID(ctx, cmd.SQLStore.ConnectionConfig, cmd.ChainID)
if err != nil {
return fmt.Errorf("failed to verify chain id:%w", err)
}
Expand Down Expand Up @@ -99,8 +99,8 @@ func (cmd *fetchCmd) Execute(args []string) error {
return nil
}

func verifyChainID(connConfig sqlstore.ConnectionConfig, chainID string) error {
connSource, err := sqlstore.NewTransactionalConnectionSource(logging.NewTestLogger(), connConfig)
func verifyChainID(ctx context.Context, connConfig sqlstore.ConnectionConfig, chainID string) error {
connSource, err := sqlstore.NewTransactionalConnectionSource(ctx, logging.NewTestLogger(), connConfig)
if err != nil {
return fmt.Errorf("failed to create new transactional connection source: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/data-node/commands/networkhistory/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (cmd *loadCmd) Execute(args []string) error {
return fmt.Errorf("failed to kill all connections to database: %w", err)
}

connPool, err := getCommandConnPool(cmd.Config.SQLStore.ConnectionConfig)
connPool, err := getCommandConnPool(ctx, cmd.Config.SQLStore.ConnectionConfig)
if err != nil {
return fmt.Errorf("failed to get command connection pool: %w", err)
}
Expand All @@ -89,7 +89,7 @@ func (cmd *loadCmd) Execute(args []string) error {
}

if hasSchema {
err = verifyChainID(cmd.SQLStore.ConnectionConfig, cmd.ChainID)
err = verifyChainID(ctx, cmd.SQLStore.ConnectionConfig, cmd.ChainID)
if err != nil {
if !errors.Is(err, networkhistory.ErrChainNotFound) {
return fmt.Errorf("failed to verify chain id:%w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/data-node/commands/networkhistory/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (cmd *rollbackCmd) Execute(args []string) error {
return fmt.Errorf("failed to kill all connections to database: %w", err)
}

connPool, err := getCommandConnPool(cmd.Config.SQLStore.ConnectionConfig)
connPool, err := getCommandConnPool(ctx, cmd.Config.SQLStore.ConnectionConfig)
if err != nil {
return fmt.Errorf("failed to get command connection pool: %w", err)
}
Expand Down
12 changes: 7 additions & 5 deletions cmd/data-node/commands/networkhistory/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ func (o *showOutput) printHuman(allSegments bool) {
}

func (cmd *showCmd) Execute(_ []string) error {
ctx, cfunc := context.WithCancel(context.Background())
defer cfunc()
cfg := logging.NewDefaultConfig()
cfg.Custom.Zap.Level = logging.WarnLevel
cfg.Environment = "custom"
Expand Down Expand Up @@ -111,7 +113,7 @@ func (cmd *showCmd) Execute(_ []string) error {
}
defer func() { _ = conn.Close() }()

response, err := client.ListAllNetworkHistorySegments(context.Background(), &v2.ListAllNetworkHistorySegmentsRequest{})
response, err := client.ListAllNetworkHistorySegments(ctx, &v2.ListAllNetworkHistorySegmentsRequest{})
if err != nil {
handleErr(log, cmd.Output.IsJSON(), "failed to list all network history segments", err)
os.Exit(1)
Expand All @@ -127,13 +129,13 @@ func (cmd *showCmd) Execute(_ []string) error {
segments := segment.Segments[*v2.HistorySegment](response.Segments)
output.ContiguousHistories = segments.AllContigousHistories()

pool, err := getCommandConnPool(cmd.Config.SQLStore.ConnectionConfig)
pool, err := getCommandConnPool(ctx, cmd.Config.SQLStore.ConnectionConfig)
if err != nil {
handleErr(log, cmd.Output.IsJSON(), "failed to get command conn pool", err)
}
defer pool.Close()

span, err := sqlstore.GetDatanodeBlockSpan(context.Background(), pool)
span, err := sqlstore.GetDatanodeBlockSpan(ctx, pool)
if err != nil {
handleErr(log, cmd.Output.IsJSON(), "failed to get datanode block span", err)
os.Exit(1)
Expand All @@ -156,10 +158,10 @@ func (cmd *showCmd) Execute(_ []string) error {
return nil
}

func getCommandConnPool(conf sqlstore.ConnectionConfig) (*pgxpool.Pool, error) {
func getCommandConnPool(ctx context.Context, conf sqlstore.ConnectionConfig) (*pgxpool.Pool, error) {
conf.MaxConnPoolSize = 3

connPool, err := sqlstore.CreateConnectionPool(conf)
connPool, err := sqlstore.CreateConnectionPool(ctx, conf)
if err != nil {
return nil, fmt.Errorf("failed to create connection pool: %w", err)
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/data-node/commands/start/node_pre.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (l *NodeCommand) persistentPre([]string) (err error) {

preLog.Info("Enabling SQL stores")

l.transactionalConnectionSource, err = sqlstore.NewTransactionalConnectionSource(preLog, l.conf.SQLStore.ConnectionConfig)
l.transactionalConnectionSource, err = sqlstore.NewTransactionalConnectionSource(l.ctx, preLog, l.conf.SQLStore.ConnectionConfig)
if err != nil {
return fmt.Errorf("failed to create transactional connection source: %w", err)
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func (l *NodeCommand) initialiseDatabase(preLog *logging.Logger) error {
var err error
conf := l.conf.SQLStore.ConnectionConfig
conf.MaxConnPoolSize = 1
pool, err := sqlstore.CreateConnectionPool(conf)
pool, err := sqlstore.CreateConnectionPool(l.ctx, conf)
if err != nil {
return fmt.Errorf("failed to create connection pool: %w", err)
}
Expand Down Expand Up @@ -333,7 +333,7 @@ func (l *NodeCommand) initialiseNetworkHistory(preLog *logging.Logger, connConfi
connConfig.MaxConnPoolSize = 3
connConfig.MinConnPoolSize = 3

networkHistoryPool, err := sqlstore.CreateConnectionPool(connConfig)
networkHistoryPool, err := sqlstore.CreateConnectionPool(l.ctx, connConfig)
if err != nil {
return fmt.Errorf("failed to create network history connection pool: %w", err)
}
Expand Down
20 changes: 2 additions & 18 deletions datanode/api/trading_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package api_test
import (
"context"
"fmt"
"io"
"net"
"testing"
"time"
Expand All @@ -40,8 +39,6 @@ import (

"github.com/golang/mock/gomock"
"github.com/golang/protobuf/proto"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v4"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
Expand Down Expand Up @@ -98,9 +95,8 @@ func getTestGRPCServer(t *testing.T, ctx context.Context) (tidy func(), conn *gr

conf.CandlesV2.CandleStore.DefaultCandleIntervals = ""

sqlConn := &sqlstore.ConnectionSource{
Connection: dummyConnection{},
}
sqlConn := &sqlstore.ConnectionSource{}
sqlConn.ToggleTest() // ensure calls to query and copyTo do not fail

bro, err := broker.New(ctx, logging.NewTestLogger(), conf.Broker, "", eventSource)
if err != nil {
Expand Down Expand Up @@ -257,18 +253,6 @@ func getTestGRPCServer(t *testing.T, ctx context.Context) (tidy func(), conn *gr
return tidy, conn, mockCoreServiceClient, err
}

type dummyConnection struct {
sqlstore.Connection
}

func (d dummyConnection) Query(context.Context, string, ...interface{}) (pgx.Rows, error) {
return nil, pgx.ErrNoRows
}

func (d dummyConnection) CopyTo(context.Context, io.Writer, string, ...any) (pgconn.CommandTag, error) {
return pgconn.CommandTag{}, nil
}

func TestSubmitTransaction(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
Expand Down
10 changes: 1 addition & 9 deletions datanode/broker/sqlstore_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func (b *SQLStoreBroker) Receive(ctx context.Context) error {
return err
}

dbContext, err := b.transactionManager.WithConnection(context.Background())
dbContext, err := b.transactionManager.WithConnection(ctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -228,14 +228,6 @@ func (b *SQLStoreBroker) processBlock(ctx context.Context, dbContext context.Con
betweenBlocks := false
refreshMaterializedViews := false
for {
// Do a pre-check on ctx.Done() since select() cases are randomized, this reduces
// the number of things we'll keep trying to handle after we are cancelled.
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
}

blockTimer.stopTimer()
select {
case <-ctx.Done():
Expand Down
16 changes: 8 additions & 8 deletions datanode/networkhistory/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@ func TestMain(t *testing.M) {
}
}()

exitCode := databasetest.TestMain(t, func(config sqlstore.Config, source *sqlstore.ConnectionSource,
exitCode := databasetest.TestMain(t, outerCtx, func(config sqlstore.Config, source *sqlstore.ConnectionSource,
pgLog *bytes.Buffer,
) {
sqlConfig = config
log.Infof("DB Connection String: ", sqlConfig.ConnectionConfig.GetConnectionString())

pool, err := sqlstore.CreateConnectionPool(sqlConfig.ConnectionConfig)
pool, err := sqlstore.CreateConnectionPool(outerCtx, sqlConfig.ConnectionConfig)
if err != nil {
panic(fmt.Errorf("failed to create connection pool: %w", err))
}
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestMain(t *testing.M) {
return nil
})

preUpgradeBroker, err := setupSQLBroker(ctx, sqlConfig, snapshotService,
preUpgradeBroker, err := setupSQLBroker(outerCtx, sqlConfig, snapshotService,
func(ctx context.Context, service *snapshot.Service, chainId string, lastCommittedBlockHeight int64, snapshotTaken bool) {
if lastCommittedBlockHeight > 0 && lastCommittedBlockHeight%snapshotInterval == 0 {
lastSnapshot, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight)
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestMain(t *testing.M) {
return nil
})

postUpgradeBroker, err := setupSQLBroker(ctx, sqlConfig, snapshotService,
postUpgradeBroker, err := setupSQLBroker(outerCtx, sqlConfig, snapshotService,
func(ctx context.Context, service *snapshot.Service, chainId string, lastCommittedBlockHeight int64, snapshotTaken bool) {
if lastCommittedBlockHeight > 0 && lastCommittedBlockHeight%snapshotInterval == 0 {
lastSnapshot, err := service.CreateSnapshotAsynchronously(ctx, chainId, lastCommittedBlockHeight)
Expand Down Expand Up @@ -862,7 +862,7 @@ func TestRestoreFromPartialHistoryAndProcessEvents(t *testing.T) {
assert.Equal(t, int64(2001), loaded.LoadedFromHeight)
assert.Equal(t, int64(3000), loaded.LoadedToHeight)

connSource, err := sqlstore.NewTransactionalConnectionSource(logging.NewTestLogger(), sqlConfig.ConnectionConfig)
connSource, err := sqlstore.NewTransactionalConnectionSource(ctx, logging.NewTestLogger(), sqlConfig.ConnectionConfig)
require.NoError(t, err)
defer connSource.Close()

Expand Down Expand Up @@ -951,7 +951,7 @@ func TestRestoreFromFullHistorySnapshotAndProcessEvents(t *testing.T) {
assert.Equal(t, int64(1), loaded.LoadedFromHeight)
assert.Equal(t, int64(2000), loaded.LoadedToHeight)

connSource, err := sqlstore.NewTransactionalConnectionSource(logging.NewTestLogger(), sqlConfig.ConnectionConfig)
connSource, err := sqlstore.NewTransactionalConnectionSource(ctx, logging.NewTestLogger(), sqlConfig.ConnectionConfig)
require.NoError(t, err)
defer connSource.Close()

Expand Down Expand Up @@ -1054,7 +1054,7 @@ func TestRestoreFromFullHistorySnapshotWithIndexesAndOrderTriggersAndProcessEven
assert.Equal(t, int64(1), loaded.LoadedFromHeight)
assert.Equal(t, int64(2000), loaded.LoadedToHeight)

connSource, err := sqlstore.NewTransactionalConnectionSource(logging.NewTestLogger(), sqlConfig.ConnectionConfig)
connSource, err := sqlstore.NewTransactionalConnectionSource(ctx, logging.NewTestLogger(), sqlConfig.ConnectionConfig)
require.NoError(t, err)
defer connSource.Close()

Expand Down Expand Up @@ -1310,7 +1310,7 @@ func setupSQLBroker(ctx context.Context, testDbConfig sqlstore.Config, snapshotS
onBlockCommitted func(ctx context.Context, service *snapshot.Service, chainId string,
lastCommittedBlockHeight int64, snapshotTaken bool), evtSource eventSource, protocolUpdateHandler ProtocolUpgradeHandler,
) (sqlStoreBroker, error) {
transactionalConnectionSource, err := sqlstore.NewTransactionalConnectionSource(logging.NewTestLogger(), testDbConfig.ConnectionConfig)
transactionalConnectionSource, err := sqlstore.NewTransactionalConnectionSource(ctx, logging.NewTestLogger(), testDbConfig.ConnectionConfig)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions datanode/service/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func (m *Markets) Upsert(ctx context.Context, market *entities.Market) error {
return err
}
m.cacheLock.Lock()
if market.State == entities.MarketStateSettled || market.State == entities.MarketStateRejected {
// a settled or rejected market can be safely removed from this map.
if market.State == entities.MarketStateSettled || market.State == entities.MarketStateRejected || market.State == entities.MarketStateCancelled {
// a settled, cancelled, or rejected market can be safely removed from this map.
delete(m.sf, market.ID)
delete(m.isSpotCache, market.ID)
} else {
Expand Down
Loading

0 comments on commit 70ae513

Please sign in to comment.