Skip to content

Commit

Permalink
feat: 新增流控特性和简单监控特性,分割特性最大值最小值预置值 (#13)
Browse files Browse the repository at this point in the history
* feat: 新增流控特性

* feat & docs: 新增channel统计信息,变更reader,writer的rdbm到dbms, 新增切分主键预置最大值和最小值

* feat& fix: 新增流量监控和修复限流特性中的问题

* feat & test: 修复打印结果和测试1.19,1.20

* test & fix: 修复通道数无法生效的问题,测试split的默认值。

* fix: 调整通道数为4
  • Loading branch information
Breeze0806 authored May 28, 2023
1 parent 14665f4 commit c26eb4e
Show file tree
Hide file tree
Showing 104 changed files with 1,386 additions and 473 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/Build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
go: [ '1.16', '1.17' , '1.18']
go: [ '1.16', '1.17' , '1.18', '1.19', '1.20']
name: Go ${{ matrix.go }} sample
steps:
- uses: actions/checkout@v3
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ cmd/datax/*.log
cmd/datax/run.*
cmd/datax/version.go
cmd/datax/tools/testData/config
cmd/datax/examples/**/*.csv
datax/plugin/plugin.go
datax/plugin/reader/**/plugin.go
datax/plugin/writer/**/plugin.go
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export LD_LIBRARY_PATH=${DB2HOME}/lib
# stable release.
GO_VERSION := $(shell go version | cut -d " " -f 3)
GO_MINOR_VERSION := $(word 2,$(subst ., ,$(GO_VERSION)))
LINTABLE_MINOR_VERSIONS := 18
LINTABLE_MINOR_VERSIONS := 20
ifneq ($(filter $(LINTABLE_MINOR_VERSIONS),$(GO_MINOR_VERSION)),)
SHOULD_LINT := true
endif
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ go-etl将提供的etl能力如下:
#### plan

- [x] 实现关系型数据库的任务切分
- [ ] 实现监控模块
- [ ] 实现流控模块
- [x] 实现监控模块
- [x] 实现流控模块
- [ ] 实现关系型数据库入库断点续传

### storage
Expand Down
37 changes: 23 additions & 14 deletions README_USER.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ make release
### windows

#### 依赖
1.需要mingw-w64 with gcc 7.2.0以上的环境进行编译
2.golang 1.16以及以上
3.最小编译环境为win7
1. 需要mingw-w64 with gcc 7.2.0以上的环境进行编译
2. golang 1.16以及以上
3. 最小编译环境为win7

#### 构建
```bash
Expand Down Expand Up @@ -76,14 +76,6 @@ datax -c tools/testData/xlsx.json -w tools/testData/wizard.csv
"retryIntervalInMsec":0
}
}
},
"transport":{
"channel":{
"speed":{
"byte": 100,
"record":100
}
}
}
},
"job":{
Expand Down Expand Up @@ -129,13 +121,30 @@ datax -c tools/testData/xlsx.json -w tools/testData/wizard.csv
],
"setting":{
"speed":{
"byte":3000,
"record":400,
"byte":0,
"record":1024,
"channel":4
}
}
}
}
```
#### 流控配置

之前speed的byte和record配置并不会生效,现在加入流控特性后,byte和record将会生效,byte会限制缓存消息字节数,而record会限制缓存消息条数,如果byte设置过小会导致缓存过小而导致同步数据失败。当byte为0或负数时,限制器将不会工作,例如byte为10485760,现在为10Mb(10*1024*1024)。
```json
{
"job":{
"setting":{
"speed":{
"byte":,
"record":10485760,
"channel":4
}
}
}
}

```

#### 源目的配置向导文件
Expand Down Expand Up @@ -295,7 +304,7 @@ datax -c examples/postgresxlsx/config.json

#### 使用切分键

这里假设数据按切分键分布是均匀的,合理使用这样的切分键可以使同步更快
这里假设数据按切分键分布是均匀的,合理使用这样的切分键可以使同步更快,另外为了加快对最大值和最小值的查询,这里对于大表可以预设最大最小值

##### 测试方式
- 使用程序生成mysql数据产生split.csv
Expand Down
18 changes: 2 additions & 16 deletions cmd/datax/examples/csvpostgres/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,6 @@
"job":{
"id": 1,
"sleepInterval":100
},
"taskGroup":{
"id": 1,
"failover":{
"retryIntervalInMsec":0
}
}
},
"transport":{
"channel":{
"speed":{
"byte": 100,
"record":100
}
}
}
},
Expand Down Expand Up @@ -64,8 +50,8 @@
],
"setting":{
"speed":{
"byte":3000,
"record":400,
"byte":0,
"record":1024,
"channel":4
}
}
Expand Down
18 changes: 2 additions & 16 deletions cmd/datax/examples/db2/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,6 @@
"job":{
"id": 1,
"sleepInterval":100
},
"taskGroup":{
"id": 1,
"failover":{
"retryIntervalInMsec":0
}
}
},
"transport":{
"channel":{
"speed":{
"byte": 100,
"record":100
}
}
}
},
Expand Down Expand Up @@ -64,8 +50,8 @@
],
"setting":{
"speed":{
"byte":3000,
"record":400,
"byte":0,
"record":1024,
"channel":4
}
}
Expand Down
40 changes: 40 additions & 0 deletions cmd/datax/examples/limit/csv.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"core" : {
"container": {
"job":{
"id": 1,
"sleepInterval":100
}
}
},
"job":{
"content":[
{
"reader":{
"name": "csvreader",
"parameter": {
"path":["examples/limit/src.csv"],
"encoding":"utf-8",
"delimiter":","
}
},
"writer":{
"name": "csvwriter",
"parameter": {
"path":["examples/limit/dest.csv"],
"encoding":"utf-8",
"delimiter":","
}
},
"transformer":[]
}
],
"setting":{
"speed":{
"byte":1024,
"record":100,
"channel":4
}
}
}
}
48 changes: 48 additions & 0 deletions cmd/datax/examples/limit/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2020 the go-etl Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
"encoding/base64"
"encoding/csv"
"fmt"
"os"
"strconv"
"time"

