Skip to content

Commit

Permalink
perf: simplify encoder's lru (#65)
Browse files Browse the repository at this point in the history
  • Loading branch information
muktihari committed Dec 25, 2023
1 parent ec124c4 commit 9774837
Show file tree
Hide file tree
Showing 4 changed files with 410 additions and 299 deletions.
139 changes: 26 additions & 113 deletions encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package encoder

import (
"bytes"
"container/list"
"context"
"encoding/binary"
"errors"
Expand All @@ -20,7 +19,6 @@ import (
"github.com/muktihari/fit/profile/untyped/fieldnum"
"github.com/muktihari/fit/profile/untyped/mesgnum"
"github.com/muktihari/fit/proto"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -70,15 +68,7 @@ type Encoder struct {
dataSize uint32 // Data size of messages in bytes for a single Fit file.
crc16 hash.Hash16 // Calculate the CRC-16 checksum for ensuring header and message integrity.

// Tracks whether a message definition has been previously written.
// The list size is determined by the number of desirable multiple local message types specified by WithNormalHeader(n).
// Default size is 1 to accommodate local message type zero (0).
localMesgDefinitions *list.List

// Tracks the Least Recently Used (LRU) element in the localMesgDefinitions list.
// Each element in this list is a pointer to an element in localMesgDefinitions.
// This helps determine which element should be removed when localMesgDefinitions is full.
localMesgDefinitionsLRU *list.List
localMesgNumLRU *lru // LRU cache for writing local message definition

// This timestamp reference is retrieved from the first message containing a valid timestamp field.
// It is initialized only if the 'compressedTimestamp' option is applied and reset when decoding is completed.
Expand Down Expand Up @@ -185,14 +175,18 @@ func New(w io.Writer, opts ...Option) *Encoder {
opt.apply(options)
}

var lruCapacity byte = 1
if options.headerOption == headerOptionNormal && options.multipleLocalMessageType > 0 {
lruCapacity = options.multipleLocalMessageType + 1
}

return &Encoder{
w: w,
options: options,
crc16: crc16.New(crc16.MakeFitTable()),
protocolValidator: proto.NewValidator(options.protocolVersion),
messageValidator: options.messageValidator,
localMesgDefinitions: list.New(),
localMesgDefinitionsLRU: list.New(),
w: w,
options: options,
crc16: crc16.New(crc16.MakeFitTable()),
protocolValidator: proto.NewValidator(options.protocolVersion),
messageValidator: options.messageValidator,
localMesgNumLRU: newLRU(lruCapacity),
defaultFileHeader: proto.FileHeader{
Size: proto.DefaultFileHeaderSize,
ProtocolVersion: byte(options.protocolVersion),
Expand Down Expand Up @@ -356,17 +350,14 @@ func (e *Encoder) encodeHeader(header *proto.FileHeader) error {
header.ProtocolVersion = byte(e.options.protocolVersion)

b, _ := header.MarshalBinary()
if header.Size == 12 {
if header.Size < 14 {
n, err := e.w.Write(b[:header.Size])
e.n += int64(n)
return err
}

_, _ = e.crc16.Write(b[:header.Size-2])

var crc = make([]byte, 2)
binary.LittleEndian.PutUint16(crc, e.crc16.Sum16())
copy(b[header.Size-2:], crc)
binary.LittleEndian.PutUint16(b[header.Size-2:], e.crc16.Sum16())
header.CRC = e.crc16.Sum16()

e.crc16.Reset() // this hash will be re-used for calculating data integrity.
Expand Down Expand Up @@ -405,9 +396,7 @@ func (e *Encoder) encodeMessage(w io.Writer, mesg *proto.Message) error {
return fmt.Errorf("message validation failed: %w", err)
}

var b []byte
var localMesgNum byte
var writeable bool
var isNewMesgDef bool
e.buf.Reset()

// Writing strategy based on the selected header option:
Expand All @@ -419,31 +408,27 @@ func (e *Encoder) encodeMessage(w io.Writer, mesg *proto.Message) error {
}

_, _ = mesgDef.WriteTo(e.buf)
b = e.buf.Bytes() // Should be copied before put on LRU.
if e.options.multipleLocalMessageType == 0 {
writeable = e.isMesgDefinitionWriteable(b) // Local Message Type Zero
} else {
localMesgNum, writeable = e.redefineLocalMesgNum(b) // Multiple Local Message Type
b[0] = (b[0] &^ proto.LocalMesgNumMask) | localMesgNum // localMesgNum redefined, update the message definition header.
}
b := e.buf.Bytes()

var localMesgNum byte
localMesgNum, isNewMesgDef = e.localMesgNumLRU.Put(b)
b[0] = (b[0] &^ proto.LocalMesgNumMask) | localMesgNum // update the message definition header.

mesg.Header = (mesg.Header &^ proto.LocalMesgNumMask) | localMesgNum
case headerOptionCompressedTimestamp:
e.compressTimestampIntoHeader(mesg)

// Timestamp field might be omitted, update the message definition to match the resulting message.
mesgDef := proto.CreateMessageDefinition(mesg)
if err := e.protocolValidator.ValidateMessageDefinition(&mesgDef); err != nil {
return err
}
_, _ = mesgDef.WriteTo(e.buf)
b = e.buf.Bytes() // Should be copied before putting it to LRU.

writeable = e.isMesgDefinitionWriteable(b)
_, _ = mesgDef.WriteTo(e.buf)
_, isNewMesgDef = e.localMesgNumLRU.Put(e.buf.Bytes())
}

if writeable {
if err := e.writeMessage(w, b); err != nil {
if isNewMesgDef {
if err := e.writeMessage(w, e.buf.Bytes()); err != nil {
return fmt.Errorf("write message definition failed: %w", err)
}
}
Expand All @@ -453,69 +438,12 @@ func (e *Encoder) encodeMessage(w io.Writer, mesg *proto.Message) error {
if err != nil {
return fmt.Errorf("marshal failed: %w", err)
}
b = e.buf.Bytes()
if err := e.writeMessage(w, b); err != nil {
if err := e.writeMessage(w, e.buf.Bytes()); err != nil {
return fmt.Errorf("write message failed: %w", err)
}
return nil
}

func (e *Encoder) isMesgDefinitionWriteable(b []byte) bool {
if e.localMesgDefinitions.Len() == 0 {
e.localMesgDefinitions.PushFront(slices.Clone(b))
return true
}

if isEqual(e.localMesgDefinitions.Front().Value.([]byte), b) {
return false
}

e.localMesgDefinitions.Remove(e.localMesgDefinitions.Front())
e.localMesgDefinitions.PushFront(slices.Clone(b))

return true
}

func (e *Encoder) redefineLocalMesgNum(b []byte) (newLocalMesgNum byte, writeable bool) {
max := e.options.multipleLocalMessageType
var index byte = 0
for elem := e.localMesgDefinitions.Front(); elem != nil; elem = elem.Next() {
val := elem.Value.([]byte)
if isEqual(val[1:], b[1:]) { // ignore header since localMesgNum in header is everchanging.
e.localMesgDefinitionsLRU.MoveToBack(elem) // Recently used items is moved to the back.
return index, false
}
index++
}

b = slices.Clone(b)

index = byte(e.localMesgDefinitions.Len())
if byte(e.localMesgDefinitions.Len()) <= max {
e.localMesgDefinitionsLRU.PushBack(e.localMesgDefinitions.PushBack(b))
return index, true
}

index = 0
for elem := e.localMesgDefinitions.Front(); elem != nil; elem = elem.Next() {
if elem == e.localMesgDefinitionsLRU.Front().Value.(*list.Element) {
break
}
index++
}
index %= max + 1 // overflowing index

// Remove Least Recently Used Item from the List.
lruElemValue := e.localMesgDefinitionsLRU.Front().Value.(*list.Element)
elem := e.localMesgDefinitions.InsertAfter(b, lruElemValue)
e.localMesgDefinitions.Remove(lruElemValue)

e.localMesgDefinitionsLRU.Remove(e.localMesgDefinitionsLRU.Front())
e.localMesgDefinitionsLRU.PushBack(elem)

return index, true
}

func (e *Encoder) compressTimestampIntoHeader(mesg *proto.Message) {
field := mesg.FieldByNum(fieldnum.TimestampCorrelationTimestamp)
if field == nil {
Expand Down Expand Up @@ -587,21 +515,6 @@ func (e *Encoder) encodeCRC(crc *uint16) error {
func (e *Encoder) reset() {
e.dataSize = 0
e.crc16.Reset()
e.localMesgDefinitions = list.New()
e.localMesgDefinitionsLRU = list.New()
e.localMesgNumLRU.Reset()
e.timestampReference = 0
}

// List of private functions should be declared after methods:

func isEqual(x, y []byte) bool {
if len(x) != len(y) {
return false
}
for i := range x {
if x[i] != y[i] {
return false
}
}
return true
}
Loading

0 comments on commit 9774837

Please sign in to comment.