Skip to content

Commit

Permalink
[faker] decouple stream state (#20492)
Browse files Browse the repository at this point in the history
* [faker] decouple stream state

* add PR #

* commit Stream instantiate changes

* fixup expected record

* skip backward test for this version too

* Apply suggestions from code review

Co-authored-by: Augustin <augustin@airbyte.io>

* lint

* Create realistic datasets of 10GB, 100GB, and 1TB in size (#20558)

* Faker CSV Streaming utilities

* readme

* don't do a final pipe to jq or you will run out or ram

* doc

* Faker gets 250% faster (#20741)

* Faker is 250% faster

* threads in spec + lint

* pass tests

* revert changes to record helper

* cleanup

* update expected_records

* bump default records-per-slice to 1k

* enforce unique email addresses

* cleanup

* more comments

* `parallelism` and pass tests

* update expected records

* cleanup notes

* update readme

* update expected records

* auto-bump connector version

Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored Jan 3, 2023
1 parent 7033b1e commit 2022f7d
Show file tree
Hide file tree
Showing 19 changed files with 564 additions and 289 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,7 @@
- name: Sample Data (Faker)
sourceDefinitionId: dfd88b22-b603-4c3d-aad7-3701784586b1
dockerRepository: airbyte/source-faker
dockerImageTag: 1.0.0
dockerImageTag: 2.0.0
documentationUrl: https://docs.airbyte.com/integrations/sources/faker
sourceType: api
releaseStage: alpha
Expand Down
13 changes: 11 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3866,7 +3866,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-faker:1.0.0"
- dockerImage: "airbyte/source-faker:2.0.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/faker"
connectionSpecification:
Expand Down Expand Up @@ -3907,8 +3907,17 @@
\ before a state message is emitted?"
type: "integer"
minimum: 1
default: 100
default: 1000
order: 3
parallelism:
title: "Parallelism"
description: "How many parallel workers should we use to generate fake data?\
\ Choose a value equal to the number of CPUs you will allocate to this\
\ source."
type: "integer"
minimum: 1
default: 4
order: 4
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-faker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,5 @@ COPY source_faker ./source_faker
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.version=2.0.0
LABEL io.airbyte.name=airbyte/source-faker
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ acceptance_tests:
tests:
- config_path: secrets/config.json
backward_compatibility_tests_config:
disable_for_version: "0.2.1"
disable_for_version: "1.0.0" # We changed the cursor field of the Purchases stream in 2.0.0
basic_read:
tests:
- config_path: secrets/config.json
Expand Down
76 changes: 76 additions & 0 deletions airbyte-integrations/connectors/source-faker/csv_export/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Python Source CSV Export

This collection of tools is used to run the source and capture it's AirbyteMessages and convert them into CSV files. This is useful if you want to manually inspect this data or load it into a database manually.

To be fast, we make use of parallel processing per-stream and only using command-line tools. This works by the main file (`main.sh`) running the source via python and tee-ing the output of RECORDS to sub-scripts which use `jq` to convert the records into CSV-delimited output, which we finally write to disk.

As we read the connector config files, e.g. `--config secrets/config.json --state secrets/state.json --catalog integration_tests/configured_catalog.json`, you can manually step forward your sync if you need to read and store the input in chunks.

## The road to 1TB of faker data

There's commentary on this at https://github.com/airbytehq/airbyte/pull/20558, along with some cool SQL tricks.

- 2 Billion faker users for 1TB: `10,000,000*(1024/5.02) = 2,039,840,637`
- 200 Million faker users for 100GB: `10,000,000*(100/5.02) = 199,203,187`
- 20 Million faker users for 10GB: `10,000,000*(10/5.02) = 19,920,318`

But let's assume we don't have 1TB of local hard disk. So, we want to make 10 chunks of data, each around 100GB in size.

**`config.json`**

```json
{
"count": 2039840637,
"seed": 0,
"records_per_sync": 203984064
}
```

**`state.json`**

At the end of every sync, increment the `id` in the users stream and the `user_id` in the purchases stream by `203984064`, the `records_per_sync` chunk size

```json
[
{
"type": "STREAM",
"stream": {
"stream_state": {
"id": 0
},
"stream_descriptor": {
"name": "users"
}
}
},
{
"type": "STREAM",
"stream": {
"stream_state": {
"id": 0,
"user_id": 0
},
"stream_descriptor": {
"name": "purchases"
}
}
},
{
"type": "STREAM",
"stream": {
"stream_state": {
"id": 0
},
"stream_descriptor": {
"name": "products"
}
}
}
]
```

Finally, ensure that you've opted-into all the streams in `integration_tests/configured_catalog.json`

## TODO

- This is currently set up very manually, in that we build bash scripts for each stream and manually populate the header information. This information all already lives in the connector's catalog. We probably could build these bash files on-demand with a python script...
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env bash

cd "$(dirname "$0")"
cd ".."

mkdir -p /tmp/csv

python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json --state secrets/state.json \
| tee >(./csv_export/purchases.sh) >(./csv_export/products.sh) >(./csv_export/users.sh)
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

cd "$(dirname "$0")"

FILE="/tmp/csv/products.csv"

rm -rf $FILE

echo "make, model, price, created_at" >> $FILE

jq -c 'select((.type | contains("RECORD")) and (.record.stream | contains("products"))) .record.data' \
| jq -r '[.make, .model, .price, .created_at] | @csv' \
>> $FILE
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

cd "$(dirname "$0")"

FILE="/tmp/csv/purchases.csv"

rm -rf $FILE

echo "id, product_id, user_id, added_to_cart_at, purchased_at, returned_at" >> $FILE

jq -c 'select((.type | contains("RECORD")) and (.record.stream | contains("purchases"))) .record.data' \
| jq -r '[.id, .product_id, .user_id, .added_to_cart_at, .purchased_at, .returned_at] | @csv' \
>> $FILE
13 changes: 13 additions & 0 deletions airbyte-integrations/connectors/source-faker/csv_export/users.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/usr/bin/env bash

cd "$(dirname "$0")"

FILE="/tmp/csv/users.csv"

rm -rf $FILE

echo "id, created_at, updated_at, name, title, age, email, telephone, gender, language, academic_degree, nationality, occupation, height, blood_type, weight" >> $FILE

jq -c 'select((.type | contains("RECORD")) and (.record.stream | contains("users"))) .record.data' \
| jq -r '[.id, .created_at, .updated_at, .name, .title, .age, .email, .telephone, .gender, .language, .academic_degree, .nationality, .occupation, .height, .blood_type, .weight] | @csv' \
>> $FILE
Loading

0 comments on commit 2022f7d

Please sign in to comment.