Skip to content

Commit

Permalink
fix: handle compressed timestamp (#438)
Browse files Browse the repository at this point in the history
* fix: decoder on check message's header by mask

* fix: encoder on compressing timestamp into header
  • Loading branch information
muktihari authored Sep 16, 2024
1 parent 6369cf5 commit c5bf0c3
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 26 deletions.
5 changes: 4 additions & 1 deletion decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,10 @@ func (d *Decoder) decodeMessage() error {
return err
}
header := b[0]
if (header & proto.MesgDefinitionMask) == proto.MesgDefinitionMask {

// NOTE: Compressed Timestamp Header Bit 5-6 is the local message type.
// Bit 6 overlap with MesgDefinitionMask; It's a message definition only if Bit 7 is zero.
if (header & (proto.MesgCompressedHeaderMask | proto.MesgDefinitionMask)) == proto.MesgDefinitionMask {
return d.decodeMessageDefinition(header)
}
return d.decodeMessageData(header)
Expand Down
96 changes: 96 additions & 0 deletions decoder/decoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,102 @@ func TestDecodeFileHeader(t *testing.T) {
}
}

func TestDecodeMessage(t *testing.T) {
now := time.Now()

tt := []struct {
name string
r io.Reader // must consist of mesgDef and mesg
timestampReference uint32
mesgDef *proto.MessageDefinition
mesg proto.Message
err error
}{
{
name: "header with compressed timestamp",
r: bytes.NewBuffer(append(
/* mesgDef */ []byte{67, 0, 0, 21, 0, 3, 3, 4, 134, 0, 1, 0, 1, 1, 0},
/* mesg */ []byte{0b11100000 | byte(datetime.ToUint32(now)&proto.CompressedTimeMask), 0, 0, 0, 0, 0, 0}...,
)),
timestampReference: datetime.ToUint32(now),
mesgDef: &proto.MessageDefinition{
Header: 67,
Reserved: 0,
Architecture: 0,
MesgNum: mesgnum.Event,
FieldDefinitions: []proto.FieldDefinition{
{Num: 3, Size: 4, BaseType: 134},
{Num: 0, Size: 1, BaseType: 0},
{Num: 1, Size: 1, BaseType: 0},
},
},
mesg: proto.Message{
Header: 0b11100000 | byte(datetime.ToUint32(now)&proto.CompressedTimeMask),
Num: mesgnum.Event,
Fields: []proto.Field{
factory.CreateField(mesgnum.Event, fieldnum.EventTimestamp).
WithValue(datetime.ToUint32(now)),
factory.CreateField(mesgnum.Event, fieldnum.EventData).
WithValue(uint32(0)),
factory.CreateField(mesgnum.Event, fieldnum.EventEvent).WithValue(typedef.EventTimer),
factory.CreateField(mesgnum.Event, fieldnum.EventEventType).WithValue(typedef.EventTypeStart),
},
},
},
}

for i, tc := range tt {
t.Run(fmt.Sprintf("[%d] %s", i, tc.name), func(t *testing.T) {
dec := New(tc.r)
dec.timestamp = tc.timestampReference
dec.lastTimeOffset = byte(tc.timestampReference & proto.CompressedTimeMask)
for i := 0; i < 2; i++ {
err := dec.decodeMessage()
if !errors.Is(err, tc.err) {
t.Fatalf("expected error: %v, got: %v", tc.err, err)
}
if err != nil {
return
}
}

var mesgDef *proto.MessageDefinition
for _, v := range dec.localMessageDefinitions {
if v != nil {
mesgDef = v
break
}
}
if diff := cmp.Diff(mesgDef, tc.mesgDef,
cmp.Transformer("MessageDefinition", func(m *proto.MessageDefinition) *proto.MessageDefinition {
if len(m.DeveloperFieldDefinitions) == 0 {
m.DeveloperFieldDefinitions = nil
}
return m
}),
); diff != "" {
t.Fatal(diff)
}

if len(dec.messages) == 0 {
t.Fatalf("no message is decoded")
}

if diff := cmp.Diff(dec.messages[0], tc.mesg,
cmp.Transformer("Message", func(m proto.Message) proto.Message {
if len(m.DeveloperFields) == 0 {
m.DeveloperFields = nil
}
return m
}),
cmp.AllowUnexported(proto.Value{}),
); diff != "" {
t.Fatal(diff)
}
})
}
}

func TestDecodeMessageDefinition(t *testing.T) {
fit, buf := createFitForTest()

Expand Down
2 changes: 1 addition & 1 deletion decoder/raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (d *RawDecoder) Decode(r io.Reader, fn func(flag RawFlag, b []byte) error)
}

// 2. a. Decode Message Definition
if (d.BytesArray[0] & proto.MesgDefinitionMask) == proto.MesgDefinitionMask {
if (d.BytesArray[0] & (proto.MesgCompressedHeaderMask | proto.MesgDefinitionMask)) == proto.MesgDefinitionMask {
const fixedSize = uint16(6) // Header + Reserved + Architecture + MesgNum (2 bytes) + n Fields
nr, err = io.ReadFull(r, d.BytesArray[1:fixedSize])
n += int64(nr)
Expand Down
25 changes: 14 additions & 11 deletions encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) {
return fmt.Errorf("message validation failed: %w", err)
}

var compressed bool
if e.options.headerOption == headerOptionCompressedTimestamp {
if e.w == io.Discard {
// NOTE: Only for calculating data size (Early Check Strategy)
Expand All @@ -451,7 +452,7 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) {
}
}
prevLen := len(mesg.Fields)
e.compressTimestampIntoHeader(mesg)
compressed = e.compressTimestampIntoHeader(mesg)
if prevLen > len(mesg.Fields) {
defer func() { // Revert: put timestamp field back at original index
mesg.Fields = mesg.Fields[:prevLen]
Expand All @@ -460,7 +461,7 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) {
}()
}
} else {
e.compressTimestampIntoHeader(mesg)
compressed = e.compressTimestampIntoHeader(mesg)
}
}

