Skip to content

Commit

Permalink
restore mainloop & entrance (pingcap#2)
Browse files Browse the repository at this point in the history
* init application mainloop

* restore mainloop

* address PR comment

* configure pprof via profile server port

* address comment
  • Loading branch information
silentsai authored Feb 8, 2018
1 parent c1b64d2 commit 8962074
Show file tree
Hide file tree
Showing 12 changed files with 1,375 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bin
.idea
70 changes: 70 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
### Makefile for tidb-lightning

GOPATH ?= $(shell go env GOPATH)

# Ensure GOPATH is set before running build process.
ifeq "$(GOPATH)" ""
$(error Please set the environment variable GOPATH before running `make`)
endif

TIDBDIR := $(GOPATH)/src/github.com/pingcap/tidb/
path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH)))
export PATH := $(path_to_add):$(PATH)

GO := go
GOBUILD := CGO_ENABLED=0 $(GO) build $(BUILD_FLAG)
GOTEST := CGO_ENABLED=1 $(GO) test -p 3
# OVERALLS := CGO_ENABLED=1 overalls
# GOVERALLS := goveralls

ARCH := "`uname -s`"
LINUX := "Linux"
MAC := "Darwin"
# PACKAGES := $$(go list ./...| grep -vE "vendor")
# FILES := $$(find . -name "*.go" | grep -vE "vendor")
# TOPDIRS := $$(ls -d */ | grep -vE "vendor")

TARGET = ""

.PHONY: all build parser clean parserlib

default: lightning buildsucc

build:
$(GOBUILD)

buildsucc:
@echo Build Lightning Successfully!

goyacc:
$(GOBUILD) -o $(TIDBDIR)/bin/goyacc $(TIDBDIR)/parser/goyacc/main.go

parser: goyacc
$(TIDBDIR)/bin/goyacc -o /dev/null $(TIDBDIR)/parser/parser.y
$(TIDBDIR)/bin/goyacc -o $(TIDBDIR)/parser/parser.go $(TIDBDIR)/parser/parser.y 2>&1 | egrep "(shift|reduce)/reduce" | awk '{print} END {if (NR > 0) {print "Find conflict in parser.y. Please check y.output for more information."; exit 1;}}'
rm -f y.output

@if [ $(ARCH) = $(LINUX) ]; \
then \
sed -i -e 's|//line.*||' -e 's/yyEofCode/yyEOFCode/' $(TIDBDIR)/parser/parser.go; \
elif [ $(ARCH) = $(MAC) ]; \
then \
/usr/bin/sed -i "" 's|//line.*||' $(TIDBDIR)/parser/parser.go; \
/usr/bin/sed -i "" 's/yyEofCode/yyEOFCode/' $(TIDBDIR)/parser/parser.go; \
fi

@awk 'BEGIN{print "// Code generated by goyacc"} {print $0}' $(TIDBDIR)/parser/parser.go > tmp_parser.go && mv tmp_parser.go $(TIDBDIR)/parser/parser.go;

parserlib: parser/parser.go

parser/parser.go: $(TIDBDIR)/parser/parser.y
make parser

RACE_FLAG =
ifeq ("$(WITH_RACE)", "1")
RACE_FLAG = -race
GOBUILD = GOPATH=$(TIDBDIR)/_vendor:$(GOPATH) CGO_ENABLED=1 $(GO) build
endif

lightning: parserlib
$(GOBUILD) $(RACE_FLAG) -o bin/tidb-lightning cmd/main.go
85 changes: 85 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package main

import (
"flag"
"net/http"
_ "net/http/pprof"
"os"
"os/signal"
"path"
"syscall"

"github.com/ngaut/log"

"github.com/pingcap/tidb-lightning/ingest"
"github.com/pingcap/tidb-lightning/ingest/common"
"github.com/pingcap/tidb-lightning/ingest/config"
)

var (
cfgFile = flag.String("c", "tidb-lighting.toml", "tidb-lighting configuration file")
)

func initEnv(cfg *config.Config) error {
common.EnsureDir(cfg.Dir)
// initLogger(cfg.Dir)

if len(cfg.ProfilePort) > 0 {
go func() { // TODO : config to enable it in debug mode
log.Info(http.ListenAndServe(":"+cfg.ProfilePort, nil))
}()
}

return nil
}

func initLogger(dir string) error {
logDir := path.Join(dir, "log")
logFile := path.Join(logDir, "ingest.log")
if err := os.MkdirAll(logDir, os.ModePerm); err != nil {
return err
}

log.SetRotateByDay()
log.SetHighlighting(false)
log.SetLevel(log.LOG_LEVEL_WARN)
if err := log.SetOutputByName(logFile); err != nil {
return err
}

return nil
}

func onExitSignal() {
sc := make(chan os.Signal, 1)
signal.Notify(sc,
os.Kill,
os.Interrupt,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)

sig := <-sc
log.Infof("Got signal %d to exit.", sig)
}

func main() {
flag.Parse()

cfg, err := config.LoadConfig(*cfgFile)
if err != nil {
log.Errorf("load config failed (%s) : %s", *cfgFile, err.Error())
return
}

initEnv(cfg)

mainloop := ingest.Mainloop(cfg)
mainloop.Run()

// TODO : onExitSignal() --> mainloop.Stop()

log.Info("tidb ingest exit.")
return
}
63 changes: 63 additions & 0 deletions ingest/common/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package common

