Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Latest commit

 

History

History
333 lines (264 loc) · 9.98 KB

tutorials.mdx

File metadata and controls

333 lines (264 loc) · 9.98 KB
title description icon
Tutorials
This tutorial walks you through how to use Wyvern step by step toward setting up pipeline like the one from [Quickstart](/quickstart). There are more explanations of what each component does. Each section builds on the previous ones but it is structured to separate topics so that you can go directly to any section and find information that you need
code

Install Wyvern

pip install wyvern-ai
poetry add wyvern-ai

This will also install the wyvern CLI

Define schema

Let's define the schema of the product and the schema of the ranking request:

from wyvern import ProductEntity


class Product(ProductEntity):
    brand: str | None = None
    category: str | None = None
    categories: list[str] = []
    title: str | None = None
    description: str | None = None
    price: float | None = None
    number_in_stock: int | None = None
    search_score: float | None = None


class MyRankingRequest(RankingRequest[Product]):
    candidates: list[Product]

Here we're using the default RankingRequest with the Product entity we just defined. This actually translates to the following API request schema:

{
  "user_page_size": 0,
  "user_page": 0,
  "candidate_page_size": 0,
  "candidate_page": 0,
  "request_id": "string",
  "include_events": false,
  "query": {
    "query": "string"
  },
  "candidates": [
    {
      "product_id": "string",
      "brand": "string",
      "category": "string",
      "categories": [],
      "title": "string",
      "description": "string",
      "price": 0,
      "number_in_stock": 0,
      "search_score": 0
    }
  ]
}

Define Realtime Features

For a ML feature, there is usually an entity or a composite of entities that the feature is associated to.

Wyvern has the RealtimeFeatureComponent which allows you to inherit from and define the your feature group. It has three potential inputs: the primary entity, the secondary entity and the request entity.

In the ranking example, there are five features. For each feature, we need to define the entity of the feature.

The entity for the search score of the product is obviously the "product". Here's the definition for this feature:

from wyvern import (
    FeatureData,
    RealtimeFeatureComponent,
    WyvernFeature,
)


class RealtimeProductFeature(RealtimeFeatureComponent[Product, Any, Any]):
    def __init__(self):
        super().__init__(
            output_feature_names={
                "search_score",
            },
        )

    async def compute_features(
        self,
        primary_entity: Product,
        request: Any,
    ) -> Optional[FeatureData]:
        features: dict[str, WyvernFeature] = {
            "search_score": primary_entity.search_score or 0.0,
        }

        return FeatureData(
            identifier=primary_entity.identifier,
            features=features,
        )

For the reset of features, they're all about similarity between the query and a product feature. This is what we call the "CompositeEntity" where the feature is the composite of two features of two different entities. Here's the definition of the composite feature group:

from wyvern import QueryEntity


class RealtimeProductQueryFeature(
    RealtimeFeatureComponent[Product, QueryEntity, Any],
):
    def __init__(self):
        super().__init__(
            output_feature_names={
                "search_score",
                "query_category_similarity",
                "query_brand_similarity",
                "query_title_similarity",
                "query_description_similarity",
            },
        )

    async def compute_composite_features(
        self,
        primary_entity: Product,
        secondary_entity: QueryEntity,
        request: Any,
    ) -> Optional[FeatureData]:
        # compute the potential categories for the query.
        # In another word, given a query, what are the possible categories
        #   that the query is referring to with top 3 probability.
        # For example, if the query is "iphone", the top 3 categories could be "iphone", "iphone case", "iphone charger"

        features: dict[str, WyvernFeature] = {
            "query_category_similarity": random.random(),
            "query_brand_similarity": random.random(),
            "query_title_similarity": random.random(),
            "query_description_similarity": random.random(),
        }

        return FeatureData(
            identifier=CompositeIdentifier(
                primary_entity.identifier,
                secondary_entity.identifier,
            ),
            features=features,
        )

Register Realtime Features

Now we go back to the pipelines/main.py and register the realtime features with Wyvern service:

from wyvern import WyvernService

from pipelines.product_ranking.realtime_features import (
    RealtimeProductFeature,
    RealtimeProductQueryFeature,
)

app = WyvernService.generate_app(
    realtime_feature_components=[RealtimeProductFeature, RealtimeProductQueryFeature],
)

