Skip to content

Commit

Permalink
feat: support concat and change to picker (#248)
Browse files Browse the repository at this point in the history
  • Loading branch information
veezhang authored Dec 14, 2022
1 parent b35eeab commit 1e1882a
Show file tree
Hide file tree
Showing 13 changed files with 320 additions and 73 deletions.
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,13 +217,24 @@ schema:
alternativeIndices:
- 7
- 8
# concatItems examples
schema:
type: vertex
vertex:
vid:
concatItems:
- "abc"
- 1
function: hash
```

##### `schema.vertex.vid`

**Optional**. Describes the vertex ID column and the function used for the vertex ID.

* `index`: **Optional**. The column number in the CSV file. Started with 0. The default value is 0.
* `concatItems`: **Optional**. The concat item can be `string`, `int` or mixed. `string` represents a constant, and `int` represents an index column. Then connect all items.If set, the above `index` will have no effect.
* `function`: **Optional**. Functions to generate the VIDs. Currently, we only support function `hash` and `uuid`.
* `type`: **Optional**. The type for VIDs. The default value is `string`.
* `prefix`: **Optional**. Add prefix to the original vid. When `function` is specified also, `prefix` is applied to the original vid before `function`.
Expand Down
10 changes: 10 additions & 0 deletions README_zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,23 @@ schema:
alternativeIndices:
- 7
- 8
# concatItems examples
schema:
type: vertex
vertex:
vid:
concatItems:
- "abc"
- 1
function: hash
```
##### `schema.vertex.vid`
**可选**。描述点 VID 所在的列和使用的函数。
- `index`:**可选**。在 CSV 文件中的列标,从 0 开始计数。默认值 0。
- `concatItems`: **可选**. 连接项可以是`string`、`int`或者混合。`string`代表常量,`int`表示索引列。然后连接所有的项。如果设置了,上面的`index`将不生效。
- `function`:**可选**。用来生成 VID 时的函数,有 `hash` 和 `uuid` 两种函数可选。
- `prefix`: **可选**。给 原始vid 添加的前缀,当同时指定了 `function` 时, 生成 VID 的方法是先添加 `prefix` 前缀, 再用 `function`生成 VID。
Expand Down
32 changes: 32 additions & 0 deletions examples/v1/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,38 @@ files:
- name: name
type: string

- path: ./course.csv
failDataPath: ./err/course-concat.csv
batchSize: 2
inOrder: true
type: csv
csv:
withHeader: false
withLabel: false
schema:
type: vertex
vertex:
vid:
type: int
concatItems: # "c1{index0}c2{index1}2"
- "c1"
- 0
- c2
- 1
- "2"
function: hash
tags:
- name: course
props:
- name: name
type: string
- name: credits
type: int
- name: building
props:
- name: name
type: string

- path: ./course-with-header.csv
failDataPath: ./err/course-with-header.csv
batchSize: 2
Expand Down
31 changes: 31 additions & 0 deletions examples/v2/example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,37 @@ files:
- name: name
type: string

- path: ./course.csv
failDataPath: ./err/course-concat.csv
batchSize: 2
inOrder: true
type: csv
csv:
withHeader: false
withLabel: false
schema:
type: vertex
vertex:
vid:
type: string
concatItems: # "c1{index0}c2{index1}2"
- "c1"
- 0
- c2
- 1
- "2"
tags:
- name: course
props:
- name: name
type: string
- name: credits
type: int
- name: building
props:
- name: name
type: string

- path: ./course-with-header.csv
failDataPath: ./err/course-with-header.csv
batchSize: 2
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220425030225-cdb52399b40a
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0
gopkg.in/yaml.v3 v3.0.1 // indirect
gopkg.in/yaml.v3 v3.0.1
)

go 1.13
153 changes: 94 additions & 59 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/vesoft-inc/nebula-importer/pkg/base"
ierrors "github.com/vesoft-inc/nebula-importer/pkg/errors"
"github.com/vesoft-inc/nebula-importer/pkg/logger"
"github.com/vesoft-inc/nebula-importer/pkg/picker"
"gopkg.in/yaml.v2"
)