Expand All @@ -471,9 +472,12 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) {

b, _ := mesgDef.MarshalAppend(e.buf[:0])
localMesgNum, isNewMesgDef := e.localMesgNumLRU.Put(b) // This might alloc memory since we need to copy the item.
if e.options.headerOption == headerOptionNormal {
b[0] = (b[0] &^ proto.LocalMesgNumMask) | localMesgNum // Update the message definition header.
mesg.Header = (mesg.Header &^ proto.LocalMesgNumMask) | localMesgNum

b[0] |= localMesgNum // Update the message definition header.
if compressed {
// TODO: implement compressed timestamp with multiple local messages type.
} else {
mesg.Header |= localMesgNum
}

var n int
Expand Down Expand Up @@ -502,28 +506,27 @@ func (e *Encoder) encodeMessage(mesg *proto.Message) (err error) {
return nil
}

func (e *Encoder) compressTimestampIntoHeader(mesg *proto.Message) {
func (e *Encoder) compressTimestampIntoHeader(mesg *proto.Message) (ok bool) {
timestamp := mesg.FieldValueByNum(proto.FieldNumTimestamp).Uint32()
if timestamp == basetype.Uint32Invalid {
return // not supported
return false // not supported
}

if timestamp < uint32(typedef.DateTimeMin) {
return
return false
}

// The 5-bit time offset rolls over every 32 seconds, it is necessary that the difference
// between timestamp and timestamp reference be measured less than 32 seconds apart.
if (timestamp - e.timestampReference) > proto.CompressedTimeMask {
e.timestampReference = timestamp
return // Rollover event occurs, keep it as it is.
return false // Rollover event occurs, keep it as it is.
}

e.timestampReference = timestamp

timeOffset := byte(timestamp & proto.CompressedTimeMask)
mesg.Header |= proto.MesgCompressedHeaderMask | timeOffset
mesg.RemoveFieldByNum(proto.FieldNumTimestamp)
return true
}

func (e *Encoder) newMessageDefinition(mesg *proto.Message) *proto.MessageDefinition {
Expand Down
74 changes: 61 additions & 13 deletions encoder/encoder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1178,9 +1178,12 @@ func TestCompressTimestampInHeader(t *testing.T) {
now := time.Now()
offset := byte(datetime.ToUint32(now) & proto.CompressedTimeMask)
tt := []struct {
name string
mesgs []proto.Message
headers []byte
name string
mesgs []proto.Message
headers []byte
lenFields []int
compresseds []bool
timestampReferences []uint32
}{
{
name: "compress timestamp in header happy flow",
Expand All @@ -1207,7 +1210,16 @@ func TestCompressTimestampInHeader(t *testing.T) {
proto.MesgNormalHeaderMask, // record: the message containing timestamp reference prior to the use of compressed header.
proto.MesgCompressedHeaderMask | (offset+1)&proto.CompressedTimeMask,
proto.MesgCompressedHeaderMask | (offset+2)&proto.CompressedTimeMask,
proto.MesgCompressedHeaderMask | (offset+32)&proto.CompressedTimeMask,
proto.MesgNormalHeaderMask,
},
lenFields: []int{2, 1, 0, 0, 1},
compresseds: []bool{false, false, true, true, false},
timestampReferences: []uint32{
0,
datetime.ToUint32(now),
datetime.ToUint32(now),
datetime.ToUint32(now),
datetime.ToUint32(now.Add(32 * time.Second)),
},
},
{
Expand All @@ -1233,6 +1245,14 @@ func TestCompressTimestampInHeader(t *testing.T) {
proto.MesgNormalHeaderMask, // record: roll over has occurred, the timestamp is used new timestamp reference.
proto.MesgCompressedHeaderMask | (offset+1)&proto.CompressedTimeMask,
},
lenFields: []int{2, 1, 1, 0},
compresseds: []bool{false, false, false, true},
timestampReferences: []uint32{
0,
datetime.ToUint32(now),
datetime.ToUint32(now.Add(32 * time.Second)),
datetime.ToUint32(now.Add(32 * time.Second)), // same as prev timestamp
},
},
{
name: "timestamp less than DateTimeMin",
Expand All @@ -1249,9 +1269,15 @@ func TestCompressTimestampInHeader(t *testing.T) {
proto.MesgNormalHeaderMask,
proto.MesgNormalHeaderMask,
},
lenFields: []int{2, 1},
compresseds: []bool{false, false},
timestampReferences: []uint32{
0,
0, // less than DateTimeMin do not change timestampReference
},
},
{
name: "timestamp wrong type not uint32 or typedef.DateTime",
name: "timestamp type typedef.DateTime",
mesgs: []proto.Message{
{Num: mesgnum.FileId, Fields: []proto.Field{
factory.CreateField(mesgnum.FileId, fieldnum.FileIdManufacturer).WithValue(typedef.ManufacturerGarmin),
Expand All @@ -1265,9 +1291,15 @@ func TestCompressTimestampInHeader(t *testing.T) {
proto.MesgNormalHeaderMask,
proto.MesgNormalHeaderMask,
},
lenFields: []int{2, 1},
compresseds: []bool{false, false},
timestampReferences: []uint32{
0,
datetime.ToUint32(now), // typedef.Datetime will be converted into uint32 in proto.Value
},
},
{
name: "timestamp wrong type not uint32 or typedef.DateTime",
name: "timestamp wrong type not uint32 or typedef.DateTime: time.Time",
mesgs: []proto.Message{
{Num: mesgnum.FileId, Fields: []proto.Field{
factory.CreateField(mesgnum.FileId, fieldnum.FileIdManufacturer).WithValue(typedef.ManufacturerGarmin),
Expand All @@ -1281,19 +1313,35 @@ func TestCompressTimestampInHeader(t *testing.T) {
proto.MesgNormalHeaderMask,
proto.MesgNormalHeaderMask,
},
lenFields: []int{2, 1},
compresseds: []bool{false, false},
timestampReferences: []uint32{
0,
0,
},
},
}

for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
for i, tc := range tt {
t.Run(fmt.Sprintf("[%d] %s", i, tc.name), func(t *testing.T) {
enc := New(nil)
for i := range tc.mesgs {
enc.compressTimestampIntoHeader(&tc.mesgs[i])
for j := range tc.mesgs {
compressed := enc.compressTimestampIntoHeader(&tc.mesgs[j])
if compressed != tc.compresseds[j] {
t.Errorf("index: %d: expected compressed: %t, got: %t", j, tc.compresseds[j], compressed)
}
if enc.timestampReference != tc.timestampReferences[j] {
t.Errorf("index: %d: expected timestampReference: %d, got: %d",
j, tc.timestampReferences[j], enc.timestampReference)
}
}
// Now that all message have been processed let's check the header
for i := range tc.mesgs {
if diff := cmp.Diff(tc.mesgs[i].Header, tc.headers[i]); diff != "" {
t.Errorf("index: %d: %s", i, diff)
for j := range tc.mesgs {
if diff := cmp.Diff(tc.mesgs[j].Header, tc.headers[j]); diff != "" {
t.Errorf("index: %d: %s", j, diff)
}
if l := len(tc.mesgs[j].Fields); l != tc.lenFields[j] {
t.Errorf("index: %d: expected len fields: %d, got: %d", j, l, tc.lenFields[j])
}
}
})
Expand Down

0 comments on commit c5bf0c3

Please sign in to comment.