Skip to content

Commit

Permalink
python code for chap 9.3 and learn mock test
Browse files Browse the repository at this point in the history
  • Loading branch information
ssupecial committed Oct 13, 2024
1 parent 46887f5 commit e166f0a
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 1 deletion.
42 changes: 42 additions & 0 deletions airflow/chap9/dags/unit_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import Any
from collections import defaultdict, Counter
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context
from airflow_movielens.hooks import MovielensHook

class MovielensPopularityOperator(BaseOperator):
def __init__(self, conn_id, start_date, end_date, min_ratings=4, top_n=5, **kwargs):
super().__init__(**kwargs)
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date
self._min_ratings = min_ratings
self._top_n = top_n

def execute(self, context: Context) -> Any:
with MovielensHook(self._conn_id) as hook:
ratings = hook.get_ratings(
start_date = self._start_date,
end_date = self._end_date
)

rating_sums = defaultdict(Counter)
for rating in ratings:
rating_sums[rating["movieId"]].update(
count=1,
rating=rating["rating"]
)
averages = {
movie_id: (
rating_counter["rating"] / rating_counter["counter"],
rating_counter["counter"]
)
for movie_id, rating_counter in rating_sums.items()
if rating_counter["count"] >= self._min_ratings
}

return sorted(
averages.items(),
key=lambda x:x[1],
reverse=True
)[:self._top_n]
2 changes: 1 addition & 1 deletion airflow/chap9/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ x-airflow-common:
AIRFLOW_CONN_MY_POSTGRES: postgresql://airflow:airflow@wiki_results:5432/airflow
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: "true"
AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
_PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-postgres apache-airflow-providers-docker pytest
_PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-postgres apache-airflow-providers-docker pytest pytest-mock
volumes:
&airflow-volumes
- ${AIRFLOW_PROJ_DIR:-.}/dags/:/opt/airflow/dags
Expand Down
14 changes: 14 additions & 0 deletions airflow/chap9/tests/dags/test_bashoperator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from airflow.operators.bash import BashOperator

def test_example():
task = BashOperator(
task_id="test_bash",
bash_command="echo 'hello!'",
do_xcom_push=True
)

result = task.execute(context={})
assert result == 'hello!'

if __name__ == "__main__":
test_example()
54 changes: 54 additions & 0 deletions airflow/chap9/tests/dags/test_customoperator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from typing import Any
from collections import defaultdict, Counter
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context
from airflow_movielens.hooks import MovielensHook

class MovielensPopularityOperator(BaseOperator):
def __init__(self, conn_id, start_date, end_date, min_ratings=4, top_n=5, **kwargs):
super().__init__(**kwargs)
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date
self._min_ratings = min_ratings
self._top_n = top_n

def execute(self, context: Context) -> Any:
hook = MovielensHook(self._conn_id)
# with MovielensHook(self._conn_id) as hook:
ratings = hook.get_ratings(
start_date = self._start_date,
end_date = self._end_date
)

rating_sums = defaultdict(Counter)
for rating in ratings:
rating_sums[rating["movieId"]].update(
count=1,
rating=rating["rating"]
)
averages = {
movie_id: (
rating_counter["rating"] / rating_counter["count"],
rating_counter["count"]
)
for movie_id, rating_counter in rating_sums.items()
if rating_counter["count"] >= self._min_ratings
}

return sorted(
averages.items(),
key=lambda x:x[1],
reverse=True
)[:self._top_n]

def test_movielenspopularityoperator():
task = MovielensPopularityOperator(
task_id="test_movielenspopularityoperator",
conn_id='movielens',
start_date='2019-01-01',
end_date='2019-01-03',
top_n=5
)
result = task.execute(context={})
assert len(result) == 5
70 changes: 70 additions & 0 deletions airflow/chap9/tests/dags/test_mock.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
from pytest_mock import mocker
from airflow.models import Connection
from typing import Any
from collections import defaultdict, Counter
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context
from airflow_movielens.hooks import MovielensHook

class MovielensPopularityOperator(BaseOperator):
def __init__(self, conn_id, start_date, end_date, min_ratings=4, top_n=5, **kwargs):
super().__init__(**kwargs)
self._conn_id = conn_id
self._start_date = start_date
self._end_date = end_date
self._min_ratings = min_ratings
self._top_n = top_n

def execute(self, context: Context) -> Any:
hook = MovielensHook(self._conn_id)
# with MovielensHook(self._conn_id) as hook:
ratings = hook.get_ratings(
start_date = self._start_date,
end_date = self._end_date
)

rating_sums = defaultdict(Counter)
for rating in ratings:
rating_sums[rating["movieId"]].update(
count=1,
rating=rating["rating"]
)
averages = {
movie_id: (
rating_counter["rating"] / rating_counter["count"],
rating_counter["count"]
)
for movie_id, rating_counter in rating_sums.items()
if rating_counter["count"] >= self._min_ratings
}

return sorted(
averages.items(),
key=lambda x:x[1],
reverse=True
)[:self._top_n]

def test_movielenspopularity(mocker):
mock_get = mocker.patch.object(
MovielensHook,
"get_connection",
return_value=Connection(
conn_id="test",
login="airflow",
password="airflow",
schema="http"
)
)

task = MovielensPopularityOperator(
task_id="test_id",
conn_id="testconn",
start_date="2015-01-01",
end_date="2015-01-03",
top_n=5
)

result = task.execute(context=None)
assert len(result) == 5
assert mock_get.call_count == 1
mock_get.assert_called_with("testconn")

0 comments on commit e166f0a

Please sign in to comment.