Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Add more log and compress option #202

Merged
merged 8 commits into from
Nov 20, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/dumpling/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
_ "net/http/pprof"
"os"

"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/zap"

Expand Down Expand Up @@ -56,6 +57,11 @@ func main() {
os.Exit(1)
}

registry := prometheus.NewRegistry()
registry.MustRegister(prometheus.NewProcessCollector(prometheus.ProcessCollectorOpts{}))
registry.MustRegister(prometheus.NewGoCollector())
export.RegisterMetrics(registry)
prometheus.DefaultGatherer = registry
err = export.Dump(context.Background(), conf)
if err != nil {
log.Error("dump failed error stack info", zap.Error(err))
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ require (
github.com/go-sql-driver/mysql v1.5.0
github.com/mattn/go-colorable v0.1.7 // indirect
github.com/mattn/go-runewidth v0.0.9 // indirect
github.com/pingcap/br v0.0.0-20201027124415-c2ed897feada
github.com/pingcap/br v0.0.0-20201119111016-600102357a27
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20200917111840-a15ef68f753d
github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce
github.com/pingcap/log v0.0.0-20200828042413-fce0951f1463
github.com/pingcap/tidb-tools v4.0.8-0.20200927084250-e47e0e12c7f3+incompatible
github.com/prometheus/client_golang v1.5.1
github.com/soheilhy/cmux v0.1.4
github.com/spf13/pflag v1.0.5
github.com/tikv/pd v1.1.0-beta.0.20200910042021-254d1345be09
Expand Down
78 changes: 76 additions & 2 deletions go.sum

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions tests/naughty_strings/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ run_sql_file "$DUMPLING_BASE_NAME/data/naughty_strings.t.sql"
run_dumpling --escape-backslash=false
# FIXME should compare the schemas too, but they differ too much among MySQL versions.
diff "$DUMPLING_BASE_NAME/expect/naughty_strings.t.sql" "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql"

# run with compress option
rm "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql"
run_dumpling --escape-backslash=false --compress
file_should_exist "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql.gz"
gzip "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql.gz" -d
diff "$DUMPLING_BASE_NAME/expect/naughty_strings.t.sql" "$DUMPLING_OUTPUT_DIR/naughty_strings.t.000000000.sql"

run_sql_file "$DUMPLING_BASE_NAME/data/naughty_strings.escape-schema.sql"
run_sql_file "$DUMPLING_BASE_NAME/data/naughty_strings.escape.sql"
run_dumpling --escape-backslash=true
Expand Down
9 changes: 9 additions & 0 deletions v4/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/errors"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"go.uber.org/zap"

Expand Down Expand Up @@ -64,6 +65,7 @@ const (
flagParams = "params"
flagReadTimeout = "read-timeout"
flagTransactionalConsistency = "transactional-consistency"
flagCompress = "compress"

FlagHelp = "help"
)
Expand Down Expand Up @@ -108,6 +110,7 @@ type Config struct {
CsvSeparator string
CsvDelimiter string
ReadTimeout time.Duration
Compress bool

TableFilter filter.Filter `json:"-"`
Rows uint64
Expand All @@ -121,6 +124,7 @@ type Config struct {
SessionParams map[string]interface{}

PosAfterConnect bool
Labels prometheus.Labels

ExternalStorage storage.ExternalStorage `json:"-"`
}
Expand Down Expand Up @@ -233,6 +237,7 @@ func (conf *Config) DefineFlags(flags *pflag.FlagSet) {
flags.Duration(flagReadTimeout, 15*time.Minute, "I/O read timeout for db connection.")
flags.MarkHidden(flagReadTimeout)
flags.Bool(flagTransactionalConsistency, true, "Only support transactional consistency")
flags.BoolP(flagCompress, "c", false, "Compress output files")
kennytm marked this conversation as resolved.
Show resolved Hide resolved
}

// GetDSN generates DSN from Config
Expand Down Expand Up @@ -374,6 +379,10 @@ func (conf *Config) ParseFromFlags(flags *pflag.FlagSet) error {
if err != nil {
return errors.Trace(err)
}
conf.Compress, err = flags.GetBool(flagCompress)
if err != nil {
return errors.Trace(err)
}

if conf.Threads <= 0 {
return errors.Errorf("--threads is set to %d. It should be greater than 0", conf.Threads)
Expand Down
100 changes: 68 additions & 32 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/dumpling/v4/log"

_ "github.com/go-sql-driver/mysql"
"github.com/pingcap/br/pkg/summary"
"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -43,6 +44,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return errors.Trace(err)
}
resolveAutoConsistency(conf)
log.Info("finish config adjustment", zap.String("config", conf.String()))

ctx, cancel := context.WithCancel(pCtx)
defer cancel()
Expand Down Expand Up @@ -226,6 +228,9 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
return errors.Errorf("unsupported filetype %s", conf.FileType)
}

summary.SetLogCollector(summary.NewLogCollector(log.Info))
summary.SetUnit(summary.BackupUnit)
defer summary.Summary(summary.BackupUnit)
if conf.Sql == "" {
if err = dumpDatabases(ctx, conf, connectPool, writer, func(conn *sql.Conn) (*sql.Conn, error) {
// make sure that the lock connection is still alive
Expand Down Expand Up @@ -257,13 +262,16 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
}
}

summary.SetSuccessStatus(true)
m.recordFinishTime(time.Now())
return nil
}

func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsPool, writer Writer, rebuildConnFunc func(*sql.Conn) (*sql.Conn, error)) error {
allTables := conf.Tables
g, ctx := errgroup.WithContext(pCtx)
tableDataIRTotal := make([]TableDataIR, 0, len(allTables))
splitChunkStart := time.Now()
for dbName, tables := range allTables {
createDatabaseSQL, err := ShowCreateDatabase(connectPool.extraConn(), dbName)
if err != nil {
Expand All @@ -282,39 +290,55 @@ func dumpDatabases(pCtx context.Context, conf *Config, connectPool *connectionsP
if err != nil {
return err
}
for _, tableIR := range tableDataIRArray {
tableIR := tableIR
g.Go(func() error {
conn := connectPool.getConn()
defer func() {
connectPool.releaseConn(conn)
}()
retryTime := 0
var lastErr error
return utils.WithRetry(ctx, func() (err error) {
defer func() {
lastErr = err
}()
retryTime += 1
log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()),
zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex()), zap.NamedError("lastError", lastErr))
if retryTime > 1 {
conn, err = rebuildConnFunc(conn)
if err != nil {
return
}
}
err = tableIR.Start(ctx, conn)
if err != nil {
return
}
return writer.WriteTableData(ctx, tableIR)
}, newDumpChunkBackoffer(canRebuildConn(conf.Consistency, conf.TransactionalConsistency)))
})
}
tableDataIRTotal = append(tableDataIRTotal, tableDataIRArray...)
}
}
return g.Wait()
summary.CollectDuration("split chunks", time.Since(splitChunkStart))
progressPrinter := utils.StartProgress(ctx, "dumpling", int64(len(tableDataIRTotal)), shouldRedirectLog(conf), log.Info)
defer progressPrinter.Close()
tableDataStartTime := time.Now()
for _, tableIR := range tableDataIRTotal {
tableIR := tableIR
g.Go(func() error {
conn := connectPool.getConn()
defer func() {
connectPool.releaseConn(conn)
}()
retryTime := 0
var lastErr error
return utils.WithRetry(ctx, func() (err error) {
defer func() {
lastErr = err
if err == nil {
progressPrinter.Inc()
} else {
errorCount.With(conf.Labels).Inc()
}
}()
retryTime += 1
log.Debug("trying to dump table chunk", zap.Int("retryTime", retryTime), zap.String("db", tableIR.DatabaseName()),
zap.String("table", tableIR.TableName()), zap.Int("chunkIndex", tableIR.ChunkIndex()), zap.NamedError("lastError", lastErr))
if retryTime > 1 {
conn, err = rebuildConnFunc(conn)
if err != nil {
return
}
}
err = tableIR.Start(ctx, conn)
if err != nil {
return
}
return writer.WriteTableData(ctx, tableIR)
}, newDumpChunkBackoffer(canRebuildConn(conf.Consistency, conf.TransactionalConsistency)))
})
}
if err := g.Wait(); err != nil {
summary.CollectFailureUnit("dump", err)
return err
} else {
summary.CollectSuccessUnit("dump cost", len(tableDataIRTotal), time.Since(tableDataStartTime))
}
return nil
}

