Skip to content

Commit

Permalink
Support new data types (#107)
Browse files Browse the repository at this point in the history
  • Loading branch information
HTHou authored Sep 27, 2024
1 parent bb812b8 commit 4341368
Show file tree
Hide file tree
Showing 12 changed files with 551 additions and 172 deletions.
6 changes: 6 additions & 0 deletions client/field.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package client

import "time"

type Field struct {
dataType TSDataType
name string
Expand Down Expand Up @@ -89,6 +91,10 @@ func (f *Field) GetText() string {
return float64ToString(f.value.(float64))
case string:
return f.value.(string)
case []byte:
return bytesToHexString(f.value.([]byte))
case time.Time:
return f.value.(time.Time).Format("2006-01-02")
}
return ""
}
89 changes: 89 additions & 0 deletions client/field_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package client
import (
"reflect"
"testing"
"time"
)

func TestField_IsNull(t *testing.T) {
Expand Down Expand Up @@ -126,6 +127,38 @@ func TestField_GetDataType(t *testing.T) {
value: nil,
},
want: TEXT,
}, {
name: "GetDataType-STRING",
fields: fields{
dataType: STRING,
name: "",
value: nil,
},
want: STRING,
}, {
name: "GetDataType-BLOB",
fields: fields{
dataType: BLOB,
name: "",
value: nil,
},
want: BLOB,
}, {
name: "GetDataType-TIMESTAMP",
fields: fields{
dataType: TIMESTAMP,
name: "",
value: nil,
},
want: TIMESTAMP,
}, {
name: "GetDataType-DATE",
fields: fields{
dataType: DATE,
name: "",
value: nil,
},
want: DATE,
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -201,6 +234,38 @@ func TestField_GetValue(t *testing.T) {
value: "TEXT",
},
want: "TEXT",
}, {
name: "GetValue-STRING",
fields: fields{
dataType: STRING,
name: "",
value: "STRING",
},
want: "STRING",
}, {
name: "GetValue-BLOB",
fields: fields{
dataType: BLOB,
name: "",
value: []byte("BLOB"),
},
want: []byte("BLOB"),
}, {
name: "GetValue-TIMESTAMP",
fields: fields{
dataType: TIMESTAMP,
name: "",
value: int64(65535),
},
want: int64(65535),
}, {
name: "GetValue-DATE",
fields: fields{
dataType: DATE,
name: "",
value: time.Date(2024, time.Month(4), 1, 0, 0, 0, 0, time.UTC),
},
want: time.Date(2024, time.Month(4), 1, 0, 0, 0, 0, time.UTC),
},
}
for _, tt := range tests {
Expand Down Expand Up @@ -408,6 +473,30 @@ func TestField_GetText(t *testing.T) {
value: int32(1),
},
want: "1",
}, {
name: "GetText-04",
fields: fields{
dataType: STRING,
name: "",
value: "STRING",
},
want: "STRING",
}, {
name: "GetText-05",
fields: fields{
dataType: BLOB,
name: "",
value: []byte("BLOB"),
},
want: "0x424c4f42",
}, {
name: "GetText-06",
fields: fields{
dataType: DATE,
name: "",
value: time.Date(2024, time.Month(4), 1, 0, 0, 0, 0, time.UTC),
},
want: "2024-04-01",
},
}
for _, tt := range tests {
Expand Down
54 changes: 27 additions & 27 deletions client/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,42 +26,42 @@ type TSEncoding uint8
type TSCompressionType uint8

const (
UNKNOWN TSDataType = -1
BOOLEAN TSDataType = 0
INT32 TSDataType = 1
INT64 TSDataType = 2
FLOAT TSDataType = 3
DOUBLE TSDataType = 4
TEXT TSDataType = 5
UNKNOWN TSDataType = -1
BOOLEAN TSDataType = 0
INT32 TSDataType = 1
INT64 TSDataType = 2
FLOAT TSDataType = 3
DOUBLE TSDataType = 4
TEXT TSDataType = 5
TIMESTAMP TSDataType = 8
DATE TSDataType = 9
BLOB TSDataType = 10
STRING TSDataType = 11
)

const (
PLAIN TSEncoding = 0
PLAIN_DICTIONARY TSEncoding = 1
RLE TSEncoding = 2
DIFF TSEncoding = 3
TS_2DIFF TSEncoding = 4
BITMAP TSEncoding = 5
GORILLA_V1 TSEncoding = 6
REGULAR TSEncoding = 7
GORILLA TSEncoding = 8
ZIGZAG TSEncoding = 9
FREQ TSEncoding = 10
CHIMP TSEncoding = 11
SPRINTZ TSEncoding = 12
RLBE TSEncoding = 13
PLAIN TSEncoding = 0
DICTIONARY TSEncoding = 1
RLE TSEncoding = 2
DIFF TSEncoding = 3
TS_2DIFF TSEncoding = 4
BITMAP TSEncoding = 5
GORILLA_V1 TSEncoding = 6
REGULAR TSEncoding = 7
GORILLA TSEncoding = 8
ZIGZAG TSEncoding = 9
FREQ TSEncoding = 10
CHIMP TSEncoding = 11
SPRINTZ TSEncoding = 12
RLBE TSEncoding = 13
)

const (
UNCOMPRESSED TSCompressionType = 0
SNAPPY TSCompressionType = 1
GZIP TSCompressionType = 2
LZO TSCompressionType = 3
SDT TSCompressionType = 4
PAA TSCompressionType = 5
PLA TSCompressionType = 6
LZ4 TSCompressionType = 7
ZSTD TSCompressionType = 8
ZSTD TSCompressionType = 8
LZMA2 TSCompressionType = 9
)

Expand Down Expand Up @@ -201,4 +201,4 @@ const (
CqAlreadyActive int32 = 1401
CqAlreadyExist int32 = 1402
CqUpdateLastExecTimeError int32 = 1403
)
)
83 changes: 64 additions & 19 deletions client/rpcdataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ const (
var (
errClosed error = errors.New("DataSet is Closed")
tsTypeMap map[string]TSDataType = map[string]TSDataType{
"BOOLEAN": BOOLEAN,
"INT32": INT32,
"INT64": INT64,
"FLOAT": FLOAT,
"DOUBLE": DOUBLE,
"TEXT": TEXT,
"BOOLEAN": BOOLEAN,
"INT32": INT32,
"INT64": INT64,
"FLOAT": FLOAT,
"DOUBLE": DOUBLE,
"TEXT": TEXT,
"TIMESTAMP": TIMESTAMP,
"DATE": DATE,
"BLOB": BLOB,
"STRING": STRING,
}
)

Expand Down Expand Up @@ -116,10 +120,10 @@ func (s *IoTDBRpcDataSet) constructOneRow() error {
case BOOLEAN:
s.values[i] = valueBuffer[:1]
s.queryDataSet.ValueList[i] = valueBuffer[1:]
case INT32:
case INT32, DATE:
s.values[i] = valueBuffer[:4]
s.queryDataSet.ValueList[i] = valueBuffer[4:]
case INT64:
case INT64, TIMESTAMP:
s.values[i] = valueBuffer[:8]
s.queryDataSet.ValueList[i] = valueBuffer[8:]
case FLOAT:
Expand All @@ -128,7 +132,7 @@ func (s *IoTDBRpcDataSet) constructOneRow() error {
case DOUBLE:
s.values[i] = valueBuffer[:8]
s.queryDataSet.ValueList[i] = valueBuffer[8:]
case TEXT:
case TEXT, BLOB, STRING:
length := bytesToInt32(valueBuffer[:4])
s.values[i] = valueBuffer[4 : 4+length]
s.queryDataSet.ValueList[i] = valueBuffer[4+length:]
Expand Down Expand Up @@ -178,16 +182,24 @@ func (s *IoTDBRpcDataSet) getString(columnIndex int, dataType TSDataType) string
return "false"
case INT32:
return int32ToString(bytesToInt32(valueBytes))
case INT64:
case INT64, TIMESTAMP:
return int64ToString(bytesToInt64(valueBytes))
case FLOAT:
bits := binary.BigEndian.Uint32(valueBytes)
return float32ToString(math.Float32frombits(bits))
case DOUBLE:
bits := binary.BigEndian.Uint64(valueBytes)
return float64ToString(math.Float64frombits(bits))
case TEXT:
case TEXT, STRING:
return string(valueBytes)
case BLOB:
return bytesToHexString(valueBytes)
case DATE:
date, err := bytesToDate(valueBytes)
if err != nil {
return ""
}
return date.Format("2006-01-02")
default:
return ""
}
Expand All @@ -206,19 +218,27 @@ func (s *IoTDBRpcDataSet) getValue(columnName string) interface{} {
valueBytes := s.values[columnIndex]
switch dataType {
case BOOLEAN:
return bool(valueBytes[0] != 0)
return valueBytes[0] != 0
case INT32:
return bytesToInt32(valueBytes)
case INT64:
case INT64, TIMESTAMP:
return bytesToInt64(valueBytes)
case FLOAT:
bits := binary.BigEndian.Uint32(valueBytes)
return math.Float32frombits(bits)
case DOUBLE:
bits := binary.BigEndian.Uint64(valueBytes)
return math.Float64frombits(bits)
case TEXT:
case TEXT, STRING:
return string(valueBytes)
case BLOB:
return valueBytes
case DATE:
date, err := bytesToDate(valueBytes)
if err != nil {
return nil
}
return date
default:
return nil
}
Expand Down Expand Up @@ -281,7 +301,7 @@ func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error {
case BOOLEAN:
switch t := d.(type) {
case *bool:
*t = bool(valueBytes[0] != 0)
*t = valueBytes[0] != 0
case *string:
if valueBytes[0] != 0 {
*t = "true"
Expand All @@ -301,7 +321,7 @@ func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error {
default:
return fmt.Errorf("dest[%d] types must be *int32 or *string", i)
}
case INT64:
case INT64, TIMESTAMP:
switch t := d.(type) {
case *int64:
*t = bytesToInt64(valueBytes)
Expand Down Expand Up @@ -332,12 +352,37 @@ func (s *IoTDBRpcDataSet) scan(dest ...interface{}) error {
default:
return fmt.Errorf("dest[%d] types must be *float64 or *string", i)
}
case TEXT:
case TEXT, STRING:
switch t := d.(type) {
case *[]byte:
*t = valueBytes
case *string:
*t = string(valueBytes)
default:
return fmt.Errorf("dest[%d] types must be *string", i)
return fmt.Errorf("dest[%d] types must be *[]byte or *string", i)
}
case BLOB:
switch t := d.(type) {
case *[]byte:
*t = valueBytes
case *string:
*t = bytesToHexString(valueBytes)
default:
return fmt.Errorf("dest[%d] types must be *[]byte or *string", i)
}
case DATE:
switch t := d.(type) {
case *time.Time:
*t, _ = bytesToDate(valueBytes)
case *string:
*t = int32ToString(bytesToInt32(valueBytes))
date, err := bytesToDate(valueBytes)
if err != nil {
*t = ""
}
*t = date.Format("2006-01-02")
default:
return fmt.Errorf("dest[%d] types must be *time.Time or *string", i)
}
default:
return nil
Expand Down Expand Up @@ -412,7 +457,7 @@ func (s *IoTDBRpcDataSet) hasCachedResults() bool {
if s.closed {
return false
}
return (s.queryDataSet != nil && len(s.queryDataSet.Time) > 0)
return s.queryDataSet != nil && len(s.queryDataSet.Time) > 0
}

func (s *IoTDBRpcDataSet) next() (bool, error) {
Expand Down
Loading

0 comments on commit 4341368

Please sign in to comment.