Skip to content

Commit

Permalink
mydumper: pipe mydumper log into dm-worker.log (pingcap#93)
Browse files Browse the repository at this point in the history
  • Loading branch information
kennytm authored and csuzhangxc committed Mar 27, 2019
1 parent 0d71389 commit c235ca2
Showing 1 changed file with 61 additions and 5 deletions.
66 changes: 61 additions & 5 deletions mydumper/mydumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,13 @@
package mydumper

import (
"bufio"
"bytes"
"context"
"fmt"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"time"
Expand All @@ -26,6 +29,7 @@ import (
"github.com/pingcap/dm/dm/pb"
"github.com/pingcap/dm/dm/unit"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/errors"
"github.com/siddontang/go/sync2"
)

Expand Down Expand Up @@ -67,8 +71,7 @@ func (m *Mydumper) Process(ctx context.Context, pr chan pb.ProcessResult) {
}

// Cmd cannot be reused, so we create a new cmd when begin processing
cmd := exec.CommandContext(ctx, m.cfg.MydumperPath, m.args...)
output, err := cmd.CombinedOutput()
output, err := m.spawn(ctx)

if err != nil {
mydumperExitWithErrorCounter.WithLabelValues(m.cfg.Name).Inc()
Expand All @@ -89,6 +92,60 @@ func (m *Mydumper) Process(ctx context.Context, pr chan pb.ProcessResult) {
}
}

var mydumperLogRegexp = regexp.MustCompile(
`^[0-9]{4}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2} \[(DEBUG|INFO|WARNING|ERROR)\] - `,
)

func (m *Mydumper) spawn(ctx context.Context) ([]byte, error) {
var stdout bytes.Buffer
cmd := exec.CommandContext(ctx, m.cfg.MydumperPath, m.args...)
cmd.Stdout = &stdout
stderr, err := cmd.StderrPipe()
if err != nil {
return nil, errors.Trace(err)
}
if err := cmd.Start(); err != nil {
return nil, errors.Trace(err)
}

// Read the stderr from mydumper, which contained the logs.
// mydumper's logs are all in the form
//
// 2016-01-02 15:04:05 [DEBUG] - actual message
//
// so we parse all these lines and translate into our own logs.
scanner := bufio.NewScanner(stderr)
for scanner.Scan() {
line := scanner.Bytes()
if loc := mydumperLogRegexp.FindSubmatchIndex(line); len(loc) == 4 {
level := string(line[loc[2]:loc[3]])
msg := line[loc[1]:]
switch level {
case "DEBUG":
log.Debugf("[mydumper] %s", msg)
continue
case "INFO":
log.Infof("[mydumper] %s", msg)
continue
case "WARNING":
log.Warnf("[mydumper] %s", msg)
continue
case "ERROR":
log.Errorf("[mydumper] %s", msg)
continue
}
}
stdout.Write(line)
stdout.WriteByte('\n')
}
if err := scanner.Err(); err != nil {
return stdout.Bytes(), errors.Trace(err)
}

err = cmd.Wait()
return stdout.Bytes(), errors.Trace(err)
}

// Close implements Unit.Close
func (m *Mydumper) Close() {
if m.closed.Get() {
Expand Down Expand Up @@ -186,9 +243,8 @@ func (m *Mydumper) constructArgs() []string {
func (m *Mydumper) logArgs(cfg *config.SubTaskConfig) []string {
args := make([]string, 0, 4)
if len(cfg.LogFile) > 0 {
// mydumper overwrite log file, ref: https://github.com/maxbube/mydumper/blob/a1ddcba64b6af807cf9de468b8ca59b54ca6a2a9/mydumper.c#L232
// so we need to use a different log file (mydumper-taskname.log) until we update mydumper to append log
args = append(args, "--logfile", fmt.Sprintf("mydumper-%s.log", cfg.Name))
// for writing mydumper output into stderr (fixme: won't work on Windows, if anyone cares)
args = append(args, "--logfile", "/dev/stderr")
}
switch strings.ToLower(cfg.LogLevel) {
case "fatal", "error":
Expand Down

0 comments on commit c235ca2

Please sign in to comment.