Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

mydumper: pipe mydumper log into dm-worker.log #93

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 61 additions & 5 deletions mydumper/mydumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
package mydumper

import (
"bufio"
"bytes"
"context"
"fmt"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -26,6 +29,7 @@ import (
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/errors"
"github.com/siddontang/go/sync2"
)

Expand Down Expand Up @@ -67,8 +71,7 @@ func (m *Mydumper) Process(ctx context.Context, pr chan pb.ProcessResult) {
}

// Cmd cannot be reused, so we create a new cmd when begin processing
cmd := exec.CommandContext(ctx, m.cfg.MydumperPath, m.args...)
output, err := cmd.CombinedOutput()
output, err := m.spawn(ctx)

if err != nil {
mydumperExitWithErrorCounter.WithLabelValues(m.cfg.Name).Inc()
Expand All @@ -89,6 +92,60 @@ func (m *Mydumper) Process(ctx context.Context, pr chan pb.ProcessResult) {
}
}

var mydumperLogRegexp = regexp.MustCompile(
`^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} \[(DEBUG|INFO|WARNING|ERROR)\] - `,
)

func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
var stdout bytes.Buffer
cmd := exec.CommandContext(ctx, m.cfg.MydumperPath, m.args...)
cmd.Stdout = &stdout
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, errors.Trace(err)
}
if err := cmd.Start(); err != nil {
return nil, errors.Trace(err)
}

// Read the stderr from mydumper, which contained the logs.
// mydumper's logs are all in the form
//
// 2016-01-02 15:04:05 [DEBUG] - actual message
//
// so we parse all these lines and translate into our own logs.
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
line := scanner.Bytes()
if loc := mydumperLogRegexp.FindSubmatchIndex(line); len(loc) == 4 {
level := string(line[loc[2]:loc[3]])
msg := line[loc[1]:]
switch level {
case "DEBUG":
log.Debugf("[mydumper] %s", msg)
continue
case "INFO":
log.Infof("[mydumper] %s", msg)
continue
case "WARNING":
log.Warnf("[mydumper] %s", msg)
continue
case "ERROR":
log.Errorf("[mydumper] %s", msg)
continue
}
}
stdout.Write(line)
stdout.WriteByte('\n')
}
if err := scanner.Err(); err != nil {
return stdout.Bytes(), errors.Trace(err)
}

err = cmd.Wait()
return stdout.Bytes(), errors.Trace(err)
}

// Close implements Unit.Close
func (m *Mydumper) Close() {
if m.closed.Get() {
Expand Down Expand Up @@ -186,9 +243,8 @@ func (m *Mydumper) constructArgs() []string {
func (m *Mydumper) logArgs(cfg *config.SubTaskConfig) []string {
args := make([]string, 0, 4)
if len(cfg.LogFile) > 0 {
// mydumper overwrite log file, ref: https://github.com/maxbube/mydumper/blob/a1ddcba64b6af807cf9de468b8ca59b54ca6a2a9/mydumper.c#L232
// so we need to use a different log file (mydumper-taskname.log) until we update mydumper to append log
args = append(args, "--logfile", fmt.Sprintf("mydumper-%s.log", cfg.Name))
// for writing mydumper output into stderr (fixme: won't work on Windows, if anyone cares)
args = append(args, "--logfile", "/dev/stderr")
}
switch strings.ToLower(cfg.LogLevel) {
case "fatal", "error":
Expand Down