Skip to content

Commit

Permalink
[latest-17.0](vitessio#3881): CherryPick(vitessio#14502): VReplicatio…
Browse files Browse the repository at this point in the history
…n VPlayer: support statement and transaction batching (vitessio#3883)

* cherry pick of 14502 (vitessio#3881)

Signed-off-by: Matt Lord <mattalord@gmail.com>

* Corrections

Signed-off-by: Matt Lord <mattalord@gmail.com>

---------

Signed-off-by: Matt Lord <mattalord@gmail.com>
Co-authored-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
planetscale-actions-bot and mattlord committed Dec 7, 2023
1 parent 7383005 commit a436505
Show file tree
Hide file tree
Showing 20 changed files with 697 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ import (
"testing"
"time"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
"vitess.io/vitess/go/test/endtoend/throttler"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"
)

type WriteMetrics struct {
Expand Down Expand Up @@ -184,6 +184,9 @@ func TestMain(m *testing.M) {
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,18 @@ import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/onlineddl"
"vitess.io/vitess/go/test/endtoend/throttler"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"
)

type testcase struct {
Expand Down Expand Up @@ -436,6 +436,9 @@ func TestMain(m *testing.M) {
"--migration_check_interval", "5s",
"--vstream_packet_size", "4096", // Keep this value small and below 10k to ensure multilple vstream iterations
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
16 changes: 10 additions & 6 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ func TestVreplicationCopyThrottling(t *testing.T) {
}

func TestBasicVreplicationWorkflow(t *testing.T) {
ogflags := extraVTTabletArgs
defer func() { extraVTTabletArgs = ogflags }()
// Test VPlayer batching mode.
extraVTTabletArgs = append(extraVTTabletArgs, fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching))
sourceKsOpts["DBTypeVersion"] = "mysql-8.0"
targetKsOpts["DBTypeVersion"] = "mysql-8.0"
testBasicVreplicationWorkflow(t, "noblob")
Expand Down Expand Up @@ -582,13 +587,12 @@ func testVStreamCellFlag(t *testing.T) {
func TestCellAliasVreplicationWorkflow(t *testing.T) {
cells := []string{"zone1", "zone2"}
mainClusterConfig.vreplicationCompressGTID = true

// Enable the bulk delete vplayer optimization in this test, which is disabled by default, to confirm that we
// don't have a regression due to the bulk delete functionality of this functionality.
oldVTTabletExtraArgs := extraVTTabletArgs
extraVTTabletArgs = append(extraVTTabletArgs, fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagVPlayerBatching))

extraVTTabletArgs = append(extraVTTabletArgs,
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
)
defer func() {
mainClusterConfig.vreplicationCompressGTID = false
extraVTTabletArgs = oldVTTabletExtraArgs
Expand Down
24 changes: 15 additions & 9 deletions go/vt/binlog/binlogplayer/binlog_player.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,12 @@ var (

// BlplQuery is the key for the stats map.
BlplQuery = "Query"
// BlplMultiQuery is the key for the stats map.
BlplMultiQuery = "MultiQuery"
// BlplTransaction is the key for the stats map.
BlplTransaction = "Transaction"
// BlplBatchTransaction is the key for the stats map.
BlplBatchTransaction = "BatchTransaction"

// VReplicationInit is for the Init state.
VReplicationInit = "Init"
Expand Down Expand Up @@ -94,14 +98,15 @@ type Stats struct {

State atomic.Value

PhaseTimings *stats.Timings
QueryTimings *stats.Timings
QueryCount *stats.CountersWithSingleLabel
BulkQueryCount *stats.CountersWithSingleLabel
CopyRowCount *stats.Counter
CopyLoopCount *stats.Counter
ErrorCounts *stats.CountersWithMultiLabels
NoopQueryCount *stats.CountersWithSingleLabel
PhaseTimings *stats.Timings
QueryTimings *stats.Timings
QueryCount *stats.CountersWithSingleLabel
BulkQueryCount *stats.CountersWithSingleLabel
TrxQueryBatchCount *stats.CountersWithSingleLabel
CopyRowCount *stats.Counter
CopyLoopCount *stats.Counter
ErrorCounts *stats.CountersWithMultiLabels
NoopQueryCount *stats.CountersWithSingleLabel

VReplicationLags *stats.Timings
VReplicationLagRates *stats.Rates
Expand Down Expand Up @@ -168,7 +173,8 @@ func NewStats() *Stats {
bps.PhaseTimings = stats.NewTimings("", "", "Phase")
bps.QueryTimings = stats.NewTimings("", "", "Phase")
bps.QueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "")
bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Phase", "")
bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "")
bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement", "")
bps.CopyRowCount = stats.NewCounter("", "")
bps.CopyLoopCount = stats.NewCounter("", "")
bps.ErrorCounts = stats.NewCountersWithMultiLabels("", "", []string{"type"})
Expand Down
40 changes: 40 additions & 0 deletions go/vt/binlog/binlogplayer/dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package binlogplayer
import (
"context"
"fmt"
"strings"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/mysql"
Expand All @@ -38,6 +39,7 @@ type DBClient interface {
Rollback() error
Close()
ExecuteFetch(query string, maxrows int) (qr *sqltypes.Result, err error)
ExecuteFetchMulti(query string, maxrows int) (qrs []*sqltypes.Result, err error)
}

// dbClientImpl is a real DBClient backed by a mysql connection.
Expand Down Expand Up @@ -140,6 +142,25 @@ func (dc *dbClientImpl) ExecuteFetch(query string, maxrows int) (*sqltypes.Resul
return mqr, nil
}

func (dc *dbClientImpl) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
results := make([]*sqltypes.Result, 0)
mqr, more, err := dc.dbConn.ExecuteFetchMulti(query, maxrows, true)
if err != nil {
dc.handleError(err)
return nil, err
}
results = append(results, mqr)
for more {
mqr, more, _, err = dc.dbConn.ReadQueryResult(maxrows, false)
if err != nil {
dc.handleError(err)
return nil, err
}
results = append(results, mqr)
}
return results, nil
}

func (dcr *dbClientImplWithSidecarDBReplacement) ExecuteFetch(query string, maxrows int) (*sqltypes.Result, error) {
// Replace any provided sidecar database qualifiers with the correct one.
uq, err := sqlparser.ReplaceTableQualifiers(query, sidecar.DefaultName, sidecar.GetName())
Expand All @@ -148,3 +169,22 @@ func (dcr *dbClientImplWithSidecarDBReplacement) ExecuteFetch(query string, maxr
}
return dcr.dbClientImpl.ExecuteFetch(uq, maxrows)
}

func (dcr *dbClientImplWithSidecarDBReplacement) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
// Replace any provided sidecar database qualifiers with the correct one.
qps, err := sqlparser.SplitStatementToPieces(query)
if err != nil {
return nil, err
}
for i, qp := range qps {
uq, err := sqlparser.ReplaceTableQualifiers(qp, sidecar.DefaultName, sidecar.GetName())
if err != nil {
return nil, err
}
qps[i] = uq
}
if err != nil {
return nil, err
}
return dcr.dbClientImpl.ExecuteFetchMulti(strings.Join(qps, ";"), maxrows)
}
4 changes: 4 additions & 0 deletions go/vt/binlog/binlogplayer/fake_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,7 @@ func (dc *fakeDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
}
return nil, fmt.Errorf("unexpected: %v", query)
}

func (dc *fakeDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
return make([]*sqltypes.Result, 0), nil
}
17 changes: 17 additions & 0 deletions go/vt/binlog/binlogplayer/mock_dbclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/sqlparser"
)

const mockClientUNameFiltered = "Filtered"
Expand Down Expand Up @@ -190,3 +191,19 @@ func (dc *MockDBClient) ExecuteFetch(query string, maxrows int) (qr *sqltypes.Re
}
return result.result, result.err
}

func (dc *MockDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
queries, err := sqlparser.SplitStatementToPieces(query)
if err != nil {
return nil, err
}
results := make([]*sqltypes.Result, 0, len(queries))
for _, query := range queries {
qr, err := dc.ExecuteFetch(query, maxrows)
if err != nil {
return nil, err
}
results = append(results, qr)
}
return results, nil
}
2 changes: 2 additions & 0 deletions go/vt/vttablet/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ import (
)

const (
// VReplicationExperimentalFlags is a bitmask of experimental features in vreplication.
VReplicationExperimentalFlagOptimizeInserts = int64(1)
VReplicationExperimentalFlagAllowNoBlobBinlogRowImage = int64(2)
VReplicationExperimentalFlagVPlayerBatching = int64(4)
)

var (
// Default flags.
VReplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage
VReplicationNetReadTimeout = 300
VReplicationNetWriteTimeout = 600
Expand Down
16 changes: 16 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,22 @@ func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Resu
return qr, err
}

func (dbc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
queries, err := sqlparser.SplitStatementToPieces(query)
if err != nil {
return nil, err
}
results := make([]*sqltypes.Result, 0, len(queries))
for _, query := range queries {
qr, err := dbc.ExecuteFetch(query, maxrows)
if err != nil {
return nil, err
}
results = append(results, qr)
}
return results, nil
}

//----------------------------------------------
// fakeTMClient

Expand Down
31 changes: 24 additions & 7 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,26 +28,25 @@ import (
"testing"
"time"

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/vttablet"

"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/dbconfigs"

"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/proto"

_flag "vitess.io/vitess/go/internal/flag"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sidecardb"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/queryservice/fakes"
"vitess.io/vitess/go/vt/vttablet/tabletconn"
Expand All @@ -72,6 +71,7 @@ var (
globalFBC = &fakeBinlogClient{}
vrepldb = "vrepl"
globalDBQueries = make(chan string, 1000)
lastMultiExecQuery = ""
testForeignKeyQueries = false
doNotLogDBQueries = false
)
Expand Down Expand Up @@ -493,6 +493,23 @@ func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Resu
return qr, err
}

func (dc *realDBClient) ExecuteFetchMulti(query string, maxrows int) ([]*sqltypes.Result, error) {
queries, err := sqlparser.SplitStatementToPieces(query)
if err != nil {
return nil, err
}
results := make([]*sqltypes.Result, 0, len(queries))
for _, query := range queries {
qr, err := dc.ExecuteFetch(query, maxrows)
if err != nil {
return nil, err
}
results = append(results, qr)
}
lastMultiExecQuery = query
return results, nil
}

func expectDeleteQueries(t *testing.T) {
t.Helper()
expectNontxQueries(t, qh.Expect(
Expand Down
Loading

0 comments on commit a436505

Please sign in to comment.