From 53e4807aa8e1f58ac4d3d2bdd17c66b7b00aa5d6 Mon Sep 17 00:00:00 2001 From: Vee Zhang Date: Tue, 13 Dec 2022 17:57:18 +0800 Subject: [PATCH] feat: support concat and change to picker --- README.md | 11 ++ README_zh-CN.md | 10 ++ examples/v1/example.yaml | 32 ++++ examples/v2/example.yaml | 31 ++++ go.mod | 2 +- pkg/config/config.go | 153 +++++++++++------- pkg/config/config_test.go | 106 +++++++++++- .../testdata/test-parse-after-period.yaml | 2 +- .../testdata/test-parse-concat-items.yaml | 31 ++++ pkg/config/testdata/test-parse-log-path.yaml | 2 +- pkg/config/testdata/test-parse-version.yaml | 2 +- pkg/picker/picker.go | 5 +- pkg/reader/batchmgr.go | 6 +- 13 files changed, 320 insertions(+), 73 deletions(-) create mode 100644 pkg/config/testdata/test-parse-concat-items.yaml diff --git a/README.md b/README.md index 595fb054..5d9599c5 100644 --- a/README.md +++ b/README.md @@ -217,6 +217,16 @@ schema: alternativeIndices: - 7 - 8 + +# concatItems examples +schema: + type: vertex + vertex: + vid: + concatItems: + - "abc" + - 1 + function: hash ``` ##### `schema.vertex.vid` @@ -224,6 +234,7 @@ schema: **Optional**. Describes the vertex ID column and the function used for the vertex ID. * `index`: **Optional**. The column number in the CSV file. Started with 0. The default value is 0. +* `concatItems`: **Optional**. The concat item can be `string`, `int` or mixed. `string` represents a constant, and `int` represents an index column. Then connect all items.If set, the above `index` will have no effect. * `function`: **Optional**. Functions to generate the VIDs. Currently, we only support function `hash` and `uuid`. * `type`: **Optional**. The type for VIDs. The default value is `string`. * `prefix`: **Optional**. Add prefix to the original vid. When `function` is specified also, `prefix` is applied to the original vid before `function`. diff --git a/README_zh-CN.md b/README_zh-CN.md index b0ee1b2c..f8749c8e 100644 --- a/README_zh-CN.md +++ b/README_zh-CN.md @@ -191,6 +191,15 @@ schema: alternativeIndices: - 7 - 8 +# concatItems examples +schema: + type: vertex + vertex: + vid: + concatItems: + - "abc" + - 1 + function: hash ``` ##### `schema.vertex.vid` @@ -198,6 +207,7 @@ schema: **可选**。描述点 VID 所在的列和使用的函数。 - `index`:**可选**。在 CSV 文件中的列标,从 0 开始计数。默认值 0。 +- `concatItems`: **可选**. 连接项可以是`string`、`int`或者混合。`string`代表常量,`int`表示索引列。然后连接所有的项。如果设置了,上面的`index`将不生效。 - `function`:**可选**。用来生成 VID 时的函数,有 `hash` 和 `uuid` 两种函数可选。 - `prefix`: **可选**。给 原始vid 添加的前缀,当同时指定了 `function` 时, 生成 VID 的方法是先添加 `prefix` 前缀, 再用 `function`生成 VID。 diff --git a/examples/v1/example.yaml b/examples/v1/example.yaml index f07cf777..ff07642d 100644 --- a/examples/v1/example.yaml +++ b/examples/v1/example.yaml @@ -76,6 +76,38 @@ files: - name: name type: string + - path: ./course.csv + failDataPath: ./err/course-concat.csv + batchSize: 2 + inOrder: true + type: csv + csv: + withHeader: false + withLabel: false + schema: + type: vertex + vertex: + vid: + type: int + concatItems: # "c1{index0}c2{index1}2" + - "c1" + - 0 + - c2 + - 1 + - "2" + function: hash + tags: + - name: course + props: + - name: name + type: string + - name: credits + type: int + - name: building + props: + - name: name + type: string + - path: ./course-with-header.csv failDataPath: ./err/course-with-header.csv batchSize: 2 diff --git a/examples/v2/example.yaml b/examples/v2/example.yaml index 687b6a65..f001ad05 100644 --- a/examples/v2/example.yaml +++ b/examples/v2/example.yaml @@ -77,6 +77,37 @@ files: - name: name type: string + - path: ./course.csv + failDataPath: ./err/course-concat.csv + batchSize: 2 + inOrder: true + type: csv + csv: + withHeader: false + withLabel: false + schema: + type: vertex + vertex: + vid: + type: string + concatItems: # "c1{index0}c2{index1}2" + - "c1" + - 0 + - c2 + - 1 + - "2" + tags: + - name: course + props: + - name: name + type: string + - name: credits + type: int + - name: building + props: + - name: name + type: string + - path: ./course-with-header.csv failDataPath: ./err/course-with-header.csv batchSize: 2 diff --git a/go.mod b/go.mod index 7ca22567..bd337ce6 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220425030225-cdb52399b40a gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v2 v2.4.0 - gopkg.in/yaml.v3 v3.0.1 // indirect + gopkg.in/yaml.v3 v3.0.1 ) go 1.13 diff --git a/pkg/config/config.go b/pkg/config/config.go index 4ed3dbc5..2fa7a1f8 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -14,6 +14,7 @@ import ( "github.com/vesoft-inc/nebula-importer/pkg/base" ierrors "github.com/vesoft-inc/nebula-importer/pkg/errors" "github.com/vesoft-inc/nebula-importer/pkg/logger" + "github.com/vesoft-inc/nebula-importer/pkg/picker" "gopkg.in/yaml.v2" ) @@ -58,13 +59,16 @@ type Prop struct { NullValue string `json:"nullValue" yaml:"nullValue"` AlternativeIndices []int `json:"alternativeIndices" yaml:"alternativeIndices"` DefaultValue *string `json:"defaultValue" yaml:"defaultValue"` + picker picker.Picker } type VID struct { - Index *int `json:"index" yaml:"index"` - Function *string `json:"function" yaml:"function"` - Type *string `json:"type" yaml:"type"` - Prefix *string `json:"prefix" yaml:"prefix"` + Index *int `json:"index" yaml:"index"` + ConcatItems []interface{} `json:"concatItems" yaml:"concatItems"` // only string and int is support, int is for Index + Function *string `json:"function" yaml:"function"` + Type *string `json:"type" yaml:"type"` + Prefix *string `json:"prefix" yaml:"prefix"` + picker picker.Picker } type Rank struct { @@ -494,7 +498,6 @@ func (s *Schema) validateAndReset(prefix string) error { func (v *VID) ParseFunction(str string) (err error) { i := strings.Index(str, "(") j := strings.Index(str, ")") - err = nil if i < 0 && j < 0 { v.Function = nil v.Type = &kDefaultVidType @@ -533,25 +536,11 @@ func (v *VID) String(vid string) string { } func (v *VID) FormatValue(record base.Record) (string, error) { - if len(record) <= *v.Index { - return "", fmt.Errorf("vid index(%d) out of range record length(%d)", *v.Index, len(record)) - } - vid := record[*v.Index] - if v.Prefix != nil { - vid = *v.Prefix + vid - } - if v.Function == nil || *v.Function == "" { - if err := checkVidFormat(vid, *v.Type == "int"); err != nil { - return "", err - } - if *v.Type == "string" { - return fmt.Sprintf("%q", vid), nil - } else { - return vid, nil - } - } else { - return fmt.Sprintf("%s(%q)", *v.Function, vid), nil + value, err := v.picker.Pick(record) + if err != nil { + return "", err } + return value.Val, nil } func (v *VID) checkFunction(prefix string) error { @@ -585,7 +574,48 @@ func (v *VID) validateAndReset(prefix string, defaultVal int) error { v.Type = &kDefaultVidType logger.Log.Warnf("Not set %s.Type, reset to default value `%s'", prefix, *v.Type) } - return nil + + return v.InitPicker() +} + +func (v *VID) InitPicker() error { + pickerConfig := picker.Config{ + Type: *v.Type, + Function: v.Function, + } + + hasPrefix := v.Prefix != nil && *v.Prefix != "" + + if len(v.ConcatItems) > 0 { + if hasPrefix { + pickerConfig.ConcatItems.AddConstant(*v.Prefix) + } + for i, item := range v.ConcatItems { + switch val := item.(type) { + case int: + pickerConfig.ConcatItems.AddIndex(val) + case string: + pickerConfig.ConcatItems.AddConstant(val) + default: + return fmt.Errorf("ConcatItems only support int or string, but the %d is %v", i, val) + } + } + } else if hasPrefix { + pickerConfig.ConcatItems.AddConstant(*v.Prefix) + pickerConfig.ConcatItems.AddIndex(*v.Index) + } else { + pickerConfig.Indices = []int{*v.Index} + } + + if (v.Function == nil || *v.Function == "") && strings.EqualFold(*v.Type, "int") { + pickerConfig.CheckOnPost = func(v *picker.Value) error { + return checkVidFormat(v.Val, true) + } + } + + var err error + v.picker, err = pickerConfig.Build() + return err } func (r *Rank) validateAndReset(prefix string, defaultVal int) error { @@ -690,22 +720,23 @@ func (e *Edge) validateAndReset(prefix string) error { if e.Name == nil { return fmt.Errorf("Please configure edge name in: %s.name", prefix) } - if e.SrcVID != nil { - if err := e.SrcVID.validateAndReset(fmt.Sprintf("%s.srcVID", prefix), 0); err != nil { - return err - } - } else { + + if e.SrcVID == nil { index := 0 e.SrcVID = &VID{Index: &index, Type: &kDefaultVidType} } - if e.DstVID != nil { - if err := e.DstVID.validateAndReset(fmt.Sprintf("%s.dstVID", prefix), 1); err != nil { - return err - } - } else { + if err := e.SrcVID.validateAndReset(fmt.Sprintf("%s.srcVID", prefix), 0); err != nil { + return err + } + + if e.DstVID == nil { index := 1 e.DstVID = &VID{Index: &index, Type: &kDefaultVidType} } + if err := e.DstVID.validateAndReset(fmt.Sprintf("%s.dstVID", prefix), 1); err != nil { + return err + } + start := 2 if e.Rank != nil { if err := e.Rank.validateAndReset(fmt.Sprintf("%s.rank", prefix), 2); err != nil { @@ -792,14 +823,13 @@ func (v *Vertex) validateAndReset(prefix string) error { // if v.Tags == nil { // return fmt.Errorf("Please configure %.tags", prefix) // } - if v.VID != nil { - if err := v.VID.validateAndReset(fmt.Sprintf("%s.vid", prefix), 0); err != nil { - return err - } - } else { + if v.VID == nil { index := 0 v.VID = &VID{Index: &index, Type: &kDefaultVidType} } + if err := v.VID.validateAndReset(fmt.Sprintf("%s.vid", prefix), 0); err != nil { + return err + } j := 1 for i := range v.Tags { if v.Tags[i] != nil { @@ -834,28 +864,11 @@ func (p *Prop) IsGeographyType() bool { } func (p *Prop) FormatValue(record base.Record) (string, error) { - r, isNull, err := p.getValue(record) + value, err := p.picker.Pick(record) if err != nil { return "", err } - if isNull { - return r, err - } - if p.IsStringType() { - return fmt.Sprintf("%q", r), nil - } - if p.IsDateOrTimeType() { - if p.IsTimestampType() && reTimestampInteger.MatchString(r) { - return fmt.Sprintf("%s(%s)", strings.ToLower(*p.Type), r), nil - } - return fmt.Sprintf("%s(%q)", strings.ToLower(*p.Type), r), nil - } - // Only support wkt for geography currently - if p.IsGeographyType() { - return fmt.Sprintf("ST_GeogFromText(%q)", r), nil - } - - return r, nil + return value.Val, nil } func (p *Prop) getValue(record base.Record) (string, bool, error) { @@ -903,7 +916,29 @@ func (p *Prop) validateAndReset(prefix string, val int) error { return fmt.Errorf("Invalid prop index: %d, name: %s, type: %s", *p.Index, *p.Name, *p.Type) } } - return nil + return p.InitPicker() +} + +func (p *Prop) InitPicker() error { + pickerConfig := picker.Config{ + Indices: []int{*p.Index}, + Type: *p.Type, + } + + if p.Nullable { + pickerConfig.Nullable = func(s string) bool { + return s == p.NullValue + } + pickerConfig.NullValue = dbNULL + if len(p.AlternativeIndices) > 0 { + pickerConfig.Indices = append(pickerConfig.Indices, p.AlternativeIndices...) + } + pickerConfig.DefaultValue = p.DefaultValue + } + + var err error + p.picker, err = pickerConfig.Build() + return err } func (t *Tag) FormatValues(record base.Record) (string, bool, error) { diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 53b4be59..a034a9fe 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -312,6 +312,87 @@ func TestParseLogPath(t *testing.T) { } } +func TestParseConcatItems(t *testing.T) { + testcases := []struct { + concatItems string + fnCheck func(ast *assert.Assertions, concatItems []interface{}) + }{ + { + concatItems: "", + fnCheck: func(ast *assert.Assertions, concatItems []interface{}) { + ast.Len(concatItems, 0) + }, + }, + { + concatItems: "concatItems: [\"c1\"]", + fnCheck: func(ast *assert.Assertions, concatItems []interface{}) { + if ast.Len(concatItems, 1) { + ast.Equal(concatItems[0], "c1") + } + }, + }, + { + concatItems: "concatItems: [3]", + fnCheck: func(ast *assert.Assertions, concatItems []interface{}) { + if ast.Len(concatItems, 1) { + ast.Equal(concatItems[0], 3) + } + }, + }, + { + concatItems: "concatItems: [3, \"c1\", 1, \"c2\", 2]", + fnCheck: func(ast *assert.Assertions, concatItems []interface{}) { + if ast.Len(concatItems, 5) { + ast.Equal(concatItems[0], 3) + ast.Equal(concatItems[1], "c1") + ast.Equal(concatItems[2], 1) + ast.Equal(concatItems[3], "c2") + ast.Equal(concatItems[4], 2) + } + }, + }, + { + concatItems: "concatItems: [\"c1\", 3, \"c2\", 1, \"2\"]", + fnCheck: func(ast *assert.Assertions, concatItems []interface{}) { + if ast.Len(concatItems, 5) { + ast.Equal(concatItems[0], "c1") + ast.Equal(concatItems[1], 3) + ast.Equal(concatItems[2], "c2") + ast.Equal(concatItems[3], 1) + ast.Equal(concatItems[4], "2") + } + }, + }, + } + + for _, tc := range testcases { + t.Run(tc.concatItems, func(t *testing.T) { + ast := assert.New(t) + + tmpl, err := template.ParseFiles("testdata/test-parse-concat-items.yaml") + ast.NoError(err) + + f, err := os.CreateTemp("testdata", ".test-parse-concat-items.yaml") + ast.NoError(err) + filename := f.Name() + defer func() { + _ = f.Close() + _ = os.Remove(filename) + }() + + err = tmpl.ExecuteTemplate(f, "test-parse-concat-items.yaml", map[string]string{ + "ConcatItems": tc.concatItems, + }) + ast.NoError(err) + + c, err := Parse(filename, logger.NewRunnerLogger("")) + if ast.NoError(err) { + tc.fnCheck(ast, c.Files[0].Schema.Edge.SrcVID.ConcatItems) + } + }) + } +} + func TestParseNoFiles(t *testing.T) { _, err := Parse("./testdata/test-parse-no-files.yaml", logger.NewRunnerLogger("")) assert.Error(t, err) @@ -386,10 +467,11 @@ func TestVidFormatValue(t *testing.T) { name: "index out of range", vid: VID{ Index: &idx1, + Type: &tString, }, want: "", record: base.Record{""}, - wantErrString: "out of range record length", + wantErrString: "out range", }, { name: "type string", @@ -459,6 +541,7 @@ func TestVidFormatValue(t *testing.T) { name: "function hash", vid: VID{ Index: &idx0, + Type: &tString, Function: &fHash, }, record: base.Record{"str"}, @@ -478,6 +561,9 @@ func TestVidFormatValue(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { ast := assert.New(t) + + ast.NoError(tc.vid.InitPicker()) + str, err := tc.vid.FormatValue(tc.record) if tc.wantErrString != "" { ast.Error(err) @@ -609,6 +695,7 @@ func TestPropFormatValue(t *testing.T) { name: "index out of range", prop: Prop{ Index: &idx1, + Type: &tString, }, want: "", record: base.Record{""}, @@ -727,7 +814,7 @@ func TestPropFormatValue(t *testing.T) { Type: &tTime, }, record: base.Record{"18:38:23.284"}, - want: "time(\"18:38:23.284\")", + want: "TIME(\"18:38:23.284\")", }, { name: "type time null", @@ -746,7 +833,7 @@ func TestPropFormatValue(t *testing.T) { Type: &tTimestamp, }, record: base.Record{"2020-01-11T19:28:23"}, - want: "timestamp(\"2020-01-11T19:28:23\")", + want: "TIMESTAMP(\"2020-01-11T19:28:23\")", }, { name: "type timestamp integer", @@ -755,7 +842,7 @@ func TestPropFormatValue(t *testing.T) { Type: &tTimestamp, }, record: base.Record{"1578770903"}, - want: "timestamp(1578770903)", + want: "TIMESTAMP(1578770903)", }, { name: "type timestamp integer", @@ -764,7 +851,7 @@ func TestPropFormatValue(t *testing.T) { Type: &tTimestamp, }, record: base.Record{"0123"}, - want: "timestamp(0123)", + want: "TIMESTAMP(0123)", }, { name: "type timestamp integer", @@ -773,7 +860,7 @@ func TestPropFormatValue(t *testing.T) { Type: &tTimestamp, }, record: base.Record{"0XF0"}, - want: "timestamp(0XF0)", + want: "TIMESTAMP(0XF0)", }, { name: "type timestamp null", @@ -792,7 +879,7 @@ func TestPropFormatValue(t *testing.T) { Type: &tDate, }, record: base.Record{"2020-01-02"}, - want: "date(\"2020-01-02\")", + want: "DATE(\"2020-01-02\")", }, { name: "type date null", @@ -811,7 +898,7 @@ func TestPropFormatValue(t *testing.T) { Type: &tDatetime, }, record: base.Record{"2020-01-11T19:28:23.284"}, - want: "datetime(\"2020-01-11T19:28:23.284\")", + want: "DATETIME(\"2020-01-11T19:28:23.284\")", }, { name: "type datetime null", @@ -1004,6 +1091,9 @@ func TestPropFormatValue(t *testing.T) { for _, tc := range testcases { t.Run(tc.name, func(t *testing.T) { ast := assert.New(t) + + ast.NoError(tc.prop.InitPicker()) + str, err := tc.prop.FormatValue(tc.record) if tc.wantErrString != "" { ast.Error(err) diff --git a/pkg/config/testdata/test-parse-after-period.yaml b/pkg/config/testdata/test-parse-after-period.yaml index 30849383..bd060eb8 100644 --- a/pkg/config/testdata/test-parse-after-period.yaml +++ b/pkg/config/testdata/test-parse-after-period.yaml @@ -14,7 +14,7 @@ clientSettings: commands: SHOW HOSTS {{ .AfterPeriod }} files: - - path: ../../examples/v2/choose.csv + - path: ../../../examples/v2/choose.csv batchSize: 2 inOrder: false type: csv diff --git a/pkg/config/testdata/test-parse-concat-items.yaml b/pkg/config/testdata/test-parse-concat-items.yaml new file mode 100644 index 00000000..f3c45c11 --- /dev/null +++ b/pkg/config/testdata/test-parse-concat-items.yaml @@ -0,0 +1,31 @@ +version: v2 +description: example +removeTempFiles: false +clientSettings: + retry: 3 + concurrency: 2 # number of graph clients + channelBufferSize: 1 + space: importer_test + connection: + user: root + password: nebula + address: 127.0.0.1:9669 +files: + - path: ../../../examples/v2/choose.csv + batchSize: 2 + inOrder: false + type: csv + csv: + withHeader: false + withLabel: false + schema: + type: edge + edge: + srcVID: + {{ .ConcatItems }} + type: string + name: choose + withRanking: false + props: + - name: grade + type: int diff --git a/pkg/config/testdata/test-parse-log-path.yaml b/pkg/config/testdata/test-parse-log-path.yaml index afddb1e0..f759f90b 100644 --- a/pkg/config/testdata/test-parse-log-path.yaml +++ b/pkg/config/testdata/test-parse-log-path.yaml @@ -12,7 +12,7 @@ clientSettings: address: 127.0.0.1:9669 {{ .LogPath }} files: - - path: ../../examples/v2/choose.csv + - path: ../../../examples/v2/choose.csv batchSize: 2 inOrder: false type: csv diff --git a/pkg/config/testdata/test-parse-version.yaml b/pkg/config/testdata/test-parse-version.yaml index 88539654..58d3a312 100644 --- a/pkg/config/testdata/test-parse-version.yaml +++ b/pkg/config/testdata/test-parse-version.yaml @@ -11,7 +11,7 @@ clientSettings: password: nebula address: 127.0.0.1:9669 files: - - path: ../../examples/v2/choose.csv + - path: ../../../examples/v2/choose.csv batchSize: 2 inOrder: false type: csv diff --git a/pkg/picker/picker.go b/pkg/picker/picker.go index f1fd6431..7329ecc0 100644 --- a/pkg/picker/picker.go +++ b/pkg/picker/picker.go @@ -1,6 +1,9 @@ package picker -var _ Picker = ConverterPicker{} +var ( + _ Picker = ConverterPicker{} + _ Picker = NullablePickers{} +) type ( Picker interface { diff --git a/pkg/reader/batchmgr.go b/pkg/reader/batchmgr.go index 3fa8c39c..8bc2e384 100644 --- a/pkg/reader/batchmgr.go +++ b/pkg/reader/batchmgr.go @@ -42,7 +42,6 @@ func (bm *BatchMgr) Done() { } func (bm *BatchMgr) InitSchema(header base.Record, runnerLogger *logger.RunnerLogger) (err error) { - err = nil if bm.initializedSchema { logger.Log.Info("Batch manager schema has been initialized!") return @@ -56,12 +55,15 @@ func (bm *BatchMgr) InitSchema(header base.Record, runnerLogger *logger.RunnerLo case strings.HasPrefix(c, base.LABEL_VID): *bm.Schema.Vertex.VID.Index = i err = bm.Schema.Vertex.VID.ParseFunction(c) + _ = bm.Schema.Vertex.VID.InitPicker() case strings.HasPrefix(c, base.LABEL_SRC_VID): *bm.Schema.Edge.SrcVID.Index = i err = bm.Schema.Edge.SrcVID.ParseFunction(c) + _ = bm.Schema.Edge.SrcVID.InitPicker() case strings.HasPrefix(c, base.LABEL_DST_VID): *bm.Schema.Edge.DstVID.Index = i err = bm.Schema.Edge.DstVID.ParseFunction(c) + _ = bm.Schema.Edge.DstVID.InitPicker() case c == base.LABEL_RANK: if bm.Schema.Edge.Rank == nil { rank := i @@ -100,6 +102,7 @@ func (bm *BatchMgr) addVertexTags(r string, i int) { Type: &columnType, Index: &i, } + _ = p.InitPicker() tag.Props = append(tag.Props, &p) } @@ -115,6 +118,7 @@ func (bm *BatchMgr) addEdgeProps(r string, i int) { Type: &columnType, Index: &i, } + _ = p.InitPicker() bm.Schema.Edge.Props = append(bm.Schema.Edge.Props, &p) }