Skip to content

Commit

Permalink
fix: mapping mysql tables into clickhouse with Engine MySQL (#18877)
Browse files Browse the repository at this point in the history
Co-authored-by: Qiu Jian <qiujian@yunionyun.com>
  • Loading branch information
swordqiu and Qiu Jian authored Dec 4, 2023
1 parent 0692c20 commit 1b997dc
Show file tree
Hide file tree
Showing 19 changed files with 372 additions and 48 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ require (
k8s.io/client-go v0.19.3
k8s.io/cluster-bootstrap v0.19.3
moul.io/http2curl/v2 v2.3.0
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20231204065558-c4edc0938f2e
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20231204130443-d54a8b16c7b7
yunion.io/x/executor v0.0.0-20230705125604-c5ac3141db32
yunion.io/x/jsonutils v1.0.1-0.20230613121553-0f3b41e2ef19
yunion.io/x/log v1.0.1-0.20230411060016-feb3f46ab361
yunion.io/x/ovsdb v0.0.0-20230306173834-f164f413a900
yunion.io/x/pkg v1.0.1-0.20231101105448-abef64cdc142
yunion.io/x/s3cli v0.0.0-20190917004522-13ac36d8687e
yunion.io/x/sqlchemy v1.1.2-0.20231201052514-97026b18ccf0
yunion.io/x/sqlchemy v1.1.2-0.20231204175132-1eb294922a51
yunion.io/x/structarg v0.0.0-20231017124457-df4d5009457c
)

Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1192,8 +1192,8 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.1/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20231204065558-c4edc0938f2e h1:QmYmnMJjVijGbLi5haZS49d2CaNgA/JLphLn7oL8q94=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20231204065558-c4edc0938f2e/go.mod h1:aj1gR9PPb6eqqKOwvANe26CoZFY8ydmXy0fuvgKYXH0=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20231204130443-d54a8b16c7b7 h1:fZ3sE1acOojrgXGtqLxXcfFhg4bT3shKx2o73J+Yat8=
yunion.io/x/cloudmux v0.3.10-0-alpha.1.0.20231204130443-d54a8b16c7b7/go.mod h1:aj1gR9PPb6eqqKOwvANe26CoZFY8ydmXy0fuvgKYXH0=
yunion.io/x/executor v0.0.0-20230705125604-c5ac3141db32 h1:v7POYkQwo1XzOxBoIoRVr/k0V9Y5JyjpshlIFa9raug=
yunion.io/x/executor v0.0.0-20230705125604-c5ac3141db32/go.mod h1:Uxuou9WQIeJXNpy7t2fPLL0BYLvLiMvGQwY7Qc6aSws=
yunion.io/x/jsonutils v0.0.0-20190625054549-a964e1e8a051/go.mod h1:4N0/RVzsYL3kH3WE/H1BjUQdFiWu50JGCFQuuy+Z634=
Expand All @@ -1211,7 +1211,7 @@ yunion.io/x/pkg v1.0.1-0.20231101105448-abef64cdc142 h1:L6LqxfP08eWUx+A6yQdrL6VB
yunion.io/x/pkg v1.0.1-0.20231101105448-abef64cdc142/go.mod h1:ksCJVQ+DwKrJ5QBEoU8pzrDFfDaZVAFH/iJ6yQCYxJk=
yunion.io/x/s3cli v0.0.0-20190917004522-13ac36d8687e h1:v+EzIadodSwkdZ/7bremd7J8J50Cise/HCylsOJngmo=
yunion.io/x/s3cli v0.0.0-20190917004522-13ac36d8687e/go.mod h1:0iFKpOs1y4lbCxeOmq3Xx/0AcQoewVPwj62eRluioEo=
yunion.io/x/sqlchemy v1.1.2-0.20231201052514-97026b18ccf0 h1:+MaykFV6YCakTLnHR3v31tovXhkvWgkBPFa83MuCekA=
yunion.io/x/sqlchemy v1.1.2-0.20231201052514-97026b18ccf0/go.mod h1:uuPVZEyEq3sWd5vf9VjGSy6lZzof22X87OEHw9sddJQ=
yunion.io/x/sqlchemy v1.1.2-0.20231204175132-1eb294922a51 h1:XgvhXxYul4W85vVJVKCKOWw6Ea/YroI3TBxyLcQ7vx4=
yunion.io/x/sqlchemy v1.1.2-0.20231204175132-1eb294922a51/go.mod h1:uuPVZEyEq3sWd5vf9VjGSy6lZzof22X87OEHw9sddJQ=
yunion.io/x/structarg v0.0.0-20231017124457-df4d5009457c h1:QuLab2kSRECZRxo4Lo2KcYn6XjQFDGaZ1+x0pYDVVwQ=
yunion.io/x/structarg v0.0.0-20231017124457-df4d5009457c/go.mod h1:EP6NSv2C0zzqBDTKumv8hPWLb3XvgMZDHQRfyuOrQng=
17 changes: 17 additions & 0 deletions pkg/cloudcommon/consts/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,21 @@ var (
QueryOffsetOptimization = false

OpsLogWithClickhouse = false

defaultDBDialect string

defaultDBConnectionString string
)

func SetDefaultDB(dialect, connStr string) {
defaultDBDialect = dialect
defaultDBConnectionString = connStr
}

func DefaultDBDialect() string {
return defaultDBDialect
}

func DefaultDBConnStr() string {
return defaultDBConnectionString
}
7 changes: 5 additions & 2 deletions pkg/cloudcommon/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"yunion.io/x/onecloud/pkg/cloudcommon/informer"
"yunion.io/x/onecloud/pkg/cloudcommon/notifyclient"
common_options "yunion.io/x/onecloud/pkg/cloudcommon/options"
"yunion.io/x/onecloud/pkg/util/dbutils"
)

