Skip to content

Commit

Permalink
fix json object array parsing. use DateTime64(3) for detected date fi…
Browse files Browse the repository at this point in the history
…eld.
  • Loading branch information
yuzhichang committed May 10, 2021
1 parent ef11a8b commit b541f81
Show file tree
Hide file tree
Showing 7 changed files with 89 additions and 39 deletions.
8 changes: 4 additions & 4 deletions go.test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ for i in `seq 1 10000`;do
echo "{\"time\" : \"${now}\", \"name\" : \"name$i\", \"value\" : $i }"
done > a.json
for i in `seq 10001 30000`;do
echo "{\"time\" : \"${now}\", \"name\" : \"name$i\", \"value\" : $i, \"newkey1\" : $i }"
echo "{\"time\" : \"${now}\", \"name\" : \"name$i\", \"value\" : $i, \"newkey01\" : $i }"
done >> a.json
for i in `seq 30001 50000`;do
echo "{\"time\" : \"${now}\", \"name\" : \"name$i\", \"value\" : $i, \"newkey2\" : $i.123, \"newkey3\" : \"name$i\", \"newkey4\" : \"${now}\" }"
echo "{\"time\" : \"${now}\", \"name\" : \"name$i\", \"value\" : $i, \"newkey02\" : $i.123, \"newkey03\" : \"name$i\", \"newkey04\" : \"${now}\", \"newkey05\" : {\"k1\": 1, \"k2\": 2} }"
done >> a.json
for i in `seq 50001 70000`;do
echo "{\"time\" : \"${now}\", \"name\" : \"name$i\", \"value\" : $i, \"newkey5\" : [$i], \"newkey6\" : [$i.123], \"newkey7\" : [\"name$i\"], \"newkey8\" : [\"${now}\"] }"
echo "{\"time\" : \"${now}\", \"name\" : \"name$i\", \"value\" : $i, \"newkey06\" : [$i], \"newkey07\" : [$i.123], \"newkey08\" : [\"name$i\"], \"newkey09\" : [\"${now}\"], \"newkey10\" : [{\"k1\": 1, \"k2\": 2}, {\"k3\": 3, \"k4\": 4}] }"
done >> a.json
for i in `seq 70001 100000`;do
echo "{\"time\" : \"${now}\", \"name\" : \"name$i\", \"value\" : $i }"
Expand Down Expand Up @@ -62,7 +62,7 @@ echo "Got test_auto_schema count => $count"

