From 226d5074fa60e39e2711307199e335d25c27ff6d Mon Sep 17 00:00:00 2001 From: Chloe He Date: Thu, 11 Jul 2024 13:17:27 -0700 Subject: [PATCH] docs: update repo link --- .../index/execute-results/html.json | 4 ++-- .../index/execute-results/html.json | 4 ++-- docs/posts/flink-announcement/index.qmd | 20 +++++++++---------- docs/posts/unified-stream-batch/index.qmd | 6 +++--- 4 files changed, 17 insertions(+), 17 deletions(-) diff --git a/docs/_freeze/posts/flink-announcement/index/execute-results/html.json b/docs/_freeze/posts/flink-announcement/index/execute-results/html.json index 6365ae1f8b85..940bdc8dc0b3 100644 --- a/docs/_freeze/posts/flink-announcement/index/execute-results/html.json +++ b/docs/_freeze/posts/flink-announcement/index/execute-results/html.json @@ -1,8 +1,8 @@ { - "hash": "c0db90940b7122a139297aa7276d3e20", + "hash": "a90b2eb61dcf4eeb03d701976ffff0c2", "result": { "engine": "jupyter", - "markdown": "---\ntitle: \"Ibis goes real-time! Introducing the new Flink backend for Ibis\"\nauthor: \"Deepyaman Datta\"\ndate: \"2024-02-12\"\ncategories:\n - blog\n - flink\n - stream processing\n---\n\n## Introduction\n\nIbis 8.0 marks the official release of the Apache Flink backend for Ibis. Ibis\nusers can now manipulate data across streaming and batch contexts using the same\ninterface. Flink is one of the most established stream-processing frameworks out\nthere and a central part of the real-time data infrastructure at companies like\nDoorDash, LinkedIn, Netflix, and Uber. It is commonly applied in use cases such\nas fraud detection, anomaly detection, real-time recommendation, dynamic\npricing, and online advertising. The Flink backend is also the first streaming\nbackend Ibis supports. Follow along as we define and execute a simple streaming\njob using Ibis!\n\n## Installation prerequisites\n\n* **Docker Compose:** This tutorial uses Docker Compose to manage an Apache\n Kafka environment (including sample data generation) and a Flink cluster (for\n [remote execution](#remote-execution)). You can [download and install Docker\n Compose from the official website](https://docs.docker.com/compose/install/).\n* **JDK 11:** Flink requires Java 11. If you don't already have JDK 11\n installed, you can [get the appropriate Eclipse Temurin\n release](https://adoptium.net/temurin/releases/?package=jdk&version=11).\n* **Python:** To follow along, you need Python 3.9 or 3.10.\n\n## Installing the Flink backend for Ibis\n\nWe use a Python client to explore data in Kafka topics. You can install it,\nalongside the Flink backend for Ibis, with `pip`, `conda`, `mamba`, or `pixi`:\n\n::: {.panel-tabset}\n\n## Using `pip`\n\n```bash\npip install ibis-framework apache-flink kafka-python\n```\n\n## Using `conda`\n\n```bash\nconda install -c conda-forge ibis-flink\n```\n\n## Using `mamba`\n\n```bash\nmamba install -c conda-forge ibis-flink\n```\n\n## Using `pixi`\n\n```bash\npixi add ibis-flink\n```\n\n:::\n\n## Spinning up the services using Docker Compose\n\nThe [ibis-project/ibis-flink-example GitHub\nrepository](https://github.com/ibis-project/ibis-flink-example) includes the\nrelevant Docker Compose configuration for this tutorial. Clone the repository,\nand run `docker compose up` from the cloned directory to create Kafka topics,\ngenerate sample data, and launch a Flink cluster:\n\n```bash\ngit clone https://github.com/claypotai/ibis-flink-example.git\ncd ibis-flink-example\ndocker compose up\n```\n\n::: {.callout-tip}\nIf you don't intend to try [remote execution](#remote-execution), you can start\nonly the Kafka-related services with `docker compose up kafka init-kafka\ndata-generator`.\n:::\n\nAfter a few seconds, you should see messages indicating your Kafka environment\nis ready:\n\n```bash\nibis-flink-example-init-kafka-1 | Successfully created the following topics:\nibis-flink-example-init-kafka-1 | payment_msg\nibis-flink-example-init-kafka-1 | sink\nibis-flink-example-init-kafka-1 exited with code 0\nibis-flink-example-data-generator-1 | Connected to Kafka\nibis-flink-example-data-generator-1 | Producing 20000 records to Kafka topic payment_msg\n```\n\nThis example uses mock payments data. The `payment_msg` Kafka topic contains\nmessages in the following format:\n\n```json\n{\n \"createTime\": \"2023-09-20 22:19:02.224\",\n \"orderId\": 1695248388,\n \"payAmount\": 88694.71922270155,\n \"payPlatform\": 0,\n \"provinceId\": 6\n}\n```\n\nIn a separate terminal, we can explore what these messages look like:\n\n::: {#04fb6b63 .cell execution_count=1}\n``` {.python .cell-code}\nfrom itertools import islice\n\nfrom kafka import KafkaConsumer\n\nconsumer = KafkaConsumer(\"payment_msg\")\nfor msg in islice(consumer, 3):\n print(msg)\n```\n\n::: {.cell-output .cell-output-stdout}\n```\nConsumerRecord(topic='payment_msg', partition=0, offset=1087, timestamp=1707754299796, timestamp_type=0, key=None, value=b'{\"createTime\": \"2024-02-12 16:11:39.795\", \"orderId\": 1707754842, \"payAmount\": 56028.520857965006, \"payPlatform\": 0, \"provinceId\": 3}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=132, serialized_header_size=-1)\nConsumerRecord(topic='payment_msg', partition=0, offset=1088, timestamp=1707754300298, timestamp_type=0, key=None, value=b'{\"createTime\": \"2024-02-12 16:11:40.298\", \"orderId\": 1707754843, \"payAmount\": 55081.020819104546, \"payPlatform\": 0, \"provinceId\": 4}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=132, serialized_header_size=-1)\nConsumerRecord(topic='payment_msg', partition=0, offset=1089, timestamp=1707754300800, timestamp_type=0, key=None, value=b'{\"createTime\": \"2024-02-12 16:11:40.800\", \"orderId\": 1707754844, \"payAmount\": 30998.71866136802, \"payPlatform\": 0, \"provinceId\": 4}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=131, serialized_header_size=-1)\n```\n:::\n:::\n\n\n## Running the tutorial\n\nThis tutorial uses Ibis with the Flink backend to process the aforementioned\npayment messages. You can choose to either [run it locally](#local-execution) or\n[submit a job to an already-running Flink cluster](#remote-execution).\n\n### Local execution\n\nThe simpler option is to run the example using the Flink mini cluster.\n\n#### Create a table environment\n\nThe [table\nenvironment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/table/table_environment/)\nserves as the main entry point for interacting with the Flink runtime. The\n`flink` backend does not create `TableEnvironment` objects; you must create a\n`TableEnvironment` and pass that to\n[`ibis.flink.connect`](../../backends/flink.qmd#ibis.flink.connect):\n\n::: {#56b2d013 .cell execution_count=2}\n``` {.python .cell-code}\nimport ibis\nfrom pyflink.table import EnvironmentSettings, TableEnvironment\n\nenv_settings = EnvironmentSettings.in_streaming_mode()\ntable_env = TableEnvironment.create(env_settings)\ntable_env.get_config().set(\"parallelism.default\", \"1\") # <1>\n\ncon = ibis.flink.connect(table_env)\n```\n:::\n\n\n1. write all the data to one file\n\nFlink’s streaming connectors aren't part of the binary distribution. Link the\n[Kafka\nconnector](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/kafka/)\nfor cluster execution by adding the JAR file from the cloned repository. [Ibis\nexposes the `raw_sql` method for situations like this, where you need to run\narbitrary SQL that cannot be modeled as a table\nexpression](https://ibis-project.org/how-to/extending/sql#backend.raw_sql):\n\n\n\n::: {#f6ddf87a .cell execution_count=4}\n``` {.python .cell-code}\ncon.raw_sql(\"ADD JAR 'flink-sql-connector-kafka-3.0.2-1.18.jar'\")\n```\n:::\n\n\n#### Create the source and sink tables\n\nUse\n[`create_table`](../../backends/flink.qmd#ibis.backends.flink.Backend.create_table)\nto register tables. Notice the new top-level `ibis.watermark` API for\n[specifying a watermark\nstrategy](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/time/#event-time-and-watermarks).\n\n::: {#7018745d .cell execution_count=5}\n``` {.python .cell-code}\nsource_schema = ibis.schema( # <1>\n { # <1>\n \"createTime\": \"timestamp(3)\", # <1>\n \"orderId\": \"int64\", # <1>\n \"payAmount\": \"float64\", # <1>\n \"payPlatform\": \"int32\", # <1>\n \"provinceId\": \"int32\", # <1>\n } # <1>\n) # <1>\n\nsource_configs = { # <1>\n \"connector\": \"kafka\", # <1>\n \"topic\": \"payment_msg\", # <1>\n \"properties.bootstrap.servers\": \"localhost:9092\", # <1>\n \"properties.group.id\": \"test_3\", # <1>\n \"scan.startup.mode\": \"earliest-offset\", # <1>\n \"format\": \"json\", # <1>\n} # <1>\n\nt = con.create_table( # <1>\n \"payment_msg\", # <1>\n schema=source_schema, # <1>\n tbl_properties=source_configs, # <1>\n watermark=ibis.watermark( # <1>\n time_col=\"createTime\", allowed_delay=ibis.interval(seconds=15) # <1>\n ), # <1>\n) # <1>\n\nsink_schema = ibis.schema( # <2>\n { # <2>\n \"province_id\": \"int32\", # <2>\n \"pay_amount\": \"float64\", # <2>\n } # <2>\n) # <2>\n\nsink_configs = { # <2>\n \"connector\": \"kafka\", # <3>\n \"topic\": \"sink\", # <2>\n \"properties.bootstrap.servers\": \"localhost:9092\", # <2>\n \"format\": \"json\", # <2>\n} # <2>\n\ncon.create_table( # <2>\n \"total_amount_by_province_id\", schema=sink_schema, tbl_properties=sink_configs # <2>\n) # <2>\n```\n\n::: {.cell-output .cell-output-display execution_count=5}\n```{=html}\n
DatabaseTable: total_amount_by_province_id\n  province_id int32\n  pay_amount  float64\n
\n```\n:::\n:::\n\n\n1. create source Table\n2. create sink Table\n\n#### Perform calculations\n\nCompute the total pay amount per province in the past 10 seconds (as of each\nmessage, for the province in the incoming message):\n\n::: {#b7bf4f29 .cell execution_count=6}\n``` {.python .cell-code}\nagged = t.select(\n province_id=t.provinceId,\n pay_amount=t.payAmount.sum().over(\n range=(-ibis.interval(seconds=10), 0),\n group_by=t.provinceId,\n order_by=t.createTime,\n ),\n)\n```\n:::\n\n\nFinally, emit the query result to the sink table:\n\n::: {#f50a9b26 .cell execution_count=7}\n``` {.python .cell-code}\ncon.insert(\"total_amount_by_province_id\", agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=7}\n```\n\n```\n:::\n:::\n\n\n### Remote execution\n\nYou can also submit the example to the [remote cluster started using Docker\nCompose](#spinning-up-the-services-using-docker-compose). The\n`window_aggregation.py` file in the cloned repository contains the [same steps\nthat we performed for local execution](#local-execution). We will [use the\nmethod described in the official Flink\ndocumentation](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/cli/#submitting-pyflink-jobs).\n\n::: {.callout-tip}\nYou can find the `./bin/flink` executable with the following command:\n\n```bash\npython -c'from pathlib import Path; import pyflink; print(Path(pyflink.__spec__.origin).parent / \"bin\" / \"flink\")'\n```\n:::\n\nMy full command looks like this:\n\n```bash\n/opt/miniconda3/envs/ibis-dev/lib/python3.10/site-packages/pyflink/bin/flink run --jobmanager localhost:8081 --python window_aggregation.py\n```\n\nThe command will exit after displaying a submission message:\n\n```\nJob has been submitted with JobID b816faaf5ef9126ea5b9b6a37012cf56\n```\n\n## Viewing the results\n\nSimilar to how we viewed messages in the `payment_msg` topic, we can print\nresults from the `sink` topic:\n\n::: {#f2f232b9 .cell execution_count=8}\n``` {.python .cell-code}\nconsumer = KafkaConsumer(\"sink\")\nfor msg in islice(consumer, 10):\n print(msg)\n```\n\n::: {.cell-output .cell-output-stdout}\n```\nConsumerRecord(topic='sink', partition=0, offset=0, timestamp=1707754308001, timestamp_type=0, key=None, value=b'{\"province_id\":0,\"pay_amount\":34598.581794063946}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=1, timestamp=1707754308005, timestamp_type=0, key=None, value=b'{\"province_id\":3,\"pay_amount\":97439.80920527918}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=2, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":5,\"pay_amount\":94291.98939770168}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=3, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":2,\"pay_amount\":67249.40686490892}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=4, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":0,\"pay_amount\":75995.17413984003}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=5, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":6,\"pay_amount\":73913.82666977578}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=6, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":6,\"pay_amount\":110483.70544553592}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=7, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":3,\"pay_amount\":146010.17295813354}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=8, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":3,\"pay_amount\":200463.85497267012}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=9, timestamp=1707754308007, timestamp_type=0, key=None, value=b'{\"province_id\":3,\"pay_amount\":208752.74642677643}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\n```\n:::\n:::\n\n\nVoilà! You've run your first streaming application using Ibis.\n\n## Shutting down the Compose environment\n\nPress Ctrl+C to stop the Docker Compose containers. Once\nstopped, run `docker compose down` to remove the services created for this\ntutorial.\n\n", + "markdown": "---\ntitle: \"Ibis goes real-time! Introducing the new Flink backend for Ibis\"\nauthor: \"Deepyaman Datta\"\ndate: \"2024-02-12\"\ncategories:\n - blog\n - flink\n - stream processing\n---\n\n## Introduction\n\nIbis 8.0 marks the official release of the Apache Flink backend for Ibis. Ibis\nusers can now manipulate data across streaming and batch contexts using the same\ninterface. Flink is one of the most established stream-processing frameworks out\nthere and a central part of the real-time data infrastructure at companies like\nDoorDash, LinkedIn, Netflix, and Uber. It is commonly applied in use cases such\nas fraud detection, anomaly detection, real-time recommendation, dynamic\npricing, and online advertising. The Flink backend is also the first streaming\nbackend Ibis supports. Follow along as we define and execute a simple streaming\njob using Ibis!\n\n## Installation prerequisites\n\n* **Docker Compose:** This tutorial uses Docker Compose to manage an Apache\n Kafka environment (including sample data generation) and a Flink cluster (for\n [remote execution](#remote-execution)). You can [download and install Docker\n Compose from the official website](https://docs.docker.com/compose/install/).\n* **JDK 11:** Flink requires Java 11. If you don't already have JDK 11\n installed, you can [get the appropriate Eclipse Temurin\n release](https://adoptium.net/temurin/releases/?package=jdk&version=11).\n* **Python:** To follow along, you need Python 3.9 or 3.10.\n\n## Installing the Flink backend for Ibis\n\nWe use a Python client to explore data in Kafka topics. You can install it,\nalongside the Flink backend for Ibis, with `pip`, `conda`, `mamba`, or `pixi`:\n\n::: {.panel-tabset}\n\n## Using `pip`\n\n```bash\npip install ibis-framework apache-flink kafka-python\n```\n\n## Using `conda`\n\n```bash\nconda install -c conda-forge ibis-flink\n```\n\n## Using `mamba`\n\n```bash\nmamba install -c conda-forge ibis-flink\n```\n\n## Using `pixi`\n\n```bash\npixi add ibis-flink\n```\n\n:::\n\n## Spinning up the services using Docker Compose\n\nThe [ibis-project/realtime-fraud-detection GitHub\nrepository](https://github.com/ibis-project/realtime-fraud-detection) includes the\nrelevant Docker Compose configuration for this tutorial. Clone the repository,\nand run `docker compose up` from the cloned directory to create Kafka topics,\ngenerate sample data, and launch a Flink cluster:\n\n```bash\ngit clone https://github.com/claypotai/realtime-fraud-detection.git\ncd realtime-fraud-detection\ndocker compose up\n```\n\n::: {.callout-tip}\nIf you don't intend to try [remote execution](#remote-execution), you can start\nonly the Kafka-related services with `docker compose up kafka init-kafka\ndata-generator`.\n:::\n\nAfter a few seconds, you should see messages indicating your Kafka environment\nis ready:\n\n```bash\nrealtime-fraud-detection-init-kafka-1 | Successfully created the following topics:\nrealtime-fraud-detection-init-kafka-1 | payment_msg\nrealtime-fraud-detection-init-kafka-1 | sink\nrealtime-fraud-detection-init-kafka-1 exited with code 0\nrealtime-fraud-detection-data-generator-1 | Connected to Kafka\nrealtime-fraud-detection-data-generator-1 | Producing 20000 records to Kafka topic payment_msg\n```\n\nThis example uses mock payments data. The `payment_msg` Kafka topic contains\nmessages in the following format:\n\n```json\n{\n \"createTime\": \"2023-09-20 22:19:02.224\",\n \"orderId\": 1695248388,\n \"payAmount\": 88694.71922270155,\n \"payPlatform\": 0,\n \"provinceId\": 6\n}\n```\n\nIn a separate terminal, we can explore what these messages look like:\n\n::: {#70bf6cfd .cell execution_count=1}\n``` {.python .cell-code}\nfrom itertools import islice\n\nfrom kafka import KafkaConsumer\n\nconsumer = KafkaConsumer(\"payment_msg\")\nfor msg in islice(consumer, 3):\n print(msg)\n```\n\n::: {.cell-output .cell-output-stdout}\n```\nConsumerRecord(topic='payment_msg', partition=0, offset=9430, timestamp=1720728902139, timestamp_type=0, key=None, value=b'{\"createTime\": \"2024-07-11 20:15:02.139\", \"orderId\": 1720733590, \"payAmount\": 21569.762559328065, \"payPlatform\": 0, \"provinceId\": 5}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=132, serialized_header_size=-1)\nConsumerRecord(topic='payment_msg', partition=0, offset=9431, timestamp=1720728902640, timestamp_type=0, key=None, value=b'{\"createTime\": \"2024-07-11 20:15:02.640\", \"orderId\": 1720733591, \"payAmount\": 81153.99302227204, \"payPlatform\": 0, \"provinceId\": 1}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=131, serialized_header_size=-1)\nConsumerRecord(topic='payment_msg', partition=0, offset=9432, timestamp=1720728903144, timestamp_type=0, key=None, value=b'{\"createTime\": \"2024-07-11 20:15:03.144\", \"orderId\": 1720733592, \"payAmount\": 33284.5361487847, \"payPlatform\": 0, \"provinceId\": 3}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=130, serialized_header_size=-1)\n```\n:::\n:::\n\n\n## Running the tutorial\n\nThis tutorial uses Ibis with the Flink backend to process the aforementioned\npayment messages. You can choose to either [run it locally](#local-execution) or\n[submit a job to an already-running Flink cluster](#remote-execution).\n\n### Local execution\n\nThe simpler option is to run the example using the Flink mini cluster.\n\n#### Create a table environment\n\nThe [table\nenvironment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/table/table_environment/)\nserves as the main entry point for interacting with the Flink runtime. The\n`flink` backend does not create `TableEnvironment` objects; you must create a\n`TableEnvironment` and pass that to\n[`ibis.flink.connect`](../../backends/flink.qmd#ibis.flink.connect):\n\n::: {#f4609ace .cell execution_count=2}\n``` {.python .cell-code}\nimport ibis\nfrom pyflink.table import EnvironmentSettings, TableEnvironment\n\nenv_settings = EnvironmentSettings.in_streaming_mode()\ntable_env = TableEnvironment.create(env_settings)\ntable_env.get_config().set(\"parallelism.default\", \"1\") # <1>\n\ncon = ibis.flink.connect(table_env)\n```\n:::\n\n\n1. write all the data to one file\n\nFlink’s streaming connectors aren't part of the binary distribution. Link the\n[Kafka\nconnector](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/kafka/)\nfor cluster execution by adding the JAR file from the cloned repository. [Ibis\nexposes the `raw_sql` method for situations like this, where you need to run\narbitrary SQL that cannot be modeled as a table\nexpression](https://ibis-project.org/how-to/extending/sql#backend.raw_sql):\n\n\n\n::: {#a3557058 .cell execution_count=4}\n``` {.python .cell-code}\ncon.raw_sql(\"ADD JAR 'flink-sql-connector-kafka-3.0.2-1.18.jar'\")\n```\n:::\n\n\n#### Create the source and sink tables\n\nUse\n[`create_table`](../../backends/flink.qmd#ibis.backends.flink.Backend.create_table)\nto register tables. Notice the new top-level `ibis.watermark` API for\n[specifying a watermark\nstrategy](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/time/#event-time-and-watermarks).\n\n::: {#cc49f85d .cell execution_count=5}\n``` {.python .cell-code}\nsource_schema = ibis.schema( # <1>\n { # <1>\n \"createTime\": \"timestamp(3)\", # <1>\n \"orderId\": \"int64\", # <1>\n \"payAmount\": \"float64\", # <1>\n \"payPlatform\": \"int32\", # <1>\n \"provinceId\": \"int32\", # <1>\n } # <1>\n) # <1>\n\nsource_configs = { # <1>\n \"connector\": \"kafka\", # <1>\n \"topic\": \"payment_msg\", # <1>\n \"properties.bootstrap.servers\": \"localhost:9092\", # <1>\n \"properties.group.id\": \"test_3\", # <1>\n \"scan.startup.mode\": \"earliest-offset\", # <1>\n \"format\": \"json\", # <1>\n} # <1>\n\nt = con.create_table( # <1>\n \"payment_msg\", # <1>\n schema=source_schema, # <1>\n tbl_properties=source_configs, # <1>\n watermark=ibis.watermark( # <1>\n time_col=\"createTime\", allowed_delay=ibis.interval(seconds=15) # <1>\n ), # <1>\n) # <1>\n\nsink_schema = ibis.schema( # <2>\n { # <2>\n \"province_id\": \"int32\", # <2>\n \"pay_amount\": \"float64\", # <2>\n } # <2>\n) # <2>\n\nsink_configs = { # <2>\n \"connector\": \"kafka\", # <3>\n \"topic\": \"sink\", # <2>\n \"properties.bootstrap.servers\": \"localhost:9092\", # <2>\n \"format\": \"json\", # <2>\n} # <2>\n\ncon.create_table( # <2>\n \"total_amount_by_province_id\", schema=sink_schema, tbl_properties=sink_configs # <2>\n) # <2>\n```\n\n::: {.cell-output .cell-output-display execution_count=5}\n```{=html}\n
DatabaseTable: total_amount_by_province_id\n  province_id int32\n  pay_amount  float64\n
\n```\n:::\n:::\n\n\n1. create source Table\n2. create sink Table\n\n#### Perform calculations\n\nCompute the total pay amount per province in the past 10 seconds (as of each\nmessage, for the province in the incoming message):\n\n::: {#1dd0161c .cell execution_count=6}\n``` {.python .cell-code}\nagged = t.select(\n province_id=t.provinceId,\n pay_amount=t.payAmount.sum().over(\n range=(-ibis.interval(seconds=10), 0),\n group_by=t.provinceId,\n order_by=t.createTime,\n ),\n)\n```\n:::\n\n\nFinally, emit the query result to the sink table:\n\n::: {#b10dbc0a .cell execution_count=7}\n``` {.python .cell-code}\ncon.insert(\"total_amount_by_province_id\", agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=7}\n```\n\n```\n:::\n:::\n\n\n### Remote execution\n\nYou can also submit the example to the [remote cluster started using Docker\nCompose](#spinning-up-the-services-using-docker-compose). The\n`window_aggregation.py` file in the cloned repository contains the [same steps\nthat we performed for local execution](#local-execution). We will [use the\nmethod described in the official Flink\ndocumentation](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/cli/#submitting-pyflink-jobs).\n\n::: {.callout-tip}\nYou can find the `./bin/flink` executable with the following command:\n\n```bash\npython -c'from pathlib import Path; import pyflink; print(Path(pyflink.__spec__.origin).parent / \"bin\" / \"flink\")'\n```\n:::\n\nMy full command looks like this:\n\n```bash\n/opt/miniconda3/envs/ibis-dev/lib/python3.10/site-packages/pyflink/bin/flink run --jobmanager localhost:8081 --python window_aggregation.py\n```\n\nThe command will exit after displaying a submission message:\n\n```\nJob has been submitted with JobID b816faaf5ef9126ea5b9b6a37012cf56\n```\n\n## Viewing the results\n\nSimilar to how we viewed messages in the `payment_msg` topic, we can print\nresults from the `sink` topic:\n\n::: {#2b10814e .cell execution_count=8}\n``` {.python .cell-code}\nconsumer = KafkaConsumer(\"sink\")\nfor msg in islice(consumer, 10):\n print(msg)\n```\n\n::: {.cell-output .cell-output-stdout}\n```\nConsumerRecord(topic='sink', partition=0, offset=0, timestamp=1720728912679, timestamp_type=0, key=None, value=b'{\"province_id\":2,\"pay_amount\":16364.037210374616}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=1, timestamp=1720728912682, timestamp_type=0, key=None, value=b'{\"province_id\":2,\"pay_amount\":101689.41546504611}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=2, timestamp=1720728912683, timestamp_type=0, key=None, value=b'{\"province_id\":5,\"pay_amount\":32552.15122784454}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=3, timestamp=1720728912684, timestamp_type=0, key=None, value=b'{\"province_id\":5,\"pay_amount\":64135.92290912925}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=4, timestamp=1720728912684, timestamp_type=0, key=None, value=b'{\"province_id\":3,\"pay_amount\":34294.35463951969}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=5, timestamp=1720728912684, timestamp_type=0, key=None, value=b'{\"province_id\":6,\"pay_amount\":96762.09000335855}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=6, timestamp=1720728912684, timestamp_type=0, key=None, value=b'{\"province_id\":6,\"pay_amount\":154188.41978421973}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=7, timestamp=1720728912684, timestamp_type=0, key=None, value=b'{\"province_id\":6,\"pay_amount\":222023.00049557863}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=8, timestamp=1720728912684, timestamp_type=0, key=None, value=b'{\"province_id\":4,\"pay_amount\":72968.01696673119}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=9, timestamp=1720728912684, timestamp_type=0, key=None, value=b'{\"province_id\":0,\"pay_amount\":81034.40856839989}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\n```\n:::\n:::\n\n\nVoilà! You've run your first streaming application using Ibis.\n\n## Shutting down the Compose environment\n\nPress Ctrl+C to stop the Docker Compose containers. Once\nstopped, run `docker compose down` to remove the services created for this\ntutorial.\n\n", "supporting": [ "index_files/figure-html" ], diff --git a/docs/_freeze/posts/unified-stream-batch/index/execute-results/html.json b/docs/_freeze/posts/unified-stream-batch/index/execute-results/html.json index 17ebdb33b14b..cbddd2149167 100644 --- a/docs/_freeze/posts/unified-stream-batch/index/execute-results/html.json +++ b/docs/_freeze/posts/unified-stream-batch/index/execute-results/html.json @@ -1,8 +1,8 @@ { - "hash": "c05ed046ce2729019b23b5236a28ee74", + "hash": "0bd729e9cb424cd3caabcddd1e2d341d", "result": { "engine": "jupyter", - "markdown": "---\ntitle: \"Stream-batch unification through Ibis\"\nauthor: \"Chloe He\"\ndate: 2024-02-26\ncategories:\n - blog\n - flink\n - risingwave\n - streaming\n---\n\nOne of my focuses in the past 10 months has been to implement the Flink backend\nfor Ibis. I was working with Apache Flink and building a feature engineering\ntool, and we stumbled upon Ibis as we attempted to build our own translation\nlayer that could turn user declarations into relation trees, then optimize and\ndeploy the query plan, all while maintaining the underlying infrastructure for\nthe user. We considered and prototyped with a number of tools and eventually\nchose Ibis. It had already established a position in the batch world and had\nsupport for 10+ of the most popular batch engines (at the time). We loved the\nidea of decoupling the user-facing interface from the execution engine, so that\nusers can swap out the execution engine depending on their needs, without\nhaving to rewrite code. And, of course, it was open-source. It was everything\nwe dreamed of.\n\nA few months later, [we started introducing Apache Flink as the first streaming\nbackend into Ibis](https://github.com/ibis-project/ibis/pull/6408). We saw so\nmuch more that Ibis can do when it steps outside of batch.\n\nIbis 8.0 marks the official launch of the first streaming backends in Ibis\n([Apache Flink](https://ibis-project.org/backends/flink) and\n[RisingWave](https://ibis-project.org/backends/risingwave)). This is a very\nsignificant milestone in Ibis development.\n\nYou may be wondering: what does this mean? Why is this such a big deal? I will\nbe answering these questions in this blog post.\n\n## Ibis combines stream and batch into a single framework beyond version 8.0\n\nToday, Ibis provides support for 20+ backends including Dask, DuckDB,\nPostgreSQL, PySpark, Snowflake, and others. However - before the introduction\nof Flink and RisingWave backends - all of the supported backends derive from a\nbatch paradigm (aside from Spark, which does offer support for stream\nprocessing, albeit using micro-batches underneath the hood).\n\nThis means that Ibis is an extremely valuable tool, but it was limited to batch\nworkloads. In the case of streaming workloads, where systems are [designed with\nunbounded data in mind](https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/),\nthe batch-oriented Ibis fell short. To deal with an infinite data stream,\nstreaming data systems operate with unique concepts such as “event time”,\n“processing time”, “watermark”, etc. All of these were missing from Ibis.\n\nAt the same time, streaming systems (Spark Streaming, Apache Flink, RisingWave,\netc) have been gaining popularity. It drove the development of more mature\ntechnologies as well as new approaches to close the gap between batch and\nstreaming worlds. [Flink SQL, for example, was born as a part of such effort\nand, through allowing users to write streaming engines in a SQL-like manner,\nhave been vastly successful in that regard.](https://www.ververica.com/blog/apache-flink-sql-past-present-and-future)\nThe success of Flink SQL both validates the potential of stream and batch\nunification and inspires the community to push for better standards, a vision\nthat Ibis is at a unique and valuable position to help build.\n\n## Why is batch-stream unification significant?\n\nFirstly, large companies that have both batch and streaming workloads often\ndeploy\n[Lambda architecture](https://en.wikipedia.org/wiki/Lambda_architecture).\nIn a Lambda infrastructure, batch and streaming pipelines are separate, which\nrequires two codebases to be set up and maintained. If you’re a platform\nengineer, you have probably found yourself trying to duplicate batch workloads\n“in streaming code” and vice versa. If you have backfilled a streaming pipeline\ndue to a bug and needed to reimplement the logic on a batch pipeline, you know\nhow painful that all is :(\n\n[LinkedIn successfully reduced processing time by 94% and resource utilization\nby 50% after switching from a Lambda architecture to unified batch and\nstreaming pipelines.](https://www.linkedin.com/blog/engineering/data-streaming-processing/unified-streaming-and-batch-pipelines-at-linkedin-reducing-proc)\nA unified system also massively increased engineer productivity because they no\nlonger needed to develop and maintain separate codebases for different\nenvironments.\n[Uber](https://www.uber.com/blog/kappa-architecture-data-stream-processing/),\n[Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020),\nand [Lyft](https://beam.apache.org/case-studies/lyft/) have also adopted\nsimilar solutions.\n\nSecondly, in the world of machine learning, it’s common for data scientists to\ndevelop locally and experiment with a sampled, batch dataset in Python. If the\nresults look promising, the features and models would then be deployed into\nproduction. Oftentimes, there is a code handover in this process, and a\ndedicated team of developers would be responsible for rewriting the logic for\nproduction, as a streaming workload.\n\nIn both cases, there is a huge amount of technical overhead. If there is a\nstreamlined architecture, using a unified API, much of this overhead can be\navoided. As a platform engineer, you no longer need to worry about maintaining\ntwo separate architectures and codebases. As a data scientist or a machine\nlearning engineer, you can write one single workload that can execute both on\nbatch and streaming backends. Wouldn’t that be amazing?\n\n## Ibis unifies batch and streaming\n\nEnter Ibis. Ibis unifies batch and streaming with a single API. It decouples\nthe dataframe API from backend execution, so that the logic for defining data\ntransformations is unaffected by implementation discrepancies across backend\nengines. There is also an ongoing effort to further increase interoperability\nacross different languages and systems via a standard query plan intermediate\nrepresentation (IR), using a library called\n[`Substrait`](https://substrait.io/).\n\nWhat does this actually look like? For example, Ibis allows users to define\nwindow aggregations using the [`over()`\nmethod](../../reference/expression-tables.qmd#ibis.expr.types.groupby.GroupedTable.over).\nWhen executed on the Flink backend, this translates into [Flink’s over\naggregation query](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/over-agg/)\nand outputs an aggregated value for every input row over a range of ordered\nrows. On streaming data, aggregation results are continuously computed and\nwritten into data sinks (e.g., Kafka, Redis) as records are received at and\nconsumed from the upstream data source (e.g., Kafka, Change Data Capture). In\npandas, the conceptual analog is [windowing\noperation](https://pandas.pydata.org/docs/user_guide/window.html). Results are\ncomputed by looking back the length of the window from the current observation,\nbut can be computed all at once because batch data is static.\n\nAnother great example is deduplication. In Flink SQL, this looks something like this:\n\n```sql\nSELECT [column_list]\nFROM (\n SELECT [column_list],\n ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]\n ORDER BY time_attr [asc|desc]) AS rownum\n FROM table_name)\nWHERE rownum = 1\n```\nIn a database like Postgres, this could be as simple as\n\n```sql\nSELECT DISTINCT t0.`string_col`, t0.`int_col`\nFROM functional_alltypes t0\n```\n\nAnd in `pandas`, you would use the method `drop_duplicates()`:\n\n```python\ndf.drop_duplicates()\n```\n\n::: {.callout-note}\nWe’re working on supporting deduplication via `distinct()` in Flink backend and\nthis feature should be available soon!\n:::\n\nThese underlying discrepancies are abstracted in such a way that you, as an\nIbis user, will no longer find yourself struggling with bugs that are the\nresult of subtleties across different engines and dialects. Need to rewrite\nyour batch workload as a streaming one or vice versa? Rest assured, Ibis has\nyou covered!\n\n## See it in action\n\nNow, let’s walk through a code example together to see how simple this\nexperience is!\n\n::: {.callout-note}\nPrerequisites for running this example:\n\n* Docker Compose: This tutorial uses Docker Compose to manage an Apache Kafka\nenvironment (including sample data generation) and a Flink cluster (for remote\nexecution). You can [download and install Docker Compose from the official\nwebsite](https://docs.docker.com/compose/install/).\n* JDK 11 release: Flink requires Java 11.\n* Python 3.9 or 3.10.\n* Follow [the setup\ntutorial](../../tutorials/open-source-software/apache-flink/0_setup.qmd) to\ninstall the Flink backend for Ibis.\n* Clone the [example repository](https://github.com/ibis-project/ibis-flink-example).\n:::\n\n::: {.callout-note}\nThis example is a hypothetical scenario and we will be using simulated data.\n:::\n\nFirst, spin up the Docker containers by running `docker compose up kafka\ninit-kafka data-generator`. This will set up a mocked Kafka source that\ncontains records that look like the following:\n\n```json\n{\n \"createTime\": \"2023-09-20 22:19:02.224\",\n \"orderId\": 1695248388,\n \"payAmount\": 88694.71922270155,\n \"payPlatform\": 0,\n \"provinceId\": 6,\n}\n```\n\nThis is a streaming data source. Commonly, to experiment with the data, we\nwould extract a chunk of the data and load it in batch:\n\n\n::: {#3892eff1 .cell execution_count=2}\n``` {.python .cell-code}\nfrom kafka import KafkaConsumer\n\nconsumer = KafkaConsumer(\"payment_msg\", auto_offset_reset=\"earliest\")\nrows = []\nfor _, msg in zip(range(100), consumer):\n rows.append(msg)\n```\n:::\n\n\nThis is a tabular dataset and we can convert it into a `pandas` DataFrame:\n\n::: {#3c408f90 .cell execution_count=3}\n``` {.python .cell-code}\nimport json\n\nimport pandas as pd\n\ndf = pd.DataFrame([json.loads(row.value) for row in rows])\ndf[\"createTime\"] = pd.to_datetime(df[\"createTime\"])\ndf\n```\n\n::: {.cell-output .cell-output-display execution_count=3}\n```{=html}\n
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
createTimeorderIdpayAmountpayPlatformprovinceId
02024-02-24 07:15:40.896170875894185955.21175303
12024-02-24 07:15:41.402170875894270006.91813303
22024-02-24 07:15:41.90517087589434352.88570403
32024-02-24 07:15:42.405170875894420850.06859702
42024-02-24 07:15:42.909170875894572829.02007604
..................
952024-02-24 07:16:28.636170875903670241.64774905
962024-02-24 07:16:29.139170875903789075.95104812
972024-02-24 07:16:29.653170875903849521.53152805
982024-02-24 07:16:30.15417087590399171.09425102
992024-02-24 07:16:30.655170875904062199.63159703
\n

100 rows × 5 columns

\n
\n```\n:::\n:::\n\n\nWe can connect to this DataFrame in Ibis in a local execution backend:\n\n::: {#6b3df357 .cell execution_count=4}\n``` {.python .cell-code}\nimport ibis\n\ncon = ibis.get_backend()\ncon.create_table(\"payments\", df)\n```\n\n::: {.cell-output .cell-output-display execution_count=4}\n```{=html}\n
DatabaseTable: payments\n  createTime  timestamp(6)\n  orderId     int64\n  payAmount   float64\n  payPlatform int64\n  provinceId  int64\n
\n```\n:::\n:::\n\n\n::: {.callout-note}\nThe default execution engine for Ibis is DuckDB.\n:::\n\nThis is a series of records of order transactions. At Company Potclay, we have\njust deployed a new ad campaign, which is A/B tested by province, and we’re\ninterested in the effectiveness of this ad campaign by monitoring data\ndistribution shift over time. A crucial feature is the total transaction amount\nover the past minute, stratified by province. We would like to first experiment\nwriting this feature on a smaller set of batch data. After we make sure that\nthe logic looks correct and handles all edge cases appropriately, we want to\ndeploy this as a streaming workload.\n\nIbis allows us to write transformations on top of so-called abstract or unbound\ntables (i.e., tables that are not bound to an actual data source). This\nseparation between transformation logic and the underlying data and execution\nis one of the things that makes Ibis so powerful. It's similar to dependency\ninjection, but in this case the data is the dependency and is injected at\nruntime.\n\nTo write transformations on top of an unbound table, we need to first define an\n`ibis.table()` with a schema. Here is how we would write all of this in Ibis\ncode:\n\n::: {#771fc1be .cell execution_count=5}\n``` {.python .cell-code}\nimport ibis.expr.schema as sch\nimport ibis.expr.datatypes as dt\nfrom ibis import _\n\nschema = sch.Schema(\n {\n \"createTime\": dt.timestamp(scale=3),\n \"orderId\": dt.int64,\n \"payAmount\": dt.float64,\n \"payPlatform\": dt.int32,\n \"provinceId\": dt.int32,\n }\n)\nunbound_table = ibis.table(schema, name=\"payments\")\nunbound_agged = unbound_table[\n \"provinceId\",\n _.payAmount.sum()\n .over(range=(-ibis.interval(seconds=10), 0), order_by=_.createTime)\n .name(\"pay_amount\"),\n]\nunbound_agged\n```\n\n::: {.cell-output .cell-output-display execution_count=5}\n```{=html}\n
r0 := UnboundTable: payments\n  createTime  timestamp(3)\n  orderId     int64\n  payAmount   float64\n  payPlatform int32\n  provinceId  int32\n\nProject[r0]\n  provinceId: r0.provinceId\n  pay_amount: WindowFunction(func=Sum(r0.payAmount), frame=RangeWindowFrame(table=r0, start=WindowBoundary(value=10\ns, preceding=True), end=WindowBoundary(Cast(0, to=interval('s'))), order_by=[asc r0.createTime]))\n
\n```\n:::\n:::\n\n\nCarrying out the computations using the local execution backend that we\nconnected to above is as simple as:\n\n::: {#82ff34c1 .cell execution_count=6}\n``` {.python .cell-code}\ncon.to_pandas(unbound_agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=6}\n```{=html}\n
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
provinceIdpay_amount
038.595521e+04
131.559621e+05
231.603150e+05
321.811651e+05
442.539941e+05
.........
9551.270405e+06
9621.269567e+06
9751.277924e+06
9821.204362e+06
9931.202042e+06
\n

100 rows × 2 columns

\n
\n```\n:::\n:::\n\n\n::: {.callout-note}\nDuckDB is much faster than `pandas`, and using Ibis you don't need to write SQL\nfor it!\n:::\n\nFor local experimentation purposes, this DataFrame only consists of 100 rows,\nso doing this in memory is easy.\n\nThe outputs look correct and we didn’t run into any errors. We are now ready to\ndeploy this as a streaming job in Flink!\n\nFirst, let’s set up the Flink environment and connect to this Kafka source:\n\n::: {.callout-note}\nKafka connector is not part of the binary distribution, so we need to download\nand link it for cluster execution explicitly:\n\n::: {#2350cbf3 .cell execution_count=7}\n``` {.python .cell-code}\n!wget -N https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar\n```\n:::\n\n\n:::\n\n::: {#f0187bee .cell execution_count=8}\n``` {.python .cell-code}\nfrom pyflink.table import EnvironmentSettings, TableEnvironment\nfrom pyflink.common import Configuration\n\nsource_schema = sch.Schema(\n {\n \"createTime\": dt.timestamp(scale=3),\n \"orderId\": dt.int64,\n \"payAmount\": dt.float64,\n \"payPlatform\": dt.int32,\n \"provinceId\": dt.int32,\n }\n)\n\nenv_settings = EnvironmentSettings.in_streaming_mode()\ntable_env = TableEnvironment.create(env_settings)\n\ntable_config = table_env.get_config()\nconfig = Configuration()\nconfig.set_string(\"parallelism.default\", \"1\")\ntable_config.add_configuration(config)\n\nconnection = ibis.flink.connect(table_env)\n\n# add the JAR downloaded above\nconnection.raw_sql(\"ADD JAR 'flink-sql-connector-kafka-1.17.1.jar'\")\n\nsource_configs = {\n \"connector\": \"kafka\",\n \"topic\": \"payment_msg\",\n \"properties.bootstrap.servers\": \"localhost:9092\",\n \"properties.group.id\": \"test_3\",\n \"scan.startup.mode\": \"earliest-offset\",\n \"format\": \"json\",\n}\n\nconnection.create_table(\n \"payments\",\n schema=source_schema,\n tbl_properties=source_configs,\n watermark=ibis.watermark(\n time_col=\"createTime\", allowed_delay=ibis.interval(seconds=15)\n ),\n)\n```\n\n::: {.cell-output .cell-output-display execution_count=8}\n```{=html}\n
DatabaseTable: payments\n  createTime  timestamp(3)\n  orderId     int64\n  payAmount   float64\n  payPlatform int32\n  provinceId  int32\n
\n```\n:::\n:::\n\n\nHow would we write this in Flink SQL? Ibis makes this extremely easy by\nexposing a `compile()` API:\n\n::: {#c2d6be4b .cell execution_count=9}\n``` {.python .cell-code}\nsql = connection.compile(unbound_agged)\nprint(sql)\n```\n\n::: {.cell-output .cell-output-stdout}\n````\nSELECT\n `t0`.`provinceId`,\n SUM(`t0`.`payAmount`) OVER (ORDER BY `t0`.`createTime` ASC NULLS LAST RANGE BETWEEN INTERVAL '10' SECOND(2) preceding AND CURRENT ROW) AS `pay_amount`\nFROM `payments` AS `t0`\n````\n:::\n:::\n\n\nBefore we can execute this query, we need to first define a data sink where the\nresults can be written:\n\n::: {#9cf5a144 .cell execution_count=10}\n``` {.python .cell-code}\nsink_schema = sch.Schema(\n {\n \"province_id\": dt.int32,\n \"pay_amount\": dt.float64,\n }\n)\n\nkafka_sink_configs = {\n \"connector\": \"kafka\",\n \"topic\": \"sink\",\n \"properties.bootstrap.servers\": \"localhost:9092\",\n \"format\": \"json\",\n}\n\nconnection.create_table(\n \"kafka_sink\", schema=sink_schema, tbl_properties=kafka_sink_configs\n)\n```\n\n::: {.cell-output .cell-output-display execution_count=10}\n```{=html}\n
DatabaseTable: kafka_sink\n  province_id int32\n  pay_amount  float64\n
\n```\n:::\n:::\n\n\nNow, let’s write the results into this sink. Note that we can directly reuse\nthe transformation logic that we wrote above for the local execution backend!!\n\n::: {#bb2cd7e7 .cell execution_count=11}\n``` {.python .cell-code}\nconnection.insert(\"kafka_sink\", unbound_agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=11}\n```\n\n```\n:::\n:::\n\n\n::: {.callout-tip}\nYou can examine the results either using the Kafka console consumer CLI or the\n`kafka-python` library.\n:::\n\nHow easy was it to define both batch and streaming workloads using Ibis?\nWithout Ibis, you would have needed to write a `pandas`/DuckDB workload and\nthen convert it into Flink SQL manually.\n\n## Concluding thoughts\n\nWith the introduction of the first streaming backends, Ibis is now both a batch\nand a streaming Python DataFrame API and we’re excited about what’s to come\nnext. We hope that Ibis can close the gap between batch and streaming in such a\nway that we no longer talk about the two separately, but, rather, as two parts\nof the same paradigm. Streaming naturally lends itself to batch: batch is\ntechnically just a special case of streaming, where the unbounded data flow\nstops at some point.\n\nOf course, this is only the beginning. There are still technical challenges to\nbe solved (e.g., backfill, window computations over large windows, GPU\nacceleration), and we'll definitely have more exciting updates to share with\nthe community soon!\n\nCheck out the new [Apache Flink](https://ibis-project.org/backends/flink) and\n[RisingWave](https://ibis-project.org/backends/risingwave) backends and let us\nknow what you think!\n\n", + "markdown": "---\ntitle: \"Stream-batch unification through Ibis\"\nauthor: \"Chloe He\"\ndate: 2024-02-26\ncategories:\n - blog\n - flink\n - risingwave\n - streaming\n---\n\nOne of my focuses in the past 10 months has been to implement the Flink backend\nfor Ibis. I was working with Apache Flink and building a feature engineering\ntool, and we stumbled upon Ibis as we attempted to build our own translation\nlayer that could turn user declarations into relation trees, then optimize and\ndeploy the query plan, all while maintaining the underlying infrastructure for\nthe user. We considered and prototyped with a number of tools and eventually\nchose Ibis. It had already established a position in the batch world and had\nsupport for 10+ of the most popular batch engines (at the time). We loved the\nidea of decoupling the user-facing interface from the execution engine, so that\nusers can swap out the execution engine depending on their needs, without\nhaving to rewrite code. And, of course, it was open-source. It was everything\nwe dreamed of.\n\nA few months later, [we started introducing Apache Flink as the first streaming\nbackend into Ibis](https://github.com/ibis-project/ibis/pull/6408). We saw so\nmuch more that Ibis can do when it steps outside of batch.\n\nIbis 8.0 marks the official launch of the first streaming backends in Ibis\n([Apache Flink](https://ibis-project.org/backends/flink) and\n[RisingWave](https://ibis-project.org/backends/risingwave)). This is a very\nsignificant milestone in Ibis development.\n\nYou may be wondering: what does this mean? Why is this such a big deal? I will\nbe answering these questions in this blog post.\n\n## Ibis combines stream and batch into a single framework beyond version 8.0\n\nToday, Ibis provides support for 20+ backends including Dask, DuckDB,\nPostgreSQL, PySpark, Snowflake, and others. However - before the introduction\nof Flink and RisingWave backends - all of the supported backends derive from a\nbatch paradigm (aside from Spark, which does offer support for stream\nprocessing, albeit using micro-batches underneath the hood).\n\nThis means that Ibis is an extremely valuable tool, but it was limited to batch\nworkloads. In the case of streaming workloads, where systems are [designed with\nunbounded data in mind](https://www.oreilly.com/radar/the-world-beyond-batch-streaming-101/),\nthe batch-oriented Ibis fell short. To deal with an infinite data stream,\nstreaming data systems operate with unique concepts such as “event time”,\n“processing time”, “watermark”, etc. All of these were missing from Ibis.\n\nAt the same time, streaming systems (Spark Streaming, Apache Flink, RisingWave,\netc) have been gaining popularity. It drove the development of more mature\ntechnologies as well as new approaches to close the gap between batch and\nstreaming worlds. [Flink SQL, for example, was born as a part of such effort\nand, through allowing users to write streaming engines in a SQL-like manner,\nhave been vastly successful in that regard.](https://www.ververica.com/blog/apache-flink-sql-past-present-and-future)\nThe success of Flink SQL both validates the potential of stream and batch\nunification and inspires the community to push for better standards, a vision\nthat Ibis is at a unique and valuable position to help build.\n\n## Why is batch-stream unification significant?\n\nFirstly, large companies that have both batch and streaming workloads often\ndeploy\n[Lambda architecture](https://en.wikipedia.org/wiki/Lambda_architecture).\nIn a Lambda infrastructure, batch and streaming pipelines are separate, which\nrequires two codebases to be set up and maintained. If you’re a platform\nengineer, you have probably found yourself trying to duplicate batch workloads\n“in streaming code” and vice versa. If you have backfilled a streaming pipeline\ndue to a bug and needed to reimplement the logic on a batch pipeline, you know\nhow painful that all is :(\n\n[LinkedIn successfully reduced processing time by 94% and resource utilization\nby 50% after switching from a Lambda architecture to unified batch and\nstreaming pipelines.](https://www.linkedin.com/blog/engineering/data-streaming-processing/unified-streaming-and-batch-pipelines-at-linkedin-reducing-proc)\nA unified system also massively increased engineer productivity because they no\nlonger needed to develop and maintain separate codebases for different\nenvironments.\n[Uber](https://www.uber.com/blog/kappa-architecture-data-stream-processing/),\n[Alibaba](https://www.ververica.com/blog/apache-flinks-stream-batch-unification-powers-alibabas-11.11-in-2020),\nand [Lyft](https://beam.apache.org/case-studies/lyft/) have also adopted\nsimilar solutions.\n\nSecondly, in the world of machine learning, it’s common for data scientists to\ndevelop locally and experiment with a sampled, batch dataset in Python. If the\nresults look promising, the features and models would then be deployed into\nproduction. Oftentimes, there is a code handover in this process, and a\ndedicated team of developers would be responsible for rewriting the logic for\nproduction, as a streaming workload.\n\nIn both cases, there is a huge amount of technical overhead. If there is a\nstreamlined architecture, using a unified API, much of this overhead can be\navoided. As a platform engineer, you no longer need to worry about maintaining\ntwo separate architectures and codebases. As a data scientist or a machine\nlearning engineer, you can write one single workload that can execute both on\nbatch and streaming backends. Wouldn’t that be amazing?\n\n## Ibis unifies batch and streaming\n\nEnter Ibis. Ibis unifies batch and streaming with a single API. It decouples\nthe dataframe API from backend execution, so that the logic for defining data\ntransformations is unaffected by implementation discrepancies across backend\nengines. There is also an ongoing effort to further increase interoperability\nacross different languages and systems via a standard query plan intermediate\nrepresentation (IR), using a library called\n[`Substrait`](https://substrait.io/).\n\nWhat does this actually look like? For example, Ibis allows users to define\nwindow aggregations using the [`over()`\nmethod](../../reference/expression-tables.qmd#ibis.expr.types.groupby.GroupedTable.over).\nWhen executed on the Flink backend, this translates into [Flink’s over\naggregation query](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/over-agg/)\nand outputs an aggregated value for every input row over a range of ordered\nrows. On streaming data, aggregation results are continuously computed and\nwritten into data sinks (e.g., Kafka, Redis) as records are received at and\nconsumed from the upstream data source (e.g., Kafka, Change Data Capture). In\npandas, the conceptual analog is [windowing\noperation](https://pandas.pydata.org/docs/user_guide/window.html). Results are\ncomputed by looking back the length of the window from the current observation,\nbut can be computed all at once because batch data is static.\n\nAnother great example is deduplication. In Flink SQL, this looks something like this:\n\n```sql\nSELECT [column_list]\nFROM (\n SELECT [column_list],\n ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]\n ORDER BY time_attr [asc|desc]) AS rownum\n FROM table_name)\nWHERE rownum = 1\n```\nIn a database like Postgres, this could be as simple as\n\n```sql\nSELECT DISTINCT t0.`string_col`, t0.`int_col`\nFROM functional_alltypes t0\n```\n\nAnd in `pandas`, you would use the method `drop_duplicates()`:\n\n```python\ndf.drop_duplicates()\n```\n\n::: {.callout-note}\nWe’re working on supporting deduplication via `distinct()` in Flink backend and\nthis feature should be available soon!\n:::\n\nThese underlying discrepancies are abstracted in such a way that you, as an\nIbis user, will no longer find yourself struggling with bugs that are the\nresult of subtleties across different engines and dialects. Need to rewrite\nyour batch workload as a streaming one or vice versa? Rest assured, Ibis has\nyou covered!\n\n## See it in action\n\nNow, let’s walk through a code example together to see how simple this\nexperience is!\n\n::: {.callout-note}\nPrerequisites for running this example:\n\n* Docker Compose: This tutorial uses Docker Compose to manage an Apache Kafka\nenvironment (including sample data generation) and a Flink cluster (for remote\nexecution). You can [download and install Docker Compose from the official\nwebsite](https://docs.docker.com/compose/install/).\n* JDK 11 release: Flink requires Java 11.\n* Python 3.9 or 3.10.\n* Follow [the setup\ntutorial](../../tutorials/open-source-software/apache-flink/0_setup.qmd) to\ninstall the Flink backend for Ibis.\n* Clone the [example repository](https://github.com/ibis-project/realtime-fraud-detection).\n:::\n\n::: {.callout-note}\nThis example is a hypothetical scenario and we will be using simulated data.\n:::\n\nFirst, spin up the Docker containers by running `docker compose up kafka\ninit-kafka data-generator`. This will set up a mocked Kafka source that\ncontains records that look like the following:\n\n```json\n{\n \"createTime\": \"2023-09-20 22:19:02.224\",\n \"orderId\": 1695248388,\n \"payAmount\": 88694.71922270155,\n \"payPlatform\": 0,\n \"provinceId\": 6,\n}\n```\n\nThis is a streaming data source. Commonly, to experiment with the data, we\nwould extract a chunk of the data and load it in batch:\n\n\n::: {#ec406332 .cell execution_count=2}\n``` {.python .cell-code}\nfrom kafka import KafkaConsumer\n\nconsumer = KafkaConsumer(\"payment_msg\", auto_offset_reset=\"earliest\")\nrows = []\nfor _, msg in zip(range(100), consumer):\n rows.append(msg)\n```\n:::\n\n\nThis is a tabular dataset and we can convert it into a `pandas` DataFrame:\n\n::: {#af03d88e .cell execution_count=3}\n``` {.python .cell-code}\nimport json\n\nimport pandas as pd\n\ndf = pd.DataFrame([json.loads(row.value) for row in rows])\ndf[\"createTime\"] = pd.to_datetime(df[\"createTime\"])\ndf\n```\n\n::: {.cell-output .cell-output-display execution_count=3}\n```{=html}\n
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
createTimeorderIdpayAmountpayPlatformprovinceId
02024-07-11 18:55:59.385172072416016364.03721002
12024-07-11 18:55:59.889172072416185325.37825502
22024-07-11 18:56:00.392172072416232552.15122805
32024-07-11 18:56:00.897172072416331583.77168105
42024-07-11 18:56:01.401172072416434294.35464003
..................
952024-07-11 18:56:47.230172072425537487.56877505
962024-07-11 18:56:47.732172072425637477.56966502
972024-07-11 18:56:48.234172072425732410.84123812
982024-07-11 18:56:48.737172072425858236.33754800
992024-07-11 18:56:49.239172072425978873.70102202
\n

100 rows × 5 columns

\n
\n```\n:::\n:::\n\n\nWe can connect to this DataFrame in Ibis in a local execution backend:\n\n::: {#4bc51a21 .cell execution_count=4}\n``` {.python .cell-code}\nimport ibis\n\ncon = ibis.get_backend()\ncon.create_table(\"payments\", df)\n```\n\n::: {.cell-output .cell-output-display execution_count=4}\n```{=html}\n
DatabaseTable: memory.main.payments\n  createTime  timestamp(6)\n  orderId     int64\n  payAmount   float64\n  payPlatform int64\n  provinceId  int64\n
\n```\n:::\n:::\n\n\n::: {.callout-note}\nThe default execution engine for Ibis is DuckDB.\n:::\n\nThis is a series of records of order transactions. At Company Potclay, we have\njust deployed a new ad campaign, which is A/B tested by province, and we’re\ninterested in the effectiveness of this ad campaign by monitoring data\ndistribution shift over time. A crucial feature is the total transaction amount\nover the past minute, stratified by province. We would like to first experiment\nwriting this feature on a smaller set of batch data. After we make sure that\nthe logic looks correct and handles all edge cases appropriately, we want to\ndeploy this as a streaming workload.\n\nIbis allows us to write transformations on top of so-called abstract or unbound\ntables (i.e., tables that are not bound to an actual data source). This\nseparation between transformation logic and the underlying data and execution\nis one of the things that makes Ibis so powerful. It's similar to dependency\ninjection, but in this case the data is the dependency and is injected at\nruntime.\n\nTo write transformations on top of an unbound table, we need to first define an\n`ibis.table()` with a schema. Here is how we would write all of this in Ibis\ncode:\n\n::: {#462a51d3 .cell execution_count=5}\n``` {.python .cell-code}\nimport ibis.expr.schema as sch\nimport ibis.expr.datatypes as dt\nfrom ibis import _\n\nschema = sch.Schema(\n {\n \"createTime\": dt.timestamp(scale=3),\n \"orderId\": dt.int64,\n \"payAmount\": dt.float64,\n \"payPlatform\": dt.int32,\n \"provinceId\": dt.int32,\n }\n)\nunbound_table = ibis.table(schema, name=\"payments\")\nunbound_agged = unbound_table[\n \"provinceId\",\n _.payAmount.sum()\n .over(range=(-ibis.interval(seconds=10), 0), order_by=_.createTime)\n .name(\"pay_amount\"),\n]\nunbound_agged\n```\n\n::: {.cell-output .cell-output-display execution_count=5}\n```{=html}\n
r0 := UnboundTable: payments\n  createTime  timestamp(3)\n  orderId     int64\n  payAmount   float64\n  payPlatform int32\n  provinceId  int32\n\nProject[r0]\n  provinceId: r0.provinceId\n  pay_amount: WindowFunction(func=Sum(r0.payAmount), how='range', start=WindowBoundary(value=10 s, preceding=True), end=WindowBoundary(Cast(0, to=interval('s'))), order_by=[asc r0.createTime])\n
\n```\n:::\n:::\n\n\nCarrying out the computations using the local execution backend that we\nconnected to above is as simple as:\n\n::: {#f3cd6db6 .cell execution_count=6}\n``` {.python .cell-code}\ncon.to_pandas(unbound_agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=6}\n```{=html}\n
\n\n\n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n \n
provinceIdpay_amount
021.636404e+04
121.016894e+05
251.342416e+05
351.658253e+05
432.001197e+05
.........
9551.127313e+06
9621.111165e+06
9721.122423e+06
9801.088848e+06
9921.082766e+06
\n

100 rows × 2 columns

\n
\n```\n:::\n:::\n\n\n::: {.callout-note}\nDuckDB is much faster than `pandas`, and using Ibis you don't need to write SQL\nfor it!\n:::\n\nFor local experimentation purposes, this DataFrame only consists of 100 rows,\nso doing this in memory is easy.\n\nThe outputs look correct and we didn’t run into any errors. We are now ready to\ndeploy this as a streaming job in Flink!\n\nFirst, let’s set up the Flink environment and connect to this Kafka source:\n\n::: {.callout-note}\nKafka connector is not part of the binary distribution, so we need to download\nand link it for cluster execution explicitly:\n\n::: {#07b088d9 .cell execution_count=7}\n``` {.python .cell-code}\n!wget -N https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar\n```\n:::\n\n\n:::\n\n::: {#e92fdc1e .cell execution_count=8}\n``` {.python .cell-code}\nfrom pyflink.table import EnvironmentSettings, TableEnvironment\nfrom pyflink.common import Configuration\n\nsource_schema = sch.Schema(\n {\n \"createTime\": dt.timestamp(scale=3),\n \"orderId\": dt.int64,\n \"payAmount\": dt.float64,\n \"payPlatform\": dt.int32,\n \"provinceId\": dt.int32,\n }\n)\n\nenv_settings = EnvironmentSettings.in_streaming_mode()\ntable_env = TableEnvironment.create(env_settings)\n\ntable_config = table_env.get_config()\nconfig = Configuration()\nconfig.set_string(\"parallelism.default\", \"1\")\ntable_config.add_configuration(config)\n\nconnection = ibis.flink.connect(table_env)\n\n# add the JAR downloaded above\nconnection.raw_sql(\"ADD JAR 'flink-sql-connector-kafka-1.17.1.jar'\")\n\nsource_configs = {\n \"connector\": \"kafka\",\n \"topic\": \"payment_msg\",\n \"properties.bootstrap.servers\": \"localhost:9092\",\n \"properties.group.id\": \"test_3\",\n \"scan.startup.mode\": \"earliest-offset\",\n \"format\": \"json\",\n}\n\nconnection.create_table(\n \"payments\",\n schema=source_schema,\n tbl_properties=source_configs,\n watermark=ibis.watermark(\n time_col=\"createTime\", allowed_delay=ibis.interval(seconds=15)\n ),\n)\n```\n\n::: {.cell-output .cell-output-display execution_count=8}\n```{=html}\n
DatabaseTable: payments\n  createTime  timestamp(3)\n  orderId     int64\n  payAmount   float64\n  payPlatform int32\n  provinceId  int32\n
\n```\n:::\n:::\n\n\nHow would we write this in Flink SQL? Ibis makes this extremely easy by\nexposing a `compile()` API:\n\n::: {#87e9bfd5 .cell execution_count=9}\n``` {.python .cell-code}\nsql = connection.compile(unbound_agged)\nprint(sql)\n```\n\n::: {.cell-output .cell-output-stdout}\n```\nSELECT `t0`.`provinceId`, SUM(`t0`.`payAmount`) OVER (ORDER BY `t0`.`createTime` ASC NULLS LAST RANGE BETWEEN INTERVAL '10' SECOND(2) preceding AND CURRENT ROW) AS `pay_amount` FROM `payments` AS `t0`\n```\n:::\n:::\n\n\nBefore we can execute this query, we need to first define a data sink where the\nresults can be written:\n\n::: {#9edeed42 .cell execution_count=10}\n``` {.python .cell-code}\nsink_schema = sch.Schema(\n {\n \"province_id\": dt.int32,\n \"pay_amount\": dt.float64,\n }\n)\n\nkafka_sink_configs = {\n \"connector\": \"kafka\",\n \"topic\": \"sink\",\n \"properties.bootstrap.servers\": \"localhost:9092\",\n \"format\": \"json\",\n}\n\nconnection.create_table(\n \"kafka_sink\", schema=sink_schema, tbl_properties=kafka_sink_configs\n)\n```\n\n::: {.cell-output .cell-output-display execution_count=10}\n```{=html}\n
DatabaseTable: kafka_sink\n  province_id int32\n  pay_amount  float64\n
\n```\n:::\n:::\n\n\nNow, let’s write the results into this sink. Note that we can directly reuse\nthe transformation logic that we wrote above for the local execution backend!!\n\n::: {#65df8e0e .cell execution_count=11}\n``` {.python .cell-code}\nconnection.insert(\"kafka_sink\", unbound_agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=11}\n```\n\n```\n:::\n:::\n\n\n::: {.callout-tip}\nYou can examine the results either using the Kafka console consumer CLI or the\n`kafka-python` library.\n:::\n\nHow easy was it to define both batch and streaming workloads using Ibis?\nWithout Ibis, you would have needed to write a `pandas`/DuckDB workload and\nthen convert it into Flink SQL manually.\n\n## Concluding thoughts\n\nWith the introduction of the first streaming backends, Ibis is now both a batch\nand a streaming Python DataFrame API and we’re excited about what’s to come\nnext. We hope that Ibis can close the gap between batch and streaming in such a\nway that we no longer talk about the two separately, but, rather, as two parts\nof the same paradigm. Streaming naturally lends itself to batch: batch is\ntechnically just a special case of streaming, where the unbounded data flow\nstops at some point.\n\nOf course, this is only the beginning. There are still technical challenges to\nbe solved (e.g., backfill, window computations over large windows, GPU\nacceleration), and we'll definitely have more exciting updates to share with\nthe community soon!\n\nCheck out the new [Apache Flink](https://ibis-project.org/backends/flink) and\n[RisingWave](https://ibis-project.org/backends/risingwave) backends and let us\nknow what you think!\n\n", "supporting": [ "index_files/figure-html" ], diff --git a/docs/posts/flink-announcement/index.qmd b/docs/posts/flink-announcement/index.qmd index 980d4dc6899b..26fe28631754 100644 --- a/docs/posts/flink-announcement/index.qmd +++ b/docs/posts/flink-announcement/index.qmd @@ -66,15 +66,15 @@ pixi add ibis-flink ## Spinning up the services using Docker Compose -The [ibis-project/ibis-flink-example GitHub -repository](https://github.com/ibis-project/ibis-flink-example) includes the +The [ibis-project/realtime-fraud-detection GitHub +repository](https://github.com/ibis-project/realtime-fraud-detection) includes the relevant Docker Compose configuration for this tutorial. Clone the repository, and run `docker compose up` from the cloned directory to create Kafka topics, generate sample data, and launch a Flink cluster: ```bash -git clone https://github.com/claypotai/ibis-flink-example.git -cd ibis-flink-example +git clone https://github.com/claypotai/realtime-fraud-detection.git +cd realtime-fraud-detection docker compose up ``` @@ -88,12 +88,12 @@ After a few seconds, you should see messages indicating your Kafka environment is ready: ```bash -ibis-flink-example-init-kafka-1 | Successfully created the following topics: -ibis-flink-example-init-kafka-1 | payment_msg -ibis-flink-example-init-kafka-1 | sink -ibis-flink-example-init-kafka-1 exited with code 0 -ibis-flink-example-data-generator-1 | Connected to Kafka -ibis-flink-example-data-generator-1 | Producing 20000 records to Kafka topic payment_msg +realtime-fraud-detection-init-kafka-1 | Successfully created the following topics: +realtime-fraud-detection-init-kafka-1 | payment_msg +realtime-fraud-detection-init-kafka-1 | sink +realtime-fraud-detection-init-kafka-1 exited with code 0 +realtime-fraud-detection-data-generator-1 | Connected to Kafka +realtime-fraud-detection-data-generator-1 | Producing 20000 records to Kafka topic payment_msg ``` This example uses mock payments data. The `payment_msg` Kafka topic contains diff --git a/docs/posts/unified-stream-batch/index.qmd b/docs/posts/unified-stream-batch/index.qmd index 3d38c3894b6c..0851e5254ef5 100644 --- a/docs/posts/unified-stream-batch/index.qmd +++ b/docs/posts/unified-stream-batch/index.qmd @@ -172,7 +172,7 @@ website](https://docs.docker.com/compose/install/). * Follow [the setup tutorial](../../tutorials/open-source-software/apache-flink/0_setup.qmd) to install the Flink backend for Ibis. -* Clone the [example repository](https://github.com/ibis-project/ibis-flink-example). +* Clone the [example repository](https://github.com/ibis-project/realtime-fraud-detection). ::: ::: {.callout-note} @@ -199,8 +199,8 @@ would extract a chunk of the data and load it in batch: # | code-fold: true # | include: false !pip install apache-flink kafka-python -!git clone https://github.com/ibis-project/ibis-flink-example -!cd ibis-flink-example && docker compose up kafka init-kafka data-generator -d && sleep 10 && cd .. +!git clone https://github.com/ibis-project/realtime-fraud-detection +!cd realtime-fraud-detection && docker compose up kafka init-kafka data-generator -d && sleep 10 && cd .. ``` ```{python}