"github.com/Breeze0806/go-etl/element"
)

func main() {
f, err := os.Create("src.csv")
if err != nil {
fmt.Println("crete file fail. err:", err)
return
}
defer f.Close()

w := csv.NewWriter(f)
for i := 0; i < 1000000; i++ {
record := []string{strconv.Itoa(i),
time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC).AddDate(0, 0, i/1000).Format(element.DefaultTimeFormat[:10]),
base64.StdEncoding.EncodeToString([]byte{byte(i / 100 / 100), byte((i / 100) % 100), byte(i % 100)}),
}
w.Write(record)
if (i+1)%1000 == 0 {
w.Flush()
}
}
w.Flush()
}
18 changes: 2 additions & 16 deletions cmd/datax/examples/mysql/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,6 @@
"job":{
"id": 1,
"sleepInterval":100
},
"taskGroup":{
"id": 1,
"failover":{
"retryIntervalInMsec":0
}
}
},
"transport":{
"channel":{
"speed":{
"byte": 100,
"record":100
}
}
}
},
Expand Down Expand Up @@ -65,8 +51,8 @@
],
"setting":{
"speed":{
"byte":3000,
"record":400,
"byte":0,
"record":1024,
"channel":4
}
}
Expand Down
18 changes: 2 additions & 16 deletions cmd/datax/examples/oracle/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,6 @@
"job":{
"id": 1,
"sleepInterval":100
},
"taskGroup":{
"id": 1,
"failover":{
"retryIntervalInMsec":0
}
}
},
"transport":{
"channel":{
"speed":{
"byte": 100,
"record":100
}
}
}
},
Expand Down Expand Up @@ -64,8 +50,8 @@
],
"setting":{
"speed":{
"byte":3000,
"record":400,
"byte":0,
"record":1024,
"channel":4
}
}
Expand Down
18 changes: 2 additions & 16 deletions cmd/datax/examples/postgres/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,6 @@
"job":{
"id": 1,
"sleepInterval":100
},
"taskGroup":{
"id": 1,
"failover":{
"retryIntervalInMsec":0
}
}
},
"transport":{
"channel":{
"speed":{
"byte": 100,
"record":100
}
}
}
},
Expand Down Expand Up @@ -66,8 +52,8 @@
],
"setting":{
"speed":{
"byte":3000,
"record":400,
"byte":0,
"record":1024,
"channel":4
}
}
Expand Down
18 changes: 2 additions & 16 deletions cmd/datax/examples/postgrescsv/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,6 @@
"job":{
"id": 1,
"sleepInterval":100
},
"taskGroup":{
"id": 1,
"failover":{
"retryIntervalInMsec":0
}
}
},
"transport":{
"channel":{
"speed":{
"byte": 100,
"record":100
}
}
}
},
Expand Down Expand Up @@ -63,8 +49,8 @@
],
"setting":{
"speed":{
"byte":3000,
"record":400,
"byte":0,
"record":1024,
"channel":4
}
}
Expand Down
Loading

0 comments on commit c26eb4e

Please sign in to comment.