Skip to content
This repository has been archived by the owner on Aug 4, 2023. It is now read-only.

Commit

Permalink
Add support for other media types to popularity calculations (#112)
Browse files Browse the repository at this point in the history
* Add support for other media types to popularity calculations

Signed-off-by: Olga Bulat <obulat@gmail.com>

* Fix linting errors

Signed-off-by: Olga Bulat <obulat@gmail.com>

* Fix linting errors

Signed-off-by: Olga Bulat <obulat@gmail.com>

* Fix linting errors

Signed-off-by: Olga Bulat <obulat@gmail.com>

* Add audio popularity view sql

Signed-off-by: Olga Bulat <obulat@gmail.com>

* Add audio popularity view sql to Dockerfile

Signed-off-by: Olga Bulat <obulat@gmail.com>

* Finish renaming functions

Signed-off-by: Olga Bulat <obulat@gmail.com>

* Add two more audio popularity dags

Signed-off-by: Olga Bulat <obulat@gmail.com>
  • Loading branch information
obulat committed Jul 7, 2021
1 parent 073c121 commit 6261176
Show file tree
Hide file tree
Showing 18 changed files with 595 additions and 140 deletions.
100 changes: 100 additions & 0 deletions src/cc_catalog_airflow/dags/recreate_audio_popularity_calculation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""
This file defines an Apache Airflow DAG that completely wipes out the
PostgreSQL relations and functions involved in calculating our
standardized popularity metric. It then recreates relations and
functions to make the calculation, and performs an initial calculation.
The results are available in the `image_view` materialized view.
This should only be run when new SQL code is deployed for the calculation.
"""
from datetime import datetime, timedelta
import logging
import os

from airflow import DAG

from util.popularity import operators
from util.operator_util import get_log_operator


logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s: %(message)s',
level=logging.INFO
)

logger = logging.getLogger(__name__)

DAG_ID = 'recreate_audio_popularity_calculation'
DB_CONN_ID = os.getenv('OPENLEDGER_CONN_ID', 'postgres_openledger_testing')
CONCURRENCY = 1
SCHEDULE_CRON = None

DAG_DEFAULT_ARGS = {
'owner': 'data-eng-admin',
'depends_on_past': False,
'start_date': datetime(2020, 6, 15),
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(seconds=3600),
}


def create_dag(
dag_id=DAG_ID,
args=DAG_DEFAULT_ARGS,
concurrency=CONCURRENCY,
max_active_runs=CONCURRENCY,
schedule_cron=SCHEDULE_CRON,
postgres_conn_id=DB_CONN_ID,
):
dag = DAG(
dag_id=dag_id,
default_args=args,
concurrency=concurrency,
max_active_runs=max_active_runs,
schedule_interval=schedule_cron,
catchup=False
)
with dag:
start_task = get_log_operator(dag, DAG_ID, 'Starting')
drop_relations = operators.drop_media_popularity_relations(
dag, postgres_conn_id, 'audio',
)
drop_functions = operators.drop_media_popularity_functions(
dag, postgres_conn_id, 'audio',
)
create_metrics = operators.create_media_popularity_metrics(
dag, postgres_conn_id, 'audio',
)
update_metrics = operators.update_media_popularity_metrics(
dag, postgres_conn_id, 'audio',
)
create_percentile = operators.create_media_popularity_percentile(
dag, postgres_conn_id, 'audio',
)
create_constants = operators.create_media_popularity_constants(
dag, postgres_conn_id, 'audio',
)
create_popularity = operators.create_media_standardized_popularity(
dag, postgres_conn_id, 'audio',
)
create_db_view = operators.create_db_view(
dag, postgres_conn_id, 'audio',
)
end_task = get_log_operator(dag, DAG_ID, 'Finished')

(
start_task
>> [drop_relations, drop_functions]
>> create_metrics
>> [update_metrics, create_percentile]
>> create_constants
>> create_popularity
>> create_db_view
>> end_task
)

return dag


globals()[DAG_ID] = create_dag()
Original file line number Diff line number Diff line change
Expand Up @@ -57,28 +57,28 @@ def create_dag(
)
with dag:
start_task = get_log_operator(dag, DAG_ID, 'Starting')
drop_relations = operators.drop_image_popularity_relations(
drop_relations = operators.drop_media_popularity_relations(
dag, postgres_conn_id,
)
drop_functions = operators.drop_image_popularity_functions(
drop_functions = operators.drop_media_popularity_functions(
dag, postgres_conn_id,
)
create_metrics = operators.create_image_popularity_metrics(
create_metrics = operators.create_media_popularity_metrics(
dag, postgres_conn_id
)
update_metrics = operators.update_image_popularity_metrics(
update_metrics = operators.update_media_popularity_metrics(
dag, postgres_conn_id
)
create_percentile = operators.create_image_popularity_percentile(
create_percentile = operators.create_media_popularity_percentile(
dag, postgres_conn_id
)
create_constants = operators.create_image_popularity_constants(
create_constants = operators.create_media_popularity_constants(
dag, postgres_conn_id
)
create_popularity = operators.create_image_standardized_popularity(
create_popularity = operators.create_media_standardized_popularity(
dag, postgres_conn_id
)
create_image_view = operators.create_image_view(
create_image_view = operators.create_db_view(
dag, postgres_conn_id
)
end_task = get_log_operator(dag, DAG_ID, 'Finished')
Expand Down
81 changes: 81 additions & 0 deletions src/cc_catalog_airflow/dags/refresh_all_audio_popularity_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
"""
This file defines an Apache Airflow DAG that completely recalculates all
popularity data, including the percentile values, and also adding any
new popularity metrics.
This should be run at least once every 6 months, or whenever a new
popularity metric is added.
"""
from datetime import datetime, timedelta
import logging
import os

from airflow import DAG

from util.popularity import operators
from util.operator_util import get_log_operator


logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s: %(message)s',
level=logging.INFO
)

logger = logging.getLogger(__name__)

DAG_ID = 'refresh_all_audio_popularity_data'
DB_CONN_ID = os.getenv('OPENLEDGER_CONN_ID', 'postgres_openledger_testing')
CONCURRENCY = 1
SCHEDULE_CRON = '@monthly'

DAG_DEFAULT_ARGS = {
'owner': 'data-eng-admin',
'depends_on_past': False,
'start_date': datetime(2020, 6, 15),
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(seconds=3600),
}


def create_dag(
dag_id=DAG_ID,
args=DAG_DEFAULT_ARGS,
concurrency=CONCURRENCY,
max_active_runs=CONCURRENCY,
schedule_cron=SCHEDULE_CRON,
postgres_conn_id=DB_CONN_ID,
):
dag = DAG(
dag_id=dag_id,
default_args=args,
concurrency=concurrency,
max_active_runs=max_active_runs,
schedule_interval=schedule_cron,
catchup=False
)
with dag:
start_task = get_log_operator(dag, DAG_ID, 'Starting')
update_metrics = operators.update_media_popularity_metrics(
dag, postgres_conn_id, media_type='audio',
)
update_constants = operators.update_media_popularity_constants(
dag, postgres_conn_id, media_type='audio',
)
update_image_view = operators.update_db_view(
dag, postgres_conn_id, media_type='audio',
)
end_task = get_log_operator(dag, DAG_ID, 'Finished')

(
start_task
>> update_metrics
>> update_constants
>> update_image_view
>> end_task
)

return dag


globals()[DAG_ID] = create_dag()
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,13 @@ def create_dag(
)
with dag:
start_task = get_log_operator(dag, DAG_ID, 'Starting')
update_metrics = operators.update_image_popularity_metrics(
update_metrics = operators.update_media_popularity_metrics(
dag, postgres_conn_id
)
update_constants = operators.update_image_popularity_constants(
update_constants = operators.update_media_popularity_constants(
dag, postgres_conn_id
)
update_image_view = operators.update_image_view(
update_image_view = operators.update_db_view(
dag, postgres_conn_id
)
end_task = get_log_operator(dag, DAG_ID, 'Finished')
Expand Down
72 changes: 72 additions & 0 deletions src/cc_catalog_airflow/dags/refresh_audio_view_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""
This file defines an Apache Airflow DAG that refreshes the data in
image_view, but not the underlying tables. This means the only effect
of this DAG is to add or update data (including popularity data) for
images which have been ingested since the last time the view was
refreshed.
This should be run once per day.
"""
from datetime import datetime, timedelta
import logging
import os

from airflow import DAG

from util.popularity import operators
from util.operator_util import get_log_operator


logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s: %(message)s',
level=logging.INFO
)

