Skip to content

Commit

Permalink
fix(file): scanner dynamic buffer (#2959)
Browse files Browse the repository at this point in the history
Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying committed Jun 28, 2024
1 parent bb15378 commit 47658f9
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 7 deletions.
2 changes: 1 addition & 1 deletion internal/io/file/reader/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (r *CsvReader) Provision(ctx api.StreamContext, props map[string]any) error
return nil
}

func (r *CsvReader) Bind(ctx api.StreamContext, fileStream io.Reader) error {
func (r *CsvReader) Bind(ctx api.StreamContext, fileStream io.Reader, _ int) error {
cr := csv.NewReader(fileStream)
cr.Comma = rune(r.config.Delimiter[0])
cr.TrimLeadingSpace = true
Expand Down
27 changes: 25 additions & 2 deletions internal/io/file/reader/lines.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package reader

import (
Expand All @@ -22,8 +36,14 @@ func (r *LinesReader) Provision(ctx api.StreamContext, props map[string]any) err
return nil
}

func (r *LinesReader) Bind(ctx api.StreamContext, fileStream io.Reader) error {
func (r *LinesReader) Bind(ctx api.StreamContext, fileStream io.Reader, maxSize int) error {
if maxSize <= 0 {
ctx.GetLogger().Errorf("maxSize must be > 0, defaul to 1MB")
// default to 1MB
maxSize = 1 << 20
}
scanner := bufio.NewScanner(fileStream)
scanner.Buffer(nil, maxSize)
scanner.Split(bufio.ScanLines)
r.scanner = scanner
return nil
Expand All @@ -34,7 +54,10 @@ func (r *LinesReader) Read(ctx api.StreamContext) (any, error) {
if !succ {
return nil, io.EOF
}
return r.scanner.Bytes(), nil
b := r.scanner.Bytes()
d := make([]byte, len(b))
copy(d, b)
return d, nil
}

func (r *LinesReader) IsBytesReader() bool {
Expand Down
16 changes: 14 additions & 2 deletions internal/io/file/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,20 @@ func (fs *Source) parseFile(ctx api.StreamContext, file string, ingest api.Tuple
err error
r io.Reader
)
r, err = os.Open(file)
f, err := os.Open(file)
if err != nil {
ctx.GetLogger().Debugf("prepare file %s error: %v", file, err)
ingestError(ctx, err)
}
r = f
// This is the buffer size, 1MB by default
maxSize := 1 << 20
info, err := f.Stat()
if err != nil {
ctx.GetLogger().Debugf("get file info for %s error: %v", file, err)
} else {
maxSize = int(info.Size())
}
if fs.config.IgnoreStartLines > 0 || fs.config.IgnoreEndLines > 0 {
r = ignoreLines(ctx, r, fs.config.IgnoreStartLines, fs.config.IgnoreEndLines)
}
Expand All @@ -231,14 +240,17 @@ func (fs *Source) parseFile(ctx api.StreamContext, file string, ingest api.Tuple
meta := map[string]any{"file": file}
// Read line or read all
if fs.reader != nil {
err = fs.reader.Bind(ctx, r)
err = fs.reader.Bind(ctx, r, maxSize)
if err != nil {
ingestError(ctx, err)
return
}
for {
line, err := fs.reader.Read(ctx)
if err != nil {
if err != io.EOF {
ctx.GetLogger().Errorf("read file %s error: %v", file, err)
}
break
}
rcvTime := timex.GetNow()
Expand Down
2 changes: 1 addition & 1 deletion internal/io/file/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func TestSourceProvision(t *testing.T) {
},
},
{
name: "no decompression for stream typs",
name: "no decompression for stream types",
props: map[string]any{
"datasource": name,
"path": path,
Expand Down
2 changes: 1 addition & 1 deletion pkg/modules/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type FileStreamReader interface {
// Provision Set up the static properties
Provision(ctx api.StreamContext, props map[string]any) error
// Bind set the file stream. Make sure the previous read has done
Bind(ctx api.StreamContext, fileStream io.Reader) error
Bind(ctx api.StreamContext, fileStream io.Reader, maxSize int) error
// Read the next record. Returns EOF when the input has reached its end.
Read(ctx api.StreamContext) (any, error)
// IsBytesReader If is bytes reader, Read must return []byte, otherwise return map or []map
Expand Down

0 comments on commit 47658f9

Please sign in to comment.