diff --git a/.github/workflows/batch-ensembler.yaml b/.github/workflows/pyfunc-ensembler-job.yaml similarity index 73% rename from .github/workflows/batch-ensembler.yaml rename to .github/workflows/pyfunc-ensembler-job.yaml index caf47613c..914254d83 100644 --- a/.github/workflows/batch-ensembler.yaml +++ b/.github/workflows/pyfunc-ensembler-job.yaml @@ -1,16 +1,16 @@ -name: engines/batch-ensembler +name: engines/pyfunc-ensembler-job on: # Automatically run CI on Release and Pre-Release tags and main branch # (only if there are changes to relevant paths) push: tags: - - "batch-ensembler/v[0-9]+.[0-9]+.[0-9]+*" + - "pyfunc-ensembler-job/v[0-9]+.[0-9]+.[0-9]+*" branches: - main paths: - - ".github/workflows/batch-ensembler.yaml" - - "engines/batch-ensembler/**" + - ".github/workflows/pyfunc-ensembler-job.yaml" + - "engines/pyfunc-ensembler-job/**" - "sdk/**" # Automatically run CI on branches, that have active PR opened @@ -18,8 +18,8 @@ on: branches: - main paths: - - ".github/workflows/batch-ensembler.yaml" - - "engines/batch-ensembler/**" + - ".github/workflows/pyfunc-ensembler-job.yaml" + - "engines/pyfunc-ensembler-job/**" - "sdk/**" # To make it possible to trigger e2e CI workflow for any arbitrary git ref @@ -50,13 +50,13 @@ jobs: - name: Cache Conda environment uses: actions/cache@v2 with: - path: engines/batch-ensembler/env + path: engines/pyfunc-ensembler-job/env key: | - conda-${{ hashFiles('engines/batch-ensembler/environment.yaml') }}-${{ hashFiles('engines/batch-ensembler/requirements.txt') }}-${{ hashFiles('engines/batch-ensembler/requirements.dev.txt') }} + conda-${{ hashFiles('engines/pyfunc-ensembler-job/environment.yaml') }}-${{ hashFiles('engines/pyfunc-ensembler-job/requirements.txt') }}-${{ hashFiles('engines/pyfunc-ensembler-job/requirements.dev.txt') }} restore-keys: conda- - name: Run Tests - working-directory: engines/batch-ensembler + working-directory: engines/pyfunc-ensembler-job run: | make setup make test @@ -70,7 +70,7 @@ jobs: - id: release-rules uses: ./.github/actions/release-rules with: - prefix: batch-ensembler/ + prefix: pyfunc-ensembler-job/ publish: # Automatically publish release and pre-release artifacts. @@ -103,13 +103,13 @@ jobs: - name: Build Docker Image id: build - working-directory: engines/batch-ensembler + working-directory: engines/pyfunc-ensembler-job env: DOCKER_REGISTRY: ghcr.io/${{ github.repository }} run: | set -o pipefail make build-image | tee output.log - echo "::set-output name=ensembler-image::$(sed -n 's%Building docker image: \(.*\)%\1%p' output.log)" + echo "::set-output name=pyfunc-ensembler-job::$(sed -n 's%Building docker image: \(.*\)%\1%p' output.log)" - - name: Publish Batch Ensembler Docker Image - run: docker push ${{ steps.build.outputs.ensembler-image }} + - name: Publish Pyfunc Ensembler Job Docker Image + run: docker push ${{ steps.build.outputs.pyfunc-ensembler-job }} diff --git a/.github/workflows/pyfunc-ensembler-service.yaml b/.github/workflows/pyfunc-ensembler-service.yaml new file mode 100644 index 000000000..0f915b4fe --- /dev/null +++ b/.github/workflows/pyfunc-ensembler-service.yaml @@ -0,0 +1,109 @@ +name: engines/pyfunc-ensembler-service + +on: + # Automatically run CI on Release and Pre-Release tags and main branch + # (only if there are changes to relevant paths) + push: + tags: + - "pyfunc-ensembler-service/v[0-9]+.[0-9]+.[0-9]+*" + branches: + - main + paths: + - ".github/workflows/pyfunc-ensembler-service.yaml" + - "engines/pyfunc-ensembler-service/**" + - "sdk/**" + + # Automatically run CI on branches, that have active PR opened + pull_request: + branches: + - main + paths: + - ".github/workflows/pyfunc-ensembler-service.yaml" + - "engines/pyfunc-ensembler-service/**" + - "sdk/**" + + # To make it possible to trigger e2e CI workflow for any arbitrary git ref + workflow_dispatch: + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + + - name: Setup Python + uses: actions/setup-python@v2 + with: + python-version: 3.8 + + - name: Setup Conda + uses: conda-incubator/setup-miniconda@v2 + with: + auto-update-conda: true + + - name: Cache Conda environment + uses: actions/cache@v2 + with: + path: engines/pyfunc-ensembler-service/env + key: | + conda-${{ hashFiles('engines/pyfunc-ensembler-service/environment.yaml') }}-${{ hashFiles('engines/pyfunc-ensembler-service/requirements.txt') }}-${{ hashFiles('engines/pyfunc-ensembler-service/requirements.dev.txt') }} + restore-keys: conda- + + - name: Run Tests + working-directory: engines/pyfunc-ensembler-service + run: | + make setup + make test + + release-rules: + runs-on: ubuntu-latest + outputs: + release-type: ${{ steps.release-rules.outputs.release-type }} + steps: + - uses: actions/checkout@v2 + - id: release-rules + uses: ./.github/actions/release-rules + with: + prefix: pyfunc-ensembler-service/ + + publish: + # Automatically publish release and pre-release artifacts. + # + # As for dev releases, make it possible to publish artifacts + # manually by approving 'deployment' in the 'manual' environment. + # + # Dev build can be released either from the 'main' branch or + # by running this workflow manually with `workflow_dispatch` event. + if: >- + contains('release,pre-release', needs.release-rules.outputs.release-type) + || ( github.event_name != 'pull_request' ) + || ( github.event.pull_request.head.repo.full_name == github.repository ) + environment: ${{ needs.release-rules.outputs.release-type == 'dev' && 'manual' || '' }} + runs-on: ubuntu-latest + needs: + - release-rules + - test + steps: + - uses: actions/checkout@v2 + with: + fetch-depth: 0 + + - name: Log in to the Container registry + uses: docker/login-action@v1 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build Docker Image + id: build + working-directory: engines/pyfunc-ensembler-service + env: + DOCKER_REGISTRY: ghcr.io/${{ github.repository }} + run: | + set -o pipefail + make build-image | tee output.log + echo "::set-output name=pyfunc-ensembler-service-image::$(sed -n 's%Building docker image: \(.*\)%\1%p' output.log)" + + - name: Publish Pyfunc Ensembler Service Docker Image + run: docker push ${{ steps.build.outputs.pyfunc-ensembler-service-image }} diff --git a/.github/workflows/turing.yaml b/.github/workflows/turing.yaml index 58de270c5..ec87ece46 100644 --- a/.github/workflows/turing.yaml +++ b/.github/workflows/turing.yaml @@ -10,9 +10,11 @@ on: - main paths-ignore: - "docs/**" - - "engines/batch-ensembler/**" + - "engines/pyfunc-ensembler-job/**" + - "engines/pyfunc-ensembler-service/**" - "sdk/**" - - ".github/workflows/batch-ensembler.yaml" + - ".github/workflows/pyfunc-ensembler-job.yaml" + - ".github/workflows/pyfunc-ensembler-service.yaml" - ".github/workflows/sdk.yaml" - ".github/workflows/helm-chart.yaml" - ".github/workflows/cluster-init.yaml" @@ -23,9 +25,11 @@ on: - main paths-ignore: - "docs/**" - - "engines/batch-ensembler/**" + - "engines/pyfunc-ensembler-job/**" + - "engines/pyfunc-ensembler-service/**" - "sdk/**" - - ".github/workflows/batch-ensembler.yaml" + - ".github/workflows/pyfunc-ensembler-job.yaml" + - ".github/workflows/pyfunc-ensembler-service.yaml" - ".github/workflows/sdk.yaml" - ".github/workflows/helm-chart.yaml" diff --git a/README.md b/README.md index f0e58e860..64413c0e0 100644 --- a/README.md +++ b/README.md @@ -180,10 +180,10 @@ BatchEnsemblingConfig: BuildNamespace: default BuildTimeoutDuration: 20m DestinationRegistry: ghcr.io - BaseImageRef: ghcr.io/gojek/turing/batch-ensembler:latest + BaseImageRef: ghcr.io/gojek/turing/pyfunc-ensembler-job:latest KanikoConfig: BuildContextURI: git://github.com/gojek/turing.git#refs/heads/main - DockerfileFilePath: engines/batch-ensembler/app.Dockerfile + DockerfileFilePath: engines/pyfunc-ensembler-job/app.Dockerfile Image: gcr.io/kaniko-project/executor ImageVersion: v1.6.0 ResourceRequestsLimits: diff --git a/api/config-dev.yaml b/api/config-dev.yaml index 4d6abc4d1..7fb85ad25 100644 --- a/api/config-dev.yaml +++ b/api/config-dev.yaml @@ -19,10 +19,10 @@ BatchEnsemblingConfig: BuildNamespace: default BuildTimeoutDuration: 20m DestinationRegistry: ghcr.io - BaseImageRef: ghcr.io/gojek/turing/batch-ensembler:latest + BaseImageRef: ghcr.io/gojek/turing/pyfunc-ensembler-job:latest KanikoConfig: BuildContextURI: git://github.com/gojek/turing.git#refs/heads/main - DockerfileFilePath: engines/batch-ensembler/app.Dockerfile + DockerfileFilePath: engines/pyfunc-ensembler-job/app.Dockerfile Image: gcr.io/kaniko-project/executor ImageVersion: v1.6.0 ResourceRequestsLimits: diff --git a/api/turing/config/example.yaml b/api/turing/config/example.yaml index 28b4afcb2..ea84639c7 100644 --- a/api/turing/config/example.yaml +++ b/api/turing/config/example.yaml @@ -42,10 +42,10 @@ BatchEnsemblingConfig: BuildNamespace: default BuildTimeoutDuration: 20m DestinationRegistry: ghcr.io - BaseImageRef: ghcr.io/gojek/turing/batch-ensembler:latest + BaseImageRef: ghcr.io/gojek/turing/pyfunc-ensembler-job:latest KanikoConfig: BuildContextURI: git://github.com/gojek/turing.git#refs/heads/main - DockerfileFilePath: engines/batch-ensembler/app.Dockerfile + DockerfileFilePath: engines/pyfunc-ensembler-job/app.Dockerfile Image: gcr.io/kaniko-project/executor ImageVersion: v1.6.0 ResourceRequestsLimits: diff --git a/engines/batch-ensembler/.dockerignore b/engines/pyfunc-ensembler-job/.dockerignore similarity index 100% rename from engines/batch-ensembler/.dockerignore rename to engines/pyfunc-ensembler-job/.dockerignore diff --git a/engines/batch-ensembler/.gitignore b/engines/pyfunc-ensembler-job/.gitignore similarity index 100% rename from engines/batch-ensembler/.gitignore rename to engines/pyfunc-ensembler-job/.gitignore diff --git a/engines/batch-ensembler/Dockerfile b/engines/pyfunc-ensembler-job/Dockerfile similarity index 100% rename from engines/batch-ensembler/Dockerfile rename to engines/pyfunc-ensembler-job/Dockerfile diff --git a/engines/batch-ensembler/Makefile b/engines/pyfunc-ensembler-job/Makefile similarity index 92% rename from engines/batch-ensembler/Makefile rename to engines/pyfunc-ensembler-job/Makefile index 0ca95c056..85f17f450 100644 --- a/engines/batch-ensembler/Makefile +++ b/engines/pyfunc-ensembler-job/Makefile @@ -1,6 +1,6 @@ SHELL := /bin/bash -APP_NAME := batch-ensembler +APP_NAME := pyfunc-ensembler-job CONDA_ENV_NAME ?= $(APP_NAME) ACTIVATE_ENV = source $$(conda info --base)/etc/profile.d/conda.sh ; conda activate ./env/$(CONDA_ENV_NAME) @@ -39,4 +39,4 @@ build-image: version .PHONY: version version: $(eval VERSION=$(if $(OVERWRITE_VERSION),$(OVERWRITE_VERSION),v$(shell ../../scripts/vertagen/vertagen.sh -p ${APP_NAME}/))) - @echo "turing-batch-ensembler version:" $(VERSION) + @echo "turing-pyfunc-ensembler-job version:" $(VERSION) diff --git a/engines/batch-ensembler/README.md b/engines/pyfunc-ensembler-job/README.md similarity index 100% rename from engines/batch-ensembler/README.md rename to engines/pyfunc-ensembler-job/README.md diff --git a/engines/batch-ensembler/app.Dockerfile b/engines/pyfunc-ensembler-job/app.Dockerfile similarity index 100% rename from engines/batch-ensembler/app.Dockerfile rename to engines/pyfunc-ensembler-job/app.Dockerfile diff --git a/engines/batch-ensembler/ensembler/__init__.py b/engines/pyfunc-ensembler-job/ensembler/__init__.py similarity index 100% rename from engines/batch-ensembler/ensembler/__init__.py rename to engines/pyfunc-ensembler-job/ensembler/__init__.py diff --git a/engines/batch-ensembler/ensembler/dataset.py b/engines/pyfunc-ensembler-job/ensembler/dataset.py similarity index 100% rename from engines/batch-ensembler/ensembler/dataset.py rename to engines/pyfunc-ensembler-job/ensembler/dataset.py diff --git a/engines/batch-ensembler/ensembler/ensembler.py b/engines/pyfunc-ensembler-job/ensembler/ensembler.py similarity index 100% rename from engines/batch-ensembler/ensembler/ensembler.py rename to engines/pyfunc-ensembler-job/ensembler/ensembler.py diff --git a/engines/batch-ensembler/ensembler/job.py b/engines/pyfunc-ensembler-job/ensembler/job.py similarity index 100% rename from engines/batch-ensembler/ensembler/job.py rename to engines/pyfunc-ensembler-job/ensembler/job.py diff --git a/engines/batch-ensembler/ensembler/sink.py b/engines/pyfunc-ensembler-job/ensembler/sink.py similarity index 100% rename from engines/batch-ensembler/ensembler/sink.py rename to engines/pyfunc-ensembler-job/ensembler/sink.py diff --git a/engines/batch-ensembler/ensembler/source.py b/engines/pyfunc-ensembler-job/ensembler/source.py similarity index 100% rename from engines/batch-ensembler/ensembler/source.py rename to engines/pyfunc-ensembler-job/ensembler/source.py diff --git a/engines/batch-ensembler/ensembler/sql/bq_join.sql.jinja2 b/engines/pyfunc-ensembler-job/ensembler/sql/bq_join.sql.jinja2 similarity index 100% rename from engines/batch-ensembler/ensembler/sql/bq_join.sql.jinja2 rename to engines/pyfunc-ensembler-job/ensembler/sql/bq_join.sql.jinja2 diff --git a/engines/batch-ensembler/ensembler/sql/bq_select.sql.jinja2 b/engines/pyfunc-ensembler-job/ensembler/sql/bq_select.sql.jinja2 similarity index 100% rename from engines/batch-ensembler/ensembler/sql/bq_select.sql.jinja2 rename to engines/pyfunc-ensembler-job/ensembler/sql/bq_select.sql.jinja2 diff --git a/engines/batch-ensembler/entrypoint.sh b/engines/pyfunc-ensembler-job/entrypoint.sh similarity index 100% rename from engines/batch-ensembler/entrypoint.sh rename to engines/pyfunc-ensembler-job/entrypoint.sh diff --git a/engines/batch-ensembler/environment.yaml b/engines/pyfunc-ensembler-job/environment.yaml similarity index 86% rename from engines/batch-ensembler/environment.yaml rename to engines/pyfunc-ensembler-job/environment.yaml index 5b4aba5a1..976f083ea 100644 --- a/engines/batch-ensembler/environment.yaml +++ b/engines/pyfunc-ensembler-job/environment.yaml @@ -1,4 +1,4 @@ -name: batch-ensembler +name: pyfunc-ensembler-job dependencies: - python=3.8 - pip=21.0.1 diff --git a/engines/batch-ensembler/main.py b/engines/pyfunc-ensembler-job/main.py similarity index 100% rename from engines/batch-ensembler/main.py rename to engines/pyfunc-ensembler-job/main.py diff --git a/engines/batch-ensembler/requirements.dev.txt b/engines/pyfunc-ensembler-job/requirements.dev.txt similarity index 100% rename from engines/batch-ensembler/requirements.dev.txt rename to engines/pyfunc-ensembler-job/requirements.dev.txt diff --git a/engines/batch-ensembler/requirements.txt b/engines/pyfunc-ensembler-job/requirements.txt similarity index 100% rename from engines/batch-ensembler/requirements.txt rename to engines/pyfunc-ensembler-job/requirements.txt diff --git a/engines/batch-ensembler/setup.py b/engines/pyfunc-ensembler-job/setup.py similarity index 95% rename from engines/batch-ensembler/setup.py rename to engines/pyfunc-ensembler-job/setup.py index 7dcb32909..a52ab91ef 100644 --- a/engines/batch-ensembler/setup.py +++ b/engines/pyfunc-ensembler-job/setup.py @@ -18,7 +18,7 @@ ] setuptools.setup( - name='batch-ensembler', + name='pyfunc-ensembler-job', packages=setuptools.find_packages(), install_requires=requirements, dev_requirements=dev_requirements, diff --git a/engines/batch-ensembler/tests/__init__.py b/engines/pyfunc-ensembler-job/tests/__init__.py similarity index 100% rename from engines/batch-ensembler/tests/__init__.py rename to engines/pyfunc-ensembler-job/tests/__init__.py diff --git a/engines/batch-ensembler/tests/conftest.py b/engines/pyfunc-ensembler-job/tests/conftest.py similarity index 100% rename from engines/batch-ensembler/tests/conftest.py rename to engines/pyfunc-ensembler-job/tests/conftest.py diff --git a/engines/batch-ensembler/tests/dataset_test.py b/engines/pyfunc-ensembler-job/tests/dataset_test.py similarity index 100% rename from engines/batch-ensembler/tests/dataset_test.py rename to engines/pyfunc-ensembler-job/tests/dataset_test.py diff --git a/engines/batch-ensembler/tests/ensembler_test.py b/engines/pyfunc-ensembler-job/tests/ensembler_test.py similarity index 100% rename from engines/batch-ensembler/tests/ensembler_test.py rename to engines/pyfunc-ensembler-job/tests/ensembler_test.py diff --git a/engines/batch-ensembler/tests/sink_test.py b/engines/pyfunc-ensembler-job/tests/sink_test.py similarity index 100% rename from engines/batch-ensembler/tests/sink_test.py rename to engines/pyfunc-ensembler-job/tests/sink_test.py diff --git a/engines/batch-ensembler/tests/utils/__init__.py b/engines/pyfunc-ensembler-job/tests/utils/__init__.py similarity index 100% rename from engines/batch-ensembler/tests/utils/__init__.py rename to engines/pyfunc-ensembler-job/tests/utils/__init__.py diff --git a/engines/batch-ensembler/tests/utils/openapi_utils.py b/engines/pyfunc-ensembler-job/tests/utils/openapi_utils.py similarity index 100% rename from engines/batch-ensembler/tests/utils/openapi_utils.py rename to engines/pyfunc-ensembler-job/tests/utils/openapi_utils.py diff --git a/engines/pyfunc-ensembler-service/.dockerignore b/engines/pyfunc-ensembler-service/.dockerignore new file mode 100644 index 000000000..c5c012bf2 --- /dev/null +++ b/engines/pyfunc-ensembler-service/.dockerignore @@ -0,0 +1,8 @@ +.gitignore +.dockerignore + +env/ +tests/ + +.mypy_cache/ +.pytest_cache/ diff --git a/engines/pyfunc-ensembler-service/.gitignore b/engines/pyfunc-ensembler-service/.gitignore new file mode 100644 index 000000000..6d3fc40b1 --- /dev/null +++ b/engines/pyfunc-ensembler-service/.gitignore @@ -0,0 +1,6 @@ +env/ +.coverage +**/mlruns/ +**/__pycache__ + +ensembler/* diff --git a/engines/pyfunc-ensembler-service/Dockerfile b/engines/pyfunc-ensembler-service/Dockerfile new file mode 100644 index 000000000..87f8180ab --- /dev/null +++ b/engines/pyfunc-ensembler-service/Dockerfile @@ -0,0 +1,16 @@ +FROM continuumio/miniconda3 AS builder + +RUN wget -qO- https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-367.0.0-linux-x86_64.tar.gz | tar xzf - +ENV PATH=$PATH:/google-cloud-sdk/bin +ENV CONDA_ENV_NAME=${CONDA_ENV_NAME} +ENV APP_NAME=${APP_NAME} + +COPY . . +COPY ./temp-deps/sdk ./../../sdk + +RUN conda env create -f ./environment.yaml && \ + conda env update --name ${CONDA_ENV_NAME} --file /ensembler/conda.yaml && \ + rm -rf /root/.cache + +# Install conda-pack: +RUN conda install -c conda-forge conda-pack \ No newline at end of file diff --git a/engines/pyfunc-ensembler-service/Makefile b/engines/pyfunc-ensembler-service/Makefile new file mode 100644 index 000000000..9991d2399 --- /dev/null +++ b/engines/pyfunc-ensembler-service/Makefile @@ -0,0 +1,33 @@ +SHELL := /bin/bash + +APP_NAME := pyfunc-ensembler-service +CONDA_ENV_NAME ?= $(APP_NAME) +ACTIVATE_ENV = source $$(conda info --base)/etc/profile.d/conda.sh ; conda activate $(CONDA_ENV_NAME) + +.PHONY: setup +setup: $(CONDA_ENV_NAME) +$(CONDA_ENV_NAME): + @conda env update -f environment.yaml --prune + $(ACTIVATE_ENV) && pip install -r requirements.dev.txt + +.PHONY: test +test: + @$(ACTIVATE_ENV) && \ + python -m pytest \ + --cov=pyfunc_ensembler_runner \ + --cov-report term-missing \ + -W ignore + +.PHONY: build-image +build-image: version + @mkdir -p temp-deps + @cp -r ../../sdk temp-deps/ + @$(eval IMAGE_TAG = $(if $(DOCKER_REGISTRY),$(DOCKER_REGISTRY)/,)${APP_NAME}:${VERSION}) + @echo "Building docker image: ${IMAGE_TAG}" + @docker build . --tag ${IMAGE_TAG} + @rm -rf temp-deps + +.PHONY: version +version: + $(eval VERSION=$(if $(OVERWRITE_VERSION),$(OVERWRITE_VERSION),v$(shell ../../scripts/vertagen/vertagen.sh -p ${APP_NAME}/))) + @echo "turing-pyfunc-ensembler-service version:" $(VERSION) diff --git a/engines/pyfunc-ensembler-service/README.md b/engines/pyfunc-ensembler-service/README.md new file mode 100644 index 000000000..f8d3aec28 --- /dev/null +++ b/engines/pyfunc-ensembler-service/README.md @@ -0,0 +1,27 @@ +# PyFuncEnsembler Server for Real-Time Experiments + +PyFuncEnsemblerRunner is a tool for deploying user-defined ensemblers (for use with Turing routers), written in +MLflow's `pyfunc` flavour. + +## Usage +To run the ensembler as a webservice: +```bash +python -m pyfunc_ensembler_runner --mlflow_ensembler_dir $ENSEMBLER_DIR [-l {DEBUG,INFO,WARNING,ERROR,CRITICAL}] + +arguments: + --mlflow_ensembler_dir Path to the ensembler folder containing the mlflow files + --log-level Set the logging level + -h, --help Show this help message and exit +``` + +## Docker Image Building + +To create a docker image locally, you'll need to first download the model artifacts from the MLflow's model registry: +```bash +gsutil cp -r gs://[bucket-name]/mlflow/[project_id]/[run_id]/artifacts/ensembler . +``` + +To build the docker image, run the following: +```bash +make build-image +``` diff --git a/engines/pyfunc-ensembler-service/app.Dockerfile b/engines/pyfunc-ensembler-service/app.Dockerfile new file mode 100644 index 000000000..4b953e1b9 --- /dev/null +++ b/engines/pyfunc-ensembler-service/app.Dockerfile @@ -0,0 +1,32 @@ +ARG BASE_IMAGE + +FROM ${BASE_IMAGE} as builder + +ARG MODEL_URL +ARG FOLDER_NAME + +RUN gsutil -m cp -r ${MODEL_URL} . + +# Install dependencies required by the user-defined ensembler +RUN /bin/bash -c ". activate ${CONDA_ENV_NAME} && conda env update --name ${CONDA_ENV_NAME} --file /${HOME}/${FOLDER_NAME}/conda.yaml" + +# Use conda-pack to create a standalone environment +# in /venv: +RUN conda-pack -n ${CONDA_ENV_NAME} -o /tmp/env.tar && \ + mkdir /venv && cd /venv && tar xf /tmp/env.tar && \ + rm /tmp/env.tar + +RUN /venv/bin/conda-unpack + +FROM debian:bullseye-slim + +COPY --from=builder /ensembler ./ensembler +COPY --from=builder /pyfunc_ensembler_runner ./pyfunc_ensembler_runner +COPY --from=builder /run.sh /run.sh +COPY --from=builder /venv /venv + +RUN /bin/bash -c ". /venv/bin/activate & \ + python -m ${APP_NAME} --mlflow_ensembler_dir /ensembler --dry_run" \ + +RUN /bin/bash -c ". /venv/bin/activate & \ + python -m ${APP_NAME} --mlflow_ensembler_dir /ensembler -l INFO" diff --git a/engines/pyfunc-ensembler-service/environment.yaml b/engines/pyfunc-ensembler-service/environment.yaml new file mode 100644 index 000000000..753f9b19a --- /dev/null +++ b/engines/pyfunc-ensembler-service/environment.yaml @@ -0,0 +1,8 @@ +name: pyfunc-ensembler-service +dependencies: + - python=3.8 + - pip=21.0.1 + - pip: + - -r file:requirements.txt + - --extra-index-url=https://test.pypi.org/simple + - --trusted-host=test.pypi.org \ No newline at end of file diff --git a/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/__init__.py b/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/__init__.py new file mode 100644 index 000000000..59c48fb22 --- /dev/null +++ b/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/__init__.py @@ -0,0 +1 @@ +from .ensembler_runner import PyFuncEnsemblerRunner diff --git a/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/__main__.py b/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/__main__.py new file mode 100644 index 000000000..1fe452dbc --- /dev/null +++ b/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/__main__.py @@ -0,0 +1,46 @@ +import argparse +import logging +import traceback + +import tornado.ioloop + +from pyfunc_ensembler_runner.server import PyFuncEnsemblerServer +from pyfunc_ensembler_runner import PyFuncEnsemblerRunner + + +parser = argparse.ArgumentParser() +parser.add_argument('--mlflow_ensembler_dir', required=True, help='A dir pointing to the saved Mlflow Pyfunc ensembler') +parser.add_argument('--dry_run', default=False, action='store_true', required=False, + help="Dry run pyfunc ensembler by loading the specified ensembler " + "in --mlflow_ensembler_dir without starting webserver") +parser.add_argument('-l', '--log-level', dest='log_level', + choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], + help='Set the logging level', default=logging.DEBUG) + +args, _ = parser.parse_known_args() + + +if __name__ == "__main__": + logging.basicConfig(level=args.log_level) + logging.info( + 'Called with arguments:\n%s\n', + '\n'.join([f'{k}: {v}' for k, v in vars(args).items()]) + ) + + ensembler = PyFuncEnsemblerRunner(args.mlflow_ensembler_dir) + + try: + ensembler.load() + except Exception as e: + logging.error("Unable to initialise PyFuncEnsemblerRunner from the MLflow directory provided.") + logging.error(traceback.format_exc()) + exit(1) + + if args.dry_run: + logging.info("Dry run success") + exit(0) + + app = PyFuncEnsemblerServer(ensembler).create_application() + logging.info("Ensembler ready to serve requests!") + app.listen(8080) + tornado.ioloop.IOLoop.current().start() diff --git a/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/ensembler_runner.py b/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/ensembler_runner.py new file mode 100644 index 000000000..ecd60b8b6 --- /dev/null +++ b/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/ensembler_runner.py @@ -0,0 +1,23 @@ +import logging + +from typing import Dict, List, Any +from mlflow import pyfunc + + +class PyFuncEnsemblerRunner: + """ + PyFunc ensembler runner used for real-time outputs + """ + + def __init__(self, artifact_dir: str): + self.artifact_dir = artifact_dir + self._ensembler = None + + def load(self): + self._ensembler = pyfunc.load_model(self.artifact_dir) + + def predict(self, inputs: Dict[str, Any]) -> List[Any]: + logging.info(f"Input request payload: {inputs}") + output = self._ensembler.predict(inputs) + logging.info(f"Output response: {output}") + return output diff --git a/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/handler.py b/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/handler.py new file mode 100644 index 000000000..adf449a78 --- /dev/null +++ b/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/handler.py @@ -0,0 +1,31 @@ +import tornado.web +import orjson +import json + +from http import HTTPStatus +from typing import Any, Dict +from pyfunc_ensembler_runner.ensembler_runner import PyFuncEnsemblerRunner + + +class EnsemblerHandler(tornado.web.RequestHandler): + def initialize(self, ensembler: PyFuncEnsemblerRunner): + self.ensembler = ensembler + + def post(self): + request = EnsemblerHandler.validate_request(self.request) + response = self.ensembler.predict(request) + + response_json = orjson.dumps(response) + self.write(response_json) + self.set_header("Content-Type", "application/json; charset=UTF-8") + + @staticmethod + def validate_request(request: Any) -> Dict[str, Any]: + try: + body = orjson.loads(request.body) + except json.decoder.JSONDecodeError as e: + raise tornado.web.HTTPError( + status_code=HTTPStatus.BAD_REQUEST, + reason="Unrecognized request format: %s" % e + ) + return body diff --git a/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/server.py b/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/server.py new file mode 100644 index 000000000..e1af17f9f --- /dev/null +++ b/engines/pyfunc-ensembler-service/pyfunc_ensembler_runner/server.py @@ -0,0 +1,14 @@ +import tornado.web + +from pyfunc_ensembler_runner.handler import EnsemblerHandler +from pyfunc_ensembler_runner.ensembler_runner import PyFuncEnsemblerRunner + + +class PyFuncEnsemblerServer: + def __init__(self, ensembler: PyFuncEnsemblerRunner): + self.ensembler = ensembler + + def create_application(self): + return tornado.web.Application([ + (r"/ensemble", EnsemblerHandler, dict(ensembler=self.ensembler)) + ]) diff --git a/engines/pyfunc-ensembler-service/requirements.dev.txt b/engines/pyfunc-ensembler-service/requirements.dev.txt new file mode 100644 index 000000000..0fb45c749 --- /dev/null +++ b/engines/pyfunc-ensembler-service/requirements.dev.txt @@ -0,0 +1,3 @@ +pytest +pytest-cov +pylint \ No newline at end of file diff --git a/engines/pyfunc-ensembler-service/requirements.txt b/engines/pyfunc-ensembler-service/requirements.txt new file mode 100644 index 000000000..85a3cc1b2 --- /dev/null +++ b/engines/pyfunc-ensembler-service/requirements.txt @@ -0,0 +1,5 @@ +argparse>=1.4.0 +cloudpickle==1.2.2 +orjson==2.6.8 +tornado==6.1 +file:../../sdk diff --git a/engines/pyfunc-ensembler-service/setup.py b/engines/pyfunc-ensembler-service/setup.py new file mode 100644 index 000000000..e2a5f6a06 --- /dev/null +++ b/engines/pyfunc-ensembler-service/setup.py @@ -0,0 +1,26 @@ +import setuptools +import pathlib +import pkg_resources + + +with pathlib.Path('requirements.txt').open() as requirements_txt: + requirements = [ + str(requirement) + for requirement + in pkg_resources.parse_requirements(requirements_txt) + ] + +with pathlib.Path('requirements.dev.txt').open() as dev_requirements_test: + dev_requirements = [ + str(requirement) + for requirement + in pkg_resources.parse_requirements(dev_requirements_test) + ] + +setuptools.setup( + name='pyfunc-ensembler-service', + packages=setuptools.find_packages(), + install_requires=requirements, + dev_requirements=dev_requirements, + python_requires='>=3.8', +) diff --git a/engines/pyfunc-ensembler-service/tests/__init__.py b/engines/pyfunc-ensembler-service/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/engines/pyfunc-ensembler-service/tests/conftest.py b/engines/pyfunc-ensembler-service/tests/conftest.py new file mode 100644 index 000000000..94020fb93 --- /dev/null +++ b/engines/pyfunc-ensembler-service/tests/conftest.py @@ -0,0 +1,47 @@ +import pytest + +from typing import Any, Dict +from turing.ensembler import PyFunc + + +class TestEnsembler(PyFunc): + + def initialize(self, artifacts: dict): + pass + + def ensemble( + self, + features: Dict, + predictions: Dict, + treatment_config: Dict) -> Any: + route_name_to_id = TestEnsembler.get_route_name_to_id_mapping(predictions) + if treatment_config['configuration']['name'] == "choose_the_control": + return predictions[route_name_to_id['control']]['data']['predictions'] + else: + return predictions[0]['data']['predictions'] + + @staticmethod + def get_route_name_to_id_mapping(predictions): + """ + Helper function to look through the predictions returned from the various routes and to map their names to + their id numbers (the order in which they are found in the payload. + """ + route_name_to_id = {} + for i, pred in enumerate(predictions): + route_name_to_id[pred['route']] = i + return route_name_to_id + + +@pytest.fixture +def simple_ensembler_uri(): + import os + import mlflow + from mlflow.pyfunc import log_model + log_model( + artifact_path='ensembler', + python_model=TestEnsembler(), + code_path=[os.path.join(os.path.dirname(__file__), '../pyfunc_ensembler_runner')]) + + ensembler_path = os.path.join(mlflow.get_artifact_uri(), 'ensembler') + + return ensembler_path diff --git a/engines/pyfunc-ensembler-service/tests/test_ensembler_runner.py b/engines/pyfunc-ensembler-service/tests/test_ensembler_runner.py new file mode 100644 index 000000000..a9983d868 --- /dev/null +++ b/engines/pyfunc-ensembler-service/tests/test_ensembler_runner.py @@ -0,0 +1,67 @@ +import os +import orjson +import pytest + +from tornado.testing import AsyncHTTPTestCase +from tornado.httpclient import HTTPError +from pyfunc_ensembler_runner.ensembler_runner import PyFuncEnsemblerRunner +from pyfunc_ensembler_runner.server import PyFuncEnsemblerServer + + +data_dir = os.path.join(os.path.dirname(__file__), "./testdata") +with open(os.path.join(data_dir, "request_short.json")) as f: + dummy_short_request = f.read() + +with open(os.path.join(data_dir, "request_long.json")) as f: + dummy_long_request = f.read() + +with open(os.path.join(data_dir, "request_invalid.json")) as f: + dummy_invalid_request = f.read() + + +@pytest.mark.parametrize( + "inputs,expected", [ + pytest.param( + dummy_long_request, + [ + 296.15732, + 0 + ] + ) + ]) +def test_ensembler_prediction(simple_ensembler_uri, inputs, expected): + ensembler = PyFuncEnsemblerRunner(simple_ensembler_uri) + ensembler.load() + actual = ensembler.predict(orjson.loads(inputs)) + assert actual == expected + + +def test_create_ensembler_server(simple_ensembler_uri): + ensembler = PyFuncEnsemblerRunner(simple_ensembler_uri) + app = PyFuncEnsemblerServer(ensembler) + + assert app.ensembler == ensembler + + +# e2e test for real-time ensembler web server and handler +@pytest.mark.usefixtures("simple_ensembler_uri") +class TestEnsemblerService(AsyncHTTPTestCase): + def get_app(self): + ensembler = PyFuncEnsemblerRunner(self.ensembler) + ensembler.load() + return PyFuncEnsemblerServer(ensembler).create_application() + + def test_valid_request(self): + response = self.fetch('/ensemble', method="POST", body=dummy_long_request) + self.assertEqual(response.code, 200) + self.assertEqual(orjson.loads(response.body), [296.15732, 0]) + + def test_invalid_request(self): + response = self.fetch('/ensemble', method="POST", body=dummy_invalid_request) + self.assertEqual(response.code, 400) + self.assertEqual(type(response.error), HTTPError) + + @pytest.fixture(autouse=True) + def _get_ensembler(self, simple_ensembler_uri): + self.ensembler = simple_ensembler_uri + diff --git a/engines/pyfunc-ensembler-service/tests/testdata/request_invalid.json b/engines/pyfunc-ensembler-service/tests/testdata/request_invalid.json new file mode 100644 index 000000000..dba0bb2c9 --- /dev/null +++ b/engines/pyfunc-ensembler-service/tests/testdata/request_invalid.json @@ -0,0 +1,83 @@ +{ + "request": { + "type": 1, + "search_results": [ + { + "place_id": "place_1", + "gates": [ + { + "place_id": "place_1", + "location_history": null, + "saved_address": null + } + ], + "location_history": null, + "saved_address": null + }, + { + "place_id": "place_2", + "gates": null, + "location_history": null, + "saved_address": null + } + ], + "country": 1 + }, + "response": { + "route_responses": [ + { + "route": "model_2", + "data": { + "predictions": [ + 20947, + 16272.597648460218 + ] + }, + "is_default": false + }, + { + "route": "model_3", + "data": { + "predictions": [ + 0.9989534700460942, + 0.051541458539085795 + ] + }, + "is_default": false + }, + { + "route": "control", + "data": { + "predictions": [ + 296.15732, + 0 + ] + }, + "is_default": true + }, + { + "route": "model_1", + "data": { + "predictions": [ + 0.9989534700460942, + 0.051541458539085795 + ] + }, + "is_default": false + }, + { + "route": "model_0", + "data": { + "predictions": [ + 20947, + 16272.597648460218 + ] + }, + "is_default": false + } + ], + "experiment": { + "configurat + } + } +} \ No newline at end of file diff --git a/engines/pyfunc-ensembler-service/tests/testdata/request_long.json b/engines/pyfunc-ensembler-service/tests/testdata/request_long.json new file mode 100644 index 000000000..96f697b56 --- /dev/null +++ b/engines/pyfunc-ensembler-service/tests/testdata/request_long.json @@ -0,0 +1,91 @@ +{ + "request": { + "type": 1, + "search_results": [ + { + "place_id": "place_1", + "gates": [ + { + "place_id": "place_1", + "location_history": null, + "saved_address": null + } + ], + "location_history": null, + "saved_address": null + }, + { + "place_id": "place_2", + "gates": null, + "location_history": null, + "saved_address": null + } + ], + "country": 1 + }, + "response": { + "route_responses": [ + { + "route": "model_2", + "data": { + "predictions": [ + 20947, + 16272.597648460218 + ] + }, + "is_default": false + }, + { + "route": "model_3", + "data": { + "predictions": [ + 0.9989534700460942, + 0.051541458539085795 + ] + }, + "is_default": false + }, + { + "route": "control", + "data": { + "predictions": [ + 296.15732, + 0 + ] + }, + "is_default": true + }, + { + "route": "model_1", + "data": { + "predictions": [ + 0.9989534700460942, + 0.051541458539085795 + ] + }, + "is_default": false + }, + { + "route": "model_0", + "data": { + "predictions": [ + 20947, + 16272.597648460218 + ] + }, + "is_default": false + } + ], + "experiment": { + "configuration": { + "name": "choose_the_control", + "version": 8, + "properties": { + "in_experiment": "true", + "route": "control" + } + }, + "error": "" + } + } +} \ No newline at end of file diff --git a/engines/pyfunc-ensembler-service/tests/testdata/request_short.json b/engines/pyfunc-ensembler-service/tests/testdata/request_short.json new file mode 100644 index 000000000..d63e42d79 --- /dev/null +++ b/engines/pyfunc-ensembler-service/tests/testdata/request_short.json @@ -0,0 +1,30 @@ +{ + "request": { + "feature_0": 0.12, + "feature_1": "a good feature" + }, + "response": { + "route_responses": [ + { + "route": "control", + "data": { + "predictions": 213 + }, + "is_default": true + }, + { + "route": "route_1", + "data": { + "predictions": "a bad prediction" + }, + "is_default": false + } + ], + "experiment": { + "configuration": { + "name": "configuration_0" + }, + "error": "" + } + } +} \ No newline at end of file diff --git a/sdk/turing/ensembler.py b/sdk/turing/ensembler.py index 708cd057f..f452b04dd 100644 --- a/sdk/turing/ensembler.py +++ b/sdk/turing/ensembler.py @@ -55,21 +55,40 @@ def initialize(self, artifacts: dict): """ pass - def predict(self, context, model_input: pandas.DataFrame) -> \ - Union[numpy.ndarray, pandas.Series, pandas.DataFrame]: + def predict(self, context, model_input: Union[pandas.DataFrame, Dict[str, Any]]) -> \ + Union[numpy.ndarray, pandas.Series, pandas.DataFrame, Any]: + if isinstance(model_input, pandas.DataFrame): # method called from a pyfunc ensembler job (batch ensembling) + return self._ensemble_batch(model_input) + elif isinstance(model_input, dict): # method called from a pyfunc ensembler service (real-time ensembling) + return self._ensemble_request(model_input) + + def _ensemble_batch(self, model_input: pandas.DataFrame) -> Union[numpy.ndarray, pandas.Series, pandas.DataFrame]: + """ + Helper function to ensemble batches; works only on DataFrame arguments and has to output DataFrame objects in + order to fulfil the mlflow.pyfunc.spark_udf requirements that gets called in the pyfunc ensembler job engine + """ prediction_columns = PyFunc._get_columns_with_prefix(model_input, PyFunc.PREDICTION_COLUMN_PREFIX) - treatment_config_columns = PyFunc._get_columns_with_prefix(model_input, PyFunc.TREATMENT_CONFIG_COLUMN_PREFIX) return model_input \ .rename(columns=prediction_columns) \ - .rename(columns=treatment_config_columns) \ .apply(lambda row: self.ensemble( - features=row.drop(prediction_columns.values()).drop(treatment_config_columns.values()), + features=row.drop(prediction_columns.values()), predictions=row[prediction_columns.values()], - treatment_config=row[treatment_config_columns.values()] + treatment_config=None ), axis=1, result_type='expand') + def _ensemble_request(self, model_input: Dict[str, Any]) -> Any: + """ + Helper function to ensemble single requests; works on dictionary input in a single request made to the pyfunc + ensembler service (run by the pyfunc ensembler service engine) + """ + return self.ensemble( + features=model_input['request'], + predictions=model_input['response']['route_responses'], + treatment_config=model_input['response']['experiment'] + ) + @staticmethod def _get_columns_with_prefix(df: pandas.DataFrame, prefix: str): selected_columns = {