-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Handle TimeStamp columns by converting int64 to time.Time and vice versa #15
Open
echarlus
wants to merge
18
commits into
herenow:master
Choose a base branch
from
echarlus:time_column_handling
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
011e6e2
Since one cannot replace the driver.DefaultParameterConverter to prop…
echarlus 0def5fb
Now that GO 1.9 is released properly handle data types in the driver …
echarlus 6d87b62
Improved GeoPoint by using a struct rather than an array of floats.
echarlus 7c51c1a
Handle Crate's Array column type
echarlus b801d28
Use our own basic json encoder rather than the GO lib one to properly…
echarlus cfa2930
Convert v to Float before formatting with %f format
echarlus a6dff48
Implemented encoding of arrays of map/arrays
echarlus efa5085
Handle interface types in Array & Map
echarlus 8d02624
Fix to satisty Error interface
echarlus cdfdf1f
Fixed JSON encoding causing issue on crate 2.3.2 because a String was…
echarlus 1b3ab3c
Merge branch 'time_column_handling' of https://github.com/echarlus/go…
echarlus 7a391ae
//Prevents rounding errors seen with floats like 0.01*41 which is 0.4…
echarlus 0a627df
Complete fix for (Fix had only been applied to float in arrays, not i…
echarlus f550e9a
Prevent crash when processing nil maps.
echarlus 0c2190f
Ticket SDRMDPTH-15 : Fixed string encoding in encodeArray.
echarlus 461e53d
Prepare for Crate 4.x : support authentication
echarlus 11d19f0
Fixed map of map element handling and added support for Int into map …
echarlus c0f52ca
return a CrateErr rather than &CrateErr to allow errors.As to be used…
echarlus File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,13 +10,55 @@ import ( | |
"io" | ||
"net/http" | ||
"net/url" | ||
"reflect" | ||
"strings" | ||
"time" | ||
//"log" | ||
) | ||
|
||
// Crate conn structure | ||
type CrateDriver struct { | ||
Url string // Crate http endpoint url | ||
} | ||
|
||
//GeoPoint represents Crate GeoPoint column | ||
type GeoPoint struct { | ||
Lat float64 | ||
Lon float64 | ||
} | ||
|
||
//CrateArray represents an Array column type | ||
type CrateArray []interface{} | ||
|
||
//crateMap used to store any map and force our own MarshalJSON method to be called | ||
type crateMap map[string]interface{} | ||
|
||
//Scan : Implements Scanner interface to populate a GeoPoint when the result is an array of 2 floats | ||
func (gp *GeoPoint) Scan(src interface{}) error { | ||
if b, ok := src.([]interface{}); ok && len(b) == 2 { | ||
var err error | ||
if gp.Lon, err = b[0].(json.Number).Float64(); err == nil { | ||
if gp.Lat, err = b[1].(json.Number).Float64(); err == nil { | ||
return nil | ||
} | ||
} | ||
return fmt.Errorf("failed to convert %v to GeoPoint : %v", src, err) | ||
} | ||
return fmt.Errorf("failed to convert %v to GeoPoint", src) | ||
} | ||
|
||
//Scan : Implements Scanner interface to populate a CrateArray from the incoming data | ||
func (arr *CrateArray) Scan(src interface{}) error { | ||
if srcArr, ok := src.([]interface{}); ok { | ||
*arr = make([]interface{}, len(srcArr)) | ||
for i, obj := range srcArr { | ||
(*arr)[i] = obj | ||
} | ||
return nil | ||
} | ||
return fmt.Errorf("failed to convert %v to CrateArray", src) | ||
} | ||
|
||
// Init a new "Connection" to a Crate Data Storage instance. | ||
// Note that the connection is not tested until the first query. | ||
func (c *CrateDriver) Open(crate_url string) (driver.Conn, error) { | ||
|
@@ -53,6 +95,179 @@ type endpointQuery struct { | |
Args []driver.Value `json:"args,omitempty"` | ||
} | ||
|
||
//encodeArray will encode the array represented by obj and store the result in buf | ||
//It returns an error if obj contains a map with keys other than strings | ||
func encodeArray(buf *bytes.Buffer, obj reflect.Value) error { | ||
m := obj.Len() | ||
if m == 0 { | ||
buf.WriteString("[]") | ||
return nil | ||
} | ||
buf.WriteByte('[') | ||
var k reflect.Kind | ||
ue := false | ||
for i := 0; i < m; i++ { | ||
v := obj.Index(i) | ||
if i > 0 { | ||
buf.WriteByte(',') | ||
if ue { | ||
v = v.Elem() | ||
} | ||
} else { | ||
k = v.Kind() | ||
if k == reflect.Interface { | ||
if v.IsNil() { | ||
continue | ||
} | ||
ue = true | ||
v = v.Elem() | ||
k = v.Type().Kind() | ||
} | ||
} | ||
switch k { | ||
case reflect.Float32, reflect.Float64: | ||
//Prevents rounding errors seen with floats like 0.01*41 which is 0.41000000000000003 ... | ||
//See https://floating-point-gui.de/ | ||
buf.WriteString(fmt.Sprintf("%0.6f", v.Float())) | ||
continue | ||
case reflect.Map: | ||
t := reflect.TypeOf(v) | ||
if v.Type().Key().Kind() != reflect.String { | ||
return fmt.Errorf("cannot encode map with keys of type %v", t) | ||
} | ||
if v.IsNil() { | ||
continue | ||
} | ||
if err := encodeMap(buf, v); err != nil { | ||
return err | ||
} | ||
continue | ||
case reflect.Slice, reflect.Array: | ||
if err := encodeArray(buf, v); err != nil { | ||
return err | ||
} | ||
continue | ||
case reflect.String: | ||
buf.WriteString(fmt.Sprintf("%s", strings.Replace(v.String(), "\"", "\\\"", -1))) | ||
continue | ||
} | ||
buf.WriteString(fmt.Sprintf("%v", v)) | ||
} | ||
buf.WriteByte(']') | ||
return nil | ||
} | ||
|
||
//encodeMap will encode the map stored in obj in json and store it as a string in the buffer buf | ||
//This is used because one cannot rely on the json encoder because it will format any float with decimal part of 0 as an int | ||
//If the first value to be stored in a new object's key is an int then all further values will be stored as int | ||
//and one will loose the decimal part of each value... Our encoder will ensure that a float with a 0 decimal part | ||
//is encoded as X.0 and not X | ||
//Note it will not encode maps with keys other than strings | ||
func encodeMap(buf *bytes.Buffer, obj reflect.Value) error { | ||
if obj.Len() == 0 { | ||
buf.WriteString("{}") | ||
return nil | ||
} | ||
buf.WriteByte('{') | ||
first := true | ||
for _, k := range obj.MapKeys() { | ||
if first { | ||
first = false | ||
} else { | ||
buf.WriteByte(',') | ||
} | ||
buf.WriteString(fmt.Sprintf("\"%s\":", k)) | ||
fm := "%v" | ||
v := obj.MapIndex(k).Elem() | ||
vk := v.Kind() | ||
if vk == reflect.Interface { | ||
v = v.Elem() | ||
vk = v.Type().Kind() | ||
} | ||
switch vk { | ||
case reflect.Float64, reflect.Float32: | ||
//Prevents rounding errors seen with floats like 0.01*41 which is 0.41000000000000003 ... | ||
//See https://floating-point-gui.de/ | ||
buf.WriteString(fmt.Sprintf("%0.6f", v.Float())) | ||
continue | ||
case reflect.Map: | ||
t := reflect.TypeOf(v) | ||
if v.Type().Key().Kind() != reflect.String { | ||
return fmt.Errorf("cannot encode map with keys of type %v", t) | ||
} | ||
if v.IsNil() { | ||
continue | ||
} | ||
if err := encodeMap(buf, v); err != nil { | ||
return err | ||
} | ||
continue | ||
case reflect.Slice, reflect.Array: | ||
if err := encodeArray(buf, v); err != nil { | ||
return err | ||
} | ||
continue | ||
case reflect.String: | ||
buf.WriteString(fmt.Sprintf("\"%s\"", strings.Replace(v.String(), "\"", "\\\"", -1))) | ||
continue | ||
} | ||
buf.WriteString(fmt.Sprintf(fm, v)) | ||
} | ||
buf.WriteByte('}') | ||
return nil | ||
} | ||
|
||
//MarshalJSON custom JSON marshal function to properly marshall maps containing floats with decimal part equals to 0 | ||
func (v crateMap) MarshalJSON() ([]byte, error) { | ||
res := bytes.Buffer{} | ||
if err := encodeMap(&res, reflect.ValueOf(v)); err != nil { | ||
return nil, err | ||
} | ||
//log.Printf("Result Map : %v", res.String()) | ||
return res.Bytes(), nil | ||
} | ||
|
||
//MarshalJSON custom JSON marshal function to properly handle arrays of floats with decimal part equals to 0 | ||
func (v CrateArray) MarshalJSON() ([]byte, error) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. receiver name v should be consistent with previous receiver name arr for CrateArray |
||
res := bytes.Buffer{} | ||
if err := encodeArray(&res, reflect.ValueOf(v)); err != nil { | ||
return nil, err | ||
} | ||
//log.Printf("Result Array : %v", res.String()) | ||
return res.Bytes(), nil | ||
} | ||
|
||
//CheckNamedValue Convert map, CrateArray, time & GeoPoint arguments to DB format. | ||
func (c *CrateDriver) CheckNamedValue(v *driver.NamedValue) error { | ||
if obj, ok := v.Value.(map[string]interface{}); ok { | ||
v.Value = crateMap(obj) | ||
return nil | ||
} else if ts, ok := v.Value.(time.Time); ok { | ||
if ts.IsZero() { | ||
v.Value = 0 | ||
} else { | ||
v.Value = ts.In(time.UTC).UnixNano() / 1000000 | ||
} | ||
return nil | ||
} else if gp, ok := v.Value.(GeoPoint); ok { | ||
//fmt.Printf("CheckNamedValue for Geopoint (%f,%f) \n", gp.Lon, gp.Lat) | ||
nGp := make([]float64, 2) | ||
nGp[0] = gp.Lon | ||
nGp[1] = gp.Lat | ||
v.Value = &nGp | ||
return nil | ||
} else if _, ok := v.Value.(CrateArray); ok { | ||
return nil | ||
} else if arr, ok := v.Value.([]interface{}); ok { | ||
v.Value = CrateArray(arr) | ||
return nil | ||
} | ||
/*else { | ||
fmt.Printf("CheckNamedValue for %v -> %v\n", v.Name, v.Value) | ||
}*/ | ||
return driver.ErrSkip | ||
} | ||
|
||
// Query the database using prepared statements. | ||
// Read: https://crate.io/docs/stable/sql/rest.html for more information about the returned response. | ||
// Example: crate.Query("SELECT * FROM sys.cluster LIMIT ?", 10) | ||
|
@@ -65,7 +280,7 @@ func (c *CrateDriver) query(stmt string, args []driver.Value) (*endpointResponse | |
Stmt: stmt, | ||
} | ||
|
||
if len(args) > 0 { | ||
if l := len(args); l > 0 { | ||
query.Args = args | ||
} | ||
|
||
|
@@ -74,7 +289,6 @@ func (c *CrateDriver) query(stmt string, args []driver.Value) (*endpointResponse | |
if err != nil { | ||
return nil, err | ||
} | ||
|
||
data := bytes.NewReader(buf) | ||
|
||
resp, err := http.Post(endpoint, "application/json", data) | ||
|
@@ -120,11 +334,21 @@ func (c *CrateDriver) Query(stmt string, args []driver.Value) (driver.Rows, erro | |
|
||
// Rows reader | ||
rows := &Rows{ | ||
columns: res.Cols, | ||
values: res.Rows, | ||
rowcount: res.Rowcount, | ||
columns: res.Cols, | ||
values: res.Rows, | ||
rowcount: res.Rowcount, | ||
isSpecial: make([]int64, len(res.Cols)), | ||
} | ||
tcount := len(res.ColumnTypes) | ||
for i := 0; i < tcount; i++ { | ||
if n, ok := res.ColumnTypes[i].(json.Number); ok { | ||
if t, err := n.Int64(); err == nil { | ||
if t == typeTimestamp || t == typeGeoPoint { | ||
rows.isSpecial[i] = t | ||
} | ||
} | ||
} | ||
} | ||
|
||
return rows, nil | ||
} | ||
|
||
|
@@ -159,10 +383,11 @@ func (r *Result) RowsAffected() (int64, error) { | |
|
||
// Rows reader | ||
type Rows struct { | ||
columns []string | ||
values [][]interface{} | ||
rowcount int64 | ||
pos int64 // index position on the values array | ||
columns []string | ||
values [][]interface{} | ||
isSpecial []int64 //Flags columns to convert to time.Time (type 11) | ||
rowcount int64 | ||
pos int64 // index position on the values array | ||
} | ||
|
||
// Row columns | ||
|
@@ -175,9 +400,37 @@ func (r *Rows) Next(dest []driver.Value) error { | |
if r.pos >= r.rowcount { | ||
return io.EOF | ||
} | ||
|
||
for i := range dest { | ||
dest[i] = r.values[r.pos][i] | ||
if (r.isSpecial[i] != 0) && (r.values[r.pos][i] != nil) { | ||
if r.isSpecial[i] == typeTimestamp { | ||
if val, ok := r.values[r.pos][i].(json.Number); ok { | ||
v, _ := val.Int64() | ||
sec := v / int64(1000) | ||
dest[i] = time.Unix(sec, (v-sec*int64(1000))*int64(1000000)) | ||
} else { | ||
return fmt.Errorf("failed to convert column %s=%T to time", r.columns[i], r.values[r.pos][i]) | ||
} | ||
} else if r.isSpecial[i] == typeGeoPoint { | ||
if psrc, ok := r.values[r.pos][i].([]interface{}); ok && (len(psrc) == 2) { | ||
var p GeoPoint | ||
var err error | ||
if p.Lon, err = psrc[0].(json.Number).Float64(); err != nil { | ||
return fmt.Errorf("failed to convert to latitude %v", psrc[0]) | ||
} | ||
if p.Lat, err = psrc[1].(json.Number).Float64(); err != nil { | ||
return fmt.Errorf("failed to convert to longitude elem %v", psrc[1]) | ||
} | ||
dest[i] = &p | ||
} else if len(psrc) == 0 { | ||
dest[i] = GeoPoint{} | ||
} else { | ||
return fmt.Errorf("failed to convert %v to GeoPoint", r.values[r.pos][i]) | ||
} | ||
} | ||
} else { | ||
dest[i] = r.values[r.pos][i] | ||
//fmt.Printf("Processing column %s : %+v / %s\n", r.columns[i], r.values[r.pos][i], reflect.TypeOf(r.values[r.pos][i])) | ||
} | ||
} | ||
|
||
r.pos++ | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type name will be used as crate.CrateArray by other packages, and that stutters; consider calling this Array