Skip to content

Commit

Permalink
[libbeat][parquet reader] - Added debug logs & improved batch_size tr…
Browse files Browse the repository at this point in the history
…acking (#40651) (#40673)

(cherry picked from commit 7f317fd)

Co-authored-by: ShourieG <105607378+ShourieG@users.noreply.github.com>
  • Loading branch information
mergify[bot] and ShourieG authored Sep 2, 2024
1 parent 883886f commit 88cc526
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 6 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Move x-pack/filebeat/input/salesforce jwt import to v5. {pull}39823[39823]
- Drop x-pack/filebeat/input dependency on github.com/lestrrat-go/jwx/v2. {pull}39968[39968]
- Added `ignore_empty_values` flag in `decode_cef` Filebeat processor. {pull}40268[40268]
- Bump version of elastic/toutoumomoma to remove internal forks of stdlib debug packages. {pull}40325[40325]
- Refactor x-pack/filebeat/input/websocket for generalisation. {pull}40308[40308]
- Add a configuration option for TCP/UDP network type. {issue}40407[40407] {pull}40623[40623]
- Added debug logging to parquet reader in x-pack/libbeat/reader. {pull}40651[40651]

==== Deprecated

Expand Down
29 changes: 23 additions & 6 deletions x-pack/libbeat/reader/parquet/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,65 +14,79 @@ import (
"github.com/apache/arrow/go/v14/parquet"
"github.com/apache/arrow/go/v14/parquet/file"
"github.com/apache/arrow/go/v14/parquet/pqarrow"

"github.com/elastic/elastic-agent-libs/logp"
)

// BufferedReader parses parquet inputs from io streams.
type BufferedReader struct {
cfg *Config
fileReader *file.Reader
recordReader pqarrow.RecordReader
log *logp.Logger
}

// NewBufferedReader creates a new reader that can decode parquet data from an io.Reader.
// It will return an error if the parquet data stream cannot be read.
// Note: As io.ReadAll is used, the entire data stream would be read into memory, so very large data streams
// may cause memory bottleneck issues.
func NewBufferedReader(r io.Reader, cfg *Config) (*BufferedReader, error) {
batchSize := 1
if cfg.BatchSize > 1 {
batchSize = cfg.BatchSize
log := logp.L().Named("reader.parquet")

if cfg.BatchSize == 0 {
cfg.BatchSize = 1
}
log.Debugw("creating parquet reader", "batch_size", cfg.BatchSize)

// reads the contents of the reader object into a byte slice
data, err := io.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("failed to read data from stream reader: %w", err)
}
log.Debugw("read data from stream reader", "size", len(data))

// defines a memory allocator for allocating memory for Arrow objects
pool := memory.NewCheckedAllocator(&memory.GoAllocator{})

// constructs a parquet file reader object from the byte slice data
pf, err := file.NewParquetReader(bytes.NewReader(data), file.WithReadProps(parquet.NewReaderProperties(pool)))
if err != nil {
return nil, fmt.Errorf("failed to create parquet reader: %w", err)
}
log.Debugw("created parquet reader")

// constructs a reader for converting to Arrow objects from an existing parquet file reader object
reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{
Parallel: cfg.ProcessParallel,
BatchSize: int64(batchSize),
BatchSize: int64(cfg.BatchSize),
}, pool)
if err != nil {
return nil, fmt.Errorf("failed to create pqarrow parquet reader: %w", err)
}
log.Debugw("created pqarrow parquet reader")

// constructs a record reader that is capable of reding entire sets of arrow records
rr, err := reader.GetRecordReader(context.Background(), nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to create parquet record reader: %w", err)
}
log.Debugw("initialization process completed")

return &BufferedReader{
cfg: cfg,
recordReader: rr,
fileReader: pf,
log: log,
}, nil
}

// Next advances the pointer to point to the next record and returns true if the next record exists.
// It will return false if there are no more records to read.
func (sr *BufferedReader) Next() bool {
return sr.recordReader.Next()
next := sr.recordReader.Next()
if !next {
sr.log.Debugw("no more records to read", "next", next)
}
return next
}

// Record reads the current record from the parquet file and returns it as a JSON marshaled byte slice.
Expand All @@ -81,13 +95,16 @@ func (sr *BufferedReader) Next() bool {
func (sr *BufferedReader) Record() ([]byte, error) {
rec := sr.recordReader.Record()
if rec == nil {
sr.log.Debugw("reached the end of the record reader", "record_reader", rec)
return nil, io.EOF
}
defer rec.Release()
val, err := rec.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to marshal JSON for parquet value: %w", err)
}
sr.log.Debugw("records successfully read", "batch_size", sr.cfg.BatchSize)

return val, nil
}

Expand Down
4 changes: 4 additions & 0 deletions x-pack/libbeat/reader/parquet/parquet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"github.com/apache/arrow/go/v14/arrow/memory"
"github.com/apache/arrow/go/v14/parquet/pqarrow"
"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"
)

// all test files are read from/stored within the "testdata" directory
Expand Down Expand Up @@ -55,6 +57,7 @@ func TestParquetWithRandomData(t *testing.T) {
},
}

logp.TestingSetup()
for i, tc := range testCases {
name := fmt.Sprintf("Test parquet files with rows=%d, and columns=%d", tc.rows, tc.columns)
t.Run(name, func(t *testing.T) {
Expand Down Expand Up @@ -189,6 +192,7 @@ func TestParquetWithFiles(t *testing.T) {
},
}

logp.TestingSetup()
for _, tc := range testCases {
name := fmt.Sprintf("Test parquet files with source file=%s, and target comparison file=%s", tc.parquetFile, tc.jsonFile)
t.Run(name, func(t *testing.T) {
Expand Down

0 comments on commit 88cc526

Please sign in to comment.