Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support file-like inputs for RecordReader #59

Conversation

MaxGroot
Copy link
Contributor

  • Peek into the file to find the right adapter
  • Add tests for avro
  • Add avro to testenv
  • Add avro to extras in setup.py

- Peek into the file to find the right adapter
- Add tests for avro
- Add avro to testenv
- Add avro to extras in setup.py
@codecov
Copy link

codecov bot commented Feb 28, 2023

Codecov Report

Merging #59 (60b0b07) into main (6358ba3) will increase coverage by 0.17%.
The diff coverage is 91.59%.

@@            Coverage Diff             @@
##             main      #59      +/-   ##
==========================================
+ Coverage   79.16%   79.34%   +0.17%     
==========================================
  Files          32       32              
  Lines        2894     2924      +30     
==========================================
+ Hits         2291     2320      +29     
- Misses        603      604       +1     
Flag Coverage Δ
unittests 79.34% <91.59%> (+0.17%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Changed Coverage Δ
flow/record/__init__.py 51.51% <ø> (ø)
flow/record/base.py 90.59% <89.01%> (+0.29%) ⬆️
flow/record/adapter/avro.py 79.27% <100.00%> (+0.38%) ⬆️
flow/record/adapter/stream.py 100.00% <100.00%> (ø)
flow/record/exceptions.py 100.00% <100.00%> (ø)
flow/record/stream.py 94.84% <100.00%> (-0.03%) ⬇️

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
Copy link
Member

@yunzheng yunzheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some feedback

flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/adapter/stream.py Outdated Show resolved Hide resolved
flow/record/adapter/stream.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
Previously, rdump would assume a RecordStream for stdin input.
Also implemented some (but not all) of the code review suggestions.
Have to revisit the peeking logic before this is again ready for review.
@MaxGroot
Copy link
Contributor Author

I adopted the suggested changes. Something I wasn't quite sure of was if this 'peeking' logic should also be extended to whenever a path is opened. For example, both

cat records.avro.gz | gunzip | rdump

cat records.avro.gz | rdump

Now work, as the recordadapter will peek into the stream, and in the case of a compressed stream, transparently decompress it. However, when you do a

rdump records.avro.gz

It won't work. While the file pointer is correctly wrapped in a gzip decompressor, the adapter is determined based on the file extension, not on the contents of the file. Therefore, the RecordStream adapter is assumed, which won't work for Avro files. Of course, when you use

rdump avro://records.avro.gz

It does work, as you manually specify which adapter should be used.

I am not sure what api design is desirable here. While I quite like the approach of peeking into files, I understand it's not scalable for all file formats. Having said that, it does sometimes feel inconsistent when the RecordAdapter will automatically switch to a correct RecordReader, and when it will not. Maybe the dictionary below should be extended with some logic to check for compression?

    # Guess adapter based on extension
    ext_to_adapter = {
        ".avro": "avro",
        ".json": "jsonfile",
        ".jsonl": "jsonfile",
        ".csv": "csvfile",
    }

@MaxGroot MaxGroot requested a review from yunzheng August 1, 2023 07:30
flow/record/base.py Outdated Show resolved Hide resolved
Copy link
Member

@yunzheng yunzheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some small suggestions and feedback

flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
Copy link
Member

@yunzheng yunzheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Small unit test issue, then it looks good to go

tests/test_rdump.py Show resolved Hide resolved
MaxGroot and others added 2 commits August 29, 2023 18:30
Co-authored-by: Yun Zheng Hu <hu@fox-it.com>
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
path = str(path)
if isinstance(path, str):
return open_path(path, mode, clobber)
elif isinstance(path, Peekable):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't followed all code paths, but exiting early here in case of a Peekable and not calling open_stream, I figured that if you somehow where to have a Peekable of a (still) compressed stream, we wouldn't do compression stream detection on it. This is probably not a realistic code path though.

flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
Also implement latest code review suggestions
flow/record/adapter/avro.py Outdated Show resolved Hide resolved
tests/test_avro.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
flow/record/base.py Outdated Show resolved Hide resolved
Co-authored-by: Erik Schamper <1254028+Schamper@users.noreply.github.com>
Copy link
Member

@yunzheng yunzheng left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@yunzheng yunzheng merged commit 2e2eb62 into fox-it:main Sep 13, 2023
15 checks passed
yunzheng added a commit that referenced this pull request Oct 11, 2023
Peek into the file to find the right adapter by checking the file magic

---------

Co-authored-by: Max Groot <max.groot@fox-it.com>
Co-authored-by: Yun Zheng Hu <hu@fox-it.com>
Co-authored-by: Erik Schamper <1254028+Schamper@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants