Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: simplify encoder's LRU #65

Merged
merged 1 commit into from
Dec 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 26 additions & 113 deletions encoder/encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package encoder

import (
"bytes"
"container/list"
"context"
"encoding/binary"
"errors"
Expand All @@ -20,7 +19,6 @@ import (
"github.com/muktihari/fit/profile/untyped/fieldnum"
"github.com/muktihari/fit/profile/untyped/mesgnum"
"github.com/muktihari/fit/proto"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -70,15 +68,7 @@ type Encoder struct {
dataSize uint32 // Data size of messages in bytes for a single Fit file.
crc16 hash.Hash16 // Calculate the CRC-16 checksum for ensuring header and message integrity.

// Tracks whether a message definition has been previously written.
// The list size is determined by the number of desirable multiple local message types specified by WithNormalHeader(n).
// Default size is 1 to accommodate local message type zero (0).
localMesgDefinitions *list.List

// Tracks the Least Recently Used (LRU) element in the localMesgDefinitions list.
// Each element in this list is a pointer to an element in localMesgDefinitions.
// This helps determine which element should be removed when localMesgDefinitions is full.
localMesgDefinitionsLRU *list.List
localMesgNumLRU *lru // LRU cache for writing local message definition

// This timestamp reference is retrieved from the first message containing a valid timestamp field.
// It is initialized only if the 'compressedTimestamp' option is applied and reset when decoding is completed.
Expand Down Expand Up @@ -185,14 +175,18 @@ func New(w io.Writer, opts ...Option) *Encoder {
opt.apply(options)
}

var lruCapacity byte = 1
if options.headerOption == headerOptionNormal && options.multipleLocalMessageType > 0 {
lruCapacity = options.multipleLocalMessageType + 1
}

return &Encoder{
w: w,
options: options,
crc16: crc16.New(crc16.MakeFitTable()),
protocolValidator: proto.NewValidator(options.protocolVersion),
messageValidator: options.messageValidator,
localMesgDefinitions: list.New(),
localMesgDefinitionsLRU: list.New(),
w: w,
options: options,
crc16: crc16.New(crc16.MakeFitTable()),
protocolValidator: proto.NewValidator(options.protocolVersion),
messageValidator: options.messageValidator,
localMesgNumLRU: newLRU(lruCapacity),
defaultFileHeader: proto.FileHeader{
Size: proto.DefaultFileHeaderSize,
ProtocolVersion: byte(options.protocolVersion),
Expand Down Expand Up @@ -356,17 +350,14 @@ func (e *Encoder) encodeHeader(header *proto.FileHeader) error {
header.ProtocolVersion = byte(e.options.protocolVersion)

b, _ := header.MarshalBinary()
if header.Size == 12 {
if header.Size < 14 {
n, err := e.w.Write(b[:header.Size])
e.n += int64(n)
return err
}

_, _ = e.crc16.Write(b[:header.Size-2])

var crc = make([]byte, 2)
binary.LittleEndian.PutUint16(crc, e.crc16.Sum16())
copy(b[header.Size-2:], crc)
binary.LittleEndian.PutUint16(b[header.Size-2:], e.crc16.Sum16())
header.CRC = e.crc16.Sum16()

e.crc16.Reset() // this hash will be re-used for calculating data integrity.
Expand Down Expand Up @@ -405,9 +396,7 @@ func (e *Encoder) encodeMessage(w io.Writer, mesg *proto.Message) error {
return fmt.Errorf("message validation failed: %w", err)
}

var b []byte
var localMesgNum byte
var writeable bool
var isNewMesgDef bool
e.buf.Reset()

// Writing strategy based on the selected header option:
Expand All @@ -419,31 +408,27 @@ func (e *Encoder) encodeMessage(w io.Writer, mesg *proto.Message) error {
}

_, _ = mesgDef.WriteTo(e.buf)
b = e.buf.Bytes() // Should be copied before put on LRU.
if e.options.multipleLocalMessageType == 0 {
writeable = e.isMesgDefinitionWriteable(b) // Local Message Type Zero
} else {
localMesgNum, writeable = e.redefineLocalMesgNum(b) // Multiple Local Message Type
b[0] = (b[0] &^ proto.LocalMesgNumMask) | localMesgNum // localMesgNum redefined, update the message definition header.
}
b := e.buf.Bytes()

var localMesgNum byte
localMesgNum, isNewMesgDef = e.localMesgNumLRU.Put(b)
b[0] = (b[0] &^ proto.LocalMesgNumMask) | localMesgNum // update the message definition header.

mesg.Header = (mesg.Header &^ proto.LocalMesgNumMask) | localMesgNum
case headerOptionCompressedTimestamp:
e.compressTimestampIntoHeader(mesg)

// Timestamp field might be omitted, update the message definition to match the resulting message.
mesgDef := proto.CreateMessageDefinition(mesg)
if err := e.protocolValidator.ValidateMessageDefinition(&mesgDef); err != nil {
return err
}
_, _ = mesgDef.WriteTo(e.buf)
b = e.buf.Bytes() // Should be copied before putting it to LRU.

writeable = e.isMesgDefinitionWriteable(b)
_, _ = mesgDef.WriteTo(e.buf)
_, isNewMesgDef = e.localMesgNumLRU.Put(e.buf.Bytes())
}

if writeable {
if err := e.writeMessage(w, b); err != nil {
if isNewMesgDef {
if err := e.writeMessage(w, e.buf.Bytes()); err != nil {
return fmt.Errorf("write message definition failed: %w", err)
}
}
Expand All @@ -453,69 +438,12 @@ func (e *Encoder) encodeMessage(w io.Writer, mesg *proto.Message) error {
if err != nil {
return fmt.Errorf("marshal failed: %w", err)
}
b = e.buf.Bytes()
if err := e.writeMessage(w, b); err != nil {
if err := e.writeMessage(w, e.buf.Bytes()); err != nil {
return fmt.Errorf("write message failed: %w", err)
}
return nil
}

func (e *Encoder) isMesgDefinitionWriteable(b []byte) bool {
if e.localMesgDefinitions.Len() == 0 {
e.localMesgDefinitions.PushFront(slices.Clone(b))
return true
}

if isEqual(e.localMesgDefinitions.Front().Value.([]byte), b) {
return false
}

e.localMesgDefinitions.Remove(e.localMesgDefinitions.Front())
e.localMesgDefinitions.PushFront(slices.Clone(b))

return true
}

func (e *Encoder) redefineLocalMesgNum(b []byte) (newLocalMesgNum byte, writeable bool) {
max := e.options.multipleLocalMessageType
var index byte = 0
for elem := e.localMesgDefinitions.Front(); elem != nil; elem = elem.Next() {
val := elem.Value.([]byte)
if isEqual(val[1:], b[1:]) { // ignore header since localMesgNum in header is everchanging.
e.localMesgDefinitionsLRU.MoveToBack(elem) // Recently used items is moved to the back.
return index, false
}
index++
}

b = slices.Clone(b)

index = byte(e.localMesgDefinitions.Len())
if byte(e.localMesgDefinitions.Len()) <= max {
e.localMesgDefinitionsLRU.PushBack(e.localMesgDefinitions.PushBack(b))
return index, true
}

index = 0
for elem := e.localMesgDefinitions.Front(); elem != nil; elem = elem.Next() {
if elem == e.localMesgDefinitionsLRU.Front().Value.(*list.Element) {
break
}
index++
}
index %= max + 1 // overflowing index

// Remove Least Recently Used Item from the List.
lruElemValue := e.localMesgDefinitionsLRU.Front().Value.(*list.Element)
elem := e.localMesgDefinitions.InsertAfter(b, lruElemValue)
e.localMesgDefinitions.Remove(lruElemValue)

e.localMesgDefinitionsLRU.Remove(e.localMesgDefinitionsLRU.Front())
e.localMesgDefinitionsLRU.PushBack(elem)

return index, true
}

func (e *Encoder) compressTimestampIntoHeader(mesg *proto.Message) {
field := mesg.FieldByNum(fieldnum.TimestampCorrelationTimestamp)
if field == nil {
Expand Down Expand Up @@ -587,21 +515,6 @@ func (e *Encoder) encodeCRC(crc *uint16) error {
func (e *Encoder) reset() {
e.dataSize = 0
e.crc16.Reset()
e.localMesgDefinitions = list.New()
e.localMesgDefinitionsLRU = list.New()
e.localMesgNumLRU.Reset()
e.timestampReference = 0
}

// List of private functions should be declared after methods:

func isEqual(x, y []byte) bool {
if len(x) != len(y) {
return false
}
for i := range x {
if x[i] != y[i] {
return false
}
}
return true
}
Loading