Expand Down Expand Up @@ -58,13 +59,16 @@ type Prop struct {
NullValue string `json:"nullValue" yaml:"nullValue"`
AlternativeIndices []int `json:"alternativeIndices" yaml:"alternativeIndices"`
DefaultValue *string `json:"defaultValue" yaml:"defaultValue"`
picker picker.Picker
}

type VID struct {
Index *int `json:"index" yaml:"index"`
Function *string `json:"function" yaml:"function"`
Type *string `json:"type" yaml:"type"`
Prefix *string `json:"prefix" yaml:"prefix"`
Index *int `json:"index" yaml:"index"`
ConcatItems []interface{} `json:"concatItems" yaml:"concatItems"` // only string and int is support, int is for Index
Function *string `json:"function" yaml:"function"`
Type *string `json:"type" yaml:"type"`
Prefix *string `json:"prefix" yaml:"prefix"`
picker picker.Picker
}

type Rank struct {
Expand Down Expand Up @@ -494,7 +498,6 @@ func (s *Schema) validateAndReset(prefix string) error {
func (v *VID) ParseFunction(str string) (err error) {
i := strings.Index(str, "(")
j := strings.Index(str, ")")
err = nil
if i < 0 && j < 0 {
v.Function = nil
v.Type = &kDefaultVidType
Expand Down Expand Up @@ -533,25 +536,11 @@ func (v *VID) String(vid string) string {
}

func (v *VID) FormatValue(record base.Record) (string, error) {
if len(record) <= *v.Index {
return "", fmt.Errorf("vid index(%d) out of range record length(%d)", *v.Index, len(record))
}
vid := record[*v.Index]
if v.Prefix != nil {
vid = *v.Prefix + vid
}
if v.Function == nil || *v.Function == "" {
if err := checkVidFormat(vid, *v.Type == "int"); err != nil {
return "", err
}
if *v.Type == "string" {
return fmt.Sprintf("%q", vid), nil
} else {
return vid, nil
}
} else {
return fmt.Sprintf("%s(%q)", *v.Function, vid), nil
value, err := v.picker.Pick(record)
if err != nil {
return "", err
}
return value.Val, nil
}

func (v *VID) checkFunction(prefix string) error {
Expand Down Expand Up @@ -585,7 +574,48 @@ func (v *VID) validateAndReset(prefix string, defaultVal int) error {
v.Type = &kDefaultVidType
logger.Log.Warnf("Not set %s.Type, reset to default value `%s'", prefix, *v.Type)
}
return nil

return v.InitPicker()
}

func (v *VID) InitPicker() error {
pickerConfig := picker.Config{
Type: *v.Type,
Function: v.Function,
}

hasPrefix := v.Prefix != nil && *v.Prefix != ""

if len(v.ConcatItems) > 0 {
if hasPrefix {
pickerConfig.ConcatItems.AddConstant(*v.Prefix)
}
for i, item := range v.ConcatItems {
switch val := item.(type) {
case int:
pickerConfig.ConcatItems.AddIndex(val)
case string:
pickerConfig.ConcatItems.AddConstant(val)
default:
return fmt.Errorf("ConcatItems only support int or string, but the %d is %v", i, val)
}
}
} else if hasPrefix {
pickerConfig.ConcatItems.AddConstant(*v.Prefix)
pickerConfig.ConcatItems.AddIndex(*v.Index)
} else {
pickerConfig.Indices = []int{*v.Index}
}

if (v.Function == nil || *v.Function == "") && strings.EqualFold(*v.Type, "int") {
pickerConfig.CheckOnPost = func(v *picker.Value) error {
return checkVidFormat(v.Val, true)
}
}

var err error
v.picker, err = pickerConfig.Build()
return err
}

func (r *Rank) validateAndReset(prefix string, defaultVal int) error {
Expand Down Expand Up @@ -690,22 +720,23 @@ func (e *Edge) validateAndReset(prefix string) error {
if e.Name == nil {
return fmt.Errorf("Please configure edge name in: %s.name", prefix)
}
if e.SrcVID != nil {
if err := e.SrcVID.validateAndReset(fmt.Sprintf("%s.srcVID", prefix), 0); err != nil {
return err
}
} else {

if e.SrcVID == nil {
index := 0
e.SrcVID = &VID{Index: &index, Type: &kDefaultVidType}
}
if e.DstVID != nil {
if err := e.DstVID.validateAndReset(fmt.Sprintf("%s.dstVID", prefix), 1); err != nil {
return err
}
} else {
if err := e.SrcVID.validateAndReset(fmt.Sprintf("%s.srcVID", prefix), 0); err != nil {
return err
}

if e.DstVID == nil {
index := 1
e.DstVID = &VID{Index: &index, Type: &kDefaultVidType}
}
if err := e.DstVID.validateAndReset(fmt.Sprintf("%s.dstVID", prefix), 1); err != nil {
return err
}

start := 2
if e.Rank != nil {
if err := e.Rank.validateAndReset(fmt.Sprintf("%s.rank", prefix), 2); err != nil {
Expand Down Expand Up @@ -792,14 +823,13 @@ func (v *Vertex) validateAndReset(prefix string) error {
// if v.Tags == nil {
// return fmt.Errorf("Please configure %.tags", prefix)
// }
if v.VID != nil {
if err := v.VID.validateAndReset(fmt.Sprintf("%s.vid", prefix), 0); err != nil {
return err
}
} else {
if v.VID == nil {
index := 0
v.VID = &VID{Index: &index, Type: &kDefaultVidType}
}
if err := v.VID.validateAndReset(fmt.Sprintf("%s.vid", prefix), 0); err != nil {
return err
}
j := 1
for i := range v.Tags {
if v.Tags[i] != nil {
Expand Down Expand Up @@ -834,28 +864,11 @@ func (p *Prop) IsGeographyType() bool {
}

func (p *Prop) FormatValue(record base.Record) (string, error) {
r, isNull, err := p.getValue(record)
value, err := p.picker.Pick(record)
if err != nil {
return "", err
}
if isNull {
return r, err
}
if p.IsStringType() {
return fmt.Sprintf("%q", r), nil
}
if p.IsDateOrTimeType() {
if p.IsTimestampType() && reTimestampInteger.MatchString(r) {
return fmt.Sprintf("%s(%s)", strings.ToLower(*p.Type), r), nil
}
return fmt.Sprintf("%s(%q)", strings.ToLower(*p.Type), r), nil
}
// Only support wkt for geography currently
if p.IsGeographyType() {
return fmt.Sprintf("ST_GeogFromText(%q)", r), nil
}

return r, nil
return value.Val, nil
}

func (p *Prop) getValue(record base.Record) (string, bool, error) {
Expand Down Expand Up @@ -903,7 +916,29 @@ func (p *Prop) validateAndReset(prefix string, val int) error {
return fmt.Errorf("Invalid prop index: %d, name: %s, type: %s", *p.Index, *p.Name, *p.Type)
}
}
return nil
return p.InitPicker()
}

func (p *Prop) InitPicker() error {
pickerConfig := picker.Config{
Indices: []int{*p.Index},
Type: *p.Type,
}

if p.Nullable {
pickerConfig.Nullable = func(s string) bool {
return s == p.NullValue
}
pickerConfig.NullValue = dbNULL
if len(p.AlternativeIndices) > 0 {
pickerConfig.Indices = append(pickerConfig.Indices, p.AlternativeIndices...)
}
pickerConfig.DefaultValue = p.DefaultValue
}

var err error
p.picker, err = pickerConfig.Build()
return err
}

func (t *Tag) FormatValues(record base.Record) (string, bool, error) {
Expand Down
Loading

0 comments on commit 1e1882a

Please sign in to comment.