diff --git a/README.md b/README.md index 01342c6..97de94b 100644 --- a/README.md +++ b/README.md @@ -34,9 +34,9 @@ go-etl将提供的etl能力如下: | 无结构流 | CSV | √ | √ | [读](datax/plugin/reader/csv/README.md)、[写](datax/plugin/writer/csv/README.md) | | | XLSX(excel) | √ | √ | [读](datax/plugin/reader/xlsx/README.md)、[写](datax/plugin/writer/xlsx/README.md) | -### 用户手册 +### 数据同步用户手册 -使用[go-etl用户手册](README_USER.md)开始数据同步 +使用[go-etl数据同步用户手册](README_USER.md)开始数据同步 ### 数据同步开发宝典 @@ -46,14 +46,6 @@ go-etl将提供的etl能力如下: ### datax 本包将提供类似于阿里巴巴[DataX](https://github.com/alibaba/DataX)的接口去实现go的etl框架,目前主要实现了job框架内的数据同步能力. - -#### plan - -- [x] 实现关系型数据库的任务切分 -- [x] 实现监控模块 -- [x] 实现流控模块 -- [ ] 实现关系型数据库入库断点续传 - ### storage #### database @@ -65,28 +57,11 @@ go-etl将提供的etl能力如下: 主要用于字节流的解析,如文件,消息队列,elasticsearch等,字节流格式可以是cvs,json, xml等 #### file - -#### mq - -##### plan - -暂无时间安排计划,欢迎来实现 - -#### elasticsearch - -##### plan - -暂无时间安排计划,欢迎来实现 +主要用于文件的解析,如cvs,excel等 ### transform -主要用于类sql数据转化 - -#### plan - -- [ ] 引入tidb数据库的mysql解析能力 -- [ ] 引入tidb数据库的mysql函数计算能力 -- [ ] 运用mysql解析能力和mysql函数计算能力实现数据转化能力 +主要用于类sql数据转化,类似百度引擎 ### tools @@ -107,6 +82,10 @@ go generate ./... 数据源插件模板新增工具,用于新增一个reader或writer模板,配合发布命令使用,减少开发者负担 +##### plugin + +数据源插件打包工具 + #### license 用于自动新增go代码文件中许可证 diff --git a/README_USER.md b/README_USER.md index a005084..c057fa5 100644 --- a/README_USER.md +++ b/README_USER.md @@ -1,4 +1,4 @@ -# go-etl用户手册 +# go-etl数据同步用户手册 go-etl的datax是一个数据同步工具,目前支持MySQL,postgres,oracle,SQL SERVER,DB2等主流关系型数据库以及csv,xlsx文件之间的数据同步。 @@ -104,23 +104,24 @@ data -c config.json } } ``` -#### 流控配置 -之前speed的byte和record配置并不会生效,现在加入流控特性后,byte和record将会生效,byte会限制缓存消息字节数,而record会限制缓存消息条数,如果byte设置过小会导致缓存过小而导致同步数据失败。当byte为0或负数时,限制器将不会工作,例如byte为10485760,现在为10Mb(10*1024*1024)。 -```json -{ - "job":{ - "setting":{ - "speed":{ - "byte":, - "record":10485760, - "channel":4 - } - } - } -} +`reader`和`writer`的配置如下: -``` +| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 | +| ------------ | ------------------ | ------------ | ---------- | ------------------------------------------------------------ | +| 关系型数据库 | MySQL/Mariadb/Tidb | √ | √ | [读](datax/plugin/reader/mysql/README.md)、[写](datax/plugin/writer/mysql/README.md) | +| | Postgres/Greenplum | √ | √ | [读](datax/plugin/reader/postgres/README.md)、[写](datax/plugin/writer/postgres/README.md) | +| | DB2 LUW | √ | √ | [读](datax/plugin/reader/db2/README.md)、[写](datax/plugin/writer/db2/README.md) | +| | SQL Server | √ | √ | [读](datax/plugin/reader/sqlserver/README.md)、[写](datax/plugin/writer/sqlserver/README.md) | +| | Oracle | √ | √ | [读](datax/plugin/reader/oracle/README.md)、[写](datax/plugin/writer/oracle/README.md) | +| 无结构流 | CSV | √ | √ | [读](datax/plugin/reader/csv/README.md)、[写](datax/plugin/writer/csv/README.md) | +| | XLSX(excel) | √ | √ | [读](datax/plugin/reader/xlsx/README.md)、[写](datax/plugin/writer/xlsx/README.md) | + +#### 2.1.2 使用示例 + +注意在linux下如Makefile所示export LD_LIBRARY_PATH=${DB2HOME}/lib + +##### 2.1.2.1 使用mysql同步 `reader`和`writer`的配置如下: @@ -446,13 +447,17 @@ datax -h 帮助显示 ```bash -Usage of datax: - -c string #数据源配置文件 +Usage of datax: + -c string config (default "config.json") - -w string #源目的配置向导文件 + -http string + http + -w string wizard ``` +-http 新增监听端口,如:8080, 开启后访问127.0.0.1:8080/metrics获取实时的吞吐量 + #### 2.3.2 查看版本 ``` diff --git a/cmd/datax/enveronment.go b/cmd/datax/enveronment.go index 271c5b7..dd44bea 100644 --- a/cmd/datax/enveronment.go +++ b/cmd/datax/enveronment.go @@ -16,10 +16,15 @@ package main import ( "context" + "fmt" "io/ioutil" + "net/http" + _ "net/http/pprof" + "time" "github.com/Breeze0806/go-etl/config" "github.com/Breeze0806/go-etl/datax" + "github.com/gorilla/handlers" ) type enveronment struct { @@ -28,9 +33,11 @@ type enveronment struct { err error ctx context.Context cancel context.CancelFunc + server *http.Server + addr string } -func newEnveronment(filename string) (e *enveronment) { +func newEnveronment(filename string, addr string) (e *enveronment) { e = &enveronment{} var buf []byte buf, e.err = ioutil.ReadFile(filename) @@ -42,20 +49,82 @@ func newEnveronment(filename string) (e *enveronment) { return e } e.ctx, e.cancel = context.WithCancel(context.Background()) + e.addr = addr return e } func (e *enveronment) build() error { + return e.initEngine().initServer().startEngine().err +} + +func (e *enveronment) initEngine() *enveronment { if e.err != nil { - return e.err + return e } e.engine = datax.NewEngine(e.ctx, e.config) + return e +} + +func (e *enveronment) initServer() *enveronment { + if e.err != nil { + return e + } + if e.addr != "" { + r := http.NewServeMux() + recoverHandler := handlers.RecoveryHandler(handlers.PrintRecoveryStack(true)) + r.Handle("/metrics", recoverHandler(newHandler(e.engine))) + e.server = &http.Server{ + Addr: e.addr, + Handler: handlers.CompressHandler(r), + } + go func() { + log.Debugf("listen begin: %v", e.addr) + defer log.Debugf("listen end: %v", e.addr) + if err := e.server.ListenAndServe(); err != nil { + log.Errorf("ListenAndServe fail. addr: %v err: %v", e.addr, err) + } + log.Infof("ListenAndServe success. addr: %v", e.addr) + }() + } + + return e +} + +func (e *enveronment) startEngine() *enveronment { + if e.err != nil { + return e + } + go func() { + statsTimer := time.NewTicker(5 * time.Second) + defer statsTimer.Stop() + exit := false + for { + select { + case <-statsTimer.C: + case <-e.ctx.Done(): + exit = true + default: + } + if e.engine.Container != nil { + fmt.Printf("%v\r", e.engine.Metrics().JSON()) + } + + if exit { + return + } + } + }() e.err = e.engine.Start() - return e.err + + return e } func (e *enveronment) close() { + if e.server != nil { + e.server.Shutdown(e.ctx) + } + if e.cancel != nil { e.cancel() } diff --git a/cmd/datax/examples/limit/config.json b/cmd/datax/examples/limit/config.json index 3e03009..0d5ba99 100644 --- a/cmd/datax/examples/limit/config.json +++ b/cmd/datax/examples/limit/config.json @@ -3,7 +3,7 @@ "container": { "job":{ "id": 1, - "sleepInterval":100 + "sleepInterval":1000 } } }, @@ -31,7 +31,7 @@ ], "setting":{ "speed":{ - "byte":10485760, + "byte":204800, "record":1024, "channel":4 } diff --git a/cmd/datax/examples/limit/csv.json b/cmd/datax/examples/limit/csv.json deleted file mode 100644 index 0c58e1f..0000000 --- a/cmd/datax/examples/limit/csv.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - "core" : { - "container": { - "job":{ - "id": 1, - "sleepInterval":100 - } - } - }, - "job":{ - "content":[ - { - "reader":{ - "name": "csvreader", - "parameter": { - "path":["examples/limit/src.csv"], - "encoding":"utf-8", - "delimiter":"," - } - }, - "writer":{ - "name": "csvwriter", - "parameter": { - "path":["examples/limit/dest.csv"], - "encoding":"utf-8", - "delimiter":"," - } - }, - "transformer":[] - } - ], - "setting":{ - "speed":{ - "byte":1024, - "record":100, - "channel":4 - } - } - } -} \ No newline at end of file diff --git a/cmd/datax/examples/split/mysql.json b/cmd/datax/examples/split/mysql.json deleted file mode 100644 index 3e858af..0000000 --- a/cmd/datax/examples/split/mysql.json +++ /dev/null @@ -1,63 +0,0 @@ -{ - "core" : { - "container": { - "job":{ - "id": 1, - "sleepInterval":100 - } - } - }, - "job":{ - "content":[ - { - "reader":{ - "name": "mysqlreader", - "parameter": { - "username": "root", - "password": "123456", - "split" : { - "key":"str" - }, - "column": ["*"], - "connection": { - "url": "tcp(192.168.15.130:3306)/source?parseTime=false", - "table": { - "db":"source", - "name":"split" - } - }, - "where": "" - } - }, - "writer":{ - "name": "mysqlwriter", - "parameter": { - "username": "root", - "password": "123456", - "writeMode": "insert", - "column": ["*"], - "session": [], - "preSql": [], - "connection": { - "url": "tcp(192.168.15.130:3306)/mysql?parseTime=false", - "table": { - "db":"destination", - "name":"split" - } - }, - "batchTimeout": "1s", - "batchSize":1000 - } - }, - "transformer":[] - } - ], - "setting":{ - "speed":{ - "byte":0, - "record":1024, - "channel":4 - } - } - } -} \ No newline at end of file diff --git a/cmd/datax/handler.go b/cmd/datax/handler.go new file mode 100644 index 0000000..2608afc --- /dev/null +++ b/cmd/datax/handler.go @@ -0,0 +1,39 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "net/http" + + "github.com/Breeze0806/go-etl/datax" +) + +type handler struct { + engine *datax.Engine +} + +func newHandler(engine *datax.Engine) *handler { + return &handler{ + engine: engine, + } +} + +func (h *handler) ServeHTTP(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusOK) + if h.engine.Metrics().JSON() == nil { + return + } + w.Write([]byte(h.engine.Metrics().JSON().String())) +} diff --git a/cmd/datax/main.go b/cmd/datax/main.go index 1623e8c..98d4486 100644 --- a/cmd/datax/main.go +++ b/cmd/datax/main.go @@ -26,6 +26,7 @@ func main() { initLog() var configFile = flag.String("c", "config.json", "config") var wizardFile = flag.String("w", "", "wizard") + var httpAddr = flag.String("http", "", "http") flag.Parse() if *wizardFile != "" { if err := tools.NewWizard(*configFile, *wizardFile).GenerateConfigsAndScripts(); err != nil { @@ -38,7 +39,7 @@ func main() { log.Infof("config: %v\n", *configFile) - e := newEnveronment(*configFile) + e := newEnveronment(*configFile, *httpAddr) defer e.close() if err := e.build(); err != nil { fmt.Printf("run fail. err: %v\n", err) diff --git a/datax/common/plugin/job_collector.go b/datax/common/plugin/job_collector.go index 013931a..09b13f1 100644 --- a/datax/common/plugin/job_collector.go +++ b/datax/common/plugin/job_collector.go @@ -19,6 +19,6 @@ import "github.com/Breeze0806/go/encoding" //JobCollector 工作信息采集器,用于统计整个工作的进度,错误信息等 //toto 当前未实现监控模块,为此需要在后面来实现这个接口的结构体 type JobCollector interface { - MessageMap() *encoding.JSON - MessageByKey(key string) *encoding.JSON + JSON() *encoding.JSON + JSONByKey(key string) *encoding.JSON } diff --git a/datax/common/plugin/job_test.go b/datax/common/plugin/job_test.go index d2dec75..66ca337 100644 --- a/datax/common/plugin/job_test.go +++ b/datax/common/plugin/job_test.go @@ -24,11 +24,11 @@ import ( type mockJobCollector struct { } -func (m *mockJobCollector) MessageMap() *encoding.JSON { +func (m *mockJobCollector) JSON() *encoding.JSON { return nil } -func (m *mockJobCollector) MessageByKey(key string) *encoding.JSON { +func (m *mockJobCollector) JSONByKey(key string) *encoding.JSON { return nil } diff --git a/datax/core/container.go b/datax/core/container.go index 133c1a0..7103ae9 100644 --- a/datax/core/container.go +++ b/datax/core/container.go @@ -16,16 +16,19 @@ package core import ( "github.com/Breeze0806/go-etl/config" + "github.com/Breeze0806/go-etl/datax/core/statistics/container" ) //Container 容器 type Container interface { Start() error + Metrics() *container.Metrics } //BaseCotainer 基础容器 type BaseCotainer struct { - conf *config.JSON + conf *config.JSON + metrics *container.Metrics } //NewBaseCotainer 创建基础容器 @@ -33,6 +36,16 @@ func NewBaseCotainer() *BaseCotainer { return &BaseCotainer{} } +//SetMetrics 设置指标 +func (b *BaseCotainer) SetMetrics(metrics *container.Metrics) { + b.metrics = metrics +} + +//Metrics 指标 +func (b *BaseCotainer) Metrics() *container.Metrics { + return b.metrics +} + //SetConfig 设置JSON配置 func (b *BaseCotainer) SetConfig(conf *config.JSON) { b.conf = conf diff --git a/datax/core/container_test.go b/datax/core/container_test.go index 2d0218a..24eb1da 100644 --- a/datax/core/container_test.go +++ b/datax/core/container_test.go @@ -19,6 +19,7 @@ import ( "testing" "github.com/Breeze0806/go-etl/config" + "github.com/Breeze0806/go-etl/datax/core/statistics/container" ) func testJSONFromString(s string) *config.JSON { @@ -57,3 +58,38 @@ func TestBaseCotainer_SetConfig(t *testing.T) { }) } } + +func TestBaseCotainer_SetGetMetrics(t *testing.T) { + type testStruct struct { + Path string `json:"path"` + } + + type args struct { + metrics *container.Metrics + } + m := container.NewMetrics() + m.Set("test", testStruct{Path: "value"}) + tests := []struct { + name string + b *BaseCotainer + args args + want string + }{ + { + name: "1", + b: NewBaseCotainer(), + args: args{ + metrics: m, + }, + want: `{"test":{"path":"value"}}`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.b.SetMetrics(tt.args.metrics) + if got := tt.b.Metrics().JSON().String(); got != tt.want { + t.Errorf("Metrics() = %v, want: %v", got, tt.want) + } + }) + } +} diff --git a/datax/core/job/container.go b/datax/core/job/container.go index 716634e..6c59050 100644 --- a/datax/core/job/container.go +++ b/datax/core/job/container.go @@ -16,7 +16,6 @@ package job import ( "context" - "fmt" "math" "sort" "strconv" @@ -32,6 +31,7 @@ import ( "github.com/Breeze0806/go-etl/datax/common/spi/writer" "github.com/Breeze0806/go-etl/datax/common/util" "github.com/Breeze0806/go-etl/datax/core" + "github.com/Breeze0806/go-etl/datax/core/statistics/container" statplugin "github.com/Breeze0806/go-etl/datax/core/statistics/container/plugin" "github.com/Breeze0806/go-etl/datax/core/taskgroup" "github.com/Breeze0806/go-etl/schedule" @@ -54,6 +54,7 @@ type Container struct { endTransferTimeStamp int64 needChannelNumber int64 totalStage int + reportInterval time.Duration //todo ErrorRecordChecker未使用 errorLimit util.ErrorRecordChecker taskSchduler *schedule.TaskSchduler @@ -68,10 +69,13 @@ func NewContainer(ctx context.Context, conf *config.JSON) (c *Container, err err ctx: ctx, } c.SetConfig(conf) + c.SetMetrics(container.NewMetrics()) c.jobID = c.Config().GetInt64OrDefaullt(coreconst.DataxCoreContainerJobID, -1) if c.jobID < 0 { return nil, errors.New("container job id is invalid") } + c.reportInterval = time.Duration(c.Config().GetFloat64OrDefaullt(coreconst.DataxCoreContainerJobReportinterval, 1)) * time.Second + c.Metrics().Set("jobID", c.jobID) return } @@ -83,38 +87,38 @@ func (c *Container) Start() (err error) { log.Debugf("DataX jobContainer %v starts to preHandle.", c.jobID) if err = c.preHandle(); err != nil { - log.Errorf("DataX jobContainer %v preHandle failed.", c.jobID, err) + log.Errorf("DataX jobContainer %v preHandle failed. err: %v", c.jobID, err) return } log.Infof("DataX jobContainer %v starts to init.", c.jobID) if err = c.init(); err != nil { - log.Errorf("DataX jobContainer %v init failed.", c.jobID, err) + log.Errorf("DataX jobContainer %v init failed. err: %v", c.jobID, err) return } log.Infof("DataX jobContainer %v starts to prepare.", c.jobID) if err = c.prepare(); err != nil { - log.Errorf("DataX jobContainer %v prepare failed.", c.jobID, err) + log.Errorf("DataX jobContainer %v prepare failed. err: %v", c.jobID, err) return } - log.Infof("DataX jobContainer %v starts to split.", c.jobID) + log.Infof("DataX jobContainer %v starts to split. err: %v", c.jobID) if err = c.split(); err != nil { - log.Errorf("DataX jobContainer %v split failed.", c.jobID, err) + log.Errorf("DataX jobContainer %v split failed. err: %v", c.jobID, err) return } - log.Infof("DataX jobContainer %v starts to schedule.", c.jobID) + log.Infof("DataX jobContainer %v starts to schedule. err: %v", c.jobID) if err = c.schedule(); err != nil { - log.Errorf("DataX jobContainer %v schedule failed.", c.jobID, err) + log.Errorf("DataX jobContainer %v schedule failed. err: %v", c.jobID, err) return } log.Infof("DataX jobContainer %v starts to post.", c.jobID) if err = c.post(); err != nil { - log.Errorf("DataX jobContainer %v post failed.", c.jobID, err) + log.Errorf("DataX jobContainer %v post failed. err: %v", c.jobID, err) return } log.Debugf("DataX jobContainer %v starts to postHandle.", c.jobID) if err = c.postHandle(); err != nil { - log.Errorf("DataX jobContainer %v postHandle failed.", c.jobID, err) + log.Errorf("DataX jobContainer %v postHandle failed. err: %v", c.jobID, err) return } @@ -178,7 +182,7 @@ func (c *Container) init() (err error) { writerConfig.Set(coreconst.DataxJobSetting, jobSettingConf) - collector := statplugin.NewDefaultJobCollector() + collector := statplugin.NewDefaultJobCollector(c.Metrics()) c.jobReader, err = c.initReaderJob(collector, readerConfig, writerConfig) if err != nil { return @@ -290,28 +294,26 @@ func (c *Container) schedule() (err error) { goto End } taskGroups = append(taskGroups, taskGroup) - go func(taskGroup *taskgroup.Container) { + go func(taskGroup *taskgroup.Container, i int) { defer func() { - fmt.Printf("\n") c.wg.Done() }() - // timer := time.NewTimer(taskGroup.SleepInterval) - // defer timer.Stop() + statsTimer := time.NewTicker(c.reportInterval) + defer statsTimer.Stop() for { select { case taskGroup.Err = <-errChan: + c.setStats(taskGroup, i) return case <-c.ctx.Done(): + c.setStats(taskGroup, i) return - case <-time.After(taskGroup.SleepInterval): + case <-statsTimer.C: + c.setStats(taskGroup, i) } - stats := taskGroup.Stats() - for _, v := range stats { - fmt.Printf("%s\r", v.String()) - } - } - }(taskGroup) + } + }(taskGroup, i) } End: c.wg.Wait() @@ -329,6 +331,11 @@ End: return } +func (c *Container) setStats(taskGroup *taskgroup.Container, i int) { + stats := taskGroup.Metrics().JSON().Clone() + c.Metrics().Set("metrics."+strconv.Itoa(i), stats) +} + //post 后置通知 func (c *Container) post() (err error) { if err = c.jobReader.Post(c.ctx); err != nil { diff --git a/datax/core/job/container_test.go b/datax/core/job/container_test.go index 1e49997..392aa30 100644 --- a/datax/core/job/container_test.go +++ b/datax/core/job/container_test.go @@ -44,7 +44,7 @@ func TestNewContainer(t *testing.T) { "container": { "job":{ "id": -3, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -67,7 +67,7 @@ func TestNewContainer(t *testing.T) { "container": { "job":{ "id": "1000", - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -110,7 +110,7 @@ func TestContainer_preHandle(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -135,7 +135,7 @@ func TestContainer_preHandle(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -157,7 +157,7 @@ func TestContainer_preHandle(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -183,7 +183,7 @@ func TestContainer_preHandle(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -209,7 +209,7 @@ func TestContainer_preHandle(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -251,7 +251,7 @@ func TestContainer_postHandle(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -276,7 +276,7 @@ func TestContainer_postHandle(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -298,7 +298,7 @@ func TestContainer_postHandle(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -324,7 +324,7 @@ func TestContainer_postHandle(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -350,7 +350,7 @@ func TestContainer_postHandle(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -406,7 +406,7 @@ func TestContainer_init(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -443,7 +443,7 @@ func TestContainer_init(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -471,7 +471,7 @@ func TestContainer_init(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -503,7 +503,7 @@ func TestContainer_init(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -537,7 +537,7 @@ func TestContainer_init(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -572,7 +572,7 @@ func TestContainer_init(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -610,7 +610,7 @@ func TestContainer_init(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -648,7 +648,7 @@ func TestContainer_init(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -686,7 +686,7 @@ func TestContainer_init(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -755,7 +755,7 @@ func TestContainer_prepare(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -792,7 +792,7 @@ func TestContainer_prepare(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -830,7 +830,7 @@ func TestContainer_prepare(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -899,7 +899,7 @@ func TestContainer_post(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -936,7 +936,7 @@ func TestContainer_post(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -974,7 +974,7 @@ func TestContainer_post(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -1042,7 +1042,7 @@ func TestContainer_destroy(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -1079,7 +1079,7 @@ func TestContainer_destroy(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, @@ -1117,7 +1117,7 @@ func TestContainer_destroy(t *testing.T) { "container": { "job":{ "id": 1, - "sleepInterval":100 + "reportInterval":100 }, "taskGroup":{ "id": 30000001, diff --git a/datax/core/statistics/container/metrics.go b/datax/core/statistics/container/metrics.go new file mode 100644 index 0000000..6ea777c --- /dev/null +++ b/datax/core/statistics/container/metrics.go @@ -0,0 +1,61 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package container + +import ( + "sync" + + "github.com/Breeze0806/go/encoding" +) + +//Metrics json格式指标 +type Metrics struct { + sync.RWMutex + + metricJSON *encoding.JSON +} + +//NewMetrics json格式指标 +func NewMetrics() *Metrics { + j, _ := encoding.NewJSONFromString("{}") + return &Metrics{ + metricJSON: j, + } +} + +//JSON json格式指标 +func (m *Metrics) JSON() *encoding.JSON { + m.RLock() + defer m.RUnlock() + return m.metricJSON +} + +//Set 设置path的value +func (m *Metrics) Set(path string, value interface{}) error { + m.Lock() + defer m.Unlock() + return m.metricJSON.Set(path, value) +} + +//Get 获得path的value +func (m *Metrics) Get(key string) *encoding.JSON { + m.RLock() + defer m.RUnlock() + j, err := m.metricJSON.GetJSON(key) + if err != nil { + return nil + } + return j +} diff --git a/datax/core/statistics/container/metrics_test.go b/datax/core/statistics/container/metrics_test.go new file mode 100644 index 0000000..0a7cc9d --- /dev/null +++ b/datax/core/statistics/container/metrics_test.go @@ -0,0 +1,151 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package container + +import ( + "reflect" + "testing" + + "github.com/Breeze0806/go/encoding" +) + +func testJSON(json string) *encoding.JSON { + j, _ := encoding.NewJSONFromString(json) + return j +} + +func TestMetrics_JSON(t *testing.T) { + tests := []struct { + name string + m *Metrics + want *encoding.JSON + }{ + { + name: "1", + m: &Metrics{ + metricJSON: testJSON(`{"test":"metrics"}`), + }, + want: testJSON(`{"test":"metrics"}`), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.m.JSON(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("Metrics.JSON() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestNewMetrics(t *testing.T) { + tests := []struct { + name string + want *Metrics + }{ + { + name: "1", + want: &Metrics{ + metricJSON: testJSON(`{}`), + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := NewMetrics(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("NewMetrics() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestMetrics_Set(t *testing.T) { + type args struct { + path string + value interface{} + } + tests := []struct { + name string + m *Metrics + args args + want string + wantErr bool + }{ + { + name: "1", + m: NewMetrics(), + args: args{ + path: "path", + value: "value", + }, + want: "value", + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := tt.m.Set(tt.args.path, tt.args.value); (err != nil) != tt.wantErr { + t.Errorf("Metrics.Set() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got, _ := tt.m.JSON().GetString(tt.args.path); got != tt.want { + t.Errorf("Metrics.Set() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestMetrics_Get(t *testing.T) { + type args struct { + key string + } + tests := []struct { + name string + m *Metrics + args args + want *encoding.JSON + }{ + { + name: "1", + m: &Metrics{ + metricJSON: testJSON(`{"test":{"path":"value"}}`), + }, + args: args{ + key: "test", + }, + want: testJSON(`{"path":"value"}`), + }, + { + name: "1", + m: &Metrics{ + metricJSON: testJSON(`{"test":{"path":"value"}}`), + }, + args: args{ + key: "test.path", + }, + want: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := tt.m.Get(tt.args.key) + if tt.want == nil && got == nil { + return + } + if got.String() != tt.want.String() { + t.Errorf("Metrics.Get() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/datax/core/statistics/container/plugin/default_job_collector.go b/datax/core/statistics/container/plugin/default_job_collector.go index d6afa10..b297021 100644 --- a/datax/core/statistics/container/plugin/default_job_collector.go +++ b/datax/core/statistics/container/plugin/default_job_collector.go @@ -16,23 +16,26 @@ package plugin import ( "github.com/Breeze0806/go-etl/datax/common/plugin" + "github.com/Breeze0806/go-etl/datax/core/statistics/container" "github.com/Breeze0806/go/encoding" ) //DefaultJobCollector 默认工作收集器 -type DefaultJobCollector struct{} +type DefaultJobCollector struct { + metrics *container.Metrics +} //NewDefaultJobCollector 创建默认工作收集器 -func NewDefaultJobCollector() plugin.JobCollector { - return &DefaultJobCollector{} +func NewDefaultJobCollector(metrics *container.Metrics) plugin.JobCollector { + return &DefaultJobCollector{metrics: metrics} } -//MessageMap 空方法 -func (d *DefaultJobCollector) MessageMap() *encoding.JSON { - return nil +//JSON 获取json的指标 +func (d *DefaultJobCollector) JSON() *encoding.JSON { + return d.metrics.JSON() } -//MessageByKey 空方法 -func (d *DefaultJobCollector) MessageByKey(key string) *encoding.JSON { - return nil +//JSONByKey 获取关键字是key的json的指标 +func (d *DefaultJobCollector) JSONByKey(key string) *encoding.JSON { + return d.metrics.Get(key) } diff --git a/datax/core/statistics/container/plugin/default_job_collector_test.go b/datax/core/statistics/container/plugin/default_job_collector_test.go new file mode 100644 index 0000000..a45eba0 --- /dev/null +++ b/datax/core/statistics/container/plugin/default_job_collector_test.go @@ -0,0 +1,79 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package plugin + +import ( + "reflect" + "testing" + + "github.com/Breeze0806/go-etl/datax/core/statistics/container" +) + +type testStruct struct { + Path string `json:"path"` +} + +func TestDefaultJobCollector_JSON(t *testing.T) { + m := container.NewMetrics() + m.Set("test", testStruct{Path: "value"}) + tests := []struct { + name string + d *DefaultJobCollector + want string + }{ + { + name: "1", + d: NewDefaultJobCollector(m).(*DefaultJobCollector), + want: `{"test":{"path":"value"}}`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.d.JSON().String(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("DefaultJobCollector.JSON() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestDefaultJobCollector_JSONByKey(t *testing.T) { + m := container.NewMetrics() + m.Set("test", testStruct{Path: "value"}) + type args struct { + key string + } + tests := []struct { + name string + d *DefaultJobCollector + args args + want string + }{ + { + name: "1", + d: NewDefaultJobCollector(m).(*DefaultJobCollector), + args: args{ + key: "test", + }, + want: `{"path":"value"}`, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := tt.d.JSONByKey(tt.args.key).String(); !reflect.DeepEqual(got, tt.want) { + t.Errorf("DefaultJobCollector.JSONByKey() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/datax/core/taskgroup/container.go b/datax/core/taskgroup/container.go index 7c14ca4..8ce670b 100644 --- a/datax/core/taskgroup/container.go +++ b/datax/core/taskgroup/container.go @@ -16,6 +16,7 @@ package taskgroup import ( "context" + "strconv" "strings" "sync" "time" @@ -23,6 +24,7 @@ import ( "github.com/Breeze0806/go-etl/config" coreconst "github.com/Breeze0806/go-etl/datax/common/config/core" "github.com/Breeze0806/go-etl/datax/core" + "github.com/Breeze0806/go-etl/datax/core/statistics/container" "github.com/Breeze0806/go-etl/schedule" "github.com/pingcap/errors" ) @@ -33,15 +35,15 @@ type Container struct { Err error - jobID int64 - taskGroupID int64 - scheduler *schedule.TaskSchduler - wg sync.WaitGroup - tasks *taskManager - ctx context.Context - SleepInterval time.Duration - retryInterval time.Duration - retryMaxCount int32 + jobID int64 + taskGroupID int64 + scheduler *schedule.TaskSchduler + wg sync.WaitGroup + tasks *taskManager + ctx context.Context + ReportInterval time.Duration + retryInterval time.Duration + retryMaxCount int32 } //NewContainer 根据JSON配置conf创建任务组容器 @@ -53,6 +55,7 @@ func NewContainer(ctx context.Context, conf *config.JSON) (c *Container, err err ctx: ctx, } c.SetConfig(conf) + c.SetMetrics(container.NewMetrics()) c.jobID, err = c.Config().GetInt64(coreconst.DataxCoreContainerJobID) if err != nil { return nil, err @@ -61,14 +64,14 @@ func NewContainer(ctx context.Context, conf *config.JSON) (c *Container, err err if err != nil { return nil, err } - - c.SleepInterval = time.Duration( - c.Config().GetInt64OrDefaullt(coreconst.DataxCoreContainerJobSleepinterval, 1000)) * time.Millisecond + c.Metrics().Set("taskGroupID", c.taskGroupID) + c.ReportInterval = time.Duration( + c.Config().GetInt64OrDefaullt(coreconst.DataxCoreContainerTaskGroupReportinterval, 1)) * time.Second c.retryInterval = time.Duration( - c.Config().GetInt64OrDefaullt(coreconst.DataxCoreContainerTaskFailoverMaxretrytimes, 10000)) * time.Millisecond + c.Config().GetInt64OrDefaullt(coreconst.DataxCoreContainerTaskFailoverRetryintervalinmsec, 1000)) * time.Millisecond c.retryMaxCount = int32(c.Config().GetInt64OrDefaullt(coreconst.DataxCoreContainerTaskFailoverMaxretrytimes, 1)) - log.Infof("datax job(%v) taskgruop(%v) sleepInterval: %v retryInterval: %v retryMaxCount: %v config: %v", - c.jobID, c.taskGroupID, c.SleepInterval, c.retryInterval, c.retryMaxCount, conf) + log.Infof("datax job(%v) taskgruop(%v) reportInterval: %v retryInterval: %v retryMaxCount: %v config: %v", + c.jobID, c.taskGroupID, c.ReportInterval, c.retryInterval, c.retryMaxCount, conf) return } @@ -87,17 +90,6 @@ func (c *Container) Do() error { return c.Start() } -//Stats 获取统计信息 -func (c *Container) Stats() (stats []Stats) { - for _, v := range c.tasks.manager.Runs() { - stat := v.(*taskExecer).Stats() - stat.JobID = c.jobID - stat.TaskGroupID = c.taskGroupID - stats = append(stats, stat) - } - return -} - //Start 开始运行,使用任务调度器执行这些JSON配置 func (c *Container) Start() (err error) { log.Infof("datax job(%v) taskgruop(%v) start", c.jobID, c.taskGroupID) @@ -135,7 +127,7 @@ func (c *Container) Start() (err error) { } } log.Infof("datax job(%v) taskgruop(%v) manage tasks", c.jobID, c.taskGroupID) - ticker := time.NewTicker(c.SleepInterval) + ticker := time.NewTicker(c.ReportInterval) defer ticker.Stop() QueueLoop: //任务队列不为空 @@ -194,32 +186,51 @@ func (c *Container) startTaskExecer(te *taskExecer) (err error) { c.wg.Done() return err } - log.Debugf("datax job(%v) taskgruop(%v) task(%v) start", c.jobID, c.taskGroupID, te.Key()) go func(te *taskExecer) { - defer c.wg.Done() - timer := time.NewTimer(c.retryInterval) - defer timer.Stop() - select { - case te.Err = <-errChan: - //当失败时,重试次数不超过最大重试次数,写入任务是否支持失败冲时,这些决定写入任务是否冲时 - if te.Err != nil && te.WriterSuportFailOverport() && te.AttemptCount() <= c.retryMaxCount { - log.Debugf("datax job(%v) taskgruop(%v) task(%v) shutdown and retry. attemptCount: %v err: %v", - c.jobID, c.taskGroupID, te.Key(), te.AttemptCount(), te.Err) - //关闭任务 - te.Shutdown() - select { - case <-timer.C: - case <-c.ctx.Done(): + log.Debugf("datax job(%v) taskgruop(%v) task(%v) start", c.jobID, c.taskGroupID, te.Key()) + defer func() { + log.Debugf("datax job(%v) taskgruop(%v) task(%v) end", c.jobID, c.taskGroupID, te.Key()) + c.wg.Done() + }() + statsTimer := time.NewTicker(c.ReportInterval) + defer statsTimer.Stop() + for { + select { + case te.Err = <-errChan: + //当失败时,重试次数不超过最大重试次数,写入任务是否支持失败冲时,这些决定写入任务是否冲时 + if te.Err != nil && te.WriterSuportFailOverport() && te.AttemptCount() <= c.retryMaxCount { + log.Debugf("datax job(%v) taskgruop(%v) task(%v) shutdown and retry. attemptCount: %v err: %v", + c.jobID, c.taskGroupID, te.Key(), te.AttemptCount(), te.Err) + //关闭任务 + te.Shutdown() + timer := time.NewTimer(c.retryInterval) + defer timer.Stop() + select { + case <-timer.C: + case <-c.ctx.Done(): + return + } + //从运行队列移到待执行队列 + c.tasks.removeRunAndPushRemain(te) + } else { + //从任务调度器移除 + c.tasks.removeRun(te) + c.setStats(te) } - //从运行队列移到待执行队列 - c.tasks.removeRunAndPushRemain(te) - } else { - log.Debugf("datax job(%v) taskgruop(%v) task(%v) end", c.jobID, c.taskGroupID, te.Key()) - //从任务调度器移除 - c.tasks.removeRun(te) + return + case <-c.ctx.Done(): + return + case <-statsTimer.C: + c.setStats(te) } - case <-c.ctx.Done(): } }(te) return } + +func (c *Container) setStats(te *taskExecer) { + key := "metrics." + strconv.FormatInt(te.taskID, 10) + stats := te.Stats() + + c.Metrics().Set(key, stats) +} diff --git a/datax/core/taskgroup/container_test.go b/datax/core/taskgroup/container_test.go index 3a7dab0..6573f30 100644 --- a/datax/core/taskgroup/container_test.go +++ b/datax/core/taskgroup/container_test.go @@ -40,11 +40,13 @@ func TestContainer_Do(t *testing.T) { "core" : { "container": { "job":{ - "id": 1, - "sleepInterval":100 + "id": 1 }, "taskGroup":{ "id": 1, + "reportInterval":1 + }, + "task":{ "failover":{ "retryIntervalInMsec":10 } @@ -115,11 +117,12 @@ func TestContainer_DoCancel1(t *testing.T) { "core" : { "container": { "job":{ - "id": 1, - "sleepInterval":100 + "id": 1 }, "taskGroup":{ - "id": 1, + "id": 1 + }, + "task":{ "failover":{ "retryIntervalInMsec":10 } @@ -164,11 +167,13 @@ func TestContainer_DoCancel2(t *testing.T) { "core" : { "container": { "job":{ - "id": 1, - "sleepInterval":100 + "id": 1 }, "taskGroup":{ "id": 1, + "reportInterval":1 + }, + "task":{ "failover":{ "retryIntervalInMsec":10 } @@ -196,7 +201,7 @@ func TestContainer_DoCancel2(t *testing.T) { cancel() }() - if err := c.Do(); err != context.Canceled { + if err := c.Do(); err == nil { t.Errorf("Do error: %v", err) } } @@ -214,12 +219,14 @@ func TestContainer_JobId(t *testing.T) { "container": { "job":{ "id": 30000000, - "sleepInterval":100 + "reportInterval":1 }, "taskGroup":{ - "id": 1, + "id": 1 + }, + "task":{ "failover":{ - "retryIntervalInMsec":0 + "retryIntervalInMsec":10 } } } @@ -234,12 +241,14 @@ func TestContainer_JobId(t *testing.T) { "container": { "job":{ "id": 1000000000000000000, - "sleepInterval":100 + "reportInterval":1 }, "taskGroup":{ - "id": 1, + "id": 1 + }, + "task":{ "failover":{ - "retryIntervalInMsec":0 + "retryIntervalInMsec":10 } } } @@ -271,10 +280,12 @@ func TestContainer_TaskGroupId(t *testing.T) { "container": { "job":{ "id": 30000000, - "sleepInterval":100 + "reportInterval":1 }, "taskGroup":{ - "id": 30000001, + "id": 30000001 + }, + "task":{ "failover":{ "retryIntervalInMsec":0 } @@ -291,7 +302,7 @@ func TestContainer_TaskGroupId(t *testing.T) { "container": { "job":{ "id": 1000000000000000000, - "sleepInterval":100 + "reportInterval":1 }, "taskGroup":{ "id": 1000000000000000001, @@ -327,10 +338,12 @@ func TestContainer_Start(t *testing.T) { "container": { "job":{ "id": 30000000, - "sleepInterval":100 + "reportInterval":1 }, "taskGroup":{ - "id": 30000001, + "id": 30000001 + }, + "task":{ "failover":{ "retryIntervalInMsec":0 } @@ -370,10 +383,12 @@ func TestNewContainer(t *testing.T) { "container": { "job":{ "id": "30000000", - "sleepInterval":100 + "reportInterval":1 }, "taskGroup":{ - "id": 30000001, + "id": 30000001 + }, + "task":{ "failover":{ "retryIntervalInMsec":0 } @@ -394,12 +409,14 @@ func TestNewContainer(t *testing.T) { "container": { "job":{ "id": 30000002, - "sleepInterval":100 + "reportInterval":1 }, "taskGroup":{ - "id": "30000001", + "id": "30000001" + }, + "task":{ "failover":{ - "retryIntervalInMsec":0 + "retryIntervalInMsec":10 } } } @@ -438,12 +455,14 @@ func TestContainer_startTaskExecer(t *testing.T) { "container": { "job":{ "id": 30000000, - "sleepInterval":100 + "reportInterval":1 }, "taskGroup":{ - "id": 30000001, + "id": 30000001 + }, + "task":{ "failover":{ - "retryIntervalInMsec":0 + "retryIntervalInMsec":10 } } } diff --git a/datax/core/taskgroup/help_test.go b/datax/core/taskgroup/help_test.go index 5ff0dda..5e1a20e 100644 --- a/datax/core/taskgroup/help_test.go +++ b/datax/core/taskgroup/help_test.go @@ -25,6 +25,7 @@ import ( "github.com/Breeze0806/go-etl/datax/common/plugin/loader" "github.com/Breeze0806/go-etl/datax/common/spi/reader" "github.com/Breeze0806/go-etl/datax/common/spi/writer" + "github.com/Breeze0806/go-etl/element" ) type mockPlugin struct { @@ -120,6 +121,7 @@ func newMockRandReaderTask(errs []error) *mockRandReaderTask { } func (m *mockRandReaderTask) StartRead(ctx context.Context, sender plugin.RecordSender) error { + defer sender.SendWriter(element.GetTerminateRecord()) if x := m.rand.Int31n(math.MaxInt16); x < math.MaxInt16/2 { return m.startReadErr } diff --git a/datax/core/taskgroup/task_execer.go b/datax/core/taskgroup/task_execer.go index 9bbfd2b..5ca3928 100644 --- a/datax/core/taskgroup/task_execer.go +++ b/datax/core/taskgroup/task_execer.go @@ -251,10 +251,8 @@ Loop: //Stats 统计信息 type Stats struct { - JobID int64 `json:"jobID"` - TaskGroupID int64 `json:"taskGroupID"` - TaskID int64 `json:"taskID"` - Channel channel.StatsJSON `json:"channel"` + TaskID int64 `json:"taskID"` + Channel channel.StatsJSON `json:"channel"` } func (s *Stats) String() string { diff --git a/datax/plugin/reader/resources/plugin.json b/datax/plugin/reader/resources/plugin.json deleted file mode 100644 index 0564ae7..0000000 --- a/datax/plugin/reader/resources/plugin.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "name" : "reader", - "developer":"Breeze0806", - "dialect": "", - "description":"" -} \ No newline at end of file diff --git a/datax/plugin/writer/resources/plugin.json b/datax/plugin/writer/resources/plugin.json deleted file mode 100644 index fabb7c3..0000000 --- a/datax/plugin/writer/resources/plugin.json +++ /dev/null @@ -1,6 +0,0 @@ -{ - "name" : "writer", - "developer":"Breeze0806", - "dialect":"", - "description":"" -} \ No newline at end of file diff --git a/go.mod b/go.mod index dddb6a8..64acdb3 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/go-sql-driver/mysql v1.6.0 github.com/godror/godror v0.33.3 github.com/google/uuid v1.3.0 + github.com/gorilla/handlers v1.5.1 github.com/ibmdb/go_ibm_db v0.4.1 github.com/lib/pq v1.10.7 github.com/pingcap/errors v0.11.4 diff --git a/go.sum b/go.sum index 32e15c1..14f7919 100644 --- a/go.sum +++ b/go.sum @@ -11,6 +11,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw= github.com/denisenkom/go-mssqldb v0.12.3/go.mod h1:k0mtMFOnU+AihqFxPMiF05rtiDrorD1Vrm1KEz5hxDo= github.com/dnaeon/go-vcr v1.2.0/go.mod h1:R4UdLID7HZT3taECzJs4YgbbH6PIGXB6W/sc5OLb6RQ= +github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ= +github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0= @@ -32,6 +34,8 @@ github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/handlers v1.5.1 h1:9lRY6j8DEeeBT10CvO9hGW0gmky0BprnvDI5vfhUHH4= +github.com/gorilla/handlers v1.5.1/go.mod h1:t8XrUpc4KVXb7HGyJ4/cEnwQiaxrX/hz1Zv/4g96P1Q= github.com/ibmdb/go_ibm_db v0.4.1 h1:IYZqoKTzD9xtkzLIkp8u6zzg7/4v7nFOfHzF79agvak= github.com/ibmdb/go_ibm_db v0.4.1/go.mod h1:nl5aUh1IzBVExcqYXaZLApaq8RUvTEph3VP49UTmEvg= github.com/lib/pq v1.10.5/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=