To register realtime features, pass the realtime_feature_components, containing the list of your realtime features, to WyvernService.generate_app. This basically tells the Wyvern service that these are available realtime features ready.

Define Model

import asyncio
from functools import cached_property
from typing import List, Set

from wyvern import (
    ModelComponent,
    ModelInput,
    ModelOutput,
    RankingRequest,
    CompositeIdentifier,
)

from pipelines.product_ranking.schemas import Product


class RankingModel(
    ModelComponent[
        ModelInput[
            Product,
            RankingRequest[Product],
        ],
        ModelOutput[float],
    ],
):
    @cached_property
    def manifest_feature_names(self) -> Set[str]:
        return {
            "RealtimeProductFeature:search_score",
            "RealtimeProductQueryFeature:query_category_similarity",
            "RealtimeProductQueryFeature:query_brand_similarity",
            "RealtimeProductQueryFeature:query_title_similarity",
            "RealtimeProductQueryFeature:query_description_similarity",
        }

    async def batch_inference(
        self,
        request: RankingRequest[Product],
        entities: List[Product],
        **kwargs,
    ) -> List[float]:
        return await asyncio.gather(
            *[self._inference_helper(request, entity) for entity in entities]
        )

    async def _inference_helper(
        self,
        request: RankingRequest[Product],
        entity: Product,
    ) -> float:
        score = 0.0
        composite_identifier = CompositeIdentifier(
            primary_identifier=entity.identifier,
            secondary_identifier=request.query.identifier,
        )
        search_score = self.get_feature(
            entity.identifier,
            "RealtimeProductFeature:search_score",
        )
        query_category_similarity = self.get_feature(
            composite_identifier,
            "RealtimeProductQueryFeature:query_category_similarity",
        )
        query_brand_similarity = self.get_feature(
            composite_identifier,
            "RealtimeProductQueryFeature:query_brand_similarity",
        )
        query_title_similarity = self.get_feature(
            composite_identifier,
            "RealtimeProductQueryFeature:query_title_similarity",
        )
        query_description_similarity = self.get_feature(
            composite_identifier,
            "RealtimeProductQueryFeature:query_description_similarity",
        )
        if search_score:
            score += 2 * search_score
        if query_category_similarity:
            score += 10 * query_category_similarity
        if query_brand_similarity:
            score += 20 * query_brand_similarity
        if query_title_similarity:
            score += 22 * query_title_similarity
        if query_description_similarity:
            score += 13 * query_description_similarity

        return score

Define Pipeline

A Wyvern pipeline is the API route that defines the logistics of your ML application pipeline and it defines the API endpoint of yours. There are three inputs:

  • request schema: the MyRankingRequest we defined in pipelines/product_ranking/schemas.py
  • response schema: the default RankingResponse
  • the machine learning model: the RankingModel we defined in pipelines/product_ranking/models.py

Now let's define a PipelineComponent under pipelines/product_ranking/ranking_pipeline.py:

from wyvern import (
    ModelComponent,
    RankingPipeline,
    RankingResponse,
)

from pipelines.product_ranking.models import RankingModel
from pipelines.product_ranking.schemas import MyRankingRequest, Product


class MyRankingPipeline(RankingPipeline[Product]):
    REQUEST_SCHEMA_CLASS = MyRankingRequest
    RESPONSE_SCHEMA_CLASS = RankingResponse

    def get_model(self) -> ModelComponent:
        return RankingModel()

Register Pipelines

Similar to registering realtime features, register the pipeline with the Wyvern service in pipelines/main.py:

from wyvern import WyvernService

from pipelines.product_ranking.ranking_pipeline import MyRankingPipeline
from pipelines.product_ranking.realtime_features import (
    RealtimeProductFeature,
    RealtimeProductQueryFeature,
)

app = WyvernService.generate_app(
    route_components=[MyRankingPipeline],
    realtime_feature_components=[RealtimeProductFeature, RealtimeProductQueryFeature],
)

To register the pipeline, pass the the route_components as a list, containing your pipelines, to WyvernService.generate_app. This basically register the /api/v1/ranking route in the Wyvern service.

Now if you do wyvern run, your ranking endpoint is ready to serve.

Define Business Logics

Coming soon.

Writing Tests

Coming soon.