diff --git a/src/prefect/workers/base.py b/src/prefect/workers/base.py index 7220c2e83d23..097352621119 100644 --- a/src/prefect/workers/base.py +++ b/src/prefect/workers/base.py @@ -53,6 +53,7 @@ Pending, exception_to_failed_state, ) +from prefect.types import KeyValueLabels from prefect.utilities.dispatch import get_registry_for_type, register_base_type from prefect.utilities.engine import propose_state from prefect.utilities.services import critical_service_loop @@ -1222,13 +1223,20 @@ async def _give_worker_labels_to_flow_run(self, flow_run_id: UUID): Give this worker's identifying labels to the specified flow run. """ if self._cloud_client: - await self._cloud_client.update_flow_run_labels( - flow_run_id, - { - "prefect.worker.name": self.name, - "prefect.worker.type": self.type, - }, - ) + labels: KeyValueLabels = { + "prefect.worker.name": self.name, + "prefect.worker.type": self.type, + } + + if self._work_pool: + labels.update( + { + "prefect.work-pool.name": self._work_pool.name, + "prefect.work-pool.id": str(self._work_pool.id), + } + ) + + await self._cloud_client.update_flow_run_labels(flow_run_id, labels) async def __aenter__(self): self._logger.debug("Entering worker context...") diff --git a/tests/workers/test_base_worker.py b/tests/workers/test_base_worker.py index dc870a4a07b6..257d61088954 100644 --- a/tests/workers/test_base_worker.py +++ b/tests/workers/test_base_worker.py @@ -2109,7 +2109,12 @@ def create_run_with_deployment(state): CloudClientMock.update_flow_run_labels.assert_awaited_once_with( flow_run.id, - {"prefect.worker.name": worker.name, "prefect.worker.type": worker.type}, + { + "prefect.worker.name": worker.name, + "prefect.worker.type": worker.type, + "prefect.work-pool.name": work_pool.name, + "prefect.work-pool.id": str(work_pool.id), + }, )