Skip to content

Commit

Permalink
AIP-84 Get Pool (apache#43221)
Browse files Browse the repository at this point in the history
  • Loading branch information
pierrejeambrun authored and PaulKobow7536 committed Oct 24, 2024
1 parent e33afd1 commit f70c9d0
Show file tree
Hide file tree
Showing 12 changed files with 413 additions and 1 deletion.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/pool_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def delete_pool(*, pool_name: str, session: Session = NEW_SESSION) -> APIRespons
return Response(status=HTTPStatus.NO_CONTENT)


@mark_fastapi_migration_done
@security.requires_access_pool("GET")
@provide_session
def get_pool(*, pool_name: str, session: Session = NEW_SESSION) -> APIResponse:
Expand Down
92 changes: 92 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1119,6 +1119,50 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
get:
tags:
- Pool
summary: Get Pool
description: Get a pool.
operationId: get_pool
parameters:
- name: pool_name
in: path
required: true
schema:
type: string
title: Pool Name
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/PoolResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/providers/:
get:
tags:
Expand Down Expand Up @@ -1916,6 +1960,54 @@ components:
- task_instance_states
title: HistoricalMetricDataResponse
description: Historical Metric Data serializer for responses.
PoolResponse:
properties:
name:
type: string
title: Name
slots:
type: integer
title: Slots
description:
anyOf:
- type: string
- type: 'null'
title: Description
include_deferred:
type: boolean
title: Include Deferred
occupied_slots:
type: integer
title: Occupied Slots
running_slots:
type: integer
title: Running Slots
queued_slots:
type: integer
title: Queued Slots
scheduled_slots:
type: integer
title: Scheduled Slots
open_slots:
type: integer
title: Open Slots
deferred_slots:
type: integer
title: Deferred Slots
type: object
required:
- name
- slots
- description
- include_deferred
- occupied_slots
- running_slots
- queued_slots
- scheduled_slots
- open_slots
- deferred_slots
title: PoolResponse
description: Pool serializer for responses.
ProviderCollectionResponse:
properties:
providers:
Expand Down
19 changes: 18 additions & 1 deletion airflow/api_fastapi/core_api/routes/public/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,14 @@
from __future__ import annotations

from fastapi import Depends, HTTPException
from sqlalchemy import delete
from sqlalchemy import delete, select
from sqlalchemy.orm import Session
from typing_extensions import Annotated

from airflow.api_fastapi.common.db.common import get_session
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.serializers.pools import PoolResponse
from airflow.models.pool import Pool

pools_router = AirflowRouter(tags=["Pool"], prefix="/pools")
Expand All @@ -46,3 +47,19 @@ async def delete_pool(

if affected_count == 0:
raise HTTPException(404, f"The Pool with name: `{pool_name}` was not found")


@pools_router.get(
"/{pool_name}",
responses=create_openapi_http_exception_doc([401, 403, 404]),
)
async def get_pool(
pool_name: str,
session: Annotated[Session, Depends(get_session)],
) -> PoolResponse:
"""Get a pool."""
pool = session.scalar(select(Pool).where(Pool.pool == pool_name))
if pool is None:
raise HTTPException(404, f"The Pool with name: `{pool_name}` was not found")

return PoolResponse.model_validate(pool, from_attributes=True)
47 changes: 47 additions & 0 deletions airflow/api_fastapi/core_api/serializers/pools.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from __future__ import annotations

from typing import Annotated, Callable

from pydantic import BaseModel, BeforeValidator, Field


def _call_function(function: Callable[[], int]) -> int:
"""
Call the given function.
Used for the BeforeValidator to get the actual values from the bound method.
"""
return function()


class PoolResponse(BaseModel):
"""Pool serializer for responses."""

pool: str = Field(serialization_alias="name")
slots: int
description: str | None
include_deferred: bool

occupied_slots: Annotated[int, BeforeValidator(_call_function)]
running_slots: Annotated[int, BeforeValidator(_call_function)]
queued_slots: Annotated[int, BeforeValidator(_call_function)]
scheduled_slots: Annotated[int, BeforeValidator(_call_function)]
open_slots: Annotated[int, BeforeValidator(_call_function)]
deferred_slots: Annotated[int, BeforeValidator(_call_function)]
16 changes: 16 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,22 @@ export const UseMonitorServiceGetHealthKeyFn = (queryKey?: Array<unknown>) => [
useMonitorServiceGetHealthKey,
...(queryKey ?? []),
];
export type PoolServiceGetPoolDefaultResponse = Awaited<
ReturnType<typeof PoolService.getPool>
>;
export type PoolServiceGetPoolQueryResult<
TData = PoolServiceGetPoolDefaultResponse,
TError = unknown,
> = UseQueryResult<TData, TError>;
export const usePoolServiceGetPoolKey = "PoolServiceGetPool";
export const UsePoolServiceGetPoolKeyFn = (
{
poolName,
}: {
poolName: string;
},
queryKey?: Array<unknown>,
) => [usePoolServiceGetPoolKey, ...(queryKey ?? [{ poolName }])];
export type ProviderServiceGetProvidersDefaultResponse = Awaited<
ReturnType<typeof ProviderService.getProviders>
>;
Expand Down
21 changes: 21 additions & 0 deletions airflow/ui/openapi-gen/queries/prefetch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
DagService,
DashboardService,
MonitorService,
PoolService,
ProviderService,
VariableService,
} from "../requests/services.gen";
Expand Down Expand Up @@ -336,6 +337,26 @@ export const prefetchUseMonitorServiceGetHealth = (queryClient: QueryClient) =>
queryKey: Common.UseMonitorServiceGetHealthKeyFn(),
queryFn: () => MonitorService.getHealth(),
});
/**
* Get Pool
* Get a pool.
* @param data The data for the request.
* @param data.poolName
* @returns PoolResponse Successful Response
* @throws ApiError
*/
export const prefetchUsePoolServiceGetPool = (
queryClient: QueryClient,
{
poolName,
}: {
poolName: string;
},
) =>
queryClient.prefetchQuery({
queryKey: Common.UsePoolServiceGetPoolKeyFn({ poolName }),
queryFn: () => PoolService.getPool({ poolName }),
});
/**
* Get Providers
* Get providers.
Expand Down
26 changes: 26 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,32 @@ export const useMonitorServiceGetHealth = <
queryFn: () => MonitorService.getHealth() as TData,
...options,
});
/**
* Get Pool
* Get a pool.
* @param data The data for the request.
* @param data.poolName
* @returns PoolResponse Successful Response
* @throws ApiError
*/
export const usePoolServiceGetPool = <
TData = Common.PoolServiceGetPoolDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
poolName,
}: {
poolName: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useQuery<TData, TError>({
queryKey: Common.UsePoolServiceGetPoolKeyFn({ poolName }, queryKey),
queryFn: () => PoolService.getPool({ poolName }) as TData,
...options,
});
/**
* Get Providers
* Get providers.
Expand Down
27 changes: 27 additions & 0 deletions airflow/ui/openapi-gen/queries/suspense.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
DagService,
DashboardService,
MonitorService,
PoolService,
ProviderService,
VariableService,
} from "../requests/services.gen";
Expand Down Expand Up @@ -426,6 +427,32 @@ export const useMonitorServiceGetHealthSuspense = <
queryFn: () => MonitorService.getHealth() as TData,
...options,
});
/**
* Get Pool
* Get a pool.
* @param data The data for the request.
* @param data.poolName
* @returns PoolResponse Successful Response
* @throws ApiError
*/
export const usePoolServiceGetPoolSuspense = <
TData = Common.PoolServiceGetPoolDefaultResponse,
TError = unknown,
TQueryKey extends Array<unknown> = unknown[],
>(
{
poolName,
}: {
poolName: string;
},
queryKey?: TQueryKey,
options?: Omit<UseQueryOptions<TData, TError>, "queryKey" | "queryFn">,
) =>
useSuspenseQuery<TData, TError>({
queryKey: Common.UsePoolServiceGetPoolKeyFn({ poolName }, queryKey),
queryFn: () => PoolService.getPool({ poolName }) as TData,
...options,
});
/**
* Get Providers
* Get providers.
Expand Down
Loading

0 comments on commit f70c9d0

Please sign in to comment.