Skip to content

Commit

Permalink
Merge pull request #2409 from chaoss/add-config-options
Browse files Browse the repository at this point in the history
Add config options for memory usage and frequency of refresh materialized views
  • Loading branch information
sgoggins authored May 17, 2023
2 parents 49645a2 + 1491172 commit 0a5b68f
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 5 deletions.
7 changes: 4 additions & 3 deletions augur/application/cli/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def start(disable_collection, development, port):
if not port:
port = config.get_value("Server", "port")

worker_vmem_cap = config.get_value("Celery", 'worker_process_vmem_cap')

gunicorn_command = f"gunicorn -c {gunicorn_location} -b {host}:{port} augur.api.server:app"
server = subprocess.Popen(gunicorn_command.split(" "))
Expand All @@ -91,7 +92,7 @@ def start(disable_collection, development, port):
logger.info("Deleting old task schedule")
os.remove("celerybeat-schedule.db")

processes = start_celery_worker_processes()
processes = start_celery_worker_processes(float(worker_vmem_cap))

time.sleep(5)

Expand Down Expand Up @@ -140,13 +141,13 @@ def start(disable_collection, development, port):
except RedisConnectionError:
pass

def start_celery_worker_processes():
def start_celery_worker_processes(vmem_cap_ratio):

#Calculate process scaling based on how much memory is available on the system in bytes.
#Each celery process takes ~500MB or 500 * 1024^2 bytes

#Cap memory usage to 30% of total virtual memory
available_memory_in_bytes = psutil.virtual_memory().total * .25
available_memory_in_bytes = psutil.virtual_memory().total * vmem_cap_ratio
available_memory_in_megabytes = available_memory_in_bytes / (1024 ** 2)
max_process_estimate = available_memory_in_megabytes // 500

Expand Down
3 changes: 2 additions & 1 deletion augur/application/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def get_development_flag():
"log_level": "INFO",
},
"Celery": {
"concurrency": 12
"worker_process_vmem_cap": 0.25,
"refresh_materialized_views_interval_in_days": 7
},
"Redis": {
"cache_group": 0,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""Add extra celery options to the config if they do not exist
Revision ID: 19
Revises: 18
Create Date: 2023-05-15 12:03:57.171011
"""
from alembic import op
import sqlalchemy as sa
from augur.application.db.session import DatabaseSession
from augur.application.config import *
from sqlalchemy.sql import text
import logging

# revision identifiers, used by Alembic.
revision = '19'
down_revision = '18'
branch_labels = None
depends_on = None

logger = logging.getLogger(__name__)

def upgrade():

with DatabaseSession(logger) as session:
config = AugurConfig(logger,session)
config_dict = config.load_config()

#Update the missing fields of the celery section in the config
section = config_dict.get("Celery")

#Just copy the default if section doesn't exist.
if section:
if 'worker_process_vmem_cap' not in section.keys():
section['worker_process_vmem_cap'] = 0.25

if 'refresh_materialized_views_interval_in_days' not in section.keys():
section['refresh_materialized_views_interval_in_days'] = 7
else:
section = config.default_config["Celery"]

config.add_section_from_json("Celery", section)

#delete old setting
session.execute_sql(text(f"""
DELETE FROM augur_operations.config
WHERE section_name='Celery' AND setting_name='concurrency';
"""))



def downgrade():

conn = op.get_bind()

conn.execute(text(f"""
DELETE FROM augur_operations.config
WHERE section_name='Celery' AND (setting_name='worker_process_vmem_cap' OR setting_name='refresh_materialized_views_interval_in_days');
"""))

try:
conn.execute(text(f"""
INSERT INTO augur_operations.config (section_name,setting_name,value,type) VALUES ('Celery','concurrency',12,'int');
"""))
except:
pass
3 changes: 2 additions & 1 deletion augur/tasks/init/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,9 @@ def setup_periodic_tasks(sender, **kwargs):
logger.info(f"Scheduling non-repo-domain collection every {non_domain_collection_interval/60} minutes")
sender.add_periodic_task(non_domain_collection_interval, non_repo_domain_tasks.s())

mat_views_interval = int(config.get_value('Celery', 'refresh_materialized_views_interval_in_days'))
logger.info(f"Scheduling refresh materialized view every night at 1am CDT")
sender.add_periodic_task(datetime.timedelta(days=7), refresh_materialized_views.s())
sender.add_periodic_task(datetime.timedelta(days=mat_views_interval), refresh_materialized_views.s())

logger.info(f"Scheduling update of collection weights on midnight each day")
sender.add_periodic_task(crontab(hour=0, minute=0),augur_collection_update_weights.s())
Expand Down

0 comments on commit 0a5b68f

Please sign in to comment.