func prepareTableListToDump(conf *Config, pool *sql.Conn) error {
Expand Down Expand Up @@ -346,7 +370,15 @@ func dumpSql(ctx context.Context, conf *Config, connectPool *connectionsPool, wr
return err
}

return writer.WriteTableData(ctx, tableIR)
tableDataStartTime := time.Now()
err = writer.WriteTableData(ctx, tableIR)
if err != nil {
summary.CollectFailureUnit("dump", err)
return err
} else {
summary.CollectSuccessUnit("dump cost", 1, time.Since(tableDataStartTime))
}
return nil
}

func dumpTable(ctx context.Context, conf *Config, db *sql.Conn, dbName string, table *TableInfo, writer Writer) ([]TableDataIR, error) {
Expand Down Expand Up @@ -462,3 +494,7 @@ func canRebuildConn(consistency string, trxConsistencyOnly bool) bool {
return false
}
}

func shouldRedirectLog(conf *Config) bool {
return conf.Logger != nil || conf.LogFile != ""
}
2 changes: 2 additions & 0 deletions v4/export/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pingcap/dumpling/v4/log"

"github.com/pingcap/errors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/soheilhy/cmux"
"go.uber.org/zap"
)
Expand All @@ -20,6 +21,7 @@ var (

func startHTTPServer(lis net.Listener) {
router := http.NewServeMux()
router.Handle("/metrics", promhttp.Handler())

router.HandleFunc("/debug/pprof/", pprof.Index)
router.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
Expand Down
2 changes: 1 addition & 1 deletion v4/export/ir_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func (s *testIRImplSuite) TestChunkRowIter(c *C) {
sqlRowIter := newRowIter(rows, 2)

res := newSimpleRowReceiver(2)
wp := newWriterPipe(nil, testFileSize, testStatementSize)
wp := newWriterPipe(nil, testFileSize, testStatementSize, nil)

var resSize [][]uint64
for sqlRowIter.HasNext() {
Expand Down
3 changes: 2 additions & 1 deletion v4/export/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ func recordGlobalMetaData(db *sql.Conn, buffer *bytes.Buffer, serverType ServerT
}

func (m *globalMetadata) writeGlobalMetaData(ctx context.Context) error {
fileWriter, tearDown, err := buildFileWriter(ctx, m.storage, metadataPath)
// keep consistent with mydumper. Never compress metadata
fileWriter, tearDown, err := buildFileWriter(ctx, m.storage, metadataPath, false)
if err != nil {
return err
}
Expand Down
62 changes: 62 additions & 0 deletions v4/export/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package export

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
finishedSizeCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "finished_size",
Help: "counter for dumpling finished file size",
}, []string{})
finishedRowsCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "finished_rows",
Help: "counter for dumpling finished rows",
}, []string{})
writeTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dumpling",
Subsystem: "write",
Name: "write_duration_time",
Help: "Bucketed histogram of write time (s) of files",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, []string{})
receiveWriteChunkTimeHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "dumpling",
Subsystem: "write",
Name: "receive_chunk_duration_time",
Help: "Bucketed histogram of write time (s) of files",
Buckets: prometheus.ExponentialBuckets(0.00005, 2, 20),
}, []string{})
errorCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "dumpling",
Subsystem: "dump",
Name: "error_count",
Help: "Total error count during dumping progress",
}, []string{})
)

