Skip to content

Commit

Permalink
perf: improve fitconv (#62)
Browse files Browse the repository at this point in the history
* perf: improve fitconv cli

* docs: fix typo
  • Loading branch information
muktihari committed Dec 22, 2023
1 parent 5cc889c commit 77d9f23
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 99 deletions.
157 changes: 88 additions & 69 deletions cmd/fitconv/fitcsv/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package fitcsv

import (
"bufio"
"bytes"
"io"
"os"
Expand All @@ -13,9 +14,8 @@ import (

"github.com/muktihari/fit/factory"
"github.com/muktihari/fit/kit/scaleoffset"
"github.com/muktihari/fit/kit/typeconv"
"github.com/muktihari/fit/listener"
"github.com/muktihari/fit/profile/untyped/fieldnum"
"github.com/muktihari/fit/profile/mesgdef"
"github.com/muktihari/fit/profile/untyped/mesgnum"
"github.com/muktihari/fit/proto"
)
Expand All @@ -31,8 +31,9 @@ type FitToCsvConv struct {

// Temporary writers that can be used to write the csv data (without header). Default: immem.
// Since we can only write the correct header after all data is retrieved.
inmem *bytes.Buffer // Temporary writer in memory
ondisk *os.File // Temporary writer on disk
inmem *bytes.Buffer // Temporary writer in memory
ondisk *os.File // Temporary writer on disk
ondiskBufferWriter *bufio.Writer // Ondisk buffer writer to reduce syscall on write

buf *bytes.Buffer // Buffer for writing bytes
err error // Error occurred while receiving messages. It's up to the handler whether to stop or continue.
Expand All @@ -41,11 +42,11 @@ type FitToCsvConv struct {

maxFields int // Since we write messages as they arrive, we don't know the exact number of headers we need, so we create an approximate size.

developerDataIdMessages []proto.Message
fieldDesciptionMessages []proto.Message
developerDataIds []*mesgdef.DeveloperDataId
fieldDescriptions []*mesgdef.FieldDescription

eventch chan any // This buffered event channel can accept either proto.Message or proto.MessageDefinition maintaining the order of arrival.
done chan struct{} // Tells that all messages have been completely processed.
mesgc chan any // This buffered event channel can accept either proto.Message or proto.MessageDefinition maintaining the order of arrival.
done chan struct{} // Tells that all messages have been completely processed.
}

type Option interface{ apply(o *options) }
Expand All @@ -55,6 +56,7 @@ type options struct {
rawValue bool // Print scaled value as is in the binary form (sint, uint, etc.) than in its representation.
unknownNumber bool // Print 'unknown(68)' instead of 'unknown'.
useDisk bool // Write temporary data in disk instead of in memory.
ondiskWriteBuffer int // Buffer on read/write in disk when useDisk is specified.
}

func defaultOptions() *options {
Expand All @@ -63,6 +65,7 @@ func defaultOptions() *options {
rawValue: false,
unknownNumber: false,
useDisk: false,
ondiskWriteBuffer: 4 << 10,
}
}

Expand All @@ -86,8 +89,13 @@ func WithUnknownNumber() Option {
return fnApply(func(o *options) { o.unknownNumber = true })
}

func WithUseDisk() Option {
return fnApply(func(o *options) { o.useDisk = true })
func WithUseDisk(writeBuffer int) Option {
return fnApply(func(o *options) {
o.useDisk = true
if writeBuffer > 0 {
o.ondiskWriteBuffer = writeBuffer
}
})
}

// NewConverter creates a new fit to csv converter.
Expand All @@ -100,15 +108,20 @@ func NewConverter(w io.Writer, opts ...Option) *FitToCsvConv {

c := &FitToCsvConv{
w: w,
inmem: bytes.NewBuffer(nil),
buf: new(bytes.Buffer),
options: options,
eventch: make(chan any, options.channelBufferSize),
mesgc: make(chan any, options.channelBufferSize),
done: make(chan struct{}),
}

if options.useDisk {
c.ondisk, c.err = os.CreateTemp(".", "fitconv-fit-to-csv-temp-file")
if c.err != nil {
return c
}
c.ondiskBufferWriter = bufio.NewWriterSize(c.ondisk, options.ondiskWriteBuffer)
} else {
c.inmem = bytes.NewBuffer(nil)
}

go c.handleEvent() // spawn only once.
Expand All @@ -120,34 +133,34 @@ func NewConverter(w io.Writer, opts ...Option) *FitToCsvConv {
func (c *FitToCsvConv) Err() error { return c.err }

// OnMesgDef receive message definition from broadcaster
func (c *FitToCsvConv) OnMesgDef(mesgDef proto.MessageDefinition) { c.eventch <- mesgDef }
func (c *FitToCsvConv) OnMesgDef(mesgDef proto.MessageDefinition) { c.mesgc <- mesgDef }

// OnMesgDef receive message from broadcaster
func (c *FitToCsvConv) OnMesg(mesg proto.Message) { c.eventch <- mesg }
func (c *FitToCsvConv) OnMesg(mesg proto.Message) { c.mesgc <- mesg }

// handleEvent processes events from a buffered channel.
// It should not be concurrently spawned multiple times, as it relies on maintaining event order.
func (c *FitToCsvConv) handleEvent() {
for event := range c.eventch {
switch data := event.(type) {
for event := range c.mesgc {
switch mesg := event.(type) {
case proto.MessageDefinition:
c.writeMesgDef(data)
c.writeMesgDef(mesg)
case proto.Message:
switch data.Num {
switch mesg.Num {
case mesgnum.DeveloperDataId:
c.developerDataIdMessages = append(c.developerDataIdMessages, data)
c.developerDataIds = append(c.developerDataIds, mesgdef.NewDeveloperDataId(&mesg))
case mesgnum.FieldDescription:
c.fieldDesciptionMessages = append(c.fieldDesciptionMessages, data)
c.fieldDescriptions = append(c.fieldDescriptions, mesgdef.NewFieldDescription(&mesg))
}
c.writeMesg(data)
c.writeMesg(mesg)
}
}
close(c.done)
}

// Wait closes the buffered channel and wait until all event handling is completed and finalize the data.
func (c *FitToCsvConv) Wait() {
close(c.eventch)
close(c.mesgc)
<-c.done
c.finalize()
}
Expand All @@ -170,11 +183,15 @@ func (c *FitToCsvConv) finalize() {
}

if c.options.useDisk {
c.err = c.ondiskBufferWriter.Flush() // flush remaining buffer
if c.err != nil {
return
}
_, c.err = c.ondisk.Seek(0, io.SeekStart)
if c.err != nil {
return
}
_, c.err = io.Copy(c.w, c.ondisk)
_, c.err = io.Copy(c.w, c.ondisk) // do not wrap with bufio.Reader
} else {
_, c.err = io.Copy(c.w, c.inmem)
}
Expand All @@ -188,9 +205,14 @@ func (c *FitToCsvConv) printHeader() {
c.buf.WriteString("Type,Local Number,Message")
for i := 0; i < c.maxFields*3; i += 3 {
num := strconv.Itoa((i / 3) + 1)
c.buf.WriteString(",Field " + num)
c.buf.WriteString(",Value " + num)
c.buf.WriteString(",Units " + num)
c.buf.WriteString(",Field ")
c.buf.WriteString(num)

c.buf.WriteString(",Value ")
c.buf.WriteString(num)

c.buf.WriteString(",Units ")
c.buf.WriteString(num)
}
c.buf.WriteByte('\n') // line break

Expand All @@ -208,34 +230,49 @@ func (c *FitToCsvConv) writeMesgDef(mesgDef proto.MessageDefinition) {
}

c.buf.WriteString("Definition,")
c.buf.WriteString(strconv.Itoa(int(proto.LocalMesgNum(mesgDef.Header))) + ",")
c.buf.WriteString(strconv.Itoa(int(proto.LocalMesgNum(mesgDef.Header))))
c.buf.WriteByte(',')

mesgName := mesgDef.MesgNum.String()
if strings.Contains(strings.ToLower(mesgName), "invalid") {
if strings.HasPrefix(mesgName, "MesgNumInvalid") {
mesgName = factory.NameUnknown
if c.options.unknownNumber {
mesgName = formatUnknown(int(mesgDef.MesgNum))
}
}
c.buf.WriteString(mesgName)
c.buf.WriteRune(',')
c.buf.WriteByte(',')

for i := range mesgDef.FieldDefinitions {
fieldDef := mesgDef.FieldDefinitions[i]
field := factory.CreateField(mesgDef.MesgNum, fieldDef.Num)

name := field.Name
if c.options.unknownNumber && name == factory.NameUnknown {
name = formatUnknown(int(field.Num))
}
c.buf.WriteString(name + ",")
c.buf.WriteString(strconv.Itoa(int(fieldDef.Size/fieldDef.BaseType.Size())) + ",")
c.buf.WriteString(",")

c.buf.WriteString(name)
c.buf.WriteByte(',')

c.buf.WriteString(strconv.Itoa(int(fieldDef.Size / fieldDef.BaseType.Size())))
c.buf.WriteByte(',')

// empty column
c.buf.WriteByte(',')
}

for i := range mesgDef.DeveloperFieldDefinitions {
devFieldDef := &mesgDef.DeveloperFieldDefinitions[i]
c.buf.WriteString(c.devFieldName(devFieldDef) + ",")
c.buf.WriteString(strconv.Itoa(int(devFieldDef.Size)) + ",")
c.buf.WriteString(",")

c.buf.WriteString(c.devFieldName(devFieldDef))
c.buf.WriteByte(',')

c.buf.WriteString(strconv.Itoa(int(devFieldDef.Size)))
c.buf.WriteByte(',')

// empty column
c.buf.WriteByte(',')
}

size := len(mesgDef.FieldDefinitions) + len(mesgDef.DeveloperFieldDefinitions)
Expand All @@ -246,51 +283,32 @@ func (c *FitToCsvConv) writeMesgDef(mesgDef proto.MessageDefinition) {
c.buf.WriteByte('\n') // line break

if c.options.useDisk {
_, c.err = c.ondisk.Write(c.buf.Bytes())
_, c.err = c.ondiskBufferWriter.Write(c.buf.Bytes())
} else {
_, c.err = c.inmem.Write(c.buf.Bytes())
}
c.buf.Reset()
}

func (c *FitToCsvConv) devFieldName(devFieldDef *proto.DeveloperFieldDefinition) string {
for i := range c.fieldDesciptionMessages {
fieldDescMesg := &c.fieldDesciptionMessages[i]
devDataIndex := fieldDescMesg.FieldByNum(fieldnum.FieldDescriptionDeveloperDataIndex)
if devDataIndex == nil {
var fieldDescription *mesgdef.FieldDescription
for i := range c.fieldDescriptions {
fieldDef := c.fieldDescriptions[i]
if fieldDef.DeveloperDataIndex != devFieldDef.DeveloperDataIndex {
continue
}
fieldDefNum := fieldDescMesg.FieldByNum(fieldnum.FieldDescriptionFieldDefinitionNumber)
if fieldDefNum == nil {
if fieldDef.FieldDefinitionNumber != devFieldDef.Num {
continue
}

if typeconv.ToByte[byte](devDataIndex.Value) == devFieldDef.DeveloperDataIndex &&
typeconv.ToByte[byte](fieldDefNum.Value) == devFieldDef.Num {
fieldName := fieldDescMesg.FieldByNum(fieldnum.FieldDescriptionFieldName)
if fieldName == nil {
break
}

strbuf := new(strings.Builder)
if vals, ok := sliceAny(fieldName.Value); ok {
for i := range vals {
strbuf.WriteString(format(vals[i]))
if i < len(vals)-1 {
strbuf.WriteByte('|')
}
}
return strbuf.String()
} else {
return format(fieldName.Value)
}
}
fieldDescription = fieldDef
break
}
if fieldDescription != nil {
return strings.Join(fieldDescription.FieldName, "|")
}

if c.options.unknownNumber {
return formatUnknown(int(devFieldDef.Num))
}

return factory.NameUnknown
}

Expand All @@ -300,16 +318,17 @@ func (c *FitToCsvConv) writeMesg(mesg proto.Message) {
}

c.buf.WriteString("Data,")
c.buf.WriteString(strconv.Itoa(int(proto.LocalMesgNum(mesg.Header))) + ",")
c.buf.WriteString(strconv.Itoa(int(proto.LocalMesgNum(mesg.Header))))
c.buf.WriteByte(',')
mesgName := mesg.Num.String()
if strings.Contains(strings.ToLower(mesgName), "invalid") {
if strings.HasPrefix(mesgName, "MesgNumInvalid") {
mesgName = factory.NameUnknown
if c.options.unknownNumber {
mesgName = formatUnknown(int(mesg.Num))
}
}
c.buf.WriteString(mesgName)
c.buf.WriteRune(',')
c.buf.WriteByte(',')

var fieldCounter int
for i := range mesg.Fields {
Expand Down Expand Up @@ -391,7 +410,7 @@ func (c *FitToCsvConv) writeMesg(mesg proto.Message) {
c.buf.WriteByte('\n') // line break

if c.options.useDisk {
_, c.err = c.ondisk.Write(c.buf.Bytes())
_, c.err = c.ondiskBufferWriter.Write(c.buf.Bytes())
} else {
_, c.err = c.inmem.Write(c.buf.Bytes())
}
Expand Down
9 changes: 5 additions & 4 deletions cmd/fitconv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (

var version = "dev"

const blockSize = 8 << 10

func main() {
var flagVersion bool
flag.BoolVar(&flagVersion, "v", false, "Show version")
Expand Down Expand Up @@ -54,7 +56,7 @@ func main() {
fitToCsvOptions = append(fitToCsvOptions, fitcsv.WithUnknownNumber())
}
if flagFitToCsvUseDisk {
fitToCsvOptions = append(fitToCsvOptions, fitcsv.WithUseDisk())
fitToCsvOptions = append(fitToCsvOptions, fitcsv.WithUseDisk(blockSize))
}

paths := flag.Args()
Expand Down Expand Up @@ -101,11 +103,10 @@ func fitToCsv(path string, opts ...fitcsv.Option) error {
}
defer cf.Close()

const bsize = 1000 << 10 // 1 MB
bw := bufio.NewWriterSize(cf, bsize)
bw := bufio.NewWriterSize(cf, blockSize)
conv := fitcsv.NewConverter(bw, opts...)

dec := decoder.New(bufio.NewReaderSize(ff, bsize),
dec := decoder.New(bufio.NewReaderSize(ff, blockSize),
decoder.WithMesgDefListener(conv),
decoder.WithMesgListener(conv),
decoder.WithBroadcastOnly(),
Expand Down
2 changes: 1 addition & 1 deletion docs/usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ Example decoding FIT file into common file `Activity File`, edit the manufacture
specified multiple local message typedef. By default, the Encoder uses local message type 0.
This option allows users to specify values between 0-15 (while entering zero is equivalent to using
the default option, nothing is changed). Using multiple local message types optimizes file size by
avoiding the need to interleave different message typedef.
avoiding the need to interleave different message definition.

Note: To minimize the required RAM for decoding, it's recommended to use a minimal number of
local message types in a file. For instance, embedded devices may only support decoding data
Expand Down
Loading

0 comments on commit 77d9f23

Please sign in to comment.