Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: handle compressed timestamp #438

Merged
merged 2 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,10 @@ func (d *Decoder) decodeMessage() error {
return err
}
header := b[0]
if (header & proto.MesgDefinitionMask) == proto.MesgDefinitionMask {

// NOTE: Compressed Timestamp Header Bit 5-6 is the local message type.
// Bit 6 overlap with MesgDefinitionMask; It's a message definition only if Bit 7 is zero.
if (header & (proto.MesgCompressedHeaderMask | proto.MesgDefinitionMask)) == proto.MesgDefinitionMask {
return d.decodeMessageDefinition(header)
}
return d.decodeMessageData(header)
Expand Down
96 changes: 96 additions & 0 deletions decoder/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,102 @@ func TestDecodeFileHeader(t *testing.T) {
}
}

func TestDecodeMessage(t *testing.T) {
now := time.Now()

tt := []struct {
name string
r io.Reader // must consist of mesgDef and mesg
timestampReference uint32
mesgDef *proto.MessageDefinition
mesg proto.Message
err error
}{
{
name: "header with compressed timestamp",
r: bytes.NewBuffer(append(
/* mesgDef */ []byte{67, 0, 0, 21, 0, 3, 3, 4, 134, 0, 1, 0, 1, 1, 0},
/* mesg */ []byte{0b11100000 | byte(datetime.ToUint32(now)&proto.CompressedTimeMask), 0, 0, 0, 0, 0, 0}...,
)),
timestampReference: datetime.ToUint32(now),
mesgDef: &proto.MessageDefinition{
Header: 67,
Reserved: 0,
Architecture: 0,
MesgNum: mesgnum.Event,
FieldDefinitions: []proto.FieldDefinition{
{Num: 3, Size: 4, BaseType: 134},
{Num: 0, Size: 1, BaseType: 0},
{Num: 1, Size: 1, BaseType: 0},
},
},
mesg: proto.Message{
Header: 0b11100000 | byte(datetime.ToUint32(now)&proto.CompressedTimeMask),
Num: mesgnum.Event,
Fields: []proto.Field{
factory.CreateField(mesgnum.Event, fieldnum.EventTimestamp).
WithValue(datetime.ToUint32(now)),
factory.CreateField(mesgnum.Event, fieldnum.EventData).
WithValue(uint32(0)),
factory.CreateField(mesgnum.Event, fieldnum.EventEvent).WithValue(typedef.EventTimer),
factory.CreateField(mesgnum.Event, fieldnum.EventEventType).WithValue(typedef.EventTypeStart),
},
},
},
}

for i, tc := range tt {
t.Run(fmt.Sprintf("[%d] %s", i, tc.name), func(t *testing.T) {
dec := New(tc.r)
dec.timestamp = tc.timestampReference
dec.lastTimeOffset = byte(tc.timestampReference & proto.CompressedTimeMask)
for i := 0; i < 2; i++ {
err := dec.decodeMessage()
if !errors.Is(err, tc.err) {
t.Fatalf("expected error: %v, got: %v", tc.err, err)
}
if err != nil {
return
}
}

var mesgDef *proto.MessageDefinition
for _, v := range dec.localMessageDefinitions {
if v != nil {
mesgDef = v
break
}
}
if diff := cmp.Diff(mesgDef, tc.mesgDef,
cmp.Transformer("MessageDefinition", func(m *proto.MessageDefinition) *proto.MessageDefinition {
if len(m.DeveloperFieldDefinitions) == 0 {
m.DeveloperFieldDefinitions = nil
}
return m
}),
); diff != "" {
t.Fatal(diff)
}

if len(dec.messages) == 0 {
t.Fatalf("no message is decoded")
}

if diff := cmp.Diff(dec.messages[0], tc.mesg,
cmp.Transformer("Message", func(m proto.Message) proto.Message {
if len(m.DeveloperFields) == 0 {
m.DeveloperFields = nil
}
return m
}),
cmp.AllowUnexported(proto.Value{}),
); diff != "" {
t.Fatal(diff)
}
})
}
}

func TestDecodeMessageDefinition(t *testing.T) {
fit, buf := createFitForTest()

Expand Down
2 changes: 1 addition & 1 deletion decoder/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (d *RawDecoder) Decode(r io.Reader, fn func(flag RawFlag, b []byte) error)
}