import (
"fmt"
"sort"
"strings"
"sync"
"time"
)

type Metrics struct {
lock sync.Mutex
Timing map[string]*TimeCost
}

func NewMetrics() *Metrics {
return &Metrics{
Timing: make(map[string]*TimeCost),
}
}

func (m *Metrics) MarkTiming(name string, since time.Time) {
m.costTimeNS(name, time.Since(since).Nanoseconds())
}

func (m *Metrics) costTimeNS(name string, ns int64) {
m.lock.Lock()
defer m.lock.Unlock()

t, ok := m.Timing[name]
if !ok {
t = &TimeCost{total: 0, times: 0}
m.Timing[name] = t
}
t.total += ns
t.times++
}

func (m *Metrics) DumpTiming() string {
marks := make([]string, 0, len(m.Timing))
for mark, _ := range m.Timing {
marks = append(marks, mark)
}
sort.Strings(marks)

lines := make([]string, 0, len(marks))
for _, mark := range marks {
t := m.Timing[mark]
l := fmt.Sprintf("%-40s : total = %.3f s / times = %d / avg = %.3f s", mark, t.Total(), t.Times(), t.Avg())
lines = append(lines, l)
}

return strings.Join(lines, "\n")
}

type TimeCost struct {
total int64
times int
}

func (t *TimeCost) Total() float64 { return float64(t.total) / 1000000000 }
func (t *TimeCost) Times() int { return t.times }
func (t *TimeCost) Avg() float64 { return t.Total() / float64(t.times) }
92 changes: 92 additions & 0 deletions ingest/common/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package common

import (
"fmt"
"os"
"strings"

"database/sql"
"path/filepath"

_ "github.com/go-sql-driver/mysql"
"github.com/ngaut/log"
)

func Percent(a int, b int) string {
return fmt.Sprintf("%.2f %%", float64(a)/float64(b)*100)
}

func ConnectDB(host string, port int, user string, psw string) *sql.DB {
dbDSN := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8", user, psw, host, port)
db, err := sql.Open("mysql", dbDSN)
if err != nil {
log.Errorf("can not open db file [%s]", err)
return nil
}

return db
}

func GetFileSize(file string) (int64, error) {
fd, err := os.Open(file)
if err != nil {
return -1, err
}
defer fd.Close()

fstat, err := fd.Stat()
if err != nil {
return -1, err
}

return fstat.Size(), nil
}

func FileExists(file string) bool {
_, err := os.Stat(file)
return err == nil
}

// IsDirExists checks if dir exists.
func IsDirExists(name string) bool {
f, err := os.Stat(name)
if err != nil {
return false
}
return f != nil && f.IsDir()
}

func EnsureDir(dir string) error {
if !FileExists(dir) {
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return err
}
}
return nil
}

func ListFiles(dir string) map[string]string {
files := make(map[string]string)
filepath.Walk(dir, func(path string, f os.FileInfo, err error) error {
if err != nil {
log.Errorf("list file failed : %s", err.Error())
return nil
}

if f == nil {
return nil
}

if f.IsDir() {
return nil
}

// relPath, _ := filepath.Rel(dir, path)
fname := strings.TrimSpace(f.Name())
files[path] = fname

return nil
})

return files
}
63 changes: 63 additions & 0 deletions ingest/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package config

import (
"io/ioutil"

"github.com/BurntSushi/toml"
)

type DataSource struct {
Type string `toml:"type"`
URL string `toml:"url"`
}

type DBStore struct {
Host string `toml:"host"`
Port int `toml:"port"`
User string `toml:"user"`
Pwd string `toml:"password"`
Database string `toml:"database"`
}

type Config struct {
Dir string `toml:"dir"`
SourceDir string `toml:"data_source_dir"`

PdAddr string `toml:"pd_backend"`
KvDeliverAddr string `toml:"kv_import_backend"`
TiDB DBStore `toml:"tidb"`

ProfilePort string `toml:"pprof_port"`
ProgressStore DBStore `toml:"progress_store"`

Mydump MydumperRuntime `toml:"mydumper"`
KvDev KVDeliverRuntime `toml:"kv-ingest"`
}

type MydumperRuntime struct {
ReadBlockSize int64 `toml:"read-block-size"`
MinRegionSize int64 `toml:"region-min-size"`
}

type KVDeliverRuntime struct {
MaxFlushSize int64 `toml:"max-flush-size"`
}

func LoadConfig(file string) (*Config, error) {
data, err := ioutil.ReadFile(file)
if err != nil {
return nil, err
}

cfg := new(Config)
if err = toml.Unmarshal(data, cfg); err != nil {
return nil, err
}

// TODO ... adjust
// cfg.Mydump.MinRegionSize = MinRegionSize
// cfg.Mydump.ReadBlockSize = ReadBlockSize
cfg.KvDev.MaxFlushSize = MaxFlushSize

return cfg, nil
}
14 changes: 14 additions & 0 deletions ingest/config/const.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package config

const (
_K = int64(1 << 10)
_M = _K << 10
_G = _M << 10

// mydumper
// ReadBlockSize int64 = 32 * _K
// MinRegionSize int64 = 256 * _M

// // kv-deliver
MaxFlushSize int64 = 200 * _G
)
Loading

0 comments on commit 8962074

Please sign in to comment.