diff --git a/docs/sphinx/source/examples/feed_performance_cloud.ipynb b/docs/sphinx/source/examples/feed_performance_cloud.ipynb new file mode 100644 index 00000000..85fc8e6e --- /dev/null +++ b/docs/sphinx/source/examples/feed_performance_cloud.ipynb @@ -0,0 +1,3430 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "given-adoption", + "metadata": {}, + "source": [ + "\n", + " \n", + " \n", + " \"#Vespa\"\n", + "\n", + "\n", + "# Feeding to Vespa Cloud\n", + "\n", + "Our [previous notebook](https://pyvespa.readthedocs.io/en/latest/examples/feed_performance.html), we demonstrated one way of benchmarking feed performance to a local Vespa instance running in Docker.\n", + "In this notebook, we will llok at the same methods, but how feeding to [Vespa Cloud](https://cloud.vespa.ai) affects performance of the different methods.\n", + "\n", + "The key difference between feeding to a local Vespa instance and a Vespa Cloud instance is the network latency.\n", + "Additionally, we will introduce embedding in Vespa at feed time, which is a realistic scenario for many use-cases.\n", + "\n", + "We will look at these 3 different methods:\n", + "\n", + "1. Using `feed_iterable()` - which uses threading to parallelize the feed operation. Best for CPU-bound operations.\n", + "2. Using `feed_async_iterable()` - which uses asyncio to parallelize the feed operation. Also uses `httpx` with HTTP/2-support. Performs best for IO-bound operations.\n", + "3. Using [Vespa CLI](https://docs.vespa.ai/en/vespa-cli).\n" + ] + }, + { + "cell_type": "markdown", + "id": "8c967bd2", + "metadata": {}, + "source": [ + "
\n", + " Refer to troubleshooting\n", + " for any problem when running this guide.\n", + "
\n" + ] + }, + { + "cell_type": "markdown", + "id": "8345b2fe", + "metadata": {}, + "source": [ + "Install [Vespa CLI](https://docs.vespa.ai/en/vespa-cli.html).\n", + "The `vespacli` python package is just a thin wrapper, allowing for installation through pypi.\n", + "\n", + "> Do NOT install if you already have the Vespa CLI installed.\n" + ] + }, + { + "cell_type": "markdown", + "id": "5acb52d8", + "metadata": {}, + "source": [ + "[Install pyvespa](https://pyvespa.readthedocs.io/), and other dependencies.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "03f3d0f2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "zsh:1: 5.20 not found\n" + ] + } + ], + "source": [ + "!pip3 install vespacli pyvespa datasets plotly>=5.20" + ] + }, + { + "cell_type": "markdown", + "id": "db637322", + "metadata": {}, + "source": [ + "## Create an application package\n", + "\n", + "The [application package](https://pyvespa.readthedocs.io/en/latest/reference-api.html#vespa.package.ApplicationPackage)\n", + "has all the Vespa configuration files.\n", + "\n", + "For this demo, we will use a simple application package\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "bd5c2629", + "metadata": {}, + "outputs": [], + "source": [ + "from vespa.package import (\n", + " ApplicationPackage,\n", + " Field,\n", + " Schema,\n", + " Document,\n", + " FieldSet,\n", + " HNSW,\n", + ")\n", + "\n", + "# Define the application name (can NOT contain `_` or `-`)\n", + "\n", + "application = \"feedperformancecloud\"\n", + "\n", + "\n", + "package = ApplicationPackage(\n", + " name=application,\n", + " schema=[\n", + " Schema(\n", + " name=\"doc\",\n", + " document=Document(\n", + " fields=[\n", + " Field(name=\"id\", type=\"string\", indexing=[\"summary\"]),\n", + " Field(name=\"text\", type=\"string\", indexing=[\"index\", \"summary\"]),\n", + " Field(\n", + " name=\"embedding\",\n", + " type=\"tensor(x[1024])\",\n", + " # Note that we are NOT embedding with a vespa model here, but that is also possible.\n", + " indexing=[\"summary\", \"attribute\", \"index\"],\n", + " ann=HNSW(distance_metric=\"angular\"),\n", + " ),\n", + " ]\n", + " ),\n", + " fieldsets=[FieldSet(name=\"default\", fields=[\"text\"])],\n", + " )\n", + " ],\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "2c5e2943", + "metadata": {}, + "source": [ + "Note that the `ApplicationPackage` name cannot have `-` or `_`.\n" + ] + }, + { + "cell_type": "markdown", + "id": "careful-savage", + "metadata": {}, + "source": [ + "## Deploy the Vespa application\n", + "\n", + "Deploy `package` on the local machine using Docker,\n", + "without leaving the notebook, by creating an instance of\n", + "[VespaDocker](https://pyvespa.readthedocs.io/en/latest/reference-api.html#vespa.deployment.VespaDocker). `VespaDocker` connects\n", + "to the local Docker daemon socket and starts the [Vespa docker image](https://hub.docker.com/r/vespaengine/vespa/).\n", + "\n", + "If this step fails, please check\n", + "that the Docker daemon is running, and that the Docker daemon socket can be used by clients (Configurable under advanced settings in Docker Desktop).\n" + ] + }, + { + "cell_type": "markdown", + "id": "6f74324a", + "metadata": {}, + "source": [ + "Follow the instrauctions from the output above and add the control-plane key in the console at `https://console.vespa-cloud.com/tenant/TENANT_NAME/account/keys`\n", + "(replace TENANT_NAME with your tenant name).\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "canadian-blood", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Setting application...\n", + "Running: vespa config set application vespa-team.feedperformancecloud\n", + "Setting target cloud...\n", + "Running: vespa config set target cloud\n", + "\n", + "Api-key found for control plane access. Using api-key.\n" + ] + } + ], + "source": [ + "from vespa.deployment import VespaCloud\n", + "from vespa.application import Vespa\n", + "import os\n", + "\n", + "\n", + "def read_secret():\n", + " \"\"\"Read the API key from the environment variable. This is\n", + " only used for CI/CD purposes.\"\"\"\n", + " t = os.getenv(\"VESPA_TEAM_API_KEY\")\n", + " if t:\n", + " return t.replace(r\"\\n\", \"\\n\")\n", + " else:\n", + " return t\n", + "\n", + "\n", + "vespa_cloud = VespaCloud(\n", + " tenant=\"vespa-team\",\n", + " application=application,\n", + " key_content=read_secret()\n", + " if read_secret()\n", + " else None, # Can removed this for interactive control-plane login\n", + " application_package=package,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "aaae2f91", + "metadata": {}, + "source": [ + "`app` now holds a reference to a [VespaCloud](https://pyvespa.readthedocs.io/en/latest/reference-api.html#vespa.deployment.VespaCloud) instance.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "471c2da7", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Deployment started in run 3 of dev-aws-us-east-1c for vespa-team.feedperformancecloud. This may take a few minutes the first time.\n", + "INFO [08:04:48] Deploying platform version 8.387.10 and application dev build 3 for dev-aws-us-east-1c of default ...\n", + "INFO [08:04:48] Using CA signed certificate version 1\n", + "INFO [08:04:49] Using 1 nodes in container cluster 'feedperformancecloud_container'\n", + "WARNING [08:04:50] Auto-overriding validation which would be disallowed in production: certificate-removal: Data plane certificate(s) from cluster 'feedperformancecloud_container' is removed (removed certificates: [CN=cloud.vespa.example]) This can cause client connection issues.. To allow this add certificate-removal to validation-overrides.xml, see https://docs.vespa.ai/en/reference/validation-overrides.html\n", + "INFO [08:04:50] Using 1 nodes in container cluster 'feedperformancecloud_container'\n", + "WARNING [08:04:53] Auto-overriding validation which would be disallowed in production: certificate-removal: Data plane certificate(s) from cluster 'feedperformancecloud_container' is removed (removed certificates: [CN=cloud.vespa.example]) This can cause client connection issues.. To allow this add certificate-removal to validation-overrides.xml, see https://docs.vespa.ai/en/reference/validation-overrides.html\n", + "INFO [08:04:55] Session 303878 for tenant 'vespa-team' prepared and activated.\n", + "INFO [08:04:55] ######## Details for all nodes ########\n", + "INFO [08:04:55] h95731a.dev.aws-us-east-1c.vespa-external.aws.oath.cloud: expected to be UP\n", + "INFO [08:04:55] --- platform vespa/cloud-tenant-rhel8:8.387.10\n", + "INFO [08:04:55] --- container on port 4080 has not started \n", + "INFO [08:04:55] --- metricsproxy-container on port 19092 has config generation 303870, wanted is 303878\n", + "INFO [08:04:55] h95729b.dev.aws-us-east-1c.vespa-external.aws.oath.cloud: expected to be UP\n", + "INFO [08:04:55] --- platform vespa/cloud-tenant-rhel8:8.387.10\n", + "INFO [08:04:55] --- storagenode on port 19102 has config generation 303870, wanted is 303878\n", + "INFO [08:04:55] --- searchnode on port 19107 has config generation 303878, wanted is 303878\n", + "INFO [08:04:55] --- distributor on port 19111 has config generation 303878, wanted is 303878\n", + "INFO [08:04:55] --- metricsproxy-container on port 19092 has config generation 303878, wanted is 303878\n", + "INFO [08:04:55] h93272g.dev.aws-us-east-1c.vespa-external.aws.oath.cloud: expected to be UP\n", + "INFO [08:04:55] --- platform vespa/cloud-tenant-rhel8:8.387.10\n", + "INFO [08:04:55] --- logserver-container on port 4080 has config generation 303878, wanted is 303878\n", + "INFO [08:04:55] --- metricsproxy-container on port 19092 has config generation 303878, wanted is 303878\n", + "INFO [08:04:55] h93272h.dev.aws-us-east-1c.vespa-external.aws.oath.cloud: expected to be UP\n", + "INFO [08:04:55] --- platform vespa/cloud-tenant-rhel8:8.387.10\n", + "INFO [08:04:55] --- container-clustercontroller on port 19050 has config generation 303878, wanted is 303878\n", + "INFO [08:04:55] --- metricsproxy-container on port 19092 has config generation 303878, wanted is 303878\n", + "INFO [08:05:03] Found endpoints:\n", + "INFO [08:05:03] - dev.aws-us-east-1c\n", + "INFO [08:05:03] |-- https://b48e8812.bc737822.z.vespa-app.cloud/ (cluster 'feedperformancecloud_container')\n", + "INFO [08:05:04] Deployment of new application complete!\n", + "Found mtls endpoint for feedperformancecloud_container\n", + "URL: https://b48e8812.bc737822.z.vespa-app.cloud/\n", + "Connecting to https://b48e8812.bc737822.z.vespa-app.cloud/\n", + "Using mtls_key_cert Authentication against endpoint https://b48e8812.bc737822.z.vespa-app.cloud//ApplicationStatus\n", + "Application is up!\n", + "Finished deployment.\n" + ] + } + ], + "source": [ + "app: Vespa = vespa_cloud.deploy()" + ] + }, + { + "cell_type": "markdown", + "id": "570cfbd3", + "metadata": {}, + "source": [ + "Note that if you already have a Vespa Cloud instance running, the recommended way to initialize a `Vespa` instance is directly, by passing the `endpoint` and `tenant` parameters to the `Vespa` constructor, along with either:\n", + "\n", + "1. Key/cert for dataplane authentication (generated as part of deployment, copied into the application package, in `/security/clients.pem`, and `~/.vespa/mytenant.myapplication/data-plane-public-cert.pem` and `~/.vespa/mytenant.myapplication/data-plane-private-key.pem`).\n", + "\n", + "```python\n", + "from vespa.application import Vespa\n", + "\n", + "app: Vespa = Vespa(\n", + " url=\"https://my-endpoint.z.vespa-app.cloud\",\n", + " tenant=\"my-tenant\",\n", + " key_file=\"path/to/private-key.pem\",\n", + " cert_file=\"path/to/certificate.pem\",\n", + ")\n", + "```\n", + "\n", + "2. Using a token (must be generated in [Vespa Cloud Console](https://console.vespa-cloud.com/) and defined in the application package, see https://cloud.vespa.ai/en/security/guide.\n", + "\n", + "```python\n", + "from vespa.application import Vespa\n", + "import os\n", + "\n", + "app: Vespa = Vespa(\n", + " url=\"https://my-endpoint.z.vespa-app.cloud\",\n", + " tenant=\"my-tenant\",\n", + " vespa_cloud_secret_token=os.getenv(\"VESPA_CLOUD_SECRET_TOKEN\"),\n", + ")\n", + "```\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "3bdbbb47", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Using mtls_key_cert Authentication against endpoint https://b48e8812.bc737822.z.vespa-app.cloud//ApplicationStatus\n" + ] + }, + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "app.get_application_status()" + ] + }, + { + "cell_type": "markdown", + "id": "sealed-mustang", + "metadata": {}, + "source": [ + "## Preparing the data\n", + "\n", + "In this example we use [HF Datasets](https://huggingface.co/docs/datasets/index) library to stream the\n", + "[\"Cohere/wikipedia-2023-11-embed-multilingual-v3\"](https://huggingface.co/datasets/Cohere/wikipedia-2023-11-embed-multilingual-v3) dataset and index in our newly deployed Vespa instance.\n", + "\n", + "The dataset contains wikipedia-pages, and their corresponding embeddings.\n", + "\n", + "> For this exploration we will use the `id` , `text` and `embedding`-fields\n", + "\n", + "The following uses the [stream](https://huggingface.co/docs/datasets/stream) option of datasets to stream the data without\n", + "downloading all the contents locally.\n", + "\n", + "The `map` functionality allows us to convert the\n", + "dataset fields into the expected feed format for `pyvespa` which expects a dict with the keys `id` and `fields`:\n", + "\n", + "`{ \"id\": \"vespa-document-id\", \"fields\": {\"vespa_field\": \"vespa-field-value\"}}`\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "e9d3facd", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/Users/thomas/.pyenv/versions/3.9.19/envs/pyvespa-dev/lib/python3.9/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" + ] + } + ], + "source": [ + "from datasets import load_dataset" + ] + }, + { + "cell_type": "markdown", + "id": "e2b68592", + "metadata": {}, + "source": [ + "## Utility function to create dataset with different number of documents\n" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "60772727", + "metadata": {}, + "outputs": [], + "source": [ + "def get_dataset(n_docs: int = 1000):\n", + " dataset = load_dataset(\n", + " \"Cohere/wikipedia-2023-11-embed-multilingual-v3\",\n", + " \"simple\",\n", + " split=f\"train[:{n_docs}]\",\n", + " )\n", + " dataset = dataset.map(\n", + " lambda x: {\n", + " \"id\": x[\"_id\"] + \"-iter\",\n", + " \"fields\": {\"text\": x[\"text\"], \"embedding\": x[\"emb\"]},\n", + " }\n", + " ).select_columns([\"id\", \"fields\"])\n", + " return dataset" + ] + }, + { + "cell_type": "markdown", + "id": "5e3f0d0f", + "metadata": {}, + "source": [ + "### A dataclass to store the parameters and results of the different feeding methods\n" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "b6ab7b70", + "metadata": {}, + "outputs": [], + "source": [ + "from dataclasses import dataclass\n", + "from typing import Callable, Optional, Iterable, Dict\n", + "\n", + "\n", + "@dataclass\n", + "class FeedParams:\n", + " name: str\n", + " num_docs: int\n", + " max_connections: int\n", + " function_name: str\n", + " max_workers: Optional[int] = None\n", + " max_queue_size: Optional[int] = None\n", + "\n", + "\n", + "@dataclass\n", + "class FeedResult(FeedParams):\n", + " feed_time: Optional[float] = None" + ] + }, + { + "cell_type": "markdown", + "id": "f865e5c7", + "metadata": {}, + "source": [ + "### A common callback function to notify if something goes wrong\n" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "ab4c02b9", + "metadata": {}, + "outputs": [], + "source": [ + "from vespa.io import VespaResponse\n", + "\n", + "\n", + "def callback(response: VespaResponse, id: str):\n", + " if not response.is_successful():\n", + " print(\n", + " f\"Failed to feed document {id} with status code {response.status_code}: Reason {response.get_json()}\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "aa3e99e3", + "metadata": {}, + "source": [ + "### Defining our feeding functions\n" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "9b70bde7", + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "import asyncio\n", + "from vespa.application import Vespa" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "1fe15cec", + "metadata": {}, + "outputs": [], + "source": [ + "def feed_iterable(app: Vespa, params: FeedParams, data: Iterable[Dict]) -> FeedResult:\n", + " start = time.time()\n", + " app.feed_iterable(\n", + " data,\n", + " schema=\"doc\",\n", + " namespace=\"pyvespa-feed\",\n", + " operation_type=\"feed\",\n", + " max_queue_size=params.max_queue_size,\n", + " max_workers=params.max_workers,\n", + " max_connections=params.max_connections,\n", + " callback=callback,\n", + " )\n", + " end = time.time()\n", + " sync_feed_time = end - start\n", + " return FeedResult(\n", + " **params.__dict__,\n", + " feed_time=sync_feed_time,\n", + " )\n", + "\n", + "\n", + "def feed_async_iterable(\n", + " app: Vespa, params: FeedParams, data: Iterable[Dict]\n", + ") -> FeedResult:\n", + " start = time.time()\n", + " app.feed_async_iterable(\n", + " data,\n", + " schema=\"doc\",\n", + " namespace=\"pyvespa-feed\",\n", + " operation_type=\"feed\",\n", + " max_queue_size=params.max_queue_size,\n", + " max_workers=params.max_workers,\n", + " max_connections=params.max_connections,\n", + " callback=callback,\n", + " )\n", + " end = time.time()\n", + " sync_feed_time = end - start\n", + " return FeedResult(\n", + " **params.__dict__,\n", + " feed_time=sync_feed_time,\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "43614eb0", + "metadata": {}, + "source": [ + "## Defining our hyperparameters\n" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "a22fe87e", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Function: feed_async_iterable\n", + "{'num_docs': 1000, 'max_connections': 1, 'max_workers': 64, 'max_queue_size': 2500}\n", + "{'num_docs': 5000, 'max_connections': 1, 'max_workers': 64, 'max_queue_size': 2500}\n", + "{'num_docs': 10000, 'max_connections': 1, 'max_workers': 64, 'max_queue_size': 2500}\n", + "\n", + "\n", + "Function: feed_iterable\n", + "{'num_docs': 1000, 'max_connections': 64, 'max_workers': 64, 'max_queue_size': 2500}\n", + "{'num_docs': 5000, 'max_connections': 64, 'max_workers': 64, 'max_queue_size': 2500}\n", + "{'num_docs': 10000, 'max_connections': 64, 'max_workers': 64, 'max_queue_size': 2500}\n", + "\n", + "\n" + ] + } + ], + "source": [ + "from itertools import product\n", + "\n", + "# We will only run for up to 10 000 documents here as notebook is run as part of CI.\n", + "\n", + "num_docs = [\n", + " 1000,\n", + " 5_000,\n", + " 10_000,\n", + "]\n", + "params_by_function = {\n", + " \"feed_async_iterable\": {\n", + " \"num_docs\": num_docs,\n", + " \"max_connections\": [1],\n", + " \"max_workers\": [64],\n", + " \"max_queue_size\": [2500],\n", + " },\n", + " \"feed_iterable\": {\n", + " \"num_docs\": num_docs,\n", + " \"max_connections\": [64],\n", + " \"max_workers\": [64],\n", + " \"max_queue_size\": [2500],\n", + " },\n", + "}\n", + "\n", + "feed_params = []\n", + "# Create one FeedParams instance of each permutation\n", + "for func, parameters in params_by_function.items():\n", + " print(f\"Function: {func}\")\n", + " keys, values = zip(*parameters.items())\n", + " for combination in product(*values):\n", + " settings = dict(zip(keys, combination))\n", + " print(settings)\n", + " feed_params.append(\n", + " FeedParams(\n", + " name=f\"{settings['num_docs']}_{settings['max_connections']}_{settings.get('max_workers', 0)}_{func}\",\n", + " function_name=func,\n", + " **settings,\n", + " )\n", + " )\n", + " print(\"\\n\") # Just to add space between different functions" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "2b3f067c", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Total number of feed_params: 6\n" + ] + } + ], + "source": [ + "print(f\"Total number of feed_params: {len(feed_params)}\")" + ] + }, + { + "cell_type": "markdown", + "id": "15648d56", + "metadata": {}, + "source": [ + "Now, we will need a way to retrieve the callable function from the function name.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "22044170", + "metadata": {}, + "outputs": [], + "source": [ + "# Get reference to function from string name\n", + "def get_func_from_str(func_name: str) -> Callable:\n", + " return globals()[func_name]" + ] + }, + { + "cell_type": "markdown", + "id": "79f3f550", + "metadata": {}, + "source": [ + "### Function to clean up after each feed\n", + "\n", + "For a fair comparison, we will delete the data before feeding it again.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "1da9d3f9", + "metadata": {}, + "outputs": [], + "source": [ + "from typing import Iterable, Dict\n", + "from vespa.application import Vespa\n", + "\n", + "\n", + "def delete_data(app: Vespa, data: Iterable[Dict]):\n", + " app.feed_iterable(\n", + " iter=data,\n", + " schema=\"doc\",\n", + " namespace=\"pyvespa-feed\",\n", + " operation_type=\"delete\",\n", + " callback=callback,\n", + " max_workers=16,\n", + " max_connections=16,\n", + " )" + ] + }, + { + "cell_type": "markdown", + "id": "e081bf94", + "metadata": {}, + "source": [ + "## Main experiment loop\n" + ] + }, + { + "cell_type": "markdown", + "id": "87c8700c", + "metadata": {}, + "source": [ + "The line below is used to make the code run in Jupyter, as it is already running an event loop\n" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "aaa8f920", + "metadata": {}, + "outputs": [], + "source": [ + "import nest_asyncio\n", + "\n", + "nest_asyncio.apply()" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "7a55e1c9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "--------------------------------------------------\n", + "Starting feed with params:\n", + "FeedParams(name='1000_1_64_feed_async_iterable', num_docs=1000, max_connections=1, function_name='feed_async_iterable', max_workers=64, max_queue_size=2500)\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Using mtls_key_cert Authentication against endpoint https://b48e8812.bc737822.z.vespa-app.cloud//ApplicationStatus\n", + "9.478203773498535\n", + "Deleting data\n", + "--------------------------------------------------\n", + "Starting feed with params:\n", + "FeedParams(name='5000_1_64_feed_async_iterable', num_docs=5000, max_connections=1, function_name='feed_async_iterable', max_workers=64, max_queue_size=2500)\n", + "32.890751123428345\n", + "Deleting data\n", + "--------------------------------------------------\n", + "Starting feed with params:\n", + "FeedParams(name='10000_1_64_feed_async_iterable', num_docs=10000, max_connections=1, function_name='feed_async_iterable', max_workers=64, max_queue_size=2500)\n", + "77.85460019111633\n", + "Deleting data\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Exception in thread Thread-7:\n", + "Traceback (most recent call last):\n", + " File \"/Users/thomas/.pyenv/versions/3.9.19/lib/python3.9/threading.py\", line 980, in _bootstrap_inner\n", + " self.run()\n", + " File \"/Users/thomas/.pyenv/versions/3.9.19/envs/pyvespa-dev/lib/python3.9/site-packages/ipykernel/ipkernel.py\", line 766, in run_closure\n", + " _threading_Thread_run(self)\n", + " File \"/Users/thomas/.pyenv/versions/3.9.19/lib/python3.9/threading.py\", line 917, in run\n", + " self._target(*self._args, **self._kwargs)\n", + " File \"/Users/thomas/Repos/pyvespa/vespa/application.py\", line 480, in _consumer\n", + " future: Future = executor.submit(_submit, doc, sync_session)\n", + " File \"/Users/thomas/.pyenv/versions/3.9.19/lib/python3.9/concurrent/futures/thread.py\", line 167, in submit\n", + " raise RuntimeError('cannot schedule new futures after shutdown')\n", + "RuntimeError: cannot schedule new futures after shutdown\n" + ] + }, + { + "ename": "KeyboardInterrupt", + "evalue": "", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mKeyboardInterrupt\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[17], line 22\u001b[0m\n\u001b[1;32m 20\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mDeleting data\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[1;32m 21\u001b[0m time\u001b[38;5;241m.\u001b[39msleep(\u001b[38;5;241m3\u001b[39m)\n\u001b[0;32m---> 22\u001b[0m \u001b[43mdelete_data\u001b[49m\u001b[43m(\u001b[49m\u001b[43mapp\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdata\u001b[49m\u001b[43m)\u001b[49m\n", + "Cell \u001b[0;32mIn[15], line 6\u001b[0m, in \u001b[0;36mdelete_data\u001b[0;34m(app, data)\u001b[0m\n\u001b[1;32m 5\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mdelete_data\u001b[39m(app: Vespa, data: Iterable[Dict]):\n\u001b[0;32m----> 6\u001b[0m \u001b[43mapp\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mfeed_iterable\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 7\u001b[0m \u001b[43m \u001b[49m\u001b[38;5;28;43miter\u001b[39;49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mdata\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 8\u001b[0m \u001b[43m \u001b[49m\u001b[43mschema\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mdoc\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\n\u001b[1;32m 9\u001b[0m \u001b[43m \u001b[49m\u001b[43mnamespace\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mpyvespa-feed\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\n\u001b[1;32m 10\u001b[0m \u001b[43m \u001b[49m\u001b[43moperation_type\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mdelete\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\n\u001b[1;32m 11\u001b[0m \u001b[43m \u001b[49m\u001b[43mcallback\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mcallback\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 12\u001b[0m \u001b[43m \u001b[49m\u001b[43mmax_workers\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;241;43m16\u001b[39;49m\u001b[43m,\u001b[49m\n\u001b[1;32m 13\u001b[0m \u001b[43m \u001b[49m\u001b[43mmax_connections\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;241;43m16\u001b[39;49m\u001b[43m,\u001b[49m\n\u001b[1;32m 14\u001b[0m \u001b[43m \u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/Repos/pyvespa/vespa/application.py:579\u001b[0m, in \u001b[0;36mVespa.feed_iterable\u001b[0;34m(self, iter, schema, namespace, callback, operation_type, max_queue_size, max_workers, max_connections, **kwargs)\u001b[0m\n\u001b[1;32m 577\u001b[0m consumer_thread\u001b[38;5;241m.\u001b[39mstart()\n\u001b[1;32m 578\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m doc \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28miter\u001b[39m:\n\u001b[0;32m--> 579\u001b[0m \u001b[43mqueue\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mput\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdoc\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mblock\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mTrue\u001b[39;49;00m\u001b[43m)\u001b[49m\n\u001b[1;32m 580\u001b[0m queue\u001b[38;5;241m.\u001b[39mput(\u001b[38;5;28;01mNone\u001b[39;00m, block\u001b[38;5;241m=\u001b[39m\u001b[38;5;28;01mTrue\u001b[39;00m)\n\u001b[1;32m 581\u001b[0m queue\u001b[38;5;241m.\u001b[39mjoin()\n", + "File \u001b[0;32m~/.pyenv/versions/3.9.19/lib/python3.9/queue.py:140\u001b[0m, in \u001b[0;36mQueue.put\u001b[0;34m(self, item, block, timeout)\u001b[0m\n\u001b[1;32m 138\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m timeout \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[1;32m 139\u001b[0m \u001b[38;5;28;01mwhile\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_qsize() \u001b[38;5;241m>\u001b[39m\u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39mmaxsize:\n\u001b[0;32m--> 140\u001b[0m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mnot_full\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mwait\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 141\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m timeout \u001b[38;5;241m<\u001b[39m \u001b[38;5;241m0\u001b[39m:\n\u001b[1;32m 142\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124mtimeout\u001b[39m\u001b[38;5;124m'\u001b[39m\u001b[38;5;124m must be a non-negative number\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n", + "File \u001b[0;32m~/.pyenv/versions/3.9.19/lib/python3.9/threading.py:312\u001b[0m, in \u001b[0;36mCondition.wait\u001b[0;34m(self, timeout)\u001b[0m\n\u001b[1;32m 310\u001b[0m \u001b[38;5;28;01mtry\u001b[39;00m: \u001b[38;5;66;03m# restore state no matter what (e.g., KeyboardInterrupt)\u001b[39;00m\n\u001b[1;32m 311\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m timeout \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m:\n\u001b[0;32m--> 312\u001b[0m \u001b[43mwaiter\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43macquire\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 313\u001b[0m gotit \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;01mTrue\u001b[39;00m\n\u001b[1;32m 314\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n", + "\u001b[0;31mKeyboardInterrupt\u001b[0m: " + ] + } + ], + "source": [ + "results = []\n", + "for params in feed_params:\n", + " print(\"-\" * 50)\n", + " print(\"Starting feed with params:\")\n", + " print(params)\n", + " data = get_dataset(params.num_docs)\n", + " if \"xxx\" not in params.function_name:\n", + " if \"feed_sync\" in params.function_name:\n", + " print(\"Skipping feed_sync\")\n", + " continue\n", + " feed_result = get_func_from_str(params.function_name)(\n", + " app=app, params=params, data=data\n", + " )\n", + " else:\n", + " feed_result = asyncio.run(\n", + " get_func_from_str(params.function_name)(app=app, params=params, data=data)\n", + " )\n", + " print(feed_result.feed_time)\n", + " results.append(feed_result)\n", + " print(\"Deleting data\")\n", + " time.sleep(3)\n", + " delete_data(app, data)" + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "e4b7f1a4", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
namenum_docsmax_connectionsfunction_namemax_workersmax_queue_sizefeed_timerequests_per_second
01000_1_64_feed_async_iterable10001feed_async_iterable6425009.478204105.505223
15000_1_64_feed_async_iterable50001feed_async_iterable64250032.890751152.018419
210000_1_64_feed_async_iterable100001feed_async_iterable64250077.854600128.444562
\n", + "
" + ], + "text/plain": [ + " name num_docs max_connections \\\n", + "0 1000_1_64_feed_async_iterable 1000 1 \n", + "1 5000_1_64_feed_async_iterable 5000 1 \n", + "2 10000_1_64_feed_async_iterable 10000 1 \n", + "\n", + " function_name max_workers max_queue_size feed_time \\\n", + "0 feed_async_iterable 64 2500 9.478204 \n", + "1 feed_async_iterable 64 2500 32.890751 \n", + "2 feed_async_iterable 64 2500 77.854600 \n", + "\n", + " requests_per_second \n", + "0 105.505223 \n", + "1 152.018419 \n", + "2 128.444562 " + ] + }, + "execution_count": 18, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Create a pandas DataFrame with the results\n", + "import pandas as pd\n", + "\n", + "df = pd.DataFrame([result.__dict__ for result in results])\n", + "df[\"requests_per_second\"] = df[\"num_docs\"] / df[\"feed_time\"]\n", + "df" + ] + }, + { + "cell_type": "markdown", + "id": "0882a32a", + "metadata": {}, + "source": [ + "## Plotting the results\n", + "\n", + "Let's plot the results to see how the different methods compare.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 28, + "id": "b94ef835", + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.plotly.v1+json": { + "config": { + "plotlyServerURL": "https://plot.ly" + }, + "data": [ + { + "customdata": [ + [ + 64, + 2500, + 1 + ], + [ + 64, + 2500, + 1 + ], + [ + 64, + 2500, + 1 + ] + ], + "hovertemplate": "function_name=feed_async_iterable
Number of Documents=%{x}
Requests per Second=%{y}
max_workers=%{customdata[0]}
max_queue_size=%{customdata[1]}
max_connections=%{customdata[2]}", + "legendgroup": "feed_async_iterable", + "marker": { + "color": "#636efa", + "opacity": 0.7, + "size": 12, + "symbol": "circle" + }, + "mode": "markers", + "name": "feed_async_iterable", + "orientation": "v", + "showlegend": true, + "type": "scatter", + "x": [ + 1000, + 5000, + 10000 + ], + "xaxis": "x", + "y": [ + 105.5052227085519, + 152.01841944066945, + 128.44456172727297 + ], + "yaxis": "y" + } + ], + "layout": { + "font": { + "size": 16 + }, + "legend": { + "title": { + "font": { + "size": 16 + }, + "text": "Function Details" + }, + "tracegroupgap": 0, + "x": 800, + "xanchor": "auto", + "y": 1, + "yanchor": "auto" + }, + "template": { + "data": { + "bar": [ + { + "error_x": { + "color": "#2a3f5f" + }, + "error_y": { + "color": "#2a3f5f" + }, + "marker": { + "line": { + "color": "white", + "width": 0.5 + }, + "pattern": { + "fillmode": "overlay", + "size": 10, + "solidity": 0.2 + } + }, + "type": "bar" + } + ], + "barpolar": [ + { + "marker": { + "line": { + "color": "white", + "width": 0.5 + }, + "pattern": { + "fillmode": "overlay", + "size": 10, + "solidity": 0.2 + } + }, + "type": "barpolar" + } + ], + "carpet": [ + { + "aaxis": { + "endlinecolor": "#2a3f5f", + "gridcolor": "#C8D4E3", + "linecolor": "#C8D4E3", + "minorgridcolor": "#C8D4E3", + "startlinecolor": "#2a3f5f" + }, + "baxis": { + "endlinecolor": "#2a3f5f", + "gridcolor": "#C8D4E3", + "linecolor": "#C8D4E3", + "minorgridcolor": "#C8D4E3", + "startlinecolor": "#2a3f5f" + }, + "type": "carpet" + } + ], + "choropleth": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "type": "choropleth" + } + ], + "contour": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "colorscale": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "type": "contour" + } + ], + "contourcarpet": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "type": "contourcarpet" + } + ], + "heatmap": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "colorscale": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "type": "heatmap" + } + ], + "heatmapgl": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "colorscale": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "type": "heatmapgl" + } + ], + "histogram": [ + { + "marker": { + "pattern": { + "fillmode": "overlay", + "size": 10, + "solidity": 0.2 + } + }, + "type": "histogram" + } + ], + "histogram2d": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "colorscale": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "type": "histogram2d" + } + ], + "histogram2dcontour": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "colorscale": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "type": "histogram2dcontour" + } + ], + "mesh3d": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "type": "mesh3d" + } + ], + "parcoords": [ + { + "line": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "parcoords" + } + ], + "pie": [ + { + "automargin": true, + "type": "pie" + } + ], + "scatter": [ + { + "fillpattern": { + "fillmode": "overlay", + "size": 10, + "solidity": 0.2 + }, + "type": "scatter" + } + ], + "scatter3d": [ + { + "line": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scatter3d" + } + ], + "scattercarpet": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scattercarpet" + } + ], + "scattergeo": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scattergeo" + } + ], + "scattergl": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scattergl" + } + ], + "scattermapbox": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scattermapbox" + } + ], + "scatterpolar": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scatterpolar" + } + ], + "scatterpolargl": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scatterpolargl" + } + ], + "scatterternary": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scatterternary" + } + ], + "surface": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "colorscale": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "type": "surface" + } + ], + "table": [ + { + "cells": { + "fill": { + "color": "#EBF0F8" + }, + "line": { + "color": "white" + } + }, + "header": { + "fill": { + "color": "#C8D4E3" + }, + "line": { + "color": "white" + } + }, + "type": "table" + } + ] + }, + "layout": { + "annotationdefaults": { + "arrowcolor": "#2a3f5f", + "arrowhead": 0, + "arrowwidth": 1 + }, + "autotypenumbers": "strict", + "coloraxis": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "colorscale": { + "diverging": [ + [ + 0, + "#8e0152" + ], + [ + 0.1, + "#c51b7d" + ], + [ + 0.2, + "#de77ae" + ], + [ + 0.3, + "#f1b6da" + ], + [ + 0.4, + "#fde0ef" + ], + [ + 0.5, + "#f7f7f7" + ], + [ + 0.6, + "#e6f5d0" + ], + [ + 0.7, + "#b8e186" + ], + [ + 0.8, + "#7fbc41" + ], + [ + 0.9, + "#4d9221" + ], + [ + 1, + "#276419" + ] + ], + "sequential": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "sequentialminus": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ] + }, + "colorway": [ + "#636efa", + "#EF553B", + "#00cc96", + "#ab63fa", + "#FFA15A", + "#19d3f3", + "#FF6692", + "#B6E880", + "#FF97FF", + "#FECB52" + ], + "font": { + "color": "#2a3f5f" + }, + "geo": { + "bgcolor": "white", + "lakecolor": "white", + "landcolor": "white", + "showlakes": true, + "showland": true, + "subunitcolor": "#C8D4E3" + }, + "hoverlabel": { + "align": "left" + }, + "hovermode": "closest", + "mapbox": { + "style": "light" + }, + "paper_bgcolor": "white", + "plot_bgcolor": "white", + "polar": { + "angularaxis": { + "gridcolor": "#EBF0F8", + "linecolor": "#EBF0F8", + "ticks": "" + }, + "bgcolor": "white", + "radialaxis": { + "gridcolor": "#EBF0F8", + "linecolor": "#EBF0F8", + "ticks": "" + } + }, + "scene": { + "xaxis": { + "backgroundcolor": "white", + "gridcolor": "#DFE8F3", + "gridwidth": 2, + "linecolor": "#EBF0F8", + "showbackground": true, + "ticks": "", + "zerolinecolor": "#EBF0F8" + }, + "yaxis": { + "backgroundcolor": "white", + "gridcolor": "#DFE8F3", + "gridwidth": 2, + "linecolor": "#EBF0F8", + "showbackground": true, + "ticks": "", + "zerolinecolor": "#EBF0F8" + }, + "zaxis": { + "backgroundcolor": "white", + "gridcolor": "#DFE8F3", + "gridwidth": 2, + "linecolor": "#EBF0F8", + "showbackground": true, + "ticks": "", + "zerolinecolor": "#EBF0F8" + } + }, + "shapedefaults": { + "line": { + "color": "#2a3f5f" + } + }, + "ternary": { + "aaxis": { + "gridcolor": "#DFE8F3", + "linecolor": "#A2B1C6", + "ticks": "" + }, + "baxis": { + "gridcolor": "#DFE8F3", + "linecolor": "#A2B1C6", + "ticks": "" + }, + "bgcolor": "white", + "caxis": { + "gridcolor": "#DFE8F3", + "linecolor": "#A2B1C6", + "ticks": "" + } + }, + "title": { + "x": 0.05 + }, + "xaxis": { + "automargin": true, + "gridcolor": "#EBF0F8", + "linecolor": "#EBF0F8", + "ticks": "", + "title": { + "standoff": 15 + }, + "zerolinecolor": "#EBF0F8", + "zerolinewidth": 2 + }, + "yaxis": { + "automargin": true, + "gridcolor": "#EBF0F8", + "linecolor": "#EBF0F8", + "ticks": "", + "title": { + "standoff": 15 + }, + "zerolinecolor": "#EBF0F8", + "zerolinewidth": 2 + } + } + }, + "title": { + "text": "Performance: Requests per Second vs. Number of Documents" + }, + "width": 800, + "xaxis": { + "anchor": "y", + "domain": [ + 0, + 1 + ], + "ticktext": [ + "1k", + "5k", + "10k" + ], + "tickvals": [ + 1000, + 5000, + 10000 + ], + "title": { + "text": "Number of Documents" + }, + "type": "log" + }, + "yaxis": { + "anchor": "x", + "domain": [ + 0, + 1 + ], + "title": { + "text": "Requests per Second" + } + } + } + } + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "import plotly.express as px\n", + "\n", + "\n", + "def plot_performance(df: pd.DataFrame):\n", + " # Create a scatter plot with logarithmic scale for both axes using Plotly Express\n", + " fig = px.scatter(\n", + " df,\n", + " x=\"num_docs\",\n", + " y=\"requests_per_second\",\n", + " color=\"function_name\", # Defines color based on different functions\n", + " log_x=True, # Set x-axis to logarithmic scale\n", + " log_y=False, # If you also want the y-axis in logarithmic scale, set this to True\n", + " title=\"Performance: Requests per Second vs. Number of Documents\",\n", + " labels={ # Customizing axis labels\n", + " \"num_docs\": \"Number of Documents\",\n", + " \"requests_per_second\": \"Requests per Second\",\n", + " \"max_workers\": \"max_workers\",\n", + " \"max_queue_size\": \"max_queue_size\",\n", + " },\n", + " template=\"plotly_white\", # This sets the style to a white background, adhering to Tufte's minimalist principles\n", + " hover_data=[\n", + " \"max_workers\",\n", + " \"max_queue_size\",\n", + " \"max_connections\",\n", + " ], # Additional information to show on hover\n", + " )\n", + "\n", + " # Update layout for better readability, similar to 'talk' context in Seaborn\n", + " fig.update_layout(\n", + " font=dict(\n", + " size=16, # Adjusting font size for better visibility, similar to 'talk' context\n", + " ),\n", + " legend_title_text=\"Function Details\", # Custom legend title\n", + " legend=dict(\n", + " title_font_size=16,\n", + " x=800, # Adjusting legend position similar to bbox_to_anchor in Matplotlib\n", + " xanchor=\"auto\",\n", + " y=1,\n", + " yanchor=\"auto\",\n", + " ),\n", + " width=800, # Adjusting width of the plot\n", + " )\n", + " fig.update_xaxes(\n", + " tickvals=[1000, 5000, 10000], # Set specific tick values\n", + " ticktext=[\"1k\", \"5k\", \"10k\"], # Set corresponding tick labels\n", + " )\n", + "\n", + " fig.update_traces(\n", + " marker=dict(size=12, opacity=0.7)\n", + " ) # Adjust marker size and opacity\n", + " # Show plot\n", + " fig.show()\n", + " # Save plot as HTML file\n", + " fig.write_html(\"performance.html\")\n", + "\n", + "\n", + "plot_performance(df)" + ] + }, + { + "cell_type": "markdown", + "id": "46a28331", + "metadata": {}, + "source": [ + "Interesting. Let's try to summarize the insights we got from this experiment:\n", + "\n", + "- The `feed_async_iterable` method is approximately 3x faster than the `feed_iterable` method.\n", + "- Note that this will vary depending on the network latency between the client and the Vespa instance.\n" + ] + }, + { + "cell_type": "markdown", + "id": "5630e540", + "metadata": {}, + "source": [ + "## Feeding with Vespa CLI\n", + "\n", + "[Vespa CLI](https://docs.vespa.ai/en/vespa-cli) is a command-line interface for interacting with Vespa.\n", + "\n", + "Among many useful features are a `vespa feed` command that is the recommended way of feeding large datasets into Vespa.\n", + "This is optimized for high feeding performance, and it will be interesting to get a feel for how performant feeding to a local Vespa instance is using the CLI.\n", + "\n", + "Note that comparing feeding with the CLI is not entirely fair, as the CLI relies on prepared data files, while the pyvespa methods are streaming the data before feeding it.\n" + ] + }, + { + "cell_type": "markdown", + "id": "f82d48fd", + "metadata": {}, + "source": [ + "## Prepare the data for Vespa CLI\n", + "\n", + "Vespa CLI can feed data from either many .json files or a single .jsonl file with many documents.\n", + "The json format needs to be in the following format:\n", + "\n", + "```json\n", + "{\n", + " \"put\": \"id:namespace:document-type::document-id\",\n", + " \"fields\": {\n", + " \"field1\": \"value1\",\n", + " \"field2\": \"value2\"\n", + " }\n", + "}\n", + "```\n", + "\n", + "Where, `put` is the document operation in this case. Other allowed operations are `get`, `update` and `remove`.\n", + "\n", + "For reference, see https://docs.vespa.ai/en/vespa-cli#cheat-sheet\n", + "\n", + "### Getting the datasets as .jsonl files\n", + "\n", + "Now, let`s save the dataset to 3 different jsonl files of 1k, 5k, and 10k documents.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "9b377ee3", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Getting dataset with 1000 docs...\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Creating json from Arrow format: 100%|██████████| 1/1 [00:00<00:00, 164.05ba/s]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Getting dataset with 5000 docs...\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Creating json from Arrow format: 100%|██████████| 5/5 [00:00<00:00, 301.13ba/s]\n" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Getting dataset with 10000 docs...\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Creating json from Arrow format: 100%|██████████| 10/10 [00:00<00:00, 302.73ba/s]\n" + ] + } + ], + "source": [ + "for n in num_docs:\n", + " print(f\"Getting dataset with {n} docs...\")\n", + " # First, let's load the dataset in non-streaming mode this time, as we want to save it to a jsonl file\n", + " dataset_cli = load_dataset(\n", + " \"Cohere/wikipedia-2023-11-embed-multilingual-v3\",\n", + " \"simple\",\n", + " split=f\"train[:{n}]\", # Notice the slicing here, see https://huggingface.co/docs/datasets/loading#slice-splits\n", + " streaming=False,\n", + " )\n", + " # Map to the format expected by the CLI.\n", + " # Note that this differs a little bit from the format expected by the Python API.\n", + " dataset_cli = dataset_cli.map(\n", + " lambda x: {\n", + " \"put\": f\"id:pyvespa-feed:doc::{x['_id']}-json\",\n", + " \"fields\": {\"text\": x[\"text\"]},\n", + " }\n", + " ).select_columns([\"put\", \"fields\"])\n", + " # Save to a jsonl file\n", + " assert len(dataset_cli) == n\n", + " dataset_cli.to_json(f\"vespa_feed-{n}.json\", orient=\"records\", lines=True)" + ] + }, + { + "cell_type": "markdown", + "id": "fe173828", + "metadata": {}, + "source": [ + "Let's look at the first line of one of the saved files to verify the format.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "id": "cd95d014", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'fields': {'text': 'April (Apr.) is the fourth month of the year in the '\n", + " 'Julian and Gregorian calendars, and comes between March '\n", + " 'and May. It is one of the four months to have 30 days.'},\n", + " 'put': 'id:pyvespa-feed:doc::20231101.simple_1_0-json'}\n" + ] + } + ], + "source": [ + "from pprint import pprint\n", + "import json\n", + "\n", + "with open(\"vespa_feed-1000.json\", \"r\") as f:\n", + " sample = f.readline()\n", + " pprint(json.loads(sample))" + ] + }, + { + "cell_type": "markdown", + "id": "68374774", + "metadata": {}, + "source": [ + "Ok, now we are ready to feed the data using Vespa CLI.\n", + "We also want to capture the output of feed statistics for each file.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 24, + "id": "2ea17d30", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Feeding 1000 docs...\n", + "{'feeder.error.count': 0,\n", + " 'feeder.inflight.count': 0,\n", + " 'feeder.ok.count': 1000,\n", + " 'feeder.ok.rate': 196.932,\n", + " 'feeder.operation.count': 1000,\n", + " 'feeder.seconds': 5.078,\n", + " 'http.exception.count': 0,\n", + " 'http.request.MBps': 0.058,\n", + " 'http.request.bytes': 293011,\n", + " 'http.request.count': 1000,\n", + " 'http.response.MBps': 0.025,\n", + " 'http.response.bytes': 129388,\n", + " 'http.response.code.counts': {'200': 1000},\n", + " 'http.response.count': 1000,\n", + " 'http.response.error.count': 0,\n", + " 'http.response.latency.millis.avg': 141,\n", + " 'http.response.latency.millis.max': 624,\n", + " 'http.response.latency.millis.min': 127}\n", + "Feeding 5000 docs...\n", + "{'feeder.error.count': 0,\n", + " 'feeder.inflight.count': 0,\n", + " 'feeder.ok.count': 5000,\n", + " 'feeder.ok.rate': 302.62,\n", + " 'feeder.operation.count': 5000,\n", + " 'feeder.seconds': 16.522,\n", + " 'http.exception.count': 0,\n", + " 'http.request.MBps': 0.088,\n", + " 'http.request.bytes': 1450480,\n", + " 'http.request.count': 5000,\n", + " 'http.response.MBps': 0.04,\n", + " 'http.response.bytes': 652778,\n", + " 'http.response.code.counts': {'200': 5000},\n", + " 'http.response.count': 5000,\n", + " 'http.response.error.count': 0,\n", + " 'http.response.latency.millis.avg': 135,\n", + " 'http.response.latency.millis.max': 532,\n", + " 'http.response.latency.millis.min': 125}\n", + "Feeding 10000 docs...\n", + "{'feeder.error.count': 0,\n", + " 'feeder.inflight.count': 0,\n", + " 'feeder.ok.count': 10000,\n", + " 'feeder.ok.rate': 660.536,\n", + " 'feeder.operation.count': 10000,\n", + " 'feeder.seconds': 15.139,\n", + " 'http.exception.count': 0,\n", + " 'http.request.MBps': 0.192,\n", + " 'http.request.bytes': 2905878,\n", + " 'http.request.count': 10000,\n", + " 'http.response.MBps': 0.087,\n", + " 'http.response.bytes': 1317226,\n", + " 'http.response.code.counts': {'200': 10000},\n", + " 'http.response.count': 10000,\n", + " 'http.response.error.count': 0,\n", + " 'http.response.latency.millis.avg': 139,\n", + " 'http.response.latency.millis.max': 563,\n", + " 'http.response.latency.millis.min': 125}\n" + ] + } + ], + "source": [ + "import subprocess\n", + "\n", + "cli_results = {}\n", + "for n in num_docs:\n", + " print(f\"Feeding {n} docs...\")\n", + " # Run the CLI command to feed the data\n", + " command = f\"vespa feed vespa_feed-{n}.json\"\n", + " results = subprocess.run(command, shell=True, capture_output=True, text=True)\n", + " result_dict = json.loads(results.stdout)\n", + " pprint(result_dict)\n", + " cli_results[n] = result_dict" + ] + }, + { + "cell_type": "code", + "execution_count": 25, + "id": "be2c18ae", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{1000: {'feeder.operation.count': 1000,\n", + " 'feeder.seconds': 5.078,\n", + " 'feeder.ok.count': 1000,\n", + " 'feeder.ok.rate': 196.932,\n", + " 'feeder.error.count': 0,\n", + " 'feeder.inflight.count': 0,\n", + " 'http.request.count': 1000,\n", + " 'http.request.bytes': 293011,\n", + " 'http.request.MBps': 0.058,\n", + " 'http.exception.count': 0,\n", + " 'http.response.count': 1000,\n", + " 'http.response.bytes': 129388,\n", + " 'http.response.MBps': 0.025,\n", + " 'http.response.error.count': 0,\n", + " 'http.response.latency.millis.min': 127,\n", + " 'http.response.latency.millis.avg': 141,\n", + " 'http.response.latency.millis.max': 624,\n", + " 'http.response.code.counts': {'200': 1000}},\n", + " 5000: {'feeder.operation.count': 5000,\n", + " 'feeder.seconds': 16.522,\n", + " 'feeder.ok.count': 5000,\n", + " 'feeder.ok.rate': 302.62,\n", + " 'feeder.error.count': 0,\n", + " 'feeder.inflight.count': 0,\n", + " 'http.request.count': 5000,\n", + " 'http.request.bytes': 1450480,\n", + " 'http.request.MBps': 0.088,\n", + " 'http.exception.count': 0,\n", + " 'http.response.count': 5000,\n", + " 'http.response.bytes': 652778,\n", + " 'http.response.MBps': 0.04,\n", + " 'http.response.error.count': 0,\n", + " 'http.response.latency.millis.min': 125,\n", + " 'http.response.latency.millis.avg': 135,\n", + " 'http.response.latency.millis.max': 532,\n", + " 'http.response.code.counts': {'200': 5000}},\n", + " 10000: {'feeder.operation.count': 10000,\n", + " 'feeder.seconds': 15.139,\n", + " 'feeder.ok.count': 10000,\n", + " 'feeder.ok.rate': 660.536,\n", + " 'feeder.error.count': 0,\n", + " 'feeder.inflight.count': 0,\n", + " 'http.request.count': 10000,\n", + " 'http.request.bytes': 2905878,\n", + " 'http.request.MBps': 0.192,\n", + " 'http.exception.count': 0,\n", + " 'http.response.count': 10000,\n", + " 'http.response.bytes': 1317226,\n", + " 'http.response.MBps': 0.087,\n", + " 'http.response.error.count': 0,\n", + " 'http.response.latency.millis.min': 125,\n", + " 'http.response.latency.millis.avg': 139,\n", + " 'http.response.latency.millis.max': 563,\n", + " 'http.response.code.counts': {'200': 10000}}}" + ] + }, + "execution_count": 25, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "cli_results" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "id": "812bac07", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
namenum_docsmax_connectionsfunction_namemax_workersmax_queue_sizefeed_timerequests_per_second
01000_cli1000unknowncliunknownn/a5.078196.927924
15000_cli5000unknowncliunknownn/a16.522302.626801
210000_cli10000unknowncliunknownn/a15.139660.545611
\n", + "
" + ], + "text/plain": [ + " name num_docs max_connections function_name max_workers \\\n", + "0 1000_cli 1000 unknown cli unknown \n", + "1 5000_cli 5000 unknown cli unknown \n", + "2 10000_cli 10000 unknown cli unknown \n", + "\n", + " max_queue_size feed_time requests_per_second \n", + "0 n/a 5.078 196.927924 \n", + "1 n/a 16.522 302.626801 \n", + "2 n/a 15.139 660.545611 " + ] + }, + "execution_count": 26, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "# Let's add the CLI results to the DataFrame\n", + "df_cli = pd.DataFrame(\n", + " [\n", + " {\n", + " \"name\": f\"{n}_cli\",\n", + " \"num_docs\": n,\n", + " \"max_connections\": \"unknown\",\n", + " \"function_name\": \"cli\",\n", + " \"max_workers\": \"unknown\",\n", + " \"max_queue_size\": \"n/a\",\n", + " \"feed_time\": result[\"feeder.seconds\"],\n", + " }\n", + " for n, result in cli_results.items()\n", + " ]\n", + ")\n", + "df_cli[\"requests_per_second\"] = df_cli[\"num_docs\"] / df_cli[\"feed_time\"]\n", + "df_cli" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "b3395710", + "metadata": {}, + "outputs": [ + { + "data": { + "application/vnd.plotly.v1+json": { + "config": { + "plotlyServerURL": "https://plot.ly" + }, + "data": [ + { + "customdata": [ + [ + 64, + 2500, + 1 + ], + [ + 64, + 2500, + 1 + ], + [ + 64, + 2500, + 1 + ] + ], + "hovertemplate": "function_name=feed_async_iterable
Number of Documents=%{x}
Requests per Second=%{y}
max_workers=%{customdata[0]}
max_queue_size=%{customdata[1]}
max_connections=%{customdata[2]}", + "legendgroup": "feed_async_iterable", + "marker": { + "color": "#636efa", + "opacity": 0.7, + "size": 12, + "symbol": "circle" + }, + "mode": "markers", + "name": "feed_async_iterable", + "orientation": "v", + "showlegend": true, + "type": "scatter", + "x": [ + 1000, + 5000, + 10000 + ], + "xaxis": "x", + "y": [ + 105.5052227085519, + 152.01841944066945, + 128.44456172727297 + ], + "yaxis": "y" + }, + { + "customdata": [ + [ + "unknown", + "n/a", + "unknown" + ], + [ + "unknown", + "n/a", + "unknown" + ], + [ + "unknown", + "n/a", + "unknown" + ] + ], + "hovertemplate": "function_name=cli
Number of Documents=%{x}
Requests per Second=%{y}
max_workers=%{customdata[0]}
max_queue_size=%{customdata[1]}
max_connections=%{customdata[2]}", + "legendgroup": "cli", + "marker": { + "color": "#EF553B", + "opacity": 0.7, + "size": 12, + "symbol": "circle" + }, + "mode": "markers", + "name": "cli", + "orientation": "v", + "showlegend": true, + "type": "scatter", + "x": [ + 1000, + 5000, + 10000 + ], + "xaxis": "x", + "y": [ + 196.92792437967702, + 302.6268006294638, + 660.545610674417 + ], + "yaxis": "y" + } + ], + "layout": { + "font": { + "size": 16 + }, + "legend": { + "title": { + "font": { + "size": 16 + }, + "text": "Function Details" + }, + "tracegroupgap": 0, + "x": 800, + "xanchor": "auto", + "y": 1, + "yanchor": "auto" + }, + "template": { + "data": { + "bar": [ + { + "error_x": { + "color": "#2a3f5f" + }, + "error_y": { + "color": "#2a3f5f" + }, + "marker": { + "line": { + "color": "white", + "width": 0.5 + }, + "pattern": { + "fillmode": "overlay", + "size": 10, + "solidity": 0.2 + } + }, + "type": "bar" + } + ], + "barpolar": [ + { + "marker": { + "line": { + "color": "white", + "width": 0.5 + }, + "pattern": { + "fillmode": "overlay", + "size": 10, + "solidity": 0.2 + } + }, + "type": "barpolar" + } + ], + "carpet": [ + { + "aaxis": { + "endlinecolor": "#2a3f5f", + "gridcolor": "#C8D4E3", + "linecolor": "#C8D4E3", + "minorgridcolor": "#C8D4E3", + "startlinecolor": "#2a3f5f" + }, + "baxis": { + "endlinecolor": "#2a3f5f", + "gridcolor": "#C8D4E3", + "linecolor": "#C8D4E3", + "minorgridcolor": "#C8D4E3", + "startlinecolor": "#2a3f5f" + }, + "type": "carpet" + } + ], + "choropleth": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "type": "choropleth" + } + ], + "contour": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "colorscale": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "type": "contour" + } + ], + "contourcarpet": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "type": "contourcarpet" + } + ], + "heatmap": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "colorscale": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "type": "heatmap" + } + ], + "heatmapgl": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "colorscale": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "type": "heatmapgl" + } + ], + "histogram": [ + { + "marker": { + "pattern": { + "fillmode": "overlay", + "size": 10, + "solidity": 0.2 + } + }, + "type": "histogram" + } + ], + "histogram2d": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "colorscale": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "type": "histogram2d" + } + ], + "histogram2dcontour": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "colorscale": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "type": "histogram2dcontour" + } + ], + "mesh3d": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "type": "mesh3d" + } + ], + "parcoords": [ + { + "line": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "parcoords" + } + ], + "pie": [ + { + "automargin": true, + "type": "pie" + } + ], + "scatter": [ + { + "fillpattern": { + "fillmode": "overlay", + "size": 10, + "solidity": 0.2 + }, + "type": "scatter" + } + ], + "scatter3d": [ + { + "line": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scatter3d" + } + ], + "scattercarpet": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scattercarpet" + } + ], + "scattergeo": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scattergeo" + } + ], + "scattergl": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scattergl" + } + ], + "scattermapbox": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scattermapbox" + } + ], + "scatterpolar": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scatterpolar" + } + ], + "scatterpolargl": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scatterpolargl" + } + ], + "scatterternary": [ + { + "marker": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "type": "scatterternary" + } + ], + "surface": [ + { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + }, + "colorscale": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "type": "surface" + } + ], + "table": [ + { + "cells": { + "fill": { + "color": "#EBF0F8" + }, + "line": { + "color": "white" + } + }, + "header": { + "fill": { + "color": "#C8D4E3" + }, + "line": { + "color": "white" + } + }, + "type": "table" + } + ] + }, + "layout": { + "annotationdefaults": { + "arrowcolor": "#2a3f5f", + "arrowhead": 0, + "arrowwidth": 1 + }, + "autotypenumbers": "strict", + "coloraxis": { + "colorbar": { + "outlinewidth": 0, + "ticks": "" + } + }, + "colorscale": { + "diverging": [ + [ + 0, + "#8e0152" + ], + [ + 0.1, + "#c51b7d" + ], + [ + 0.2, + "#de77ae" + ], + [ + 0.3, + "#f1b6da" + ], + [ + 0.4, + "#fde0ef" + ], + [ + 0.5, + "#f7f7f7" + ], + [ + 0.6, + "#e6f5d0" + ], + [ + 0.7, + "#b8e186" + ], + [ + 0.8, + "#7fbc41" + ], + [ + 0.9, + "#4d9221" + ], + [ + 1, + "#276419" + ] + ], + "sequential": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ], + "sequentialminus": [ + [ + 0, + "#0d0887" + ], + [ + 0.1111111111111111, + "#46039f" + ], + [ + 0.2222222222222222, + "#7201a8" + ], + [ + 0.3333333333333333, + "#9c179e" + ], + [ + 0.4444444444444444, + "#bd3786" + ], + [ + 0.5555555555555556, + "#d8576b" + ], + [ + 0.6666666666666666, + "#ed7953" + ], + [ + 0.7777777777777778, + "#fb9f3a" + ], + [ + 0.8888888888888888, + "#fdca26" + ], + [ + 1, + "#f0f921" + ] + ] + }, + "colorway": [ + "#636efa", + "#EF553B", + "#00cc96", + "#ab63fa", + "#FFA15A", + "#19d3f3", + "#FF6692", + "#B6E880", + "#FF97FF", + "#FECB52" + ], + "font": { + "color": "#2a3f5f" + }, + "geo": { + "bgcolor": "white", + "lakecolor": "white", + "landcolor": "white", + "showlakes": true, + "showland": true, + "subunitcolor": "#C8D4E3" + }, + "hoverlabel": { + "align": "left" + }, + "hovermode": "closest", + "mapbox": { + "style": "light" + }, + "paper_bgcolor": "white", + "plot_bgcolor": "white", + "polar": { + "angularaxis": { + "gridcolor": "#EBF0F8", + "linecolor": "#EBF0F8", + "ticks": "" + }, + "bgcolor": "white", + "radialaxis": { + "gridcolor": "#EBF0F8", + "linecolor": "#EBF0F8", + "ticks": "" + } + }, + "scene": { + "xaxis": { + "backgroundcolor": "white", + "gridcolor": "#DFE8F3", + "gridwidth": 2, + "linecolor": "#EBF0F8", + "showbackground": true, + "ticks": "", + "zerolinecolor": "#EBF0F8" + }, + "yaxis": { + "backgroundcolor": "white", + "gridcolor": "#DFE8F3", + "gridwidth": 2, + "linecolor": "#EBF0F8", + "showbackground": true, + "ticks": "", + "zerolinecolor": "#EBF0F8" + }, + "zaxis": { + "backgroundcolor": "white", + "gridcolor": "#DFE8F3", + "gridwidth": 2, + "linecolor": "#EBF0F8", + "showbackground": true, + "ticks": "", + "zerolinecolor": "#EBF0F8" + } + }, + "shapedefaults": { + "line": { + "color": "#2a3f5f" + } + }, + "ternary": { + "aaxis": { + "gridcolor": "#DFE8F3", + "linecolor": "#A2B1C6", + "ticks": "" + }, + "baxis": { + "gridcolor": "#DFE8F3", + "linecolor": "#A2B1C6", + "ticks": "" + }, + "bgcolor": "white", + "caxis": { + "gridcolor": "#DFE8F3", + "linecolor": "#A2B1C6", + "ticks": "" + } + }, + "title": { + "x": 0.05 + }, + "xaxis": { + "automargin": true, + "gridcolor": "#EBF0F8", + "linecolor": "#EBF0F8", + "ticks": "", + "title": { + "standoff": 15 + }, + "zerolinecolor": "#EBF0F8", + "zerolinewidth": 2 + }, + "yaxis": { + "automargin": true, + "gridcolor": "#EBF0F8", + "linecolor": "#EBF0F8", + "ticks": "", + "title": { + "standoff": 15 + }, + "zerolinecolor": "#EBF0F8", + "zerolinewidth": 2 + } + } + }, + "title": { + "text": "Performance: Requests per Second vs. Number of Documents" + }, + "width": 800, + "xaxis": { + "anchor": "y", + "domain": [ + 0, + 1 + ], + "ticktext": [ + "1k", + "5k", + "10k" + ], + "tickvals": [ + 1000, + 5000, + 10000 + ], + "title": { + "text": "Number of Documents" + }, + "type": "log" + }, + "yaxis": { + "anchor": "x", + "domain": [ + 0, + 1 + ], + "title": { + "text": "Requests per Second" + } + } + } + } + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "df_total = pd.concat([df, df_cli])\n", + "\n", + "plot_performance(df_total)" + ] + }, + { + "cell_type": "markdown", + "id": "1f745d73", + "metadata": {}, + "source": [ + "As you can tell, the CLI is still almost 2x faster than the `feed_async_iterable` method.\n", + "\n", + "We might improve the performance of the `feed_async_iterable` method by introducing parallelism (threading) for that method as well.\n" + ] + }, + { + "cell_type": "markdown", + "id": "18fc6282", + "metadata": {}, + "source": [ + "## Conclusion\n" + ] + }, + { + "cell_type": "markdown", + "id": "2d91581b", + "metadata": {}, + "source": [ + "- Prefer to use the CLI if you care about performance. 🚀\n", + "- If you want to use pyvespa, prefer the `feed_async_iterable`- method, if you are I/O-bound.\n" + ] + }, + { + "cell_type": "markdown", + "id": "28591491", + "metadata": {}, + "source": [ + "## Cleanup\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e5064bd2", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Deactivated vespa-team.feedperformancecloud in dev.aws-us-east-1c\n", + "Deleted instance vespa-team.feedperformancecloud.default\n" + ] + } + ], + "source": [ + "vespa_cloud.delete()" + ] + }, + { + "cell_type": "markdown", + "id": "d1872b31", + "metadata": {}, + "source": [ + "## Next steps\n", + "\n", + "Check out some of the other\n", + "[examples](https://pyvespa.readthedocs.io/en/latest/examples.html) in the documentation.\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.19" + }, + "vscode": { + "interpreter": { + "hash": "b0fa6594d8f4cbf19f97940f81e996739fb7646882a419484c72d19e05852a7e" + } + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/pyproject.toml b/pyproject.toml index b06781a7..01a553fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "jinja2", "cryptography", "aiohttp", + "httpx[http2]", "tenacity>=8.4.1", "typing_extensions", "python-dateutil", diff --git a/tests/integration/test_integration_docker.py b/tests/integration/test_integration_docker.py index 37b52a82..81a5855a 100644 --- a/tests/integration/test_integration_docker.py +++ b/tests/integration/test_integration_docker.py @@ -7,7 +7,7 @@ import json from typing import List, Dict, Optional -from vespa.io import VespaResponse +from vespa.io import VespaResponse, VespaQueryResponse from vespa.resources import get_resource_path from vespa.package import ( HNSW, @@ -262,6 +262,27 @@ class TestApplicationCommon(unittest.TestCase): # Set maxDiff to None to see full diff maxDiff = None + async def handle_longlived_connection(self, app, n_seconds=10): + # Test that the connection can live for at least n_seconds + async with app.asyncio(connections=1) as async_app: + response = await async_app.httpx_client.get( + app.end_point + "/ApplicationStatus" + ) + self.assertEqual(response.status_code, 200) + await asyncio.sleep(n_seconds) + response = await async_app.httpx_client.get( + app.end_point + "/ApplicationStatus" + ) + self.assertEqual(response.status_code, 200) + + async def async_is_http2_client(self, app): + async with app.asyncio() as async_app: + response = await async_app.httpx_client.get( + app.end_point + "/ApplicationStatus" + ) + self.assertEqual(response.status_code, 200) + self.assertEqual(response.http_version, "HTTP/2") + def execute_data_operations( self, app, @@ -922,6 +943,12 @@ def setUp(self) -> None: ] self.queries_first_hit = ["this is title 1", "this is title 2"] + def test_is_using_http2_client(self): + asyncio.run(self.async_is_http2_client(app=self.app)) + + def test_handle_longlived_connection(self): + asyncio.run(self.handle_longlived_connection(app=self.app)) + def test_model_endpoints_when_no_model_is_available(self): self.get_model_endpoints_when_no_model_is_available( app=self.app, @@ -1049,6 +1076,32 @@ def test_execute_async_data_operations(self): ) ) + def test_feed_async_iterable(self): + def sentence_to_doc(sentences): + for sentence in sentences: + yield { + "id": sentence["id"], + "fields": {k: v for k, v in sentence.items() if k != "id"}, + } + + self.app.feed_async_iterable( + sentence_to_doc(self.fields_to_send_sentence), + schema="sentence", + operation_type="feed", + ) + # check doc count + total_docs = [] + for doc_slice in self.app.visit( + schema="sentence", content_cluster_name="qa_content", selection="true" + ): + for response in doc_slice: + total_docs.extend(response.documents) + self.assertEqual( + len(total_docs), + len(self.fields_to_send_sentence), + ) + self.app.delete_all_docs(content_cluster_name="qa_content", schema="sentence") + def tearDown(self) -> None: self.vespa_docker.container.stop(timeout=CONTAINER_STOP_TIMEOUT) self.vespa_docker.container.remove() @@ -1115,7 +1168,6 @@ def callback(response: VespaResponse, id: str): print("Id " + id + " + failed : " + response.json) self.app.feed_iterable(docs, schema="mail", namespace="test", callback=callback) - from vespa.io import VespaQueryResponse response: VespaQueryResponse = self.app.query( yql="select * from sources * where title contains 'title'", @@ -1395,7 +1447,7 @@ def doc_generator(self, num_docs: int): }, } - def test_retry(self): + def test_retries_sync(self): num_docs = 10 num_429 = 0 @@ -1421,6 +1473,43 @@ def callback(response: VespaResponse, id: str): for response in doc_slice: total_docs.extend(response.documents) self.assertEqual(len(total_docs), num_docs) + self.app.delete_all_docs( + content_cluster_name="retryapplication_content", + schema="retryapplication", + namespace="retryapplication", + ) + + def test_retries_async(self): + num_docs = 10 + num_429 = 0 + + def callback(response: VespaResponse, id: str): + nonlocal num_429 + if response.status_code == 429: + print(f"429 response for id {id}") + num_429 += 1 + + self.app.feed_async_iterable( + self.doc_generator(num_docs), + schema="retryapplication", + callback=callback, + ) + self.assertEqual(num_429, 0) + total_docs = [] + for doc_slice in self.app.visit( + content_cluster_name="retryapplication_content", + schema="retryapplication", + namespace="retryapplication", + selection="true", + ): + for response in doc_slice: + total_docs.extend(response.documents) + self.assertEqual(len(total_docs), num_docs) + self.app.delete_all_docs( + content_cluster_name="retryapplication_content", + schema="retryapplication", + namespace="retryapplication", + ) def tearDown(self) -> None: self.vespa_docker.container.stop(timeout=CONTAINER_STOP_TIMEOUT) diff --git a/tests/integration/test_integration_vespa_cloud.py b/tests/integration/test_integration_vespa_cloud.py index cb7e46e9..81ffc924 100644 --- a/tests/integration/test_integration_vespa_cloud.py +++ b/tests/integration/test_integration_vespa_cloud.py @@ -125,6 +125,12 @@ def setUp(self) -> None: for i in range(10) ] + def test_is_using_http2_client(self): + asyncio.run(self.async_is_http2_client(app=self.app)) + + def test_handle_longlived_connection(self): + asyncio.run(self.handle_longlived_connection(app=self.app)) + def test_prediction_when_model_not_defined(self): self.get_stateless_prediction_when_model_not_defined( app=self.app, application_package=self.app_package diff --git a/tests/unit/test_application.py b/tests/unit/test_application.py index 91a638ae..4fe9bdde 100644 --- a/tests/unit/test_application.py +++ b/tests/unit/test_application.py @@ -5,6 +5,8 @@ import pytest from unittest.mock import PropertyMock, patch +from unittest.mock import MagicMock, AsyncMock + from requests.models import HTTPError, Response from vespa.package import ApplicationPackage, Schema, Document @@ -488,3 +490,109 @@ def setUp(self) -> None: ], } } + + +class TestFeedAsyncIterable(unittest.TestCase): + def setUp(self): + self.mock_session = AsyncMock() + self.mock_asyncio_patcher = patch("vespa.application.VespaAsync") + self.mock_asyncio = self.mock_asyncio_patcher.start() + self.mock_asyncio.return_value.__aenter__.return_value = self.mock_session + + self.vespa = Vespa(url="http://localhost", port=8080) + + def tearDown(self): + self.mock_asyncio_patcher.stop() + + def test_feed_async_iterable_happy_path(self): + # Arrange + iter_data = [ + {"id": "doc1", "fields": {"title": "Document 1"}}, + {"id": "doc2", "fields": {"title": "Document 2"}}, + ] + callback = MagicMock() + + # Act + self.vespa.feed_async_iterable( + iter=iter_data, + schema="test_schema", + namespace="test_namespace", + callback=callback, + max_queue_size=2, + max_workers=2, + max_connections=2, + ) + + # Assert + self.mock_session.feed_data_point.assert_has_calls( + [ + unittest.mock.call( + schema="test_schema", + namespace="test_namespace", + groupname=None, + data_id="doc1", + fields={"title": "Document 1"}, + ), + unittest.mock.call( + schema="test_schema", + namespace="test_namespace", + groupname=None, + data_id="doc2", + fields={"title": "Document 2"}, + ), + ], + any_order=True, + ) + self.assertEqual(callback.call_count, 2) + + def test_feed_async_iterable_missing_id(self): + # Arrange + iter_data = [ + {"fields": {"title": "Document 1"}}, + ] + callback = MagicMock() + + # Act + self.vespa.feed_async_iterable( + iter=iter_data, + schema="test_schema", + namespace="test_namespace", + callback=callback, + max_queue_size=1, + max_workers=1, + max_connections=1, + ) + + # Assert + self.mock_session.feed_data_point.assert_not_called() + callback.assert_called_once_with(unittest.mock.ANY, None) + self.assertEqual(callback.call_args[0][0].status_code, 499) + self.assertEqual( + callback.call_args[0][0].json["message"], "Missing id in input dict" + ) + + def test_feed_async_iterable_missing_fields(self): + # Arrange + iter_data = [ + {"id": "doc1"}, + ] + callback = MagicMock() + + # Act + self.vespa.feed_async_iterable( + iter=iter_data, + schema="test_schema", + namespace="test_namespace", + callback=callback, + max_queue_size=1, + max_workers=1, + max_connections=1, + ) + + # Assert + self.mock_session.feed_data_point.assert_not_called() + callback.assert_called_once_with(unittest.mock.ANY, "doc1") + self.assertEqual(callback.call_args[0][0].status_code, 499) + self.assertEqual( + callback.call_args[0][0].json["message"], "Missing fields in input dict" + ) diff --git a/vespa/application.py b/vespa/application.py index 0056338a..9fbfbd7b 100644 --- a/vespa/application.py +++ b/vespa/application.py @@ -1,8 +1,6 @@ # Copyright Vespa.ai. Licensed under the terms of the Apache 2.0 license. See LICENSE in the project root. import sys -import ssl -import aiohttp import asyncio import requests import traceback @@ -36,6 +34,7 @@ from vespa.exceptions import VespaError from vespa.io import VespaQueryResponse, VespaResponse, VespaVisitResponse from vespa.package import ApplicationPackage +import httpx VESPA_CLOUD_SECRET_TOKEN: str = "VESPA_CLOUD_SECRET_TOKEN" @@ -582,6 +581,157 @@ def _handle_result_callback( queue.join() consumer_thread.join() + def feed_async_iterable( + self, + iter: Iterable[Dict], + schema: Optional[str] = None, + namespace: Optional[str] = None, + callback: Optional[Callable[[VespaResponse, str], None]] = None, + operation_type: Optional[str] = "feed", + max_queue_size: int = 1000, + max_workers: int = 64, + max_connections: int = 1, + **kwargs, + ): + """ + Feed data asynchronously using httpx.AsyncClient with HTTP/2. Feed from an Iterable of Dict with the keys 'id' and 'fields' to be used in the :func:`feed_data_point`. + The result of each operation is forwarded to the user provided callback function that can process the returned `VespaResponse`. + Prefer using this method over :func:`feed_iterable` when the operation is I/O bound from the client side. + + Example usage:: + + app = Vespa(url="localhost", port=8080) + data = [ + {"id": "1", "fields": {"field1": "value1"}}, + {"id": "2", "fields": {"field1": "value2"}}, + ] + async def callback(response, id): + print(f"Response for id {id}: {response.status_code}") + app.feed_async_iterable(data, schema="schema_name", callback=callback) + + + :param iter: An iterable of Dict containing the keys 'id' and 'fields' to be used in the :func:`feed_data_point`. Note that this 'id' is only the last part of the full document id, that will be generated automatically by pyvespa. + :param schema: The Vespa schema name that we are sending data to. + :param namespace: The Vespa document id namespace. If no namespace is provided the schema is used. + :param callback: A callback function to be called on each result. Signature `callback(response:VespaResponse, id:str)` + :param operation_type: The operation to perform. Default to `feed`. Valid are `feed`, `update` or `delete`. + :param max_queue_size: The maximum number of tasks waiting to be processed. Useful to limit memory usage. Default is 1000. + :param max_workers: Maximum number of concurrent requests to have in-flight, bound by an asyncio.Semaphore, that needs to be acquired by a submit task. Increase if the server is scaled to handle more requests. + :param max_connections: The maximum number of connections passed to httpx.AsyncClient to the Vespa endpoint. As HTTP/2 is used, only one connection is needed. + :param kwargs: Additional parameters are passed to the respective operation type specific :func:`_data_point`. + """ + + if operation_type not in ["feed", "update", "delete"]: + raise ValueError( + "Invalid operation type. Valid are `feed`, `update` or `delete`." + ) + + if namespace is None: + namespace = schema + if not schema: + try: + schema = self._infer_schema_name() + except ValueError: + raise ValueError( + "Not possible to infer schema name. Specify schema parameter." + ) + + async def handle_result(task: asyncio.Task, id: str): + # Wrapper around the task to handle exceptions and call the user callback + try: + response = await task + except Exception as e: + response = VespaResponse( + status_code=599, + json={ + "Exception": str(e), + "id": id, + "message": "Exception during feed_data_point", + }, + url="n/a", + operation_type=operation_type, + ) + if callback is not None: + try: + callback(response, id) + except Exception as e: + print(f"Exception in user callback for id {id}", file=sys.stderr) + traceback.print_exception( + type(e), e, e.__traceback__, file=sys.stderr + ) + + # Wrapping in async function to be able to use asyncio.run, and avoid that the feed_async_iterable have to be async + async def run(): + async with self.asyncio(connections=max_connections) as async_session: + semaphore = asyncio.Semaphore(max_workers) + tasks = [] + for doc in iter: + id = doc.get("id") + fields = doc.get("fields") + groupname = doc.get("groupname") + + if id is None: + response = VespaResponse( + status_code=499, + json={"id": id, "message": "Missing id in input dict"}, + url="n/a", + operation_type=operation_type, + ) + if callback is not None: + callback(response, id) + continue + if fields is None and operation_type != "delete": + response = VespaResponse( + status_code=499, + json={"id": id, "message": "Missing fields in input dict"}, + url="n/a", + operation_type=operation_type, + ) + if callback is not None: + callback(response, id) + continue + + async with semaphore: + if operation_type == "feed": + task = async_session.feed_data_point( + schema=schema, + namespace=namespace, + groupname=groupname, + data_id=id, + fields=fields, + **kwargs, + ) + elif operation_type == "update": + task = async_session.update_data( + schema=schema, + namespace=namespace, + groupname=groupname, + data_id=id, + fields=fields, + **kwargs, + ) + elif operation_type == "delete": + task = async_session.delete_data( + schema=schema, + namespace=namespace, + data_id=id, + groupname=groupname, + **kwargs, + ) + + tasks.append(handle_result(asyncio.create_task(task), id)) + + # Control the number of in-flight tasks + if len(tasks) >= max_queue_size: + await asyncio.gather(*tasks) + tasks = [] + + if tasks: + await asyncio.gather(*tasks) + + asyncio.run(run()) + return + def delete_data( self, schema: str, @@ -1260,10 +1410,19 @@ def update_data( class VespaAsync(object): def __init__( - self, app: Vespa, connections: Optional[int] = 10, total_timeout: int = 180 + self, app: Vespa, connections: Optional[int] = 1, total_timeout: int = 10 ) -> None: + """ + Class to handle async requests to Vespa. + Uses httpx as the async http client, and HTTP/2 by default. + + Args: + app (Vespa): Vespa application object. + connections (Optional[int], optional): number of connections. Defaults to 1 as HTTP/2 is multiplexed. + total_timeout (int, optional): timeout for each individual request in seconds. Defaults to 10. + """ self.app = app - self.aiohttp_session = None + self.httpx_client = None self.connections = connections self.total_timeout = total_timeout self.app.auth_method = self.app._get_valid_auth_method() @@ -1277,46 +1436,45 @@ def __init__( ) async def __aenter__(self): - self._open_aiohttp_session() + self._open_httpx_client() return self async def __aexit__(self, exc_type, exc_val, exc_tb): - await self._close_aiohttp_session() + await self._close_httpx_client() - def _open_aiohttp_session(self): - if self.aiohttp_session is not None and not self.aiohttp_session.closed: + def _open_httpx_client(self): + if self.httpx_client is not None: return - sslcontext = False + limits = httpx.Limits( + max_keepalive_connections=self.connections, + max_connections=self.connections, + keepalive_expiry=10, # This should NOT exceed the keepalive_timeout on the Server, otherwise we will get ConnectionTerminated errors. + ) + timeout = httpx.Timeout(pool=5, connect=5, read=5, write=5) if self.app.cert is not None: - sslcontext = ssl.create_default_context() - sslcontext.load_cert_chain(self.app.cert, self.app.key) - conn = aiohttp.TCPConnector(ssl=sslcontext, limit=self.connections) - if self.app.vespa_cloud_secret_token: - self.aiohttp_session = aiohttp.ClientSession( - connector=conn, - timeout=aiohttp.ClientTimeout(total=self.total_timeout), - headers=self.headers, - ) + sslcontext = httpx.create_ssl_context(cert=(self.app.cert, self.app.key)) else: - self.aiohttp_session = aiohttp.ClientSession( - connector=conn, - timeout=aiohttp.ClientTimeout(total=self.total_timeout), - headers={"User-Agent": "pyvespa asyncio client"}, - ) - return self.aiohttp_session + sslcontext = False + self.httpx_client = httpx.AsyncClient( + timeout=timeout, + headers=self.headers, + verify=sslcontext, + http2=True, # HTTP/2 by default + http1=False, + limits=limits, + ) + return self.httpx_client - async def _close_aiohttp_session(self): - if self.aiohttp_session is None: + async def _close_httpx_client(self): + if self.httpx_client is None: return - return await self.aiohttp_session.close() + await self.httpx_client.aclose() - @staticmethod async def _wait(f, args, **kwargs): tasks = [asyncio.create_task(f(*arg, **kwargs)) for arg in args] await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED) return [result for result in map(lambda task: task.result(), tasks)] - @staticmethod def callback_docv1(state: RetryCallState) -> VespaResponse: if state.outcome.failed: raise state.outcome.exception() @@ -1328,11 +1486,11 @@ async def query( ) -> VespaQueryResponse: if groupname: kwargs["streaming.groupname"] = groupname - r = await self.aiohttp_session.post( + r = await self.httpx_client.post( self.app.search_end_point, json=body, params=kwargs ) return VespaQueryResponse( - json=await r.json(), status_code=r.status, url=str(r.url) + json=r.json(), status_code=r.status_code, url=str(r.url) ) @retry( @@ -1345,7 +1503,7 @@ async def query( retry_error_callback=callback_docv1, ) @retry( - wait=wait_random_exponential(multiplier=1, max=10), + wait=wait_random_exponential(multiplier=1, max=3), retry=retry_if_result(lambda x: x.get_status_code() == 429), ) async def feed_data_point( @@ -1355,6 +1513,7 @@ async def feed_data_point( fields: Dict, namespace: str = None, groupname: str = None, + semaphore: asyncio.Semaphore = None, **kwargs, ) -> VespaResponse: path = self.app.get_document_v1_path( @@ -1362,12 +1521,18 @@ async def feed_data_point( ) end_point = "{}{}".format(self.app.end_point, path) vespa_format = {"fields": fields} - response = await self.aiohttp_session.post( - end_point, json=vespa_format, params=kwargs - ) + if semaphore: + async with semaphore: + response = await self.httpx_client.post( + end_point, json=vespa_format, params=kwargs + ) + else: + response = await self.httpx_client.post( + end_point, json=vespa_format, params=kwargs + ) return VespaResponse( - json=await response.json(), - status_code=response.status, + json=response.json(), + status_code=response.status_code, url=str(response.url), operation_type="feed", ) @@ -1391,16 +1556,21 @@ async def delete_data( data_id: str, namespace: str = None, groupname: str = None, + semaphore: asyncio.Semaphore = None, **kwargs, ) -> VespaResponse: path = self.app.get_document_v1_path( id=data_id, schema=schema, namespace=namespace, group=groupname ) end_point = "{}{}".format(self.app.end_point, path) - response = await self.aiohttp_session.delete(end_point, params=kwargs) + if semaphore: + async with semaphore: + response = await self.httpx_client.delete(end_point, params=kwargs) + else: + response = await self.httpx_client.delete(end_point, params=kwargs) return VespaResponse( - json=await response.json(), - status_code=response.status, + json=response.json(), + status_code=response.status_code, url=str(response.url), operation_type="delete", ) @@ -1424,16 +1594,21 @@ async def get_data( data_id: str, namespace: str = None, groupname: str = None, + semaphore: asyncio.Semaphore = None, **kwargs, ) -> VespaResponse: path = self.app.get_document_v1_path( id=data_id, schema=schema, namespace=namespace, group=groupname ) end_point = "{}{}".format(self.app.end_point, path) - response = await self.aiohttp_session.get(end_point, params=kwargs) + if semaphore: + async with semaphore: + response = await self.httpx_client.get(end_point, params=kwargs) + else: + response = await self.httpx_client.get(end_point, params=kwargs) return VespaResponse( - json=await response.json(), - status_code=response.status, + json=response.json(), + status_code=response.status_code, url=str(response.url), operation_type="get", ) @@ -1460,6 +1635,7 @@ async def update_data( auto_assign: bool = True, namespace: str = None, groupname: str = None, + semaphore: asyncio.Semaphore = None, **kwargs, ) -> VespaResponse: path = self.app.get_document_v1_path( @@ -1473,12 +1649,18 @@ async def update_data( else: # Can not send 'id' in fields for partial update vespa_format = {"fields": {k: v for k, v in fields.items() if k != "id"}} - response = await self.aiohttp_session.put( - end_point, json=vespa_format, params=kwargs - ) + if semaphore: + async with semaphore: + response = await self.httpx_client.put( + end_point, json=vespa_format, params=kwargs + ) + else: + response = await self.httpx_client.put( + end_point, json=vespa_format, params=kwargs + ) return VespaResponse( - json=await response.json(), - status_code=response.status, + json=response.json(), + status_code=response.status_code, url=str(response.url), operation_type="update", )