// RegisterMetrics registers metrics.
func RegisterMetrics(registry *prometheus.Registry) {
registry.MustRegister(finishedSizeCounter)
registry.MustRegister(finishedRowsCounter)
registry.MustRegister(writeTimeHistogram)
registry.MustRegister(receiveWriteChunkTimeHistogram)
registry.MustRegister(errorCount)
}

func RemoveLabelValuesWithTaskInMetrics(labels prometheus.Labels) {
finishedSizeCounter.Delete(labels)
finishedRowsCounter.Delete(labels)
writeTimeHistogram.Delete(labels)
receiveWriteChunkTimeHistogram.Delete(labels)
errorCount.Delete(labels)
}
2 changes: 1 addition & 1 deletion v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ func pickupPossibleField(dbName, tableName string, db *sql.Conn, conf *Config) (
}

func estimateCount(dbName, tableName string, db *sql.Conn, field string, conf *Config) uint64 {
query := fmt.Sprintf("EXPLAIN SELECT `%s` FROM `%s`.`%s`", field, escapeString(dbName), escapeString(tableName))
query := fmt.Sprintf("EXPLAIN SELECT `%s` FROM `%s`.`%s`", escapeString(field), escapeString(dbName), escapeString(tableName))

if conf.Where != "" {
query += " WHERE "
Expand Down
Loading