Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix workers #262

Merged
merged 2 commits into from
Sep 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ prod-dotenv-file: guard-VAULT_ADDR guard-VAULT_TOKEN vault-installed
@ echo XML_PROCESSOR_PATH=${XML_PROCESSOR_PATH} >> .env
@ echo AIRFLOW_INFRA_FOLDER=~/airflow-infra/prod >> .env
@ echo AIRFLOW_WORKER_HOSTNAME=${HOSTNAME} >> .env
@ echo AIRFLOW_CELERY_WORKER_CONCURRENCY=32 >> .env
@ vault kv get -format="json" ted-prod/airflow | jq -r ".data.data | keys[] as \$$k | \"\(\$$k)=\(.[\$$k])\"" >> .env
@ vault kv get -format="json" ted-prod/mongo-db | jq -r ".data.data | keys[] as \$$k | \"\(\$$k)=\(.[\$$k])\"" >> .env
@ vault kv get -format="json" ted-prod/metabase | jq -r ".data.data | keys[] as \$$k | \"\(\$$k)=\(.[\$$k])\"" >> .env
Expand Down
4 changes: 2 additions & 2 deletions dags/fetch_notices_per_day_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@


@dag(default_args=DEFAULT_DAG_ARGUMENTS,
max_active_runs=64,
max_active_tasks=64,
max_active_runs=128,
max_active_tasks=128,
schedule_interval=None,
tags=['worker', 'fetch_notices_per_day'])
def fetch_notices_per_day_worker():
Expand Down
2 changes: 1 addition & 1 deletion infra/airflow-cluster/docker-compose-worker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ x-airflow-common:
AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG: 256
AIRFLOW__CORE__NON_POOLED_TASK_SLOT_COUNT: 256
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 24
AIRFLOW__CELERY__WORKER_CONCURRENCY: 64
AIRFLOW__CELERY__WORKER_CONCURRENCY: ${AIRFLOW_CELERY_WORKER_CONCURRENCY}
AIRFLOW__CORE__SQL_ALCHEMY_POOL_SIZE: 512
AIRFLOW__CORE__SQL_ALCHEMY_MAX_OVERFLOW: 1024
AIRFLOW__CORE__SQL_ALCHEMY_CONN: "postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_USER}@postgres.${SUBDOMAIN}${DOMAIN}/airflow"
Expand Down
2 changes: 1 addition & 1 deletion infra/airflow-cluster/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ x-airflow-common:
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 8
AIRFLOW__SCHEDULER__SCHEDULER_HEARTBEAT_SEC: 1
AIRFLOW__SCHEDULER__MAX_DAGRUNS_PER_LOOP_TO_SCHEDULE: 128
AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
AIRFLOW__CELERY__WORKER_CONCURRENCY: 16
AIRFLOW__CORE__SQL_ALCHEMY_POOL_SIZE: 512
AIRFLOW__CORE__SQL_ALCHEMY_MAX_OVERFLOW: 1024
AIRFLOW__CORE__SQL_ALCHEMY_CONN: "postgresql+psycopg2://${POSTGRES_USER}:${POSTGRES_USER}@postgres/airflow"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,14 @@ def normalise_form_number(cls, value: str) -> str:
:param value:
:return:
"""
form_number_parts = re.split(r"(?=\d)", value, 1)
if len(form_number_parts) == 2:
text_part: str = form_number_parts[0] if form_number_parts[0] else "F"
number_part: str = form_number_parts[1]
if text_part.isalpha() and number_part.isdecimal():
number_part = "0" + number_part if number_part and len(number_part) < 2 else number_part
return text_part + number_part
if value:
form_number_parts = re.split(r"(?=\d)", value, 1)
if len(form_number_parts) == 2:
text_part: str = form_number_parts[0] if form_number_parts[0] else "F"
number_part: str = form_number_parts[1]
if text_part.isalpha() and number_part.isdecimal():
number_part = "0" + number_part if number_part and len(number_part) < 2 else number_part
return text_part + number_part
return value

@classmethod
Expand Down