// 2. a. Decode Message Definition
if (d.BytesArray[0] & proto.MesgDefinitionMask) == proto.MesgDefinitionMask {
if (d.BytesArray[0] & (proto.MesgCompressedHeaderMask | proto.MesgDefinitionMask)) == proto.MesgDefinitionMask {
const fixedSize = uint16(6) // Header + Reserved + Architecture + MesgNum (2 bytes) + n Fields
nr, err = io.ReadFull(r, d.BytesArray[1:fixedSize])
n += int64(nr)
Expand Down
25 changes: 14 additions & 11 deletions encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) {
return fmt.Errorf("message validation failed: %w", err)
}

var compressed bool
if e.options.headerOption == headerOptionCompressedTimestamp {
if e.w == io.Discard {
// NOTE: Only for calculating data size (Early Check Strategy)
Expand All @@ -451,7 +452,7 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) {
}
}
prevLen := len(mesg.Fields)
e.compressTimestampIntoHeader(mesg)
compressed = e.compressTimestampIntoHeader(mesg)
if prevLen > len(mesg.Fields) {
defer func() { // Revert: put timestamp field back at original index
mesg.Fields = mesg.Fields[:prevLen]
Expand All @@ -460,7 +461,7 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) {
}()
}
} else {
e.compressTimestampIntoHeader(mesg)
compressed = e.compressTimestampIntoHeader(mesg)
}
}