schema=`curl "localhost:8123" -d 'DESC test_dynamic_schema' 2>/dev/null | grep newkey | sort | tr -d '\t' | tr '\n' ','`
echo "Got test_dynamic_schema schema => $schema"
[ $schema = "newkey1Nullable(Int64),newkey2Nullable(Float64),newkey3Nullable(String),newkey4Nullable(DateTime),newkey5Array(Int64),newkey6Array(Float64),newkey7Array(String),newkey8Array(DateTime)," ] || exit 1
[ $schema = "newkey01Nullable(Int64),newkey02Nullable(Float64),newkey03Nullable(String),newkey04Nullable(DateTime64(3)),newkey05Nullable(String),newkey06Array(Int64),newkey07Array(Float64),newkey08Array(String),newkey09Array(DateTime64(3)),newkey10Array(String)," ] || exit 1
count=`curl "localhost:8123" -d 'SELECT count() FROM test_dynamic_schema'`
echo "Got test_dynamic_schema count => $count"
[ $count -eq 100000 ] || exit 1
Expand Down
3 changes: 3 additions & 0 deletions model/value.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ func WhichType(typ string) (dataType int, nullable bool) {
}
if strings.HasPrefix(typ, "DateTime64") {
dataType = DateTime
} else if strings.HasPrefix(typ, "Array(DateTime64") {
dataType = DateTimeArray
nullable = false
} else {
util.Logger.Fatal(fmt.Sprintf("LOGIC ERROR: unsupported ClickHouse data type %v", typ))
}
Expand Down
4 changes: 2 additions & 2 deletions output/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,15 @@ func (c *ClickHouse) ChangeSchema(newKeys *sync.Map) (err error) {
case model.String:
strVal = "Nullable(String)"
case model.DateTime:
strVal = "Nullable(DateTime)"
strVal = "Nullable(DateTime64(3))"
case model.IntArray:
strVal = "Array(Int64)"
case model.FloatArray:
strVal = "Array(Float64)"
case model.StringArray:
strVal = "Array(String)"
case model.DateTimeArray:
strVal = "Array(DateTime)"
strVal = "Array(DateTime64(3))"
default:
err = errors.Errorf("%s: BUG: unsupported column type %s", taskCfg.Name, strVal)
return false
Expand Down
79 changes: 55 additions & 24 deletions parser/fastjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,37 @@ func (c *FastjsonMetric) GetArray(key string, typ int) (val interface{}) {
case model.String:
results := make([]string, 0, len(array))
for _, e := range array {
v, _ := e.StringBytes()
results = append(results, string(v))
if e.Type() == fastjson.TypeString {
v, _ := e.StringBytes()
results = append(results, string(v))
} else {
results = append(results, e.String())
}
}
val = results
case model.DateTime:
results := make([]time.Time, 0, len(array))
var err error
for _, e := range array {
v, _ := e.StringBytes()
t := c.pp.ParseDateTime(key, string(v))
var t time.Time
switch e.Type() {
case fastjson.TypeNumber:
var f float64
if f, err = e.Float64(); err != nil {
t = Epoch
} else {
t = time.Unix(int64(f), int64(f*1e9)%1e9).In(time.UTC)
}
case fastjson.TypeString:
var b []byte
if b, err = e.StringBytes(); err != nil {
t = Epoch
} else {
t = c.pp.ParseDateTime(key, string(b))
}
default:
t = Epoch
}
results = append(results, t)
}
val = results
Expand Down Expand Up @@ -210,31 +232,40 @@ func (c *FastjsonMetric) GetNewKeys(knownKeys *sync.Map, newKeys *sync.Map) (fou
}

func fjDetectType(v *fastjson.Value) (typ int) {
if vt := v.Type(); vt == fastjson.TypeNull {
} else if vt == fastjson.TypeTrue || vt == fastjson.TypeFalse {
switch v.Type() {
case fastjson.TypeNull:
case fastjson.TypeTrue:
typ = model.Int
} else if _, err := v.Int64(); err == nil {
case fastjson.TypeFalse:
typ = model.Int
} else if _, err := v.Float64(); err == nil {
case fastjson.TypeNumber:
typ = model.Float
} else if val, err := v.StringBytes(); err == nil {
if _, layout := parseInLocation(string(val), time.Local); layout != "" {
typ = model.DateTime
} else {
typ = model.String
if _, err := v.Int64(); err == nil {
typ = model.Int
}
case fastjson.TypeString:
typ = model.String
if val, err := v.StringBytes(); err == nil {
if _, layout := parseInLocation(string(val), time.Local); layout != "" {
typ = model.DateTime
}
}
} else if arr, err := v.Array(); err == nil && len(arr) > 0 {
typ2 := fjDetectType(arr[0])
switch typ2 {
case model.Int:
typ = model.IntArray
case model.Float:
typ = model.FloatArray
case model.String:
typ = model.StringArray
case model.DateTime:
typ = model.DateTimeArray
case fastjson.TypeArray:
if arr, err := v.Array(); err == nil && len(arr) > 0 {
typ2 := fjDetectType(arr[0])
switch typ2 {
case model.Int:
typ = model.IntArray
case model.Float:
typ = model.FloatArray
case model.String:
typ = model.StringArray
case model.DateTime:
typ = model.DateTimeArray
}
}
default:
typ = model.String
}
return
}
10 changes: 9 additions & 1 deletion parser/gjson.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,15 @@ func (c *GjsonMetric) GetArray(key string, typ int) (val interface{}) {
case model.DateTime:
results := make([]time.Time, 0, len(array))
for _, e := range array {
t := c.pp.ParseDateTime(key, e.String())
var t time.Time
switch e.Type {
case gjson.Number:
t = time.Unix(int64(e.Num), int64(r.Num*1e9)%1e9).In(time.UTC)
case gjson.String:
t = c.pp.ParseDateTime(key, e.Str)
default:
t = Epoch
}
results = append(results, t)
}
val = results
Expand Down
13 changes: 11 additions & 2 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var jsonSample = []byte(`{
"array_float": [1.1,2.2,3.3],
"array_string": ["aa","bb","cc"],
"array_date": ["2000-01-01","2000-01-02","2000-01-03"],
"array_object": [{"i":[1,2,3],"f":[1.1,2.2,3.3]},{"s":["aa","bb","cc"],"e":[]}],
"array_empty": [],
"bool_true": true,
"bool_false": false
Expand Down Expand Up @@ -78,12 +79,13 @@ var jsonSchema = map[string]string{
"array_float": "array",
"array_string": "array",
"array_date": "array",
"array_object": "array",
"array_empty": "array",
"bool_true": "true",
"bool_false": "false",
}

var csvSample = []byte(`1536813227,"0.11","escaped_""ws","{""i"":[1,2,3],""f"":[1.1,2.2,3.3],""s"":[""aa"",""bb"",""cc""],""e"":[]}",2019-12-16,2019-12-16T12:10:30Z,2019-12-16T12:10:30+08:00,2019-12-16 12:10:30,2019-12-16T12:10:30.123Z,2019-12-16T12:10:30.123+08:00,2019-12-16 12:10:30.123,"[1,2,3]","[1.1,2.2,3.3]","[""aa"",""bb"",""cc""]","[""2000-01-01"",""2000-01-02"",""2000-01-03""]","[]","true","false"`)
var csvSample = []byte(`1536813227,"0.11","escaped_""ws","{""i"":[1,2,3],""f"":[1.1,2.2,3.3],""s"":[""aa"",""bb"",""cc""],""e"":[]}",2019-12-16,2019-12-16T12:10:30Z,2019-12-16T12:10:30+08:00,2019-12-16 12:10:30,2019-12-16T12:10:30.123Z,2019-12-16T12:10:30.123+08:00,2019-12-16 12:10:30.123,"[1,2,3]","[1.1,2.2,3.3]","[""aa"",""bb"",""cc""]","[""2000-01-01"",""2000-01-02"",""2000-01-03""]","[{""i"":[1,2,3],""f"":[1.1,2.2,3.3]},{""s"":[""aa"",""bb"",""cc""],""e"":[]}]","[]","true","false"`)

var csvSchema = []string{
"its",
Expand All @@ -101,6 +103,7 @@ var csvSchema = []string{
"array_float",
"array_string",
"array_date",
"array_object",
"array_empty",
"bool_true",
"bool_false",
Expand Down Expand Up @@ -154,7 +157,8 @@ func initMetrics() {
}
parser = pp.Get()
if metric, initErr = parser.Parse(sample); initErr != nil {
return
msg := fmt.Sprintf("%+v", initErr)
panic(msg)
}
pools[name] = pp
parsers[name] = parser
Expand Down Expand Up @@ -261,6 +265,7 @@ func TestParserArray(t *testing.T) {
{"array_float", model.Float, []float64{1.1, 2.2, 3.3}},
{"array_string", model.String, []string{"aa", "bb", "cc"}},
{"array_date", model.DateTime, ts},
{"array_object", model.String, []string{`{"i":[1,2,3],"f":[1.1,2.2,3.3]}`, `{"s":["aa","bb","cc"],"e":[]}`}},
{"array_empty", model.Int, []int64{}},
{"array_empty", model.Float, []float64{}},
{"array_empty", model.String, []string{}},
Expand All @@ -271,6 +276,10 @@ func TestParserArray(t *testing.T) {
name := names[i]
metric := metrics[name]
for j := range testCases {
if name == "csv" && testCases[j].Field == "array_object" {
// csv parser doesn't support object array yet.
continue
}
var v interface{}
desc := fmt.Sprintf(`%s GetArray("%s", %d)`, name, testCases[j].Field, testCases[j].Type)
v = metric.GetArray(testCases[j].Field, testCases[j].Type)
Expand Down
11 changes: 5 additions & 6 deletions task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,23 +229,22 @@ func (service *Service) put(msg model.InputMessage) {
defer statistics.ParsingPoolBacklog.WithLabelValues(taskCfg.Name).Dec()
p := service.pp.Get()
metric, err = p.Parse(msg.Value)
if err == nil {
row = model.MetricToRow(metric, msg, service.dims)
}
// WARNNING: Always PutElem even if there's parsing error, so that this message can be acked to Kafka and skipped writing to ClickHouse.
if err != nil {
statistics.ParseMsgsErrorTotal.WithLabelValues(taskCfg.Name).Inc()
if service.limiter1.Allow() {
util.Logger.Error(fmt.Sprintf("failed to parse message(topic %v, partition %d, offset %v)",
msg.Topic, msg.Partition, msg.Offset), zap.String("message value", string(msg.Value)), zap.String("task", service.cfg.Task.Name), zap.Error(err))
}
} else {
row = model.MetricToRow(metric, msg, service.dims)
if taskCfg.DynamicSchema.Enable {
foundNewKeys = metric.GetNewKeys(&service.knownKeys, &service.newKeys)
}
}
// WARNNING: metric.GetXXX may depend on p. Don't call them after p been freed.
service.pp.Put(p)

if taskCfg.DynamicSchema.Enable {
foundNewKeys = metric.GetNewKeys(&service.knownKeys, &service.newKeys)
}
if foundNewKeys {
cntNewKeys := atomic.AddInt32(&service.cntNewKeys, 1)
if cntNewKeys == 1 {
Expand Down

0 comments on commit b541f81

Please sign in to comment.