func InitDB(options *common_options.DBOptions) {
Expand Down Expand Up @@ -77,6 +78,8 @@ func InitDB(options *common_options.DBOptions) {
log.Fatalf("cannot use clickhouse as primary database")
}
log.Infof("database dialect: %s sqlStr: %s", dialect, sqlStr)
// save configuration to consts
consts.SetDefaultDB(dialect, sqlStr)
dbConn, err := sql.Open(dialect, sqlStr)
if err != nil {
panic(err)
Expand All @@ -87,11 +90,11 @@ func InitDB(options *common_options.DBOptions) {
if err == nil {
// connect to clickcloud
// force convert sqlstr from clickhouse v2 to v1
sqlStr, err = clickhouseSqlStrV2ToV1(sqlStr)
sqlStr, err = dbutils.ClickhouseSqlStrV2ToV1(sqlStr)
if err != nil {
log.Fatalf("fail to convert clickhouse sqlstr from v2 to v1: %s", err)
}
err = validateClickhouseV1Str(sqlStr)
err = dbutils.ValidateClickhouseV1Str(sqlStr)
if err != nil {
log.Fatalf("invalid clickhouse sqlstr: %s", err)
}
Expand Down
26 changes: 26 additions & 0 deletions pkg/cloudcommon/db/modelbase.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ import (
"yunion.io/x/pkg/util/rbacscope"
"yunion.io/x/pkg/util/version"
"yunion.io/x/sqlchemy"
"yunion.io/x/sqlchemy/backends/clickhouse"

"yunion.io/x/onecloud/pkg/apis"
"yunion.io/x/onecloud/pkg/appsrv"
"yunion.io/x/onecloud/pkg/cloudcommon/consts"
"yunion.io/x/onecloud/pkg/cloudcommon/policy"
"yunion.io/x/onecloud/pkg/httperrors"
"yunion.io/x/onecloud/pkg/mcclient"
"yunion.io/x/onecloud/pkg/util/dbutils"
"yunion.io/x/onecloud/pkg/util/logclient"
"yunion.io/x/onecloud/pkg/util/splitable"
"yunion.io/x/onecloud/pkg/util/stringutils2"
Expand Down Expand Up @@ -83,6 +86,29 @@ func NewModelBaseManagerWithSplitableDBName(model interface{}, tableName string,
return modelMan
}

func NewModelBaseManagerWithClickhouseMapping(manager IModelManager, tblname, keyword, keywordPlural string) SModelBaseManager {
ots := manager.TableSpec()
var extraOpts sqlchemy.TableExtraOptions
switch consts.DefaultDBDialect() {
case "mysql":
cfg := dbutils.ParseMySQLConnStr(consts.DefaultDBConnStr())
err := cfg.Validate()
if err != nil {
panic(fmt.Sprintf("invalid mysql connection string %s", consts.DefaultDBConnStr()))
}
extraOpts = clickhouse.MySQLExtraOptions(cfg.Hostport, cfg.Database, ots.Name(), cfg.Username, cfg.Password)
default:
panic(fmt.Sprintf("unsupport dialect %s to be backend of clickhouse", consts.DefaultDBDialect()))
}
nts := newClickhouseTableSpecFromMySQL(ots, tblname, ClickhouseDB, extraOpts)
modelMan := SModelBaseManager{
tableSpec: nts,
keyword: keyword,
keywordPlural: keywordPlural,
}
return modelMan
}

func (manager *SModelBaseManager) CreateByInsertOrUpdate() bool {
return true
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/cloudcommon/db/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ func mustCheckModelManager(modelMan IModelManager) {

func tableSpecId(tableSpec ITableSpec) string {
keys := []string{
string(tableSpec.GetDBName()),
tableSpec.Name(),
}
for _, c := range tableSpec.Columns() {
Expand Down Expand Up @@ -151,6 +152,7 @@ func CheckSync(autoSync bool, enableChecksumTables bool, skipInitChecksum bool)
tableSpec := modelMan.TableSpec()
tableKey := tableSpecId(tableSpec)
if _, ok := processedTbl[tableKey]; ok {
log.Warningf("table %s has been synced!", tableKey)
continue
}
processedTbl[tableKey] = true
Expand Down
7 changes: 7 additions & 0 deletions pkg/cloudcommon/db/tablespec.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func newTableSpec(model interface{}, tableName string, indexField string, dateFi
}
}

func newClickhouseTableSpecFromMySQL(spec ITableSpec, name string, dbName sqlchemy.DBName, extraOpts sqlchemy.TableExtraOptions) ITableSpec {
itbl := sqlchemy.NewTableSpecFromISpecWithDBName(spec.(*sTableSpec).ITableSpec, name, dbName, extraOpts)
return &sTableSpec{
ITableSpec: itbl,
}
}

func (ts *sTableSpec) GetSplitTable() *splitable.SSplitTableSpec {
sts, ok := ts.ITableSpec.(*splitable.SSplitTableSpec)
if ok {
Expand Down
10 changes: 5 additions & 5 deletions pkg/cloudcommon/clickhouse.go → pkg/util/dbutils/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package cloudcommon
package dbutils

import (
"fmt"
Expand All @@ -27,7 +27,7 @@ import (
// convert clickhouse sqlstr v1 to v2
// v1: tcp://192.168.222.4:9000?database=yunionmeter&read_timeout=10&write_timeout=20
// v2: clickhouse://username:password@host1:9000,host2:9000/database?dial_timeout=200ms&max_execution_time=60
func clickhouseSqlStrV1ToV2(sqlstr string) (string, error) {
func ClickhouseSqlStrV1ToV2(sqlstr string) (string, error) {
if strings.HasPrefix(sqlstr, "clickhouse://") {
// already v2 format
return sqlstr, nil
Expand Down Expand Up @@ -57,7 +57,7 @@ func clickhouseSqlStrV1ToV2(sqlstr string) (string, error) {
return fmt.Sprintf("clickhouse://%s/%s?dial_timeout=200ms&max_execution_time=60", hostPart, dbname), nil
}

func clickhouseSqlStrV2ToV1(sqlstr string) (string, error) {
func ClickhouseSqlStrV2ToV1(sqlstr string) (string, error) {
if strings.HasPrefix(sqlstr, "tcp://") {
// already v1 format
return sqlstr, nil
Expand Down Expand Up @@ -89,7 +89,7 @@ func clickhouseSqlStrV2ToV1(sqlstr string) (string, error) {
return fmt.Sprintf("tcp://%s?%s&read_timeout=10&write_timeout=20", hostPart, jsonutils.Marshal(qs).QueryString()), nil
}

func validateClickhouseV2Str(sqlstr string) error {
func ValidateClickhouseV2Str(sqlstr string) error {
if !strings.HasPrefix(sqlstr, "clickhouse://") {
return errors.Wrapf(httperrors.ErrInputParameter, "must start with clickhouse://")
}
Expand All @@ -108,7 +108,7 @@ func validateClickhouseV2Str(sqlstr string) error {
return nil
}

func validateClickhouseV1Str(sqlstr string) error {
func ValidateClickhouseV1Str(sqlstr string) error {
if !strings.HasPrefix(sqlstr, "tcp://") {
return errors.Wrapf(httperrors.ErrInputParameter, "must start with tcp://")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package cloudcommon
package dbutils

import (
"testing"
Expand All @@ -36,7 +36,7 @@ func TestClickhouseSqlStrV1ToV2(t *testing.T) {
want: "clickhouse://admin:pass@192.168.222.4:9000/yunionmeter?dial_timeout=200ms&max_execution_time=60",
},
} {
got, err := clickhouseSqlStrV1ToV2(c.in)
got, err := ClickhouseSqlStrV1ToV2(c.in)
if err != nil {
t.Errorf("%s", err)
} else if got != c.want {
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestClickhouseSqlStrV2ToV1(t *testing.T) {
want: "tcp://192.168.222.4:9000?database=yunionmeter&password=pass&username=admin&read_timeout=10&write_timeout=20",
},
} {
got, err := clickhouseSqlStrV2ToV1(c.in)
got, err := ClickhouseSqlStrV2ToV1(c.in)
if err != nil {
t.Errorf("%s", err)
} else if got != c.want {
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestValidateClickhouseSqlstrV1(t *testing.T) {
valid: false,
},
} {
err := validateClickhouseV1Str(c.in)
err := ValidateClickhouseV1Str(c.in)
if err != nil && c.valid {
t.Errorf("%s", err)
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestValidateClickhouseSqlstrV2(t *testing.T) {
valid: true,
},
} {
err := validateClickhouseV2Str(c.in)
err := ValidateClickhouseV2Str(c.in)
if err != nil && c.valid {
t.Errorf("%s", err)
}
Expand Down
15 changes: 15 additions & 0 deletions pkg/util/dbutils/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbutils // import "yunion.io/x/onecloud/pkg/util/dbutils"
72 changes: 72 additions & 0 deletions pkg/util/dbutils/mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbutils

import (
"strings"

"yunion.io/x/pkg/errors"
)

type SDBConfig struct {
Hostport string
Database string
Username string
Password string
}

func (cfg SDBConfig) Validate() error {
errs := make([]error, 0)
if len(cfg.Hostport) == 0 {
errs = append(errs, errors.Error("empty host port"))
}
if len(cfg.Username) == 0 {
errs = append(errs, errors.Error("empty username"))
}
if len(cfg.Database) == 0 {
errs = append(errs, errors.Error("empty database"))
}
return errors.NewAggregate(errs)
}

func ParseMySQLConnStr(connStr string) SDBConfig {
// fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?%s", user, passwd, host, port, dburl, query.Encode())
cfg := SDBConfig{}
index := strings.Index(connStr, "@tcp(")
if index > 0 {
userpass := connStr[:index]
hostdb := connStr[index+len("@tcp("):]

index = strings.Index(userpass, ":")
if index > 0 {
cfg.Username = userpass[:index]
cfg.Password = userpass[index+1:]
} else if index < 0 {
cfg.Username = userpass
}
index = strings.Index(hostdb, ")/")
if index > 0 {
cfg.Hostport = hostdb[:index]
dbstr := hostdb[index+len(")/"):]
index = strings.Index(dbstr, "?")
if index > 0 {
cfg.Database = dbstr[:index]
} else if index < 0 {
cfg.Database = dbstr
}
}
}
return cfg
}
44 changes: 44 additions & 0 deletions pkg/util/dbutils/mysql_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2019 Yunion
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package dbutils

import (
"testing"

"yunion.io/x/jsonutils"
)

func TestParseMySQLConnStr(t *testing.T) {
cases := []struct {
connStr string
cfg SDBConfig
}{
{
connStr: "regionadmin:abcdefg@tcp(192.168.222.22:3306)/yunioncloud?parseTime=True",
cfg: SDBConfig{
Hostport: "192.168.222.22:3306",
Database: "yunioncloud",
Username: "regionadmin",
Password: "abcdefg",
},
},
}
for _, c := range cases {
got := ParseMySQLConnStr(c.connStr)
if jsonutils.Marshal(got).String() != jsonutils.Marshal(c.cfg).String() {
t.Errorf("parse %s got %s want %s", c.connStr, jsonutils.Marshal(got), jsonutils.Marshal(c.cfg))
}
}
}
Loading

0 comments on commit 1b997dc

Please sign in to comment.