diff --git a/docs/_freeze/posts/flink-announcement/index/execute-results/html.json b/docs/_freeze/posts/flink-announcement/index/execute-results/html.json index 8d9ae93f6d39..6365ae1f8b85 100644 --- a/docs/_freeze/posts/flink-announcement/index/execute-results/html.json +++ b/docs/_freeze/posts/flink-announcement/index/execute-results/html.json @@ -1,10 +1,10 @@ { - "hash": "2ace8eafe6bfd928bfd0bde3afc91b0c", + "hash": "c0db90940b7122a139297aa7276d3e20", "result": { "engine": "jupyter", - "markdown": "---\ntitle: \"Ibis goes real-time! Introducing the new Flink backend for Ibis\"\nauthor: \"Deepyaman Datta\"\ndate: \"2024-02-12\"\ncategories:\n - blog\n - flink\n - stream processing\n---\n\n## Introduction\n\nIbis 8.0 marks the official release of the Apache Flink backend for Ibis. Flink\nis one of the most established stream-processing frameworks out there and a\ncentral part of the real-time data infrastructure at companies like DoorDash,\nLinkedIn, Netflix, and Uber. It is commonly applied in use cases such as fraud\ndetection, anomaly detection, real-time recommendation, dynamic pricing, and\nonline advertising. The Flink backend is also the first streaming backend Ibis\nsupports. Follow along as we define and execute a simple streaming job using\nIbis!\n\n## Installation prerequisites\n\n* **Docker Compose:** This tutorial uses Docker Compose to manage an Apache\n Kafka environment (including sample data generation) and a Flink cluster (for\n [remote execution](#remote-execution)). You can [download and install Docker\n Compose from the official website](https://docs.docker.com/compose/install/).\n* **JDK 11:** Flink requires Java 11. If you don't already have JDK 11\n installed, you can [get the appropriate Eclipse Temurin\n release](https://adoptium.net/temurin/releases/?package=jdk&version=11).\n* **Python:** To follow along, you need Python 3.9 or 3.10.\n\n## Installing the Flink backend for Ibis\n\nWe use a Python client to explore data in Kafka topics. You can install it,\nalongside the Flink backend for Ibis, with `pip`, `conda`, `mamba`, or `pixi`:\n\n::: {.panel-tabset}\n\n## Using `pip`\n\n```bash\npip install ibis-framework apache-flink kafka-python\n```\n\n## Using `conda`\n\n```bash\nconda install -c conda-forge ibis-flink\n```\n\n## Using `mamba`\n\n```bash\nmamba install -c conda-forge ibis-flink\n```\n\n## Using `pixi`\n\n```bash\npixi add ibis-flink\n```\n\n:::\n\n## Spinning up the services using Docker Compose\n\nThe [ibis-project/ibis-flink-example GitHub\nrepository](https://github.com/ibis-project/ibis-flink-example) includes the\nrelevant Docker Compose configuration for this tutorial. Clone the repository,\nand run `docker compose up` from the cloned directory to create Kafka topics,\ngenerate sample data, and launch a Flink cluster:\n\n```bash\ngit clone https://github.com/claypotai/ibis-flink-example.git\ncd ibis-flink-example\ndocker compose up\n```\n\n::: {.callout-tip}\nIf you don't intend to try [remote execution](#remote-execution), you can start\nonly the Kafka-related services with `docker compose up kafka init-kafka\ndata-generator`.\n:::\n\nAfter a few seconds, you should see messages indicating your Kafka environment\nis ready:\n\n```bash\nibis-flink-example-init-kafka-1 | Successfully created the following topics:\nibis-flink-example-init-kafka-1 | payment_msg\nibis-flink-example-init-kafka-1 | sink\nibis-flink-example-init-kafka-1 exited with code 0\nibis-flink-example-data-generator-1 | Connected to Kafka\nibis-flink-example-data-generator-1 | Producing 20000 records to Kafka topic payment_msg\n```\n\nThis example uses mock payments data. The `payment_msg` Kafka topic contains\nmessages in the following format:\n\n```json\n{\n \"createTime\": \"2023-09-20 22:19:02.224\",\n \"orderId\": 1695248388,\n \"payAmount\": 88694.71922270155,\n \"payPlatform\": 0,\n \"provinceId\": 6\n}\n```\n\nIn a separate terminal, we can explore what these messages look like:\n\n::: {#ec8ba792 .cell execution_count=1}\n``` {.python .cell-code}\nfrom itertools import islice\n\nfrom kafka import KafkaConsumer\n\nconsumer = KafkaConsumer(\"payment_msg\")\nfor msg in islice(consumer, 3):\n print(msg)\n```\n\n::: {.cell-output .cell-output-stdout}\n```\nConsumerRecord(topic='payment_msg', partition=0, offset=134, timestamp=1707750873865, timestamp_type=0, key=None, value=b'{\"createTime\": \"2024-02-12 15:14:33.865\", \"orderId\": 1707750941, \"payAmount\": 43204.147361188414, \"payPlatform\": 0, \"provinceId\": 2}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=132, serialized_header_size=-1)\nConsumerRecord(topic='payment_msg', partition=0, offset=135, timestamp=1707750874366, timestamp_type=0, key=None, value=b'{\"createTime\": \"2024-02-12 15:14:34.365\", \"orderId\": 1707750942, \"payAmount\": 27694.73284964067, \"payPlatform\": 0, \"provinceId\": 4}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=131, serialized_header_size=-1)\nConsumerRecord(topic='payment_msg', partition=0, offset=136, timestamp=1707750874866, timestamp_type=0, key=None, value=b'{\"createTime\": \"2024-02-12 15:14:34.866\", \"orderId\": 1707750943, \"payAmount\": 55767.91816225589, \"payPlatform\": 0, \"provinceId\": 0}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=131, serialized_header_size=-1)\n```\n:::\n:::\n\n\n## Running the tutorial\n\nThis tutorial uses Ibis with the Flink backend to process the aforementioned\npayment messages. You can choose to either [run it locally](#local-execution) or\n[submit a job to an already-running Flink cluster](#remote-execution).\n\n### Local execution\n\nThe simpler option is to run the example using the Flink mini cluster.\n\n#### Create a table environment\n\nThe [table\nenvironment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/table/table_environment/)\nserves as the main entry point for interacting with the Flink runtime. The\n`flink` backend does not create `TableEnvironment` objects; you must create a\n`TableEnvironment` and pass that to\n[`ibis.flink.connect`](../../backends/flink.qmd#ibis.flink.connect):\n\n::: {#28a7f742 .cell execution_count=2}\n``` {.python .cell-code}\nimport ibis\nfrom pyflink.table import EnvironmentSettings, TableEnvironment\n\nenv_settings = EnvironmentSettings.in_streaming_mode()\ntable_env = TableEnvironment.create(env_settings)\ntable_env.get_config().set(\"parallelism.default\", \"1\") # <1>\n\ncon = ibis.flink.connect(table_env)\n```\n:::\n\n\n1. write all the data to one file\n\nFlink’s streaming connectors aren't part of the binary distribution. Link the\n[Kafka\nconnector](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/kafka/)\nfor cluster execution by adding the JAR file from the cloned repository. [Ibis\nexposes the `raw_sql` method for situations like this, where you need to run\narbitrary SQL that cannot be modeled as a table\nexpression](https://ibis-project.org/how-to/extending/sql#backend.raw_sql):\n\n\n\n::: {#0eb4a7a7 .cell execution_count=4}\n``` {.python .cell-code}\ncon.raw_sql(\"ADD JAR 'flink-sql-connector-kafka-3.0.2-1.18.jar'\")\n```\n:::\n\n\n#### Create the source and sink tables\n\nUse\n[`create_table`](../../backends/flink.qmd#ibis.backends.flink.Backend.create_table)\nto register tables. Notice the new top-level `ibis.watermark` API for\n[specifying a watermark\nstrategy](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/time/#event-time-and-watermarks).\n\n::: {#3242685f .cell execution_count=5}\n``` {.python .cell-code}\nsource_schema = ibis.schema( # <1>\n { # <1>\n \"createTime\": \"timestamp(3)\", # <1>\n \"orderId\": \"int64\", # <1>\n \"payAmount\": \"float64\", # <1>\n \"payPlatform\": \"int32\", # <1>\n \"provinceId\": \"int32\", # <1>\n } # <1>\n) # <1>\n\nsource_configs = { # <1>\n \"connector\": \"kafka\", # <1>\n \"topic\": \"payment_msg\", # <1>\n \"properties.bootstrap.servers\": \"localhost:9092\", # <1>\n \"properties.group.id\": \"test_3\", # <1>\n \"scan.startup.mode\": \"earliest-offset\", # <1>\n \"format\": \"json\", # <1>\n} # <1>\n\nt = con.create_table( # <1>\n \"payment_msg\", # <1>\n schema=source_schema, # <1>\n tbl_properties=source_configs, # <1>\n watermark=ibis.watermark( # <1>\n time_col=\"createTime\", allowed_delay=ibis.interval(seconds=15) # <1>\n ), # <1>\n) # <1>\n\nsink_schema = ibis.schema( # <2>\n { # <2>\n \"province_id\": \"int32\", # <2>\n \"pay_amount\": \"float64\", # <2>\n } # <2>\n) # <2>\n\nsink_configs = { # <2>\n \"connector\": \"kafka\", # <3>\n \"topic\": \"sink\", # <2>\n \"properties.bootstrap.servers\": \"localhost:9092\", # <2>\n \"format\": \"json\", # <2>\n} # <2>\n\ncon.create_table( # <2>\n \"total_amount_by_province_id\", schema=sink_schema, tbl_properties=sink_configs # <2>\n) # <2>\n```\n\n::: {.cell-output .cell-output-display execution_count=5}\n```{=html}\n
DatabaseTable: total_amount_by_province_id\n  province_id int32\n  pay_amount  float64\n
\n```\n:::\n:::\n\n\n1. create source Table\n2. create sink Table\n\n#### Perform calculations\n\nCompute the total pay amount per province in the past 10 seconds (as of each\nmessage, for the province in the incoming message):\n\n::: {#3c270646 .cell execution_count=6}\n``` {.python .cell-code}\nagged = t.select(\n province_id=t.provinceId,\n pay_amount=t.payAmount.sum().over(\n range=(-ibis.interval(seconds=10), 0),\n group_by=t.provinceId,\n order_by=t.createTime,\n ),\n)\n```\n:::\n\n\nFinally, emit the query result to the sink table:\n\n::: {#b31b7cd0 .cell execution_count=7}\n``` {.python .cell-code}\ncon.insert(\"total_amount_by_province_id\", agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=7}\n```\n\n```\n:::\n:::\n\n\n### Remote execution\n\nYou can also submit the example to the [remote cluster started using Docker\nCompose](#spinning-up-the-services-using-docker-compose). The\n`window_aggregation.py` file in the cloned repository contains the [same steps\nthat we performed for local execution](#local-execution). We will [use the\nmethod described in the official Flink\ndocumentation](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/cli/#submitting-pyflink-jobs).\n\n::: {.callout-tip}\nYou can find the `./bin/flink` executable with the following command:\n\n```bash\npython -c'from pathlib import Path; import pyflink; print(Path(pyflink.__spec__.origin).parent / \"bin\" / \"flink\")'\n```\n:::\n\nMy full command looks like this:\n\n```bash\n/opt/miniconda3/envs/ibis-dev/lib/python3.10/site-packages/pyflink/bin/flink run --jobmanager localhost:8081 --python window_aggregation.py\n```\n\nThe command will exit after displaying a submission message:\n\n```\nJob has been submitted with JobID b816faaf5ef9126ea5b9b6a37012cf56\n```\n\n## Viewing the results\n\nSimilar to how we viewed messages in the `payment_msg` topic, we can print\nresults from the `sink` topic:\n\n::: {#2d8c65f5 .cell execution_count=8}\n``` {.python .cell-code}\nconsumer = KafkaConsumer(\"sink\")\nfor msg in islice(consumer, 10):\n print(msg)\n```\n\n::: {.cell-output .cell-output-stdout}\n```\nConsumerRecord(topic='sink', partition=0, offset=0, timestamp=1707750885285, timestamp_type=0, key=None, value=b'{\"province_id\":3,\"pay_amount\":26224.618434788914}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=1, timestamp=1707750885289, timestamp_type=0, key=None, value=b'{\"province_id\":2,\"pay_amount\":91000.13002428309}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=2, timestamp=1707750885291, timestamp_type=0, key=None, value=b'{\"province_id\":4,\"pay_amount\":32221.942267624105}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=3, timestamp=1707750885291, timestamp_type=0, key=None, value=b'{\"province_id\":2,\"pay_amount\":114055.85452988454}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=4, timestamp=1707750885291, timestamp_type=0, key=None, value=b'{\"province_id\":3,\"pay_amount\":40205.43467470664}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=5, timestamp=1707750885291, timestamp_type=0, key=None, value=b'{\"province_id\":6,\"pay_amount\":28991.968635615594}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=6, timestamp=1707750885291, timestamp_type=0, key=None, value=b'{\"province_id\":0,\"pay_amount\":66764.9494772887}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=47, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=7, timestamp=1707750885291, timestamp_type=0, key=None, value=b'{\"province_id\":2,\"pay_amount\":141685.624808002}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=47, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=8, timestamp=1707750885291, timestamp_type=0, key=None, value=b'{\"province_id\":5,\"pay_amount\":78201.29007867258}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=9, timestamp=1707750885291, timestamp_type=0, key=None, value=b'{\"province_id\":2,\"pay_amount\":209122.34275356008}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\n```\n:::\n:::\n\n\nVoilà! You've run your first streaming application using Ibis.\n\n## Shutting down the Compose environment\n\nPress Ctrl+C to stop the Docker Compose containers. Once\nstopped, run `docker compose down` to remove the services created for this\ntutorial.\n\n", + "markdown": "---\ntitle: \"Ibis goes real-time! Introducing the new Flink backend for Ibis\"\nauthor: \"Deepyaman Datta\"\ndate: \"2024-02-12\"\ncategories:\n - blog\n - flink\n - stream processing\n---\n\n## Introduction\n\nIbis 8.0 marks the official release of the Apache Flink backend for Ibis. Ibis\nusers can now manipulate data across streaming and batch contexts using the same\ninterface. Flink is one of the most established stream-processing frameworks out\nthere and a central part of the real-time data infrastructure at companies like\nDoorDash, LinkedIn, Netflix, and Uber. It is commonly applied in use cases such\nas fraud detection, anomaly detection, real-time recommendation, dynamic\npricing, and online advertising. The Flink backend is also the first streaming\nbackend Ibis supports. Follow along as we define and execute a simple streaming\njob using Ibis!\n\n## Installation prerequisites\n\n* **Docker Compose:** This tutorial uses Docker Compose to manage an Apache\n Kafka environment (including sample data generation) and a Flink cluster (for\n [remote execution](#remote-execution)). You can [download and install Docker\n Compose from the official website](https://docs.docker.com/compose/install/).\n* **JDK 11:** Flink requires Java 11. If you don't already have JDK 11\n installed, you can [get the appropriate Eclipse Temurin\n release](https://adoptium.net/temurin/releases/?package=jdk&version=11).\n* **Python:** To follow along, you need Python 3.9 or 3.10.\n\n## Installing the Flink backend for Ibis\n\nWe use a Python client to explore data in Kafka topics. You can install it,\nalongside the Flink backend for Ibis, with `pip`, `conda`, `mamba`, or `pixi`:\n\n::: {.panel-tabset}\n\n## Using `pip`\n\n```bash\npip install ibis-framework apache-flink kafka-python\n```\n\n## Using `conda`\n\n```bash\nconda install -c conda-forge ibis-flink\n```\n\n## Using `mamba`\n\n```bash\nmamba install -c conda-forge ibis-flink\n```\n\n## Using `pixi`\n\n```bash\npixi add ibis-flink\n```\n\n:::\n\n## Spinning up the services using Docker Compose\n\nThe [ibis-project/ibis-flink-example GitHub\nrepository](https://github.com/ibis-project/ibis-flink-example) includes the\nrelevant Docker Compose configuration for this tutorial. Clone the repository,\nand run `docker compose up` from the cloned directory to create Kafka topics,\ngenerate sample data, and launch a Flink cluster:\n\n```bash\ngit clone https://github.com/claypotai/ibis-flink-example.git\ncd ibis-flink-example\ndocker compose up\n```\n\n::: {.callout-tip}\nIf you don't intend to try [remote execution](#remote-execution), you can start\nonly the Kafka-related services with `docker compose up kafka init-kafka\ndata-generator`.\n:::\n\nAfter a few seconds, you should see messages indicating your Kafka environment\nis ready:\n\n```bash\nibis-flink-example-init-kafka-1 | Successfully created the following topics:\nibis-flink-example-init-kafka-1 | payment_msg\nibis-flink-example-init-kafka-1 | sink\nibis-flink-example-init-kafka-1 exited with code 0\nibis-flink-example-data-generator-1 | Connected to Kafka\nibis-flink-example-data-generator-1 | Producing 20000 records to Kafka topic payment_msg\n```\n\nThis example uses mock payments data. The `payment_msg` Kafka topic contains\nmessages in the following format:\n\n```json\n{\n \"createTime\": \"2023-09-20 22:19:02.224\",\n \"orderId\": 1695248388,\n \"payAmount\": 88694.71922270155,\n \"payPlatform\": 0,\n \"provinceId\": 6\n}\n```\n\nIn a separate terminal, we can explore what these messages look like:\n\n::: {#04fb6b63 .cell execution_count=1}\n``` {.python .cell-code}\nfrom itertools import islice\n\nfrom kafka import KafkaConsumer\n\nconsumer = KafkaConsumer(\"payment_msg\")\nfor msg in islice(consumer, 3):\n print(msg)\n```\n\n::: {.cell-output .cell-output-stdout}\n```\nConsumerRecord(topic='payment_msg', partition=0, offset=1087, timestamp=1707754299796, timestamp_type=0, key=None, value=b'{\"createTime\": \"2024-02-12 16:11:39.795\", \"orderId\": 1707754842, \"payAmount\": 56028.520857965006, \"payPlatform\": 0, \"provinceId\": 3}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=132, serialized_header_size=-1)\nConsumerRecord(topic='payment_msg', partition=0, offset=1088, timestamp=1707754300298, timestamp_type=0, key=None, value=b'{\"createTime\": \"2024-02-12 16:11:40.298\", \"orderId\": 1707754843, \"payAmount\": 55081.020819104546, \"payPlatform\": 0, \"provinceId\": 4}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=132, serialized_header_size=-1)\nConsumerRecord(topic='payment_msg', partition=0, offset=1089, timestamp=1707754300800, timestamp_type=0, key=None, value=b'{\"createTime\": \"2024-02-12 16:11:40.800\", \"orderId\": 1707754844, \"payAmount\": 30998.71866136802, \"payPlatform\": 0, \"provinceId\": 4}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=131, serialized_header_size=-1)\n```\n:::\n:::\n\n\n## Running the tutorial\n\nThis tutorial uses Ibis with the Flink backend to process the aforementioned\npayment messages. You can choose to either [run it locally](#local-execution) or\n[submit a job to an already-running Flink cluster](#remote-execution).\n\n### Local execution\n\nThe simpler option is to run the example using the Flink mini cluster.\n\n#### Create a table environment\n\nThe [table\nenvironment](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/python/table/table_environment/)\nserves as the main entry point for interacting with the Flink runtime. The\n`flink` backend does not create `TableEnvironment` objects; you must create a\n`TableEnvironment` and pass that to\n[`ibis.flink.connect`](../../backends/flink.qmd#ibis.flink.connect):\n\n::: {#56b2d013 .cell execution_count=2}\n``` {.python .cell-code}\nimport ibis\nfrom pyflink.table import EnvironmentSettings, TableEnvironment\n\nenv_settings = EnvironmentSettings.in_streaming_mode()\ntable_env = TableEnvironment.create(env_settings)\ntable_env.get_config().set(\"parallelism.default\", \"1\") # <1>\n\ncon = ibis.flink.connect(table_env)\n```\n:::\n\n\n1. write all the data to one file\n\nFlink’s streaming connectors aren't part of the binary distribution. Link the\n[Kafka\nconnector](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/connectors/table/kafka/)\nfor cluster execution by adding the JAR file from the cloned repository. [Ibis\nexposes the `raw_sql` method for situations like this, where you need to run\narbitrary SQL that cannot be modeled as a table\nexpression](https://ibis-project.org/how-to/extending/sql#backend.raw_sql):\n\n\n\n::: {#f6ddf87a .cell execution_count=4}\n``` {.python .cell-code}\ncon.raw_sql(\"ADD JAR 'flink-sql-connector-kafka-3.0.2-1.18.jar'\")\n```\n:::\n\n\n#### Create the source and sink tables\n\nUse\n[`create_table`](../../backends/flink.qmd#ibis.backends.flink.Backend.create_table)\nto register tables. Notice the new top-level `ibis.watermark` API for\n[specifying a watermark\nstrategy](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/concepts/time/#event-time-and-watermarks).\n\n::: {#7018745d .cell execution_count=5}\n``` {.python .cell-code}\nsource_schema = ibis.schema( # <1>\n { # <1>\n \"createTime\": \"timestamp(3)\", # <1>\n \"orderId\": \"int64\", # <1>\n \"payAmount\": \"float64\", # <1>\n \"payPlatform\": \"int32\", # <1>\n \"provinceId\": \"int32\", # <1>\n } # <1>\n) # <1>\n\nsource_configs = { # <1>\n \"connector\": \"kafka\", # <1>\n \"topic\": \"payment_msg\", # <1>\n \"properties.bootstrap.servers\": \"localhost:9092\", # <1>\n \"properties.group.id\": \"test_3\", # <1>\n \"scan.startup.mode\": \"earliest-offset\", # <1>\n \"format\": \"json\", # <1>\n} # <1>\n\nt = con.create_table( # <1>\n \"payment_msg\", # <1>\n schema=source_schema, # <1>\n tbl_properties=source_configs, # <1>\n watermark=ibis.watermark( # <1>\n time_col=\"createTime\", allowed_delay=ibis.interval(seconds=15) # <1>\n ), # <1>\n) # <1>\n\nsink_schema = ibis.schema( # <2>\n { # <2>\n \"province_id\": \"int32\", # <2>\n \"pay_amount\": \"float64\", # <2>\n } # <2>\n) # <2>\n\nsink_configs = { # <2>\n \"connector\": \"kafka\", # <3>\n \"topic\": \"sink\", # <2>\n \"properties.bootstrap.servers\": \"localhost:9092\", # <2>\n \"format\": \"json\", # <2>\n} # <2>\n\ncon.create_table( # <2>\n \"total_amount_by_province_id\", schema=sink_schema, tbl_properties=sink_configs # <2>\n) # <2>\n```\n\n::: {.cell-output .cell-output-display execution_count=5}\n```{=html}\n
DatabaseTable: total_amount_by_province_id\n  province_id int32\n  pay_amount  float64\n
\n```\n:::\n:::\n\n\n1. create source Table\n2. create sink Table\n\n#### Perform calculations\n\nCompute the total pay amount per province in the past 10 seconds (as of each\nmessage, for the province in the incoming message):\n\n::: {#b7bf4f29 .cell execution_count=6}\n``` {.python .cell-code}\nagged = t.select(\n province_id=t.provinceId,\n pay_amount=t.payAmount.sum().over(\n range=(-ibis.interval(seconds=10), 0),\n group_by=t.provinceId,\n order_by=t.createTime,\n ),\n)\n```\n:::\n\n\nFinally, emit the query result to the sink table:\n\n::: {#f50a9b26 .cell execution_count=7}\n``` {.python .cell-code}\ncon.insert(\"total_amount_by_province_id\", agged)\n```\n\n::: {.cell-output .cell-output-display execution_count=7}\n```\n\n```\n:::\n:::\n\n\n### Remote execution\n\nYou can also submit the example to the [remote cluster started using Docker\nCompose](#spinning-up-the-services-using-docker-compose). The\n`window_aggregation.py` file in the cloned repository contains the [same steps\nthat we performed for local execution](#local-execution). We will [use the\nmethod described in the official Flink\ndocumentation](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/cli/#submitting-pyflink-jobs).\n\n::: {.callout-tip}\nYou can find the `./bin/flink` executable with the following command:\n\n```bash\npython -c'from pathlib import Path; import pyflink; print(Path(pyflink.__spec__.origin).parent / \"bin\" / \"flink\")'\n```\n:::\n\nMy full command looks like this:\n\n```bash\n/opt/miniconda3/envs/ibis-dev/lib/python3.10/site-packages/pyflink/bin/flink run --jobmanager localhost:8081 --python window_aggregation.py\n```\n\nThe command will exit after displaying a submission message:\n\n```\nJob has been submitted with JobID b816faaf5ef9126ea5b9b6a37012cf56\n```\n\n## Viewing the results\n\nSimilar to how we viewed messages in the `payment_msg` topic, we can print\nresults from the `sink` topic:\n\n::: {#f2f232b9 .cell execution_count=8}\n``` {.python .cell-code}\nconsumer = KafkaConsumer(\"sink\")\nfor msg in islice(consumer, 10):\n print(msg)\n```\n\n::: {.cell-output .cell-output-stdout}\n```\nConsumerRecord(topic='sink', partition=0, offset=0, timestamp=1707754308001, timestamp_type=0, key=None, value=b'{\"province_id\":0,\"pay_amount\":34598.581794063946}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=1, timestamp=1707754308005, timestamp_type=0, key=None, value=b'{\"province_id\":3,\"pay_amount\":97439.80920527918}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=2, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":5,\"pay_amount\":94291.98939770168}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=3, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":2,\"pay_amount\":67249.40686490892}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=4, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":0,\"pay_amount\":75995.17413984003}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=5, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":6,\"pay_amount\":73913.82666977578}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=48, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=6, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":6,\"pay_amount\":110483.70544553592}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=7, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":3,\"pay_amount\":146010.17295813354}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=8, timestamp=1707754308006, timestamp_type=0, key=None, value=b'{\"province_id\":3,\"pay_amount\":200463.85497267012}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\nConsumerRecord(topic='sink', partition=0, offset=9, timestamp=1707754308007, timestamp_type=0, key=None, value=b'{\"province_id\":3,\"pay_amount\":208752.74642677643}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=49, serialized_header_size=-1)\n```\n:::\n:::\n\n\nVoilà! You've run your first streaming application using Ibis.\n\n## Shutting down the Compose environment\n\nPress Ctrl+C to stop the Docker Compose containers. Once\nstopped, run `docker compose down` to remove the services created for this\ntutorial.\n\n", "supporting": [ - "index_files" + "index_files/figure-html" ], "filters": [], "includes": { diff --git a/docs/posts/flink-announcement/index.qmd b/docs/posts/flink-announcement/index.qmd index e9e006d5374e..980d4dc6899b 100644 --- a/docs/posts/flink-announcement/index.qmd +++ b/docs/posts/flink-announcement/index.qmd @@ -10,14 +10,15 @@ categories: ## Introduction -Ibis 8.0 marks the official release of the Apache Flink backend for Ibis. Flink -is one of the most established stream-processing frameworks out there and a -central part of the real-time data infrastructure at companies like DoorDash, -LinkedIn, Netflix, and Uber. It is commonly applied in use cases such as fraud -detection, anomaly detection, real-time recommendation, dynamic pricing, and -online advertising. The Flink backend is also the first streaming backend Ibis -supports. Follow along as we define and execute a simple streaming job using -Ibis! +Ibis 8.0 marks the official release of the Apache Flink backend for Ibis. Ibis +users can now manipulate data across streaming and batch contexts using the same +interface. Flink is one of the most established stream-processing frameworks out +there and a central part of the real-time data infrastructure at companies like +DoorDash, LinkedIn, Netflix, and Uber. It is commonly applied in use cases such +as fraud detection, anomaly detection, real-time recommendation, dynamic +pricing, and online advertising. The Flink backend is also the first streaming +backend Ibis supports. Follow along as we define and execute a simple streaming +job using Ibis! ## Installation prerequisites