From 77d9f239e21a6c97aefe94ef987b7f73895f311b Mon Sep 17 00:00:00 2001 From: Hari Mukti Date: Fri, 22 Dec 2023 22:08:03 +0700 Subject: [PATCH] perf: improve fitconv (#62) * perf: improve fitconv cli * docs: fix typo --- cmd/fitconv/fitcsv/converter.go | 157 ++++++++++++++++------------- cmd/fitconv/main.go | 9 +- docs/usage.md | 2 +- internal/cmd/fitconv-debug/main.go | 90 ++++++++++++----- 4 files changed, 159 insertions(+), 99 deletions(-) diff --git a/cmd/fitconv/fitcsv/converter.go b/cmd/fitconv/fitcsv/converter.go index 649d546..cf99303 100644 --- a/cmd/fitconv/fitcsv/converter.go +++ b/cmd/fitconv/fitcsv/converter.go @@ -5,6 +5,7 @@ package fitcsv import ( + "bufio" "bytes" "io" "os" @@ -13,9 +14,8 @@ import ( "github.com/muktihari/fit/factory" "github.com/muktihari/fit/kit/scaleoffset" - "github.com/muktihari/fit/kit/typeconv" "github.com/muktihari/fit/listener" - "github.com/muktihari/fit/profile/untyped/fieldnum" + "github.com/muktihari/fit/profile/mesgdef" "github.com/muktihari/fit/profile/untyped/mesgnum" "github.com/muktihari/fit/proto" ) @@ -31,8 +31,9 @@ type FitToCsvConv struct { // Temporary writers that can be used to write the csv data (without header). Default: immem. // Since we can only write the correct header after all data is retrieved. - inmem *bytes.Buffer // Temporary writer in memory - ondisk *os.File // Temporary writer on disk + inmem *bytes.Buffer // Temporary writer in memory + ondisk *os.File // Temporary writer on disk + ondiskBufferWriter *bufio.Writer // Ondisk buffer writer to reduce syscall on write buf *bytes.Buffer // Buffer for writing bytes err error // Error occurred while receiving messages. It's up to the handler whether to stop or continue. @@ -41,11 +42,11 @@ type FitToCsvConv struct { maxFields int // Since we write messages as they arrive, we don't know the exact number of headers we need, so we create an approximate size. - developerDataIdMessages []proto.Message - fieldDesciptionMessages []proto.Message + developerDataIds []*mesgdef.DeveloperDataId + fieldDescriptions []*mesgdef.FieldDescription - eventch chan any // This buffered event channel can accept either proto.Message or proto.MessageDefinition maintaining the order of arrival. - done chan struct{} // Tells that all messages have been completely processed. + mesgc chan any // This buffered event channel can accept either proto.Message or proto.MessageDefinition maintaining the order of arrival. + done chan struct{} // Tells that all messages have been completely processed. } type Option interface{ apply(o *options) } @@ -55,6 +56,7 @@ type options struct { rawValue bool // Print scaled value as is in the binary form (sint, uint, etc.) than in its representation. unknownNumber bool // Print 'unknown(68)' instead of 'unknown'. useDisk bool // Write temporary data in disk instead of in memory. + ondiskWriteBuffer int // Buffer on read/write in disk when useDisk is specified. } func defaultOptions() *options { @@ -63,6 +65,7 @@ func defaultOptions() *options { rawValue: false, unknownNumber: false, useDisk: false, + ondiskWriteBuffer: 4 << 10, } } @@ -86,8 +89,13 @@ func WithUnknownNumber() Option { return fnApply(func(o *options) { o.unknownNumber = true }) } -func WithUseDisk() Option { - return fnApply(func(o *options) { o.useDisk = true }) +func WithUseDisk(writeBuffer int) Option { + return fnApply(func(o *options) { + o.useDisk = true + if writeBuffer > 0 { + o.ondiskWriteBuffer = writeBuffer + } + }) } // NewConverter creates a new fit to csv converter. @@ -100,15 +108,20 @@ func NewConverter(w io.Writer, opts ...Option) *FitToCsvConv { c := &FitToCsvConv{ w: w, - inmem: bytes.NewBuffer(nil), buf: new(bytes.Buffer), options: options, - eventch: make(chan any, options.channelBufferSize), + mesgc: make(chan any, options.channelBufferSize), done: make(chan struct{}), } if options.useDisk { c.ondisk, c.err = os.CreateTemp(".", "fitconv-fit-to-csv-temp-file") + if c.err != nil { + return c + } + c.ondiskBufferWriter = bufio.NewWriterSize(c.ondisk, options.ondiskWriteBuffer) + } else { + c.inmem = bytes.NewBuffer(nil) } go c.handleEvent() // spawn only once. @@ -120,26 +133,26 @@ func NewConverter(w io.Writer, opts ...Option) *FitToCsvConv { func (c *FitToCsvConv) Err() error { return c.err } // OnMesgDef receive message definition from broadcaster -func (c *FitToCsvConv) OnMesgDef(mesgDef proto.MessageDefinition) { c.eventch <- mesgDef } +func (c *FitToCsvConv) OnMesgDef(mesgDef proto.MessageDefinition) { c.mesgc <- mesgDef } // OnMesgDef receive message from broadcaster -func (c *FitToCsvConv) OnMesg(mesg proto.Message) { c.eventch <- mesg } +func (c *FitToCsvConv) OnMesg(mesg proto.Message) { c.mesgc <- mesg } // handleEvent processes events from a buffered channel. // It should not be concurrently spawned multiple times, as it relies on maintaining event order. func (c *FitToCsvConv) handleEvent() { - for event := range c.eventch { - switch data := event.(type) { + for event := range c.mesgc { + switch mesg := event.(type) { case proto.MessageDefinition: - c.writeMesgDef(data) + c.writeMesgDef(mesg) case proto.Message: - switch data.Num { + switch mesg.Num { case mesgnum.DeveloperDataId: - c.developerDataIdMessages = append(c.developerDataIdMessages, data) + c.developerDataIds = append(c.developerDataIds, mesgdef.NewDeveloperDataId(&mesg)) case mesgnum.FieldDescription: - c.fieldDesciptionMessages = append(c.fieldDesciptionMessages, data) + c.fieldDescriptions = append(c.fieldDescriptions, mesgdef.NewFieldDescription(&mesg)) } - c.writeMesg(data) + c.writeMesg(mesg) } } close(c.done) @@ -147,7 +160,7 @@ func (c *FitToCsvConv) handleEvent() { // Wait closes the buffered channel and wait until all event handling is completed and finalize the data. func (c *FitToCsvConv) Wait() { - close(c.eventch) + close(c.mesgc) <-c.done c.finalize() } @@ -170,11 +183,15 @@ func (c *FitToCsvConv) finalize() { } if c.options.useDisk { + c.err = c.ondiskBufferWriter.Flush() // flush remaining buffer + if c.err != nil { + return + } _, c.err = c.ondisk.Seek(0, io.SeekStart) if c.err != nil { return } - _, c.err = io.Copy(c.w, c.ondisk) + _, c.err = io.Copy(c.w, c.ondisk) // do not wrap with bufio.Reader } else { _, c.err = io.Copy(c.w, c.inmem) } @@ -188,9 +205,14 @@ func (c *FitToCsvConv) printHeader() { c.buf.WriteString("Type,Local Number,Message") for i := 0; i < c.maxFields*3; i += 3 { num := strconv.Itoa((i / 3) + 1) - c.buf.WriteString(",Field " + num) - c.buf.WriteString(",Value " + num) - c.buf.WriteString(",Units " + num) + c.buf.WriteString(",Field ") + c.buf.WriteString(num) + + c.buf.WriteString(",Value ") + c.buf.WriteString(num) + + c.buf.WriteString(",Units ") + c.buf.WriteString(num) } c.buf.WriteByte('\n') // line break @@ -208,34 +230,49 @@ func (c *FitToCsvConv) writeMesgDef(mesgDef proto.MessageDefinition) { } c.buf.WriteString("Definition,") - c.buf.WriteString(strconv.Itoa(int(proto.LocalMesgNum(mesgDef.Header))) + ",") + c.buf.WriteString(strconv.Itoa(int(proto.LocalMesgNum(mesgDef.Header)))) + c.buf.WriteByte(',') + mesgName := mesgDef.MesgNum.String() - if strings.Contains(strings.ToLower(mesgName), "invalid") { + if strings.HasPrefix(mesgName, "MesgNumInvalid") { mesgName = factory.NameUnknown if c.options.unknownNumber { mesgName = formatUnknown(int(mesgDef.MesgNum)) } } c.buf.WriteString(mesgName) - c.buf.WriteRune(',') + c.buf.WriteByte(',') for i := range mesgDef.FieldDefinitions { fieldDef := mesgDef.FieldDefinitions[i] field := factory.CreateField(mesgDef.MesgNum, fieldDef.Num) + name := field.Name if c.options.unknownNumber && name == factory.NameUnknown { name = formatUnknown(int(field.Num)) } - c.buf.WriteString(name + ",") - c.buf.WriteString(strconv.Itoa(int(fieldDef.Size/fieldDef.BaseType.Size())) + ",") - c.buf.WriteString(",") + + c.buf.WriteString(name) + c.buf.WriteByte(',') + + c.buf.WriteString(strconv.Itoa(int(fieldDef.Size / fieldDef.BaseType.Size()))) + c.buf.WriteByte(',') + + // empty column + c.buf.WriteByte(',') } for i := range mesgDef.DeveloperFieldDefinitions { devFieldDef := &mesgDef.DeveloperFieldDefinitions[i] - c.buf.WriteString(c.devFieldName(devFieldDef) + ",") - c.buf.WriteString(strconv.Itoa(int(devFieldDef.Size)) + ",") - c.buf.WriteString(",") + + c.buf.WriteString(c.devFieldName(devFieldDef)) + c.buf.WriteByte(',') + + c.buf.WriteString(strconv.Itoa(int(devFieldDef.Size))) + c.buf.WriteByte(',') + + // empty column + c.buf.WriteByte(',') } size := len(mesgDef.FieldDefinitions) + len(mesgDef.DeveloperFieldDefinitions) @@ -246,7 +283,7 @@ func (c *FitToCsvConv) writeMesgDef(mesgDef proto.MessageDefinition) { c.buf.WriteByte('\n') // line break if c.options.useDisk { - _, c.err = c.ondisk.Write(c.buf.Bytes()) + _, c.err = c.ondiskBufferWriter.Write(c.buf.Bytes()) } else { _, c.err = c.inmem.Write(c.buf.Bytes()) } @@ -254,43 +291,24 @@ func (c *FitToCsvConv) writeMesgDef(mesgDef proto.MessageDefinition) { } func (c *FitToCsvConv) devFieldName(devFieldDef *proto.DeveloperFieldDefinition) string { - for i := range c.fieldDesciptionMessages { - fieldDescMesg := &c.fieldDesciptionMessages[i] - devDataIndex := fieldDescMesg.FieldByNum(fieldnum.FieldDescriptionDeveloperDataIndex) - if devDataIndex == nil { + var fieldDescription *mesgdef.FieldDescription + for i := range c.fieldDescriptions { + fieldDef := c.fieldDescriptions[i] + if fieldDef.DeveloperDataIndex != devFieldDef.DeveloperDataIndex { continue } - fieldDefNum := fieldDescMesg.FieldByNum(fieldnum.FieldDescriptionFieldDefinitionNumber) - if fieldDefNum == nil { + if fieldDef.FieldDefinitionNumber != devFieldDef.Num { continue } - - if typeconv.ToByte[byte](devDataIndex.Value) == devFieldDef.DeveloperDataIndex && - typeconv.ToByte[byte](fieldDefNum.Value) == devFieldDef.Num { - fieldName := fieldDescMesg.FieldByNum(fieldnum.FieldDescriptionFieldName) - if fieldName == nil { - break - } - - strbuf := new(strings.Builder) - if vals, ok := sliceAny(fieldName.Value); ok { - for i := range vals { - strbuf.WriteString(format(vals[i])) - if i < len(vals)-1 { - strbuf.WriteByte('|') - } - } - return strbuf.String() - } else { - return format(fieldName.Value) - } - } + fieldDescription = fieldDef + break + } + if fieldDescription != nil { + return strings.Join(fieldDescription.FieldName, "|") } - if c.options.unknownNumber { return formatUnknown(int(devFieldDef.Num)) } - return factory.NameUnknown } @@ -300,16 +318,17 @@ func (c *FitToCsvConv) writeMesg(mesg proto.Message) { } c.buf.WriteString("Data,") - c.buf.WriteString(strconv.Itoa(int(proto.LocalMesgNum(mesg.Header))) + ",") + c.buf.WriteString(strconv.Itoa(int(proto.LocalMesgNum(mesg.Header)))) + c.buf.WriteByte(',') mesgName := mesg.Num.String() - if strings.Contains(strings.ToLower(mesgName), "invalid") { + if strings.HasPrefix(mesgName, "MesgNumInvalid") { mesgName = factory.NameUnknown if c.options.unknownNumber { mesgName = formatUnknown(int(mesg.Num)) } } c.buf.WriteString(mesgName) - c.buf.WriteRune(',') + c.buf.WriteByte(',') var fieldCounter int for i := range mesg.Fields { @@ -391,7 +410,7 @@ func (c *FitToCsvConv) writeMesg(mesg proto.Message) { c.buf.WriteByte('\n') // line break if c.options.useDisk { - _, c.err = c.ondisk.Write(c.buf.Bytes()) + _, c.err = c.ondiskBufferWriter.Write(c.buf.Bytes()) } else { _, c.err = c.inmem.Write(c.buf.Bytes()) } diff --git a/cmd/fitconv/main.go b/cmd/fitconv/main.go index b5f6cad..b7f970e 100644 --- a/cmd/fitconv/main.go +++ b/cmd/fitconv/main.go @@ -17,6 +17,8 @@ import ( var version = "dev" +const blockSize = 8 << 10 + func main() { var flagVersion bool flag.BoolVar(&flagVersion, "v", false, "Show version") @@ -54,7 +56,7 @@ func main() { fitToCsvOptions = append(fitToCsvOptions, fitcsv.WithUnknownNumber()) } if flagFitToCsvUseDisk { - fitToCsvOptions = append(fitToCsvOptions, fitcsv.WithUseDisk()) + fitToCsvOptions = append(fitToCsvOptions, fitcsv.WithUseDisk(blockSize)) } paths := flag.Args() @@ -101,11 +103,10 @@ func fitToCsv(path string, opts ...fitcsv.Option) error { } defer cf.Close() - const bsize = 1000 << 10 // 1 MB - bw := bufio.NewWriterSize(cf, bsize) + bw := bufio.NewWriterSize(cf, blockSize) conv := fitcsv.NewConverter(bw, opts...) - dec := decoder.New(bufio.NewReaderSize(ff, bsize), + dec := decoder.New(bufio.NewReaderSize(ff, blockSize), decoder.WithMesgDefListener(conv), decoder.WithMesgListener(conv), decoder.WithBroadcastOnly(), diff --git a/docs/usage.md b/docs/usage.md index 2f30f56..5b38db1 100644 --- a/docs/usage.md +++ b/docs/usage.md @@ -544,7 +544,7 @@ Example decoding FIT file into common file `Activity File`, edit the manufacture specified multiple local message typedef. By default, the Encoder uses local message type 0. This option allows users to specify values between 0-15 (while entering zero is equivalent to using the default option, nothing is changed). Using multiple local message types optimizes file size by - avoiding the need to interleave different message typedef. + avoiding the need to interleave different message definition. Note: To minimize the required RAM for decoding, it's recommended to use a minimal number of local message types in a file. For instance, embedded devices may only support decoding data diff --git a/internal/cmd/fitconv-debug/main.go b/internal/cmd/fitconv-debug/main.go index 6d503da..0695562 100644 --- a/internal/cmd/fitconv-debug/main.go +++ b/internal/cmd/fitconv-debug/main.go @@ -17,18 +17,52 @@ import ( "github.com/pkg/profile" ) -func main() { - flag.Usage = func() { - fmt.Printf("Usage Example: \n" + - " $ fitconv file1.fit\n" + - " $ fitconv file1.fit file2.fit\n", - ) - } +const version = "dev" +const blockSize = 8 << 10 +func main() { var opt string flag.StringVar(&opt, "opt", "", "") + var flagVersion bool + flag.BoolVar(&flagVersion, "v", false, "Show version") + + // Commands + var flagFitToCsv bool + flag.BoolVar(&flagFitToCsv, "f2c", false, "Convert FIT to CSV (default if not specified)") + + // FitToCsv Options + var flagFitToCsvRawValue bool + flag.BoolVar(&flagFitToCsvRawValue, "f2c-raw-value", false, "Use raw value instead of scaled value") + + var flagFitToCsvUseDisk bool + flag.BoolVar(&flagFitToCsvUseDisk, "f2c-use-disk", false, "Use disk instead of load everything in memory") + + var flagFitToCsvUnknownNumber bool + flag.BoolVar(&flagFitToCsvUnknownNumber, "f2c-unknown-number", false, "Print 'unknown(68)' instead of 'unknown'") + flag.Parse() + + if flagVersion { + fmt.Println(version) + return + } + + if !flagFitToCsv { + flagFitToCsv = true // default + } + + var fitToCsvOptions = []fitcsv.Option{fitcsv.WithChannelBufferSize(2000)} + if flagFitToCsvRawValue { + fitToCsvOptions = append(fitToCsvOptions, fitcsv.WithRawValue()) + } + if flagFitToCsvUnknownNumber { + fitToCsvOptions = append(fitToCsvOptions, fitcsv.WithUnknownNumber()) + } + if flagFitToCsvUseDisk { + fitToCsvOptions = append(fitToCsvOptions, fitcsv.WithUseDisk(1*blockSize)) + } + paths := flag.Args() if len(paths) == 0 { @@ -55,13 +89,15 @@ func main() { } for _, path := range paths { - if err := convertCSV(path); err != nil { - fmt.Fprintf(os.Stderr, "could not convert %q to csv: %v\n", path, err) + if flagFitToCsv { + if err := fitToCsv(path, fitToCsvOptions...); err != nil { + fmt.Fprintf(os.Stderr, "could not convert %q to csv: %v\n", path, err) + } } } } -func convertCSV(path string) error { +func fitToCsv(path string, opts ...fitcsv.Option) error { ff, err := os.Open(path) if err != nil { return fmt.Errorf("could not open file: %s: %w", path, err) @@ -72,8 +108,8 @@ func convertCSV(path string) error { dir := filepath.Dir(path) ext := filepath.Ext(path) - if ext == ".csv" { - return fmt.Errorf("expected *.fit, got *.csv") + if ext != ".fit" { + return fmt.Errorf("expected *.fit, got %s", ext) } name := base @@ -84,38 +120,42 @@ func convertCSV(path string) error { namecsv := name + ".csv" pathcsv := filepath.Join(dir, namecsv) - sequenceCompleted := 0 - defer func() { - if sequenceCompleted == 0 { - _ = os.Remove(pathcsv) - } - }() - cf, err := os.OpenFile(pathcsv, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) if err != nil { return fmt.Errorf("could not open file: %s: %w", pathcsv, err) } defer cf.Close() - const bsize = 1000 << 10 // 1 MB + const bsize = 1 * blockSize bw := bufio.NewWriterSize(cf, bsize) - csvconv := fitcsv.NewConverter(bw) + conv := fitcsv.NewConverter(bw, opts...) dec := decoder.New(bufio.NewReaderSize(ff, bsize), - decoder.WithMesgDefListener(csvconv), - decoder.WithMesgListener(csvconv), + decoder.WithMesgDefListener(conv), + decoder.WithMesgListener(conv), decoder.WithBroadcastOnly(), ) + var sequenceCounter int + defer func() { + if sequenceCounter == 0 { + _ = os.Remove(pathcsv) + } + }() + for dec.Next() { _, err = dec.Decode() if err != nil { return fmt.Errorf("decode failed: %w", err) } - sequenceCompleted++ + sequenceCounter++ } - csvconv.Wait() + conv.Wait() + + if err := conv.Err(); err != nil { + return fmt.Errorf("convert done with error: %v", err) + } if err := bw.Flush(); err != nil { return fmt.Errorf("could not flush buffered data: %w", err)