diff --git a/.gitignore b/.gitignore index 244b3a1..644fa86 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,7 @@ datax/plugin_test.go datax/plugin.go *.txt *.out -*.exe \ No newline at end of file +*.exe +release/* +*.zip +*.tar.gz \ No newline at end of file diff --git a/Makefile b/Makefile index 4924c52..2246670 100644 --- a/Makefile +++ b/Makefile @@ -59,7 +59,7 @@ cover: .PHONY: release release: @go generate ./... && cd cmd/datax && go build && cd ../.. - + @go run tools/datax/release/main.go .PHONY: doc doc: @godoc -http=:6080 \ No newline at end of file diff --git a/README.md b/README.md index 4d651da..01342c6 100644 --- a/README.md +++ b/README.md @@ -38,14 +38,14 @@ go-etl将提供的etl能力如下: 使用[go-etl用户手册](README_USER.md)开始数据同步 -### 开发宝典 +### 数据同步开发宝典 -参考[go-etl开发者文档](datax/README.md)来帮助开发 +参考[go-etl数据同步开发者文档](datax/README.md)来帮助开发 ## 模块简介 ### datax -本包将提供类似于阿里巴巴[DataX](https://github.com/alibaba/DataX)的接口去实现go的etl框架,目前主要实现了job框架内的数据同步能力,监控等功能还未实现. +本包将提供类似于阿里巴巴[DataX](https://github.com/alibaba/DataX)的接口去实现go的etl框架,目前主要实现了job框架内的数据同步能力. #### plan diff --git a/README_USER.md b/README_USER.md index 7812219..a005084 100644 --- a/README_USER.md +++ b/README_USER.md @@ -1,67 +1,55 @@ # go-etl用户手册 -## 从源码进行编译 +go-etl的datax是一个数据同步工具,目前支持MySQL,postgres,oracle,SQL SERVER,DB2等主流关系型数据库以及csv,xlsx文件之间的数据同步。 -### linux +## 1 从源码进行编译 -#### 依赖 +### 1.1 linux + +#### 1.1.1 依赖 1. golang 1.16以及以上 -#### 构建 +#### 1.1.2 构建 ```bash make dependencies make release ``` -### windows +### 1.2 windows -#### 依赖 +#### 1.2.1 依赖 1. 需要mingw-w64 with gcc 7.2.0以上的环境进行编译 2. golang 1.16以及以上 3. 最小编译环境为win7 -#### 构建 +#### 1.2.2 构建 ```bash release.bat ``` -## 如何开始 +## 2 如何开始 下载对应操作系统的datax,在linux下如Makefile所示export LD_LIBRARY_PATH=${DB2HOME}/lib,否则无法运行 可以使用[ibm db2](https://public.dhe.ibm.com/ibmdl/export/pub/software/data/db2/drivers/odbc_cli/)以及[oracle](https://www.oracle.com/database/technologies/instant-client/downloads.html)下载到对应64位版本odbc依赖,也可以在**QQ群185188648**群共享中中下载到。 -### 查看版本 - -``` -datax version -v0.1.0 (git commit: c82eb302218f38cd3851df4b425256e93f85160d) complied by go version go1.16.5 windows/amd64 -``` - -### 使用方式 +### 2.1 单任务数据同步 +调用datax十分简单,只要直接调用它即可 ```bash -Usage of datax: - -c string - config (default "config.json") - -w string - wizard +data -c config.json ``` +-c 指定数据源配置文件 -### 批量生成配置集和执行脚本 - -```bash -datax -c tools/testData/xlsx.json -w tools/testData/wizard.csv -``` --c 指定数据源配置文件 -w 指定源目的配置向导文件。 +当返回值是0,并且显示run success,表示执行成功 -执行结果会在数据源配置文件目录文件生成源目的配置向导文件行数的配置集,分别以指定数据源配置文件1.json,指定数据源配置文件2.json,...,指定数据源配置文件[n].json的配置集。 +当返回值是1,并且显示run fail,并告知执行失败的原因 -另外,在当前目录会生成执行脚本run.bat或者run.sh。 -#### 数据源配置文件 +#### 2.1.1 数据源配置文件 数据源配置文件是json文件,使用数据源相互组合,如从mysql同步到postgres中 + ```json { "core" : { @@ -69,12 +57,6 @@ datax -c tools/testData/xlsx.json -w tools/testData/wizard.csv "job":{ "id": 1, "sleepInterval":100 - }, - "taskGroup":{ - "id": 1, - "failover":{ - "retryIntervalInMsec":0 - } } } }, @@ -118,14 +100,7 @@ datax -c tools/testData/xlsx.json -w tools/testData/wizard.csv }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":0, - "record":1024, - "channel":4 - } - } + ] } } ``` @@ -147,74 +122,23 @@ datax -c tools/testData/xlsx.json -w tools/testData/wizard.csv ``` -#### 源目的配置向导文件 -源目的配置向导文件是csv文件,每行配置可以配置如下: -```csv -path[table],path[table] -``` -每一列可以是路径或者是表名,注意所有的表要配置库名或者模式名,需要在数据源配置文件配置。 - -#### 测试结果 -可以运行cmd/datax/testData的测试数据 -```bash -cd cmd/datax -datax -c testData/xlsx.json -w testData/wizard.csv -``` -结果会在testData下生成wizard.csv行数的配置文件,分别以xlsx1.json,xlsx2.json,...,xlsx[n].json的配置集。 - -### 数据同步 - -调用datax十分简单,只要直接调用它即可 +`reader`和`writer`的配置如下: -```bash -data -c config.json -``` --c 指定数据源配置文件 +| 类型 | 数据源 | Reader(读) | Writer(写) | 文档 | +| ------------ | ------------------ | ------------ | ---------- | ------------------------------------------------------------ | +| 关系型数据库 | MySQL/Mariadb/Tidb | √ | √ | [读](datax/plugin/reader/mysql/README.md)、[写](datax/plugin/writer/mysql/README.md) | +| | Postgres/Greenplum | √ | √ | [读](datax/plugin/reader/postgres/README.md)、[写](datax/plugin/writer/postgres/README.md) | +| | DB2 LUW | √ | √ | [读](datax/plugin/reader/db2/README.md)、[写](datax/plugin/writer/db2/README.md) | +| | SQL Server | √ | √ | [读](datax/plugin/reader/sqlserver/README.md)、[写](datax/plugin/writer/sqlserver/README.md) | +| | Oracle | √ | √ | [读](datax/plugin/reader/oracle/README.md)、[写](datax/plugin/writer/oracle/README.md) | +| 无结构流 | CSV | √ | √ | [读](datax/plugin/reader/csv/README.md)、[写](datax/plugin/writer/csv/README.md) | +| | XLSX(excel) | √ | √ | [读](datax/plugin/reader/xlsx/README.md)、[写](datax/plugin/writer/xlsx/README.md) | -当返回值是0,并且显示run success,表示执行成功 +#### 2.1.2 使用示例 -当返回值是1,并且显示run fail,并告知执行失败的原因 +注意在linux下如Makefile所示export LD_LIBRARY_PATH=${DB2HOME}/lib -#### 数据库全局配置 - -```json -{ - "job":{ - "setting":{ - "pool":{ - "maxOpenConns":8, - "maxIdleConns":8, - "connMaxIdleTime":"40m", - "connMaxLifetime":"40m" - }, - "retry":{ - "type":"ntimes", - "strategy":{ - "n":3, - "wait":"1s" - }, - "ignoreOneByOneError":true - } - } - } -} -``` -##### 连接池pool -+ maxOpenConns: 最大连接打开数 -+ maxIdleConns: 最大空闲连接打开数 -+ connMaxIdleTime: 最大空闲时间 -+ connMaxLifetime: 最大生存时间 - -##### 重试retry -ignoreOneByOneError 是否忽略一个个重试错误 -+ 重试类型type和重试策略 -1. 类型有`ntimes`,指n次数重复重试策略,`"strategy":{"n":3,"wait":"1s"}`,n代表重试次数,wait代表等待时间 -2. 类型有`forever`,指永久重复重试策略,`"strategy":{"wait":"1s"}`,wait代表等待时间 -3. 类型有`exponential`,指幂等重复重试策略,`"strategy":{"init":"100ms","max":"4s"}`,init代表开始时间,max代表最大时间 - -### 使用示例 - -#### 使用mysql同步 +##### 2.1.2.1 使用mysql同步 - 使用cmd/datax/examples/mysql/init.sql初始化数据库**用于测试** - 开启同步mysql命令 @@ -223,7 +147,7 @@ ignoreOneByOneError 是否忽略一个个重试错误 datax -c examples/mysql/config.json ``` -#### 使用postgres同步 +##### 2.1.2.2 使用postgres同步 - 使用cmd/datax/examples/postgres/init.sql初始化数据库**用于测试** - 开启同步postgres命令 @@ -232,7 +156,7 @@ datax -c examples/mysql/config.json datax -c examples/postgres/config.json ``` -#### 使用db2同步 +##### 2.1.2.3 使用db2同步 - 注意使用前请下载相应的db2的odbc库,如linux的make dependencies和release.bat - 注意在linux下如Makefile所示export LD_LIBRARY_PATH=${DB2HOME}/lib @@ -244,7 +168,7 @@ datax -c examples/postgres/config.json datax -c examples/db2/config.json ``` -#### 使用oracle同步 +##### 2.1.2.4 使用oracle同步 - 注意使用前请下载相应的[Oracle Instant Client]( https://www.oracle.com/database/technologies/instant-client/downloads.html),例如,连接oracle 11g最好下载12.x版本。 - 注意在linux下如export LD_LIBRARY_PATH=/opt/oracle/instantclient_21_1:$LD_LIBRARY_PATH,另需要安装libaio @@ -257,7 +181,7 @@ Oracle Instant Client 19不再支持windows7 datax -c examples/oracle/config.json ``` -#### 使用sql server同步 +##### 2.1.2.5 使用sql server同步 - 使用cmd/datax/examples/sqlserver/init.sql初始化数据库**用于测试** - 开启同步sql server命令 @@ -266,7 +190,7 @@ datax -c examples/oracle/config.json datax -c examples/sqlserver/config.json ``` -#### 使用csv同步到postgres +##### 2.1.2.6 使用csv同步到postgres - 使用cmd/datax/examples/csvpostgres/init.sql初始化数据库**用于测试** - 开启同步命令 @@ -275,7 +199,7 @@ datax -c examples/sqlserver/config.json datax -c examples/csvpostgres/config.json ``` -#### 使用xlsx同步到postgres +##### 2.1.2.7 使用xlsx同步到postgres - 使用cmd/examples/datax/csvpostgres/init.sql初始化数据库**用于测试** - 开启同步命令 @@ -284,7 +208,7 @@ datax -c examples/csvpostgres/config.json datax -c examples/xlsxpostgres/config.json ``` -#### 使用postgres同步csv +##### 2.1.2.8 使用postgres同步csv - 使用cmd/datax/examples/csvpostgres/init.sql初始化数据库**用于测试** - 开启同步命令 @@ -293,7 +217,7 @@ datax -c examples/xlsxpostgres/config.json datax -c examples/postgrescsv/config.json ``` -#### 使用postgres同步xlsx +##### 2.1.2.9 使用postgres同步xlsx - 使用cmd/datax/examples/csvpostgres/init.sql初始化数据库**用于测试** - 开启同步命令 @@ -302,11 +226,52 @@ datax -c examples/postgrescsv/config.json datax -c examples/postgresxlsx/config.json ``` -#### 使用切分键 +##### 2.1.2.10 其他同步例子 + +除了上述例子外,在go-etl特性中所列出的数据源都可以交叉使用,还配置例如mysql到postgresql数据源,mysql到oracle,oracle到db2等等, + +#### 2.1.3 数据库全局配置 + +```json +{ + "job":{ + "setting":{ + "pool":{ + "maxOpenConns":8, + "maxIdleConns":8, + "connMaxIdleTime":"40m", + "connMaxLifetime":"40m" + }, + "retry":{ + "type":"ntimes", + "strategy":{ + "n":3, + "wait":"1s" + }, + "ignoreOneByOneError":true + } + } + } +} +``` +##### 2.1.3.1 连接池pool ++ maxOpenConns: 最大连接打开数 ++ maxIdleConns: 最大空闲连接打开数 ++ connMaxIdleTime: 最大空闲时间 ++ connMaxLifetime: 最大生存时间 + +##### 2.1.3.2 重试retry +ignoreOneByOneError 是否忽略一个个重试错误 ++ 重试类型type和重试策略 +1. 类型有`ntimes`,指n次数重复重试策略,`"strategy":{"n":3,"wait":"1s"}`,n代表重试次数,wait代表等待时间 +2. 类型有`forever`,指永久重复重试策略,`"strategy":{"wait":"1s"}`,wait代表等待时间 +3. 类型有`exponential`,指幂等重复重试策略,`"strategy":{"init":"100ms","max":"4s"}`,init代表开始时间,max代表最大时间 + +#### 2.1.4 使用切分键 这里假设数据按切分键分布是均匀的,合理使用这样的切分键可以使同步更快,另外为了加快对最大值和最小值的查询,这里对于大表可以预设最大最小值 -##### 测试方式 +##### 2.1.4.1 测试方式 - 使用程序生成mysql数据产生split.csv ```bash cd cmd/datax/examples/split @@ -318,25 +283,184 @@ go run main.go cd ../.. datax -c examples/split/csv.json ``` -- 修改examples/split/mysql.json的split的key为id,dt,str +- 修改examples/split/config.json的split的key为id,dt,str - mysql数据库切分同步整形,日期,字符串类型 ```bash -datax -c examples/split/mysql.json +datax -c examples/split/config.json ``` -#### 使用preSql和postSql +#### 2.1.5 使用preSql和postSql preSql和postSql分别是写入数据前和写入数据后的sql语句组 -##### 测试方式 +##### 2.1.5.1 测试方式 在本例子中,采用了全量导入的方式 1.写入数据前先建立了一个临时表 2.在写入数据后,将原表删除,将临时表重名为新表 ```bash -datax -c examples/prePostSql/mysql.json +datax -c examples/prePostSql/config.json ``` -#### 其他同步例子 +#### 2.1.6 流控配置 -除了上述例子外,在go-etl特性中所列出的数据源都可以交叉使用,还配置例如mysql到postgresql数据源,mysql到oracle,oracle到db2等等, \ No newline at end of file +之前speed的byte和record配置并不会生效,现在加入流控特性后,byte和record将会生效,byte会限制缓存消息字节数,而record会限制缓存消息条数,如果byte设置过小会导致缓存过小而导致同步数据失败。当byte为0或负数时,限制器将不会工作, 例如byte为10485760,即10Mb(10x1024x1024)。 + +```json +{ + "job":{ + "setting":{ + "speed":{ + "byte":10485760, + "record":1024, + "channel":4 + } + } + } +} +``` +##### 2.1.6.1 流控测试 +- 使用程序生成src.csv,发起流控测试 +```bash +cd cmd/datax/examples/limit +go run main.go +cd ../.. +datax -c examples/limit/config.json +``` + + +### 2.2 多任务数据同步 + +#### 2.2.1 使用方式 + +##### 2.2.1.1 数据源配置文件 + +配置数据源配置文件,如从mysql同步到postgres中 + +```json +{ + "core" : { + "container": { + "job":{ + "id": 1, + "sleepInterval":100 + } + } + }, + "job":{ + "content":[ + { + "reader":{ + "name": "mysqlreader", + "parameter": { + "username": "test:", + "password": "test:", + "column": ["*"], + "connection": { + "url": "tcp(192.168.15.130:3306)/source?parseTime=false", + "table": { + "db":"source", + "name":"type_table" + } + }, + "where": "" + } + }, + "writer":{ + "name": "postgreswriter", + "parameter": { + "username": "postgres", + "password": "123456", + "writeMode": "insert", + "column": ["*"], + "preSql": [], + "connection": { + "url": "postgres://192.168.15.130:5432/postgres?sslmode=disable&connect_timeout=2", + "table": { + "schema":"destination", + "name":"type_table" + } + }, + "batchTimeout": "1s", + "batchSize":1000 + } + }, + "transformer":[] + } + ] + } +} +``` + +##### 2.2.1.2 源目的配置向导文件 + +源目的配置向导文件是csv文件,每行配置可以配置如下: + +```csv +path[table],path[table] +``` + +每一列可以是路径或者是表名,注意所有的表要配置库名或者模式名,需要在数据源配置文件配置。 + +##### 2.2.1.3 批量生成数据配置集和执行脚本 + +```bash +datax -c tools/testData/xlsx.json -w tools/testData/wizard.csv +``` +-c 指定数据源配置文件 -w 指定源目的配置向导文件。 + +执行结果会在数据源配置文件目录文件生成源目的配置向导文件行数的配置集,分别以指定数据源配置文件1.json,指定数据源配置文件2.json,...,指定数据源配置文件[n].json的配置集。 + +另外,在当前目录会生成执行脚本run.bat或者run.sh。 + +##### 2.2.1.4 批量生成数据配置集和执行脚本 + +###### windows + +```bash +run.bat +``` + +linux + +```bash +run.sh +``` + +#### 2.2.2 测试结果 +可以运行cmd/datax/testData的测试数据 +```bash +cd cmd/datax +datax -c testData/xlsx.json -w testData/wizard.csv +``` +结果会在testData下生成wizard.csv行数的配置文件,分别以xlsx1.json,xlsx2.json,...,xlsx[n].json的配置集。 + +### 2.3 数据同步帮助手册 + +#### 2.3.1 帮助命令 + +``` +datax -h +``` + +帮助显示 + +```bash +Usage of datax: + -c string #数据源配置文件 + config (default "config.json") + -w string #源目的配置向导文件 + wizard +``` + +#### 2.3.2 查看版本 + +``` +datax version +``` + +显示`版本号`(git commit: `git提交号`) complied by go version `go版本号` + +``` +v0.1.0 (git commit: c82eb302218f38cd3851df4b425256e93f85160d) complied by go version go1.16.5 windows/amd64 +``` \ No newline at end of file diff --git a/cmd/datax/examples/csvpostgres/config.json b/cmd/datax/examples/csvpostgres/config.json index 0f5db49..36902bf 100644 --- a/cmd/datax/examples/csvpostgres/config.json +++ b/cmd/datax/examples/csvpostgres/config.json @@ -47,13 +47,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":0, - "record":1024, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/examples/db2/config.json b/cmd/datax/examples/db2/config.json index 25b5dff..e6c61eb 100644 --- a/cmd/datax/examples/db2/config.json +++ b/cmd/datax/examples/db2/config.json @@ -47,13 +47,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":0, - "record":1024, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/examples/limit/config.json b/cmd/datax/examples/limit/config.json new file mode 100644 index 0000000..3e03009 --- /dev/null +++ b/cmd/datax/examples/limit/config.json @@ -0,0 +1,40 @@ +{ + "core" : { + "container": { + "job":{ + "id": 1, + "sleepInterval":100 + } + } + }, + "job":{ + "content":[ + { + "reader":{ + "name": "csvreader", + "parameter": { + "path":["examples/limit/src.csv"], + "encoding":"utf-8", + "delimiter":"," + } + }, + "writer":{ + "name": "csvwriter", + "parameter": { + "path":["examples/limit/dest.csv"], + "encoding":"utf-8", + "delimiter":"," + } + }, + "transformer":[] + } + ], + "setting":{ + "speed":{ + "byte":10485760, + "record":1024, + "channel":4 + } + } + } +} \ No newline at end of file diff --git a/cmd/datax/examples/mysql/config.json b/cmd/datax/examples/mysql/config.json index f4c5482..40576ee 100644 --- a/cmd/datax/examples/mysql/config.json +++ b/cmd/datax/examples/mysql/config.json @@ -48,13 +48,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":0, - "record":1024, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/examples/oracle/config.json b/cmd/datax/examples/oracle/config.json index a6d7461..f8e62e5 100644 --- a/cmd/datax/examples/oracle/config.json +++ b/cmd/datax/examples/oracle/config.json @@ -47,13 +47,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":0, - "record":1024, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/examples/postgres/config.json b/cmd/datax/examples/postgres/config.json index 802efd9..e4451ae 100644 --- a/cmd/datax/examples/postgres/config.json +++ b/cmd/datax/examples/postgres/config.json @@ -49,13 +49,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":0, - "record":1024, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/examples/postgrescsv/config.json b/cmd/datax/examples/postgrescsv/config.json index 02ca115..8266c2d 100644 --- a/cmd/datax/examples/postgrescsv/config.json +++ b/cmd/datax/examples/postgrescsv/config.json @@ -46,13 +46,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":0, - "record":1024, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/examples/postgresxlsx/config.json b/cmd/datax/examples/postgresxlsx/config.json index 7d3d03b..2936503 100644 --- a/cmd/datax/examples/postgresxlsx/config.json +++ b/cmd/datax/examples/postgresxlsx/config.json @@ -49,13 +49,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":0, - "record":1024, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/examples/prePostSql/mysql.json b/cmd/datax/examples/prePostSql/config.json similarity index 92% rename from cmd/datax/examples/prePostSql/mysql.json rename to cmd/datax/examples/prePostSql/config.json index e0322be..adbe75d 100644 --- a/cmd/datax/examples/prePostSql/mysql.json +++ b/cmd/datax/examples/prePostSql/config.json @@ -52,13 +52,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":0, - "record":1024, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/examples/split/config.json b/cmd/datax/examples/split/config.json new file mode 100644 index 0000000..ef111f0 --- /dev/null +++ b/cmd/datax/examples/split/config.json @@ -0,0 +1,68 @@ +{ + "core" : { + "container": { + "job":{ + "id": 1, + "sleepInterval":100 + } + } + }, + "job":{ + "content":[ + { + "reader":{ + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "123456", + "split" : { + "key":"id", + "range":{ + "type":"bigInt", + "left":"100000", + "right":"900000" + } + }, + "column": ["*"], + "connection": { + "url": "tcp(192.168.15.130:3306)/source?parseTime=false", + "table": { + "db":"source", + "name":"split" + } + }, + "where": "" + } + }, + "writer":{ + "name": "mysqlwriter", + "parameter": { + "username": "root", + "password": "123456", + "writeMode": "insert", + "column": ["*"], + "session": [], + "preSql": [], + "connection": { + "url": "tcp(192.168.15.130:3306)/mysql?parseTime=false", + "table": { + "db":"destination", + "name":"split" + } + }, + "batchTimeout": "1s", + "batchSize":1000 + } + }, + "transformer":[] + } + ], + "setting":{ + "speed":{ + "byte":0, + "record":1024, + "channel":4 + } + } + } +} \ No newline at end of file diff --git a/cmd/datax/examples/split/csv.json b/cmd/datax/examples/split/csv.json index 2928a1c..763c399 100644 --- a/cmd/datax/examples/split/csv.json +++ b/cmd/datax/examples/split/csv.json @@ -4,20 +4,6 @@ "job":{ "id": 1, "sleepInterval":100 - }, - "taskGroup":{ - "id": 1, - "failover":{ - "retryIntervalInMsec":0 - } - } - }, - "transport":{ - "channel":{ - "speed":{ - "byte": 100, - "record":100 - } } } }, diff --git a/cmd/datax/examples/sqlserver/config.json b/cmd/datax/examples/sqlserver/config.json index e952e87..bf767a4 100644 --- a/cmd/datax/examples/sqlserver/config.json +++ b/cmd/datax/examples/sqlserver/config.json @@ -52,13 +52,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":0, - "record":1024, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/examples/xlsxpostgres/config.json b/cmd/datax/examples/xlsxpostgres/config.json index 8bcbbf8..d35080b 100644 --- a/cmd/datax/examples/xlsxpostgres/config.json +++ b/cmd/datax/examples/xlsxpostgres/config.json @@ -50,13 +50,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":0, - "record":1024, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/log.go b/cmd/datax/log.go index 4f0d140..8ab74e3 100644 --- a/cmd/datax/log.go +++ b/cmd/datax/log.go @@ -23,7 +23,7 @@ import ( var log = mylog.NewDefaultLogger(os.Stdout, mylog.DebugLevel, "[datax]") func init() { - f, err := os.Create("datax.log") + f, err := os.OpenFile("datax.log", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { panic(err) } diff --git a/cmd/datax/main.go b/cmd/datax/main.go index 64ec160..1623e8c 100644 --- a/cmd/datax/main.go +++ b/cmd/datax/main.go @@ -36,6 +36,8 @@ func main() { return } + log.Infof("config: %v\n", *configFile) + e := newEnveronment(*configFile) defer e.close() if err := e.build(); err != nil { diff --git a/cmd/datax/tools/testData/csv.json b/cmd/datax/tools/testData/csv.json index 2137958..c836943 100644 --- a/cmd/datax/tools/testData/csv.json +++ b/cmd/datax/tools/testData/csv.json @@ -4,20 +4,6 @@ "job":{ "id": 1, "sleepInterval":100 - }, - "taskGroup":{ - "id": 1, - "failover":{ - "retryIntervalInMsec":0 - } - } - }, - "transport":{ - "channel":{ - "speed":{ - "byte": 100, - "record":100 - } } } }, @@ -58,13 +44,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":3000, - "record":400, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/tools/testData/db2.json b/cmd/datax/tools/testData/db2.json index 793f6bf..e6c61eb 100644 --- a/cmd/datax/tools/testData/db2.json +++ b/cmd/datax/tools/testData/db2.json @@ -4,20 +4,6 @@ "job":{ "id": 1, "sleepInterval":100 - }, - "taskGroup":{ - "id": 1, - "failover":{ - "retryIntervalInMsec":0 - } - } - }, - "transport":{ - "channel":{ - "speed":{ - "byte": 100, - "record":100 - } } } }, @@ -61,13 +47,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":3000, - "record":400, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/tools/testData/mysql.json b/cmd/datax/tools/testData/mysql.json index e5a1d0d..40576ee 100644 --- a/cmd/datax/tools/testData/mysql.json +++ b/cmd/datax/tools/testData/mysql.json @@ -4,20 +4,6 @@ "job":{ "id": 1, "sleepInterval":100 - }, - "taskGroup":{ - "id": 1, - "failover":{ - "retryIntervalInMsec":0 - } - } - }, - "transport":{ - "channel":{ - "speed":{ - "byte": 100, - "record":100 - } } } }, @@ -62,13 +48,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":3000, - "record":400, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/tools/testData/oracle.json b/cmd/datax/tools/testData/oracle.json index a955db1..f8e62e5 100644 --- a/cmd/datax/tools/testData/oracle.json +++ b/cmd/datax/tools/testData/oracle.json @@ -4,20 +4,6 @@ "job":{ "id": 1, "sleepInterval":100 - }, - "taskGroup":{ - "id": 1, - "failover":{ - "retryIntervalInMsec":0 - } - } - }, - "transport":{ - "channel":{ - "speed":{ - "byte": 100, - "record":100 - } } } }, @@ -61,13 +47,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":3000, - "record":400, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/tools/testData/postgres.json b/cmd/datax/tools/testData/postgres.json index a360e1f..19fff56 100644 --- a/cmd/datax/tools/testData/postgres.json +++ b/cmd/datax/tools/testData/postgres.json @@ -4,20 +4,6 @@ "job":{ "id": 1, "sleepInterval":100 - }, - "taskGroup":{ - "id": 1, - "failover":{ - "retryIntervalInMsec":0 - } - } - }, - "transport":{ - "channel":{ - "speed":{ - "byte": 100, - "record":100 - } } } }, @@ -62,13 +48,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":3000, - "record":400, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/cmd/datax/tools/testData/sqlserver.json b/cmd/datax/tools/testData/sqlserver.json index 12ecac3..504365d 100644 --- a/cmd/datax/tools/testData/sqlserver.json +++ b/cmd/datax/tools/testData/sqlserver.json @@ -4,20 +4,6 @@ "job":{ "id": 1, "sleepInterval":100 - }, - "taskGroup":{ - "id": 1, - "failover":{ - "retryIntervalInMsec":0 - } - } - }, - "transport":{ - "channel":{ - "speed":{ - "byte": 100, - "record":100 - } } } }, diff --git a/cmd/datax/tools/testData/xlsx.json b/cmd/datax/tools/testData/xlsx.json index b139fbc..439c8a1 100644 --- a/cmd/datax/tools/testData/xlsx.json +++ b/cmd/datax/tools/testData/xlsx.json @@ -4,20 +4,6 @@ "job":{ "id": 1, "sleepInterval":100 - }, - "taskGroup":{ - "id": 1, - "failover":{ - "retryIntervalInMsec":0 - } - } - }, - "transport":{ - "channel":{ - "speed":{ - "byte": 100, - "record":100 - } } } }, @@ -64,13 +50,6 @@ }, "transformer":[] } - ], - "setting":{ - "speed":{ - "byte":3000, - "record":400, - "channel":4 - } - } + ] } } \ No newline at end of file diff --git a/datax/README.md b/datax/README.md index fce9f93..51d7c9b 100644 --- a/datax/README.md +++ b/datax/README.md @@ -1,4 +1,4 @@ -# go-etl开发者指南 +# go-etl数据同步开发者指南 ## 同步框架简介 @@ -112,14 +112,14 @@ cd tools/go-etl/plugin go run main.go -t reader -p Mysql ``` -这个命令会在go-etl/plugin/reader中自动生成一个如下DB2的reader模板来帮助开发 +这个命令会在datax/plugin/reader中自动生成一个如下mysql的reader模板来帮助开发 ``` - plugin --- reader---mysql--+-----resources--+--plugin.json - |--job.go |--plugin_job_template.json - |--reader.go - |--README.md - |--task.go + reader---mysql--+-----resources--+--plugin.json + |--job.go |--plugin_job_template.json + |--reader.go + |--README.md + |--task.go ``` 如下,不要忘了在plugin.json加入开发者名字和描述 @@ -142,9 +142,9 @@ go run main.go -t reader -p Mysql 查看[数据库存储开发者指南](../storage/database/README.md),不仅能帮助你更快地实现Reader插件接口,而且能帮助你更快地实现Writer插件接口 -##### rdbm reader +##### dbms reader -rdbm reader通过抽象数据库存储的DBWrapper结构体成如下Querier,然后利用Querier完成Job和Task的实现 +dbms reader通过抽象数据库存储的DBWrapper结构体成如下Querier,然后利用Querier完成Job和Task的实现 ```go //Querier 查询器 @@ -166,7 +166,7 @@ type Querier interface { } ``` -像mysql实现Job和Reader,对于Task需要使用rdbm.StartRead函数实现StartRead方法 +像mysql实现Job和Reader,对于Task需要使用dbms.StartRead函数实现StartRead方法 #### 二维表文件流 @@ -239,14 +239,13 @@ cd tools/go-etl/plugin go run main.go -t writer -p Mysql ``` -这个命令会在go-etl/plugin/writer中自动生成如下一个DB2的writer模板来帮助开发 - +这个命令会在datax/plugin/writer中自动生成如下一个mysql的writer模板来帮助开发 ``` - plugin ---- writer--mysql---+-----resources--+--plugin.json - |--job.go |--plugin_job_template.json - |--README.md - |--task.go - |--writer.go + writer--mysql---+-----resources--+--plugin.json + |--job.go |--plugin_job_template.json + |--README.md + |--task.go + |--writer.go ``` 如下,不要忘了在plugin.json加入开发者名字和描述 @@ -261,17 +260,17 @@ go run main.go -t writer -p Mysql 另外,这个可以帮助开发者避免在使用插件注册命令后编译时报错。 -#### 关系型数据库 +#### 数据库 -如果你想帮忙实现关系型数据库的数据源,根据以下方式去实现你的数据源将更加方便 +如果你想帮忙实现数据库的数据源,根据以下方式去实现你的数据源将更加方便,当然前提你所使用的驱动库必须实现golang标准库的database/sql的接口。 ##### 数据库存储 查看[数据库存储开发者指南](../storage/database/README.md),不仅能帮助你更快地实现Reader插件接口,而且能帮助你更快地实现Writer插件接口 -##### rdbm writer +##### dbms writer -rdbm writer通过抽象数据库存储的DBWrapper结构体成如下Execer,然后利用Execer完成Job和Task的实现 +dbms writer通过抽象数据库存储的DBWrapper结构体成如下Execer,然后利用Execer完成Job和Task的实现 ```go //Execer 执行器 @@ -299,7 +298,7 @@ type Execer interface { } ``` -像mysql实现Job和Writer,对于Task需要使用rdbm.StartWrite函数实现StartWrite方法 +像mysql实现Job和Writer,对于Task需要使用dbms.StartWrite函数实现StartWrite方法 #### 二维表文件流 @@ -408,7 +407,7 @@ type Execer interface { - 使用正确的数据类型。比如,bool类型的值使用`true`/`false`,而非`"yes"`/`"true"`/`0`等。 - 合理使用集合类型,比如,用数组替代有分隔符的字符串。 -- 类似通用:遵守同一类型的插件的习惯,比如关系型数据库的`connection`参数都是如下结构: +- 类似通用:遵守同一类型的插件的习惯,比如数据库的`connection`参数都是如下结构: ```json { diff --git a/datax/common/plugin/job_collector.go b/datax/common/plugin/job_collector.go index f5523d2..013931a 100644 --- a/datax/common/plugin/job_collector.go +++ b/datax/common/plugin/job_collector.go @@ -14,9 +14,11 @@ package plugin +import "github.com/Breeze0806/go/encoding" + //JobCollector 工作信息采集器,用于统计整个工作的进度,错误信息等 //toto 当前未实现监控模块,为此需要在后面来实现这个接口的结构体 type JobCollector interface { - MessageMap() map[string][]string - MessageByKey(key string) []string + MessageMap() *encoding.JSON + MessageByKey(key string) *encoding.JSON } diff --git a/datax/common/plugin/job_test.go b/datax/common/plugin/job_test.go index 4f32d93..d2dec75 100644 --- a/datax/common/plugin/job_test.go +++ b/datax/common/plugin/job_test.go @@ -17,16 +17,18 @@ package plugin import ( "reflect" "testing" + + "github.com/Breeze0806/go/encoding" ) type mockJobCollector struct { } -func (m *mockJobCollector) MessageMap() map[string][]string { +func (m *mockJobCollector) MessageMap() *encoding.JSON { return nil } -func (m *mockJobCollector) MessageByKey(key string) []string { +func (m *mockJobCollector) MessageByKey(key string) *encoding.JSON { return nil } diff --git a/datax/core/container.go b/datax/core/container.go index 6a84c0c..133c1a0 100644 --- a/datax/core/container.go +++ b/datax/core/container.go @@ -16,7 +16,6 @@ package core import ( "github.com/Breeze0806/go-etl/config" - "github.com/Breeze0806/go-etl/datax/core/statistics/communication" ) //Container 容器 @@ -27,7 +26,6 @@ type Container interface { //BaseCotainer 基础容器 type BaseCotainer struct { conf *config.JSON - com *communication.Communication } //NewBaseCotainer 创建基础容器 @@ -44,13 +42,3 @@ func (b *BaseCotainer) SetConfig(conf *config.JSON) { func (b *BaseCotainer) Config() *config.JSON { return b.conf } - -//SetCommunication 未真正使用 -func (b *BaseCotainer) SetCommunication(com *communication.Communication) { - b.com = com -} - -//Communication 未真正使用 -func (b *BaseCotainer) Communication() *communication.Communication { - return b.com -} diff --git a/datax/core/container_test.go b/datax/core/container_test.go index 6c49589..2d0218a 100644 --- a/datax/core/container_test.go +++ b/datax/core/container_test.go @@ -19,7 +19,6 @@ import ( "testing" "github.com/Breeze0806/go-etl/config" - "github.com/Breeze0806/go-etl/datax/core/statistics/communication" ) func testJSONFromString(s string) *config.JSON { @@ -58,32 +57,3 @@ func TestBaseCotainer_SetConfig(t *testing.T) { }) } } - -func TestBaseCotainer_SetCommunication(t *testing.T) { - type args struct { - com *communication.Communication - } - tests := []struct { - name string - b *BaseCotainer - args args - want *communication.Communication - }{ - { - name: "1", - b: NewBaseCotainer(), - args: args{ - com: &communication.Communication{}, - }, - want: &communication.Communication{}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - tt.b.SetCommunication(tt.args.com) - if got := tt.b.Communication(); !reflect.DeepEqual(got, tt.want) { - t.Errorf("Communication() = %v, want: %v", got, tt.want) - } - }) - } -} diff --git a/datax/core/job/container.go b/datax/core/job/container.go index a66bb35..716634e 100644 --- a/datax/core/job/container.go +++ b/datax/core/job/container.go @@ -83,31 +83,38 @@ func (c *Container) Start() (err error) { log.Debugf("DataX jobContainer %v starts to preHandle.", c.jobID) if err = c.preHandle(); err != nil { + log.Errorf("DataX jobContainer %v preHandle failed.", c.jobID, err) return } log.Infof("DataX jobContainer %v starts to init.", c.jobID) if err = c.init(); err != nil { + log.Errorf("DataX jobContainer %v init failed.", c.jobID, err) return } log.Infof("DataX jobContainer %v starts to prepare.", c.jobID) if err = c.prepare(); err != nil { + log.Errorf("DataX jobContainer %v prepare failed.", c.jobID, err) return } log.Infof("DataX jobContainer %v starts to split.", c.jobID) if err = c.split(); err != nil { + log.Errorf("DataX jobContainer %v split failed.", c.jobID, err) return } log.Infof("DataX jobContainer %v starts to schedule.", c.jobID) if err = c.schedule(); err != nil { + log.Errorf("DataX jobContainer %v schedule failed.", c.jobID, err) return } log.Infof("DataX jobContainer %v starts to post.", c.jobID) if err = c.post(); err != nil { + log.Errorf("DataX jobContainer %v post failed.", c.jobID, err) return } log.Debugf("DataX jobContainer %v starts to postHandle.", c.jobID) if err = c.postHandle(); err != nil { + log.Errorf("DataX jobContainer %v postHandle failed.", c.jobID, err) return } @@ -171,7 +178,7 @@ func (c *Container) init() (err error) { writerConfig.Set(coreconst.DataxJobSetting, jobSettingConf) - collector := statplugin.NewDefaultJobCollector(c.Communication()) + collector := statplugin.NewDefaultJobCollector() c.jobReader, err = c.initReaderJob(collector, readerConfig, writerConfig) if err != nil { return @@ -379,7 +386,8 @@ func (c *Container) distributeTaskIntoTaskGroup() (confs []*config.JSON, err err var speed *config.JSON speed, err = c.Config().GetConfig(coreconst.DataxJobSettingSpeed) if err != nil { - return + speed, _ = config.NewJSONFromString("{}") + err = nil } speed.Remove("channel") @@ -415,8 +423,8 @@ func (c *Container) adjustChannelNumber() error { var needChannelNumberByByte int64 = math.MaxInt32 var needChannelNumberByRecord int64 = math.MaxInt32 - if isChannelLimit := c.Config().GetInt64OrDefaullt(coreconst.DataxJobSettingSpeedChannel, 0) > 0; isChannelLimit { - c.needChannelNumber, _ = c.Config().GetInt64(coreconst.DataxJobSettingSpeedChannel) + if isChannelLimit := c.Config().GetInt64OrDefaullt(coreconst.DataxJobSettingSpeedChannel, 1) > 0; isChannelLimit { + c.needChannelNumber = c.Config().GetInt64OrDefaullt(coreconst.DataxJobSettingSpeedChannel, 1) log.Infof("DataX jobContainer %v set Channel-Number to %v channels.", c.jobID, c.needChannelNumber) return nil } diff --git a/datax/core/statistics/communication/comminication.go b/datax/core/statistics/communication/comminication.go deleted file mode 100644 index c74c21b..0000000 --- a/datax/core/statistics/communication/comminication.go +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2020 the go-etl Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package communication - -//Communication todo 暂时未实现 -type Communication struct { -} diff --git a/datax/core/statistics/container/communicator.go b/datax/core/statistics/container/communicator.go deleted file mode 100644 index 1a392dc..0000000 --- a/datax/core/statistics/container/communicator.go +++ /dev/null @@ -1,38 +0,0 @@ -// Copyright 2020 the go-etl Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package container - -import ( - "github.com/Breeze0806/go-etl/config" - "github.com/Breeze0806/go-etl/datax/core/statistics/communication" -) - -//State 状态 -type State int - -//Communicator 交换器 todo 未使用 -type Communicator interface { - RegisterCommunication(configs []*config.JSON) - - Collect() Communicator - - Report(communication communication.Communication) - - CollectState() State - - GetCommunication(id int64) communication.Communication - - GetCommunicationMap() map[int64]communication.Communication -} diff --git a/datax/core/statistics/container/plugin/default_job_collector.go b/datax/core/statistics/container/plugin/default_job_collector.go index a02cfbe..d6afa10 100644 --- a/datax/core/statistics/container/plugin/default_job_collector.go +++ b/datax/core/statistics/container/plugin/default_job_collector.go @@ -16,23 +16,23 @@ package plugin import ( "github.com/Breeze0806/go-etl/datax/common/plugin" - "github.com/Breeze0806/go-etl/datax/core/statistics/communication" + "github.com/Breeze0806/go/encoding" ) //DefaultJobCollector 默认工作收集器 type DefaultJobCollector struct{} //NewDefaultJobCollector 创建默认工作收集器 -func NewDefaultJobCollector(*communication.Communication) plugin.JobCollector { +func NewDefaultJobCollector() plugin.JobCollector { return &DefaultJobCollector{} } //MessageMap 空方法 -func (d *DefaultJobCollector) MessageMap() map[string][]string { +func (d *DefaultJobCollector) MessageMap() *encoding.JSON { return nil } //MessageByKey 空方法 -func (d *DefaultJobCollector) MessageByKey(key string) []string { +func (d *DefaultJobCollector) MessageByKey(key string) *encoding.JSON { return nil } diff --git a/datax/core/taskgroup/task_execer.go b/datax/core/taskgroup/task_execer.go index c9f7a9a..9bbfd2b 100644 --- a/datax/core/taskgroup/task_execer.go +++ b/datax/core/taskgroup/task_execer.go @@ -24,7 +24,6 @@ import ( coreconst "github.com/Breeze0806/go-etl/datax/common/config/core" "github.com/Breeze0806/go-etl/datax/common/plugin/loader" "github.com/Breeze0806/go-etl/datax/common/spi/writer" - "github.com/Breeze0806/go-etl/datax/core/statistics/communication" "github.com/Breeze0806/go-etl/datax/core/taskgroup/runner" "github.com/Breeze0806/go-etl/datax/core/transport/channel" "github.com/Breeze0806/go-etl/datax/core/transport/exchange" @@ -43,14 +42,13 @@ type taskExecer struct { readerRunner runner.Runner //执行运行器 wg sync.WaitGroup errors chan error - //todo: taskCommunication没用 - taskCommunication communication.Communication - destroy sync.Once - key string - exchanger *exchange.RecordExchanger - cancalMutex sync.Mutex //由于取消函数会被多线程调用,需要加锁 - cancel context.CancelFunc //取消函数 - attemptCount *atomic.Int32 //执行次数 + + destroy sync.Once + key string + exchanger *exchange.RecordExchanger + cancalMutex sync.Mutex //由于取消函数会被多线程调用,需要加锁 + cancel context.CancelFunc //取消函数 + attemptCount *atomic.Int32 //执行次数 } //newTaskExecer 根据上下文ctx,任务配置taskConf,前缀关键字prefixKey diff --git a/release.bat b/release.bat index 4173968..2f63c55 100644 --- a/release.bat +++ b/release.bat @@ -8,4 +8,5 @@ go mod vendor go generate ./... cd cmd\datax go build -cd ../.. \ No newline at end of file +cd ../.. +go run tools/datax/release/main.go \ No newline at end of file diff --git a/storage/database/README.md b/storage/database/README.md index 1ec4e42..49a00c6 100644 --- a/storage/database/README.md +++ b/storage/database/README.md @@ -1,6 +1,6 @@ # 数据库存储开发者指南 -数据库存储是数据库查询和执行SQL的框架,用于关系型数据库的抽象 +数据库存储是数据库查询和执行SQL的框架,用于数据库的抽象,其底层是借助golang标准库的database/sql的接口来实现的。 ## 数据库存储简介 @@ -45,7 +45,7 @@ type Source interface { 具体实现Source接口时,可以组合BaseSource以简化具体实现Source接口的实现Table方法可以返回具体的表结构接口。可以看mysql包source.go的实现。 -另外,连接信息依赖Config的依赖。目前Config需要用下面的方式定义,否则无法使用rdbm包来实现datax的插件,可以看mysql包config.go的实现。 +另外,连接信息依赖Config的依赖。目前Config需要用下面的方式定义,否则无法使用dbms包来实现datax的插件,可以看mysql包config.go的实现。 ```go type Config struct { diff --git a/tools/datax/build/main.go b/tools/datax/build/main.go index b4a44ee..7b092f4 100644 --- a/tools/datax/build/main.go +++ b/tools/datax/build/main.go @@ -371,6 +371,7 @@ func main() { } } +//生成plugin的reader/writer插件文件 type pluginParser struct { infos []pluginInfo } @@ -475,6 +476,8 @@ func writeVersionCode() (err error) { return } +//通过git获取git版本号 `tag`` (git commit: `git version`) complied by gp version `go version` +//例如 v0.1.2 (git commit: c26eb4e15751e41d32402cbf3c7f1ea8af4e3e47) complied by go version go1.16.14 windows/amd64 func getVersion() (version string, err error) { output := "" if output, err = cmdOutput("git", "describe", "--abbrev=0", "--tags"); err != nil { diff --git a/tools/datax/plugin/main.go b/tools/datax/plugin/main.go index d8108ff..31647c9 100644 --- a/tools/datax/plugin/main.go +++ b/tools/datax/plugin/main.go @@ -181,11 +181,23 @@ func main() { }) switch *typ { + // datax/plugin/reader中自动生成一个如下mysql的reader模板来帮助开发 + // reader---mysql--+-----resources--+--plugin.json + // |--job.go |--plugin_job_template.json + // |--reader.go + // |--README.md + // |--task.go case "reader": files = append(files, file{ filename: filepath.Join(packPath, "reader.go"), content: fmt.Sprintf(readerFile, p), }) + // datax/plugin/writer中自动生成如下一个mysql的writer模板来帮助开发 + // writer--mysql---+-----resources--+--plugin.json + // |--job.go |--plugin_job_template.json + // |--README.md + // |--task.go + // |--writer.go case "writer": files = append(files, file{ filename: filepath.Join(packPath, "writer.go"), diff --git a/tools/datax/release/main.go b/tools/datax/release/main.go new file mode 100644 index 0000000..df087a6 --- /dev/null +++ b/tools/datax/release/main.go @@ -0,0 +1,246 @@ +// Copyright 2020 the go-etl Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "archive/tar" + "archive/zip" + "bytes" + "compress/gzip" + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "runtime" + "strings" +) + +var ( + sourceUserPath = "datax/" + destUserPath = "release/datax/" + + sourceExamplePath = "cmd/datax/examples" + destExamplePath = "release/examples" +) + +func main() { + err := copyMarkdown("plugin/reader") + if err != nil { + fmt.Println("copyMarkdown reader fail. err:", err) + os.Exit(1) + } + + err = copyMarkdown("plugin/writer") + if err != nil { + fmt.Println("copyMarkdown writer fail. err:", err) + os.Exit(1) + } + + err = copyConfig() + if err != nil { + fmt.Println("copyConfig fail. err:", err) + os.Exit(1) + } + + data, err := ioutil.ReadFile("README_USER.md") + if err != nil { + fmt.Println("ReadFile README_USER.md fail. err:", err) + os.Exit(1) + } + + err = ioutil.WriteFile("release/README_USER.md", data, 0644) + if err != nil { + fmt.Println("WriteFile release/README_USER.md fail. err:", err) + os.Exit(1) + } + + output := "" + if output, err = cmdOutput("git", "describe", "--abbrev=0", "--tags"); err != nil { + fmt.Printf("use git to tag version fail. error: %v\n", err) + os.Exit(1) + } + tagVersion := strings.ReplaceAll(output, "\r", "") + tagVersion = strings.ReplaceAll(tagVersion, "\n", "") + + if runtime.GOOS == "windows" { + os.Rename("cmd/datax/datax.exe", "release/datax.exe") + if err = zipDir("release", "datax-"+tagVersion+"-windows-x86_64.zip"); err != nil { + fmt.Printf("uzipDir fail. error: %v\n", err) + os.Exit(1) + } + } else if runtime.GOOS == "linux" { + os.Rename("cmd/datax/datax", "release/datax") + if err = tarDir("release", "datax-"+tagVersion+"-linux-x86_64.tar.gz"); err != nil { + fmt.Printf("tarDir fail. error: %v\n", err) + os.Exit(1) + } + } else { + fmt.Printf("OS: %v\n", runtime.GOOS) + os.Exit(1) + } +} + +func copyMarkdown(path string) (err error) { + var list []os.FileInfo + list, err = ioutil.ReadDir(filepath.Join(sourceUserPath, path)) + if err != nil { + return err + } + var data []byte + for _, v := range list { + if v.IsDir() { + data, err = ioutil.ReadFile(filepath.Join(sourceUserPath, path, v.Name(), "README.md")) + if err != nil { + err = nil + continue + } + os.MkdirAll(filepath.Join(destUserPath, path, v.Name()), 0644) + err = ioutil.WriteFile(filepath.Join(destUserPath, path, v.Name(), "README.md"), data, 0644) + if err != nil { + return + } + } + } + return +} + +func copyConfig() (err error) { + os.MkdirAll(destExamplePath, 0644) + var list []os.FileInfo + list, err = ioutil.ReadDir(sourceExamplePath) + if err != nil { + return err + } + var data []byte + for _, v := range list { + if v.IsDir() { + data, err = ioutil.ReadFile(filepath.Join(sourceExamplePath, v.Name(), "config.json")) + if err != nil { + err = nil + continue + } + os.MkdirAll(filepath.Join(destExamplePath, v.Name()), 0644) + err = ioutil.WriteFile(filepath.Join(destExamplePath, v.Name(), "config.json"), data, 0644) + if err != nil { + return + } + } + } + return +} + +func zipDir(src, dest string) error { + zipfile, err := os.Create(dest) + if err != nil { + return err + } + defer zipfile.Close() + + archive := zip.NewWriter(zipfile) + defer archive.Close() + + filepath.Walk(src, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + header, err := zip.FileInfoHeader(info) + if err != nil { + return err + } + + header.Name = path + if info.IsDir() { + header.Name += "/" + } else { + header.Method = zip.Deflate + } + + writer, err := archive.CreateHeader(header) + if err != nil { + return err + } + + if !info.IsDir() { + file, err := os.Open(path) + if err != nil { + return err + } + defer file.Close() + _, err = io.Copy(writer, file) + } + return err + }) + + return err +} + +func tarDir(src, dst string) error { + fw, err := os.Create(dst) + if err != nil { + return err + } + defer fw.Close() + + gw := gzip.NewWriter(fw) + defer gw.Close() + + tw := tar.NewWriter(gw) + defer tw.Close() + return filepath.Walk(src, func(fileName string, fi os.FileInfo, err error) error { + if err != nil { + return err + } + hdr, err := tar.FileInfoHeader(fi, "") + if err != nil { + return err + } + hdr.Name = strings.TrimPrefix(fileName, string(filepath.Separator)) + + if err := tw.WriteHeader(hdr); err != nil { + return err + } + + if !fi.Mode().IsRegular() { + return nil + } + + fr, err := os.Open(fileName) + defer fr.Close() + if err != nil { + return err + } + + _, err = io.Copy(tw, fr) + if err != nil { + return err + } + return nil + }) +} + +func cmdOutput(cmd string, arg ...string) (output string, err error) { + c := exec.Command(cmd, arg...) + var stdout, stderr bytes.Buffer + c.Stdout = &stdout + c.Stderr = &stderr + if err = c.Run(); err != nil { + err = fmt.Errorf("%v(%s)", err, stderr.String()) + return + } + return stdout.String(), nil +} diff --git a/tools/license/main.go b/tools/license/main.go index 445aab4..7357ac9 100644 --- a/tools/license/main.go +++ b/tools/license/main.go @@ -100,6 +100,7 @@ func main() { wg.Wait() } +//添加许可证 func readPackages(path string) (packages []string, err error) { var list []os.FileInfo list, err = ioutil.ReadDir(path) @@ -119,6 +120,7 @@ func readPackages(path string) (packages []string, err error) { return } +//检查许可证 func addLicenseHeader(filename string) error { data, err := ioutil.ReadFile(filename) if err != nil {