Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source: files (s3 and local) #653

Merged
merged 24 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c66e2a5
initial commit, try to fix pickling error
tim-quix Nov 25, 2024
6389fcd
fixed issue around pickling, need to simplify file gathering and part…
tim-quix Nov 25, 2024
fa4ad96
created more shared methods and unified classes more
tim-quix Nov 26, 2024
9045a00
a few more tweaks
tim-quix Nov 26, 2024
6fee37d
move everything to file source
tim-quix Nov 26, 2024
5dd5d98
remove previous blob_store addition
tim-quix Nov 26, 2024
dae3fc6
clean up some things, add pagination handling to azure
tim-quix Nov 27, 2024
211b3cd
add docs, rename some stuff
tim-quix Nov 27, 2024
e23fc57
clean up schema doc section
tim-quix Nov 27, 2024
71792df
clean up compressor
tim-quix Nov 27, 2024
4c3e6c3
tiny doc tweaks
tim-quix Nov 27, 2024
8cff599
changes based on code review
tim-quix Nov 27, 2024
8a699ac
Update docs/connectors/sources/aws-s3-file-source.md
tim-quix Nov 27, 2024
c26a217
change bucket variable name
tim-quix Nov 27, 2024
bb12e07
update local file docs
tim-quix Nov 27, 2024
07af800
add azure docs
tim-quix Nov 27, 2024
72cba51
clean up azure stuff and other small tweaks
tim-quix Nov 28, 2024
f581e28
clean up after second round of code review
tim-quix Dec 2, 2024
4726928
doc inclusions and cleanup
tim-quix Dec 2, 2024
5331152
some tweaks for consistency across source and sink
tim-quix Dec 2, 2024
bd1bdff
adjust replay to only sleep with large enough deltas
tim-quix Dec 2, 2024
2c8483b
adjust replay speed to accept floats rather than a bool
tim-quix Dec 3, 2024
5a2e771
remove azure stuff
tim-quix Dec 3, 2024
e248f0c
fix a doc ref
tim-quix Dec 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/build/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,8 @@
"quixstreams.sources.core.kafka.quix",
"quixstreams.sources.community.file.file",
"quixstreams.sources.community.file.compressions.gzip",
"quixstreams.sources.community.file.origins.local",
"quixstreams.sources.community.file.origins.s3",
"quixstreams.sources.community.file.formats.json",
"quixstreams.sources.community.file.formats.parquet",
"quixstreams.sources.community.kinesis.kinesis",
Expand Down
89 changes: 0 additions & 89 deletions docs/connectors/sources/file-source.md

This file was deleted.

171 changes: 171 additions & 0 deletions docs/connectors/sources/local-file-source.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
# Local File Source

!!! info

This is a **Community** connector. Test it before using in production.

To learn more about differences between Core and Community connectors, see the [Community and Core Connectors](../community-and-core.md) page.

This source reads records from files at a local directory and produces
them as messages to a kafka topic using any desired `StreamingDataFrame`-based transformations.

The resulting messages can be produced in "replay" mode, where the time between record
producing is matched as close as possible to the original. (per topic partition only).


## How To Install

Simply install Quix Streams, no options required:

```bash
pip install quixstreams
```

## How It Works

`FileSource` steps through each folder within the provided path and dumps each record
contained in each file as a message to a Kafka topic. Folders are navigated in
lexicographical order.

Records are read in a streaming fashion and committed after every file, offering
[at-least-once guarantees](#processingdelivery-guarantees).

It can handle one given file type (ex. JSONlines or Parquet) at a time, and also
supports file decompression.

You can learn more details about the [expected kafka message format](#message-data-formatschema) below.

## How To Use

Local File Source is the default configuration of the `FileSource` connector.

Simply hand the configured `FileSource` (without a `origin`) to your `SDF`
(`app.dataframe(source=<SOURCE>)`).

For more details around various settings, see [configuration](#configuration).

```python
from quixstreams import Application
from quixstreams.sources.community.file import FileSource

app = Application(broker_address="localhost:9092")
source = FileSource(
directory="/path/to/my/topic_folder",
format="json",
compression="gzip",
replay_speed=1.0,
)
sdf = app.dataframe(source=source).print(metadata=True)
# YOUR LOGIC HERE!

if __name__ == "__main__":
app.run()
```

## Configuration

Here are some important configurations to be aware of (see [File Source API](../../api-reference/sources.md#filesource) for all parameters).

### Required:

- `directory`: a directory to recursively read through (exclude bucket name).
**Note**: If using alongside `FileSink`, provide the path to the topic name folder (ex: `"path/to/topic_a/"`).

### Optional:

- `format`: what format the message files are in (ex: `"json"`, `"parquet"`).
**Advanced**: can optionally provide a `Format` instance (`compression` will then be ignored).
**Default**: `"json"`
- `compression`: what compression is used on the given files, if any (ex: `"gzip"`)
**Default**: `None`
- `replay_speed`: Produce the messages with this speed multiplier, which roughly
reflects the time "delay" between the original message producing.
Use any `float` `>= 0.0`, where `0.0` is no delay, and `1.0` is the original speed.
**Note**: Time delay will only be accurate _per partition_, NOT overall.
**Default**: 1.0


## File hierarchy/structure

The File Source expects a folder structure like so:

```
my_sinked_topics/
├── topic_a/ # topic name (use this path to File Source!)
│ ├── 0/ # topic partition number
│ │ ├── 0000.ext # formatted offset files (ex: JSON)
│ │ └── 0011.ext
│ └── 1/
│ ├── 0003.ext
│ └── 0016.ext
└── topic_b/
└── etc...
```

This is the default structure generated by the File Sink.

## Message Data Format/Schema

The expected file schema largely depends on the chosen
file format.

For easiest use (especially alongside [`FileSink`](../sinks/file-sink.md)),
you can follow these patterns:

### Row-based Formats (ex: JSON)

Files should have records with the following fields, with `_value` being a
JSON-deserializable item:

- `_key`
- `_value`
- `_timestamp`


This will result in the following Kafka message format for `Application`:

- Message `key` will be the record `_key` as `bytes`.
- Message `value` will be the record `_value` as a `json`/`dict`
- Message `timestamp` will be the record `_timestamp` (ms).

### Columnar Formats (ex: Parquet)
These do not expect an explicit `value` field; instead all columns should be included
individually while including `_key` and `_timestamp`:

- `_key`
- `_timestamp`
- `field_a`
- `field_b`
etc...


This will result in the following Kafka message format for `Application`:

- Message `key` will be the record `_key` as `bytes`.
- Message `value` will be every record field except `_key` and `_timestamp` packed as a `json`/`dict`
- Message `timestamp` will be the record `_timestamp` (ms).


### Custom Schemas (Advanced)

If the original files are not formatted as expected, custom loaders can be configured
on some `Format` classes (ex: `JsonFormat`) which can be handed to `FileSource(format=<Format>)`.

Formats can be imported from `quixstreams.sources.community.file.formats`.

## Processing/Delivery Guarantees

This Source offers "at-least-once" guarantees with message delivery: messages are
guaranteed to be committed when a file is finished processing.

However, it does not save any state/position: an unhandled exception will cause the
`Application` to fail, and rerunning the `Application` will begin processing from the
beginning (reproducing all previously processed messages).

## Topic

The default topic will have a partition count that reflects the partition count found
within the provided topic's folder structure.

The default topic name the Application dumps to is based on the last folder name of
the `FileSource` `directory` as: `source__<last folder name>`.
Loading
Loading