Expand All @@ -471,9 +472,12 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) {

b, _ := mesgDef.MarshalAppend(e.buf[:0])
localMesgNum, isNewMesgDef := e.localMesgNumLRU.Put(b) // This might alloc memory since we need to copy the item.
if e.options.headerOption == headerOptionNormal {
b[0] = (b[0] &^ proto.LocalMesgNumMask) | localMesgNum // Update the message definition header.
mesg.Header = (mesg.Header &^ proto.LocalMesgNumMask) | localMesgNum

b[0] |= localMesgNum // Update the message definition header.
if compressed {
// TODO: implement compressed timestamp with multiple local messages type.
} else {
mesg.Header |= localMesgNum
}

var n int
Expand Down Expand Up @@ -502,28 +506,27 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) {
return nil
}

func (e *Encoder) compressTimestampIntoHeader(mesg *proto.Message) {
func (e *Encoder) compressTimestampIntoHeader(mesg *proto.Message) (ok bool) {
timestamp := mesg.FieldValueByNum(proto.FieldNumTimestamp).Uint32()
if timestamp == basetype.Uint32Invalid {
return // not supported
return false // not supported
}

if timestamp < uint32(typedef.DateTimeMin) {
return
return false
}

// The 5-bit time offset rolls over every 32 seconds, it is necessary that the difference
// between timestamp and timestamp reference be measured less than 32 seconds apart.
if (timestamp - e.timestampReference) > proto.CompressedTimeMask {
e.timestampReference = timestamp
return // Rollover event occurs, keep it as it is.
return false // Rollover event occurs, keep it as it is.
}

e.timestampReference = timestamp

timeOffset := byte(timestamp & proto.CompressedTimeMask)
mesg.Header |= proto.MesgCompressedHeaderMask | timeOffset
mesg.RemoveFieldByNum(proto.FieldNumTimestamp)
return true
}

func (e *Encoder) newMessageDefinition(mesg *proto.Message) *proto.MessageDefinition {
Expand Down
74 changes: 61 additions & 13 deletions encoder/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,9 +1178,12 @@ func TestCompressTimestampInHeader(t *testing.T) {
now := time.Now()
offset := byte(datetime.ToUint32(now) & proto.CompressedTimeMask)
tt := []struct {
name string
mesgs []proto.Message
headers []byte
name string
mesgs []proto.Message
headers []byte
lenFields []int
compresseds []bool
timestampReferences []uint32
}{
{
name: "compress timestamp in header happy flow",
Expand All @@ -1207,7 +1210,16 @@ func TestCompressTimestampInHeader(t *testing.T) {
proto.MesgNormalHeaderMask, // record: the message containing timestamp reference prior to the use of compressed header.
proto.MesgCompressedHeaderMask | (offset+1)&proto.CompressedTimeMask,
proto.MesgCompressedHeaderMask | (offset+2)&proto.CompressedTimeMask,
proto.MesgCompressedHeaderMask | (offset+32)&proto.CompressedTimeMask,
proto.MesgNormalHeaderMask,
},
lenFields: []int{2, 1, 0, 0, 1},
compresseds: []bool{false, false, true, true, false},
timestampReferences: []uint32{
0,
datetime.ToUint32(now),
datetime.ToUint32(now),
datetime.ToUint32(now),
datetime.ToUint32(now.Add(32 * time.Second)),
},
},
{
Expand All @@ -1233,6 +1245,14 @@ func TestCompressTimestampInHeader(t *testing.T) {
proto.MesgNormalHeaderMask, // record: roll over has occurred, the timestamp is used new timestamp reference.
proto.MesgCompressedHeaderMask | (offset+1)&proto.CompressedTimeMask,
},
lenFields: []int{2, 1, 1, 0},
compresseds: []bool{false, false, false, true},
timestampReferences: []uint32{
0,
datetime.ToUint32(now),
datetime.ToUint32(now.Add(32 * time.Second)),
datetime.ToUint32(now.Add(32 * time.Second)), // same as prev timestamp
},
},
{
name: "timestamp less than DateTimeMin",
Expand All @@ -1249,9 +1269,15 @@ func TestCompressTimestampInHeader(t *testing.T) {
proto.MesgNormalHeaderMask,
proto.MesgNormalHeaderMask,
},
lenFields: []int{2, 1},
compresseds: []bool{false, false},
timestampReferences: []uint32{
0,
0, // less than DateTimeMin do not change timestampReference
},
},
{
name: "timestamp wrong type not uint32 or typedef.DateTime",
name: "timestamp type typedef.DateTime",
mesgs: []proto.Message{
{Num: mesgnum.FileId, Fields: []proto.Field{
factory.CreateField(mesgnum.FileId, fieldnum.FileIdManufacturer).WithValue(typedef.ManufacturerGarmin),
Expand All @@ -1265,9 +1291,15 @@ func TestCompressTimestampInHeader(t *testing.T) {
proto.MesgNormalHeaderMask,
proto.MesgNormalHeaderMask,
},
lenFields: []int{2, 1},
compresseds: []bool{false, false},
timestampReferences: []uint32{
0,
datetime.ToUint32(now), // typedef.Datetime will be converted into uint32 in proto.Value
},
},
{
name: "timestamp wrong type not uint32 or typedef.DateTime",
name: "timestamp wrong type not uint32 or typedef.DateTime: time.Time",
mesgs: []proto.Message{
{Num: mesgnum.FileId, Fields: []proto.Field{
factory.CreateField(mesgnum.FileId, fieldnum.FileIdManufacturer).WithValue(typedef.ManufacturerGarmin),
Expand All @@ -1281,19 +1313,35 @@ func TestCompressTimestampInHeader(t *testing.T) {
proto.MesgNormalHeaderMask,
proto.MesgNormalHeaderMask,
},
lenFields: []int{2, 1},
compresseds: []bool{false, false},
timestampReferences: []uint32{
0,
0,
},
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
for i, tc := range tt {
t.Run(fmt.Sprintf("[%d] %s", i, tc.name), func(t *testing.T) {
enc := New(nil)
for i := range tc.mesgs {
enc.compressTimestampIntoHeader(&tc.mesgs[i])
for j := range tc.mesgs {
compressed := enc.compressTimestampIntoHeader(&tc.mesgs[j])
if compressed != tc.compresseds[j] {
t.Errorf("index: %d: expected compressed: %t, got: %t", j, tc.compresseds[j], compressed)
}
if enc.timestampReference != tc.timestampReferences[j] {
t.Errorf("index: %d: expected timestampReference: %d, got: %d",
j, tc.timestampReferences[j], enc.timestampReference)
}
}
// Now that all message have been processed let's check the header
for i := range tc.mesgs {
if diff := cmp.Diff(tc.mesgs[i].Header, tc.headers[i]); diff != "" {
t.Errorf("index: %d: %s", i, diff)
for j := range tc.mesgs {
if diff := cmp.Diff(tc.mesgs[j].Header, tc.headers[j]); diff != "" {
t.Errorf("index: %d: %s", j, diff)
}
if l := len(tc.mesgs[j].Fields); l != tc.lenFields[j] {
t.Errorf("index: %d: expected len fields: %d, got: %d", j, l, tc.lenFields[j])
}
}
})
Expand Down