logger = logging.getLogger(__name__)

DAG_ID = 'refresh_audio_view_data'
DB_CONN_ID = os.getenv('OPENLEDGER_CONN_ID', 'postgres_openledger_testing')
CONCURRENCY = 1
# We don't run on the first of the month, since the
# `refresh_all_audio_popularity_data` DAG should run on that day.
SCHEDULE_CRON = '0 0 2-31 * *'

DAG_DEFAULT_ARGS = {
'owner': 'data-eng-admin',
'depends_on_past': False,
'start_date': datetime(2020, 6, 15),
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(seconds=3600),
}


def create_dag(
dag_id=DAG_ID,
args=DAG_DEFAULT_ARGS,
concurrency=CONCURRENCY,
max_active_runs=CONCURRENCY,
schedule_cron=SCHEDULE_CRON,
postgres_conn_id=DB_CONN_ID,
):
dag = DAG(
dag_id=dag_id,
default_args=args,
concurrency=concurrency,
max_active_runs=max_active_runs,
schedule_interval=schedule_cron,
catchup=False
)
with dag:
start_task = get_log_operator(dag, DAG_ID, 'Starting')
update_audio_view = operators.update_db_view(
dag, postgres_conn_id, media_type='audio'
)
end_task = get_log_operator(dag, DAG_ID, 'Finished')

start_task >> update_audio_view >> end_task

return dag


globals()[DAG_ID] = create_dag()
2 changes: 1 addition & 1 deletion src/cc_catalog_airflow/dags/refresh_image_view_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def create_dag(
)
with dag:
start_task = get_log_operator(dag, DAG_ID, 'Starting')
update_image_view = operators.update_image_view(
update_image_view = operators.update_db_view(
dag, postgres_conn_id
)
end_task = get_log_operator(dag, DAG_ID, 'Finished')
Expand Down
Loading

0 comments on commit 6261176

Please sign in to comment.