Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[libbeat][parquet reader] - Added debug logs & improved batch_size tracking #40651

Merged
merged 6 commits into from
Sep 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Bump version of elastic/toutoumomoma to remove internal forks of stdlib debug packages. {pull}40325[40325]
- Refactor x-pack/filebeat/input/websocket for generalisation. {pull}40308[40308]
- Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623]
- Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651]

==== Deprecated

Expand Down
29 changes: 23 additions & 6 deletions x-pack/libbeat/reader/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,65 +14,79 @@ import (
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/file"
"github.com/apache/arrow/go/v14/parquet/pqarrow"

"github.com/elastic/elastic-agent-libs/logp"
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
)

// BufferedReader parses parquet inputs from io streams.
type BufferedReader struct {
cfg *Config
fileReader *file.Reader
recordReader pqarrow.RecordReader
log *logp.Logger
}

// NewBufferedReader creates a new reader that can decode parquet data from an io.Reader.
// It will return an error if the parquet data stream cannot be read.
// Note: As io.ReadAll is used, the entire data stream would be read into memory, so very large data streams
// may cause memory bottleneck issues.
func NewBufferedReader(r io.Reader, cfg *Config) (*BufferedReader, error) {
batchSize := 1
if cfg.BatchSize > 1 {
batchSize = cfg.BatchSize
log := logp.L().Named("reader.parquet")

if cfg.BatchSize == 0 {
cfg.BatchSize = 1
}
log.Debugw("creating parquet reader", "batch_size", cfg.BatchSize)

// reads the contents of the reader object into a byte slice
data, err := io.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("failed to read data from stream reader: %w", err)
}
log.Debugw("read data from stream reader", "size", len(data))

// defines a memory allocator for allocating memory for Arrow objects
pool := memory.NewCheckedAllocator(&memory.GoAllocator{})

// constructs a parquet file reader object from the byte slice data
pf, err := file.NewParquetReader(bytes.NewReader(data), file.WithReadProps(parquet.NewReaderProperties(pool)))
if err != nil {
return nil, fmt.Errorf("failed to create parquet reader: %w", err)
}
log.Debugw("created parquet reader")

// constructs a reader for converting to Arrow objects from an existing parquet file reader object
reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{
Parallel: cfg.ProcessParallel,
BatchSize: int64(batchSize),
BatchSize: int64(cfg.BatchSize),
}, pool)
if err != nil {
return nil, fmt.Errorf("failed to create pqarrow parquet reader: %w", err)
}
log.Debugw("created pqarrow parquet reader")

// constructs a record reader that is capable of reding entire sets of arrow records
rr, err := reader.GetRecordReader(context.Background(), nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to create parquet record reader: %w", err)
}
log.Debugw("initialization process completed")

return &BufferedReader{
cfg: cfg,
recordReader: rr,
fileReader: pf,
log: log,
}, nil
}

// Next advances the pointer to point to the next record and returns true if the next record exists.
// It will return false if there are no more records to read.
func (sr *BufferedReader) Next() bool {
return sr.recordReader.Next()
next := sr.recordReader.Next()
if !next {
sr.log.Debugw("no more records to read", "next", next)
}
return next
}

// Record reads the current record from the parquet file and returns it as a JSON marshaled byte slice.
Expand All @@ -81,13 +95,16 @@ func (sr *BufferedReader) Next() bool {
func (sr *BufferedReader) Record() ([]byte, error) {
rec := sr.recordReader.Record()
if rec == nil {
sr.log.Debugw("reached the end of the record reader", "record_reader", rec)
return nil, io.EOF
}
defer rec.Release()
val, err := rec.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal JSON for parquet value: %w", err)
}
sr.log.Debugw("records successfully read", "batch_size", sr.cfg.BatchSize)

return val, nil
}

Expand Down
4 changes: 4 additions & 0 deletions x-pack/libbeat/reader/parquet/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/apache/arrow/go/v14/parquet/pqarrow"
"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"
)

// all test files are read from/stored within the "testdata" directory
Expand Down Expand Up @@ -55,6 +57,7 @@ func TestParquetWithRandomData(t *testing.T) {
},
}

logp.TestingSetup()
for i, tc := range testCases {
name := fmt.Sprintf("Test parquet files with rows=%d, and columns=%d", tc.rows, tc.columns)
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -189,6 +192,7 @@ func TestParquetWithFiles(t *testing.T) {
},
}

logp.TestingSetup()
for _, tc := range testCases {
name := fmt.Sprintf("Test parquet files with source file=%s, and target comparison file=%s", tc.parquetFile, tc.jsonFile)
t.Run(name, func(t *testing.T) {
Expand Down
Loading