Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add engine unit test #25

Merged
merged 5 commits into from
Jun 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ translations/*.qm
youtube-analyzer.build
youtube-analyzer.dist
Output
.coverage
3 changes: 3 additions & 0 deletions coverage.bat
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
.venv\Scripts\coverage.exe run -m unittest discover
.venv\Scripts\coverage.exe html -i
htmlcov\index.html
127 changes: 73 additions & 54 deletions engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
QNetworkReply
)
from youtubesearchpython import (
VideosSearch,
VideosSearch,
Channel,
Video
)
Expand All @@ -28,7 +28,7 @@
)


def view_count_to_int(count_str):
def view_count_to_int(count_str: str):
if not count_str:
return 0
parts = count_str.split()
Expand All @@ -38,7 +38,7 @@ def view_count_to_int(count_str):
return int(processed_count) if processed_count.isdigit() else 0


def subcriber_count_to_int(count_str):
def subcriber_count_to_int(count_str: str):
if not count_str:
return 0
parts = count_str.split()
Expand All @@ -60,16 +60,16 @@ def subcriber_count_to_int(count_str):


def timedelta_to_str(td: timedelta):
if td is None:
return ""
mm, ss = divmod(td.seconds, 60)
hh, mm = divmod(mm, 60)
s = "%02d:%02d:%02d" % (hh, mm, ss)
if td.days:
s = ("%d:" % td.days) + s
if td.microseconds:
s = s + ".%06d" % td.microseconds
return s
if td is None:
return ""
mm, ss = divmod(td.seconds, 60)
hh, mm = divmod(mm, 60)
s = "%02d:%02d:%02d" % (hh, mm, ss)
if td.days:
s = ("%d:" % td.days) + s
if td.microseconds:
s = s + ".%06d" % td.microseconds
return s


class ImageDownloader(QObject):
Expand Down Expand Up @@ -126,27 +126,28 @@ def __init__(self, model: ResultTableModel, request_limit: int):
def search(self, request_text: str):
try:
self.error = ""
videos_search = VideosSearch(request_text, limit = self._request_limit)
videos_search = self._create_video_searcher(request_text)
result = []
has_next_page = True
counter = 0
while has_next_page:
result_array = videos_search.result()["result"]
for video in result_array:
views = view_count_to_int(video["viewCount"]["text"])
video_info = Video.getInfo(video["id"])
channel_info = Channel.get(video["channel"]["id"])
video_info = self._get_video_info(video["id"])
channel_info = self._get_channel_info(video["channel"]["id"])
channel_views = view_count_to_int(channel_info["views"])
channel_subscribers = subcriber_count_to_int(channel_info["subscribers"]["simpleText"])
preview_link = video["thumbnails"][0]["url"] if len(video["thumbnails"]) > 0 else ""
channel_logo_link = channel_info["thumbnails"][0]["url"] if len(channel_info["thumbnails"]) > 0 else ""
video_duration_td = YoutubeGrepEngine.duration_to_timedelta(video["duration"])
video_duration = timedelta_to_str(video_duration_td) if video_duration_td is not None else video["duration"]
video_duration_d = YoutubeGrepEngine.duration_to_timedelta(video["duration"])
video_duration = timedelta_to_str(video_duration_d) if video_duration_d is not None else video["duration"]
result.append(
make_result_row(video["title"], video["publishedTime"], video_duration,
views, video["link"], channel_info["title"], channel_info["url"], channel_subscribers,
channel_views, channel_info["joinedDate"], preview_link, channel_logo_link, video_info["keywords"],
video_duration_td))
make_result_row(
video["title"], video["publishedTime"], video_duration,
views, video["link"], channel_info["title"], channel_info["url"],
channel_subscribers, channel_views, channel_info["joinedDate"], preview_link, channel_logo_link,
video_info["keywords"], video_duration_d))
counter = counter + 1
if counter == self._request_limit:
break
Expand All @@ -156,10 +157,11 @@ def search(self, request_text: str):
self._model.setData(result)
self._model.set_sort_cast(ResultFields.VideoPublishedTime, YoutubeGrepEngine.published_time_sort_cast)
return True
except Exception as _:
self.error = traceback.format_exc()
except Exception as exc:
print(exc)
self.error = traceback.format_exc()
return False

@staticmethod
def duration_to_timedelta(duration: str):
if not duration:
Expand Down Expand Up @@ -204,27 +206,25 @@ def published_time_sort_cast(published_time: str):
return None
return float(pb_timedelta.replace(":", ""))

def _create_video_searcher(self, request_text: str):
return VideosSearch(request_text, limit=self._request_limit)

def _get_video_info(self, video_id: str):
return Video.getInfo(video_id)

def _get_channel_info(self, channel_id: str):
return Channel.get(channel_id)


class YoutubeApiEngine(AbstractYoutubeEngine):
def __init__(self, model: ResultTableModel, request_limit: int, api_key: str):
super().__init__(model, request_limit)
self._api_key = api_key

def search(self, request_text: str):
api_service_name = "youtube"
api_version = "v3"

try:
youtube = googleapiclient.discovery.build(api_service_name, api_version, developerKey = self._api_key)

request = youtube.search().list(
part = "snippet",
maxResults = self._request_limit,
q = request_text,
type="video"
)

search_response = request.execute()
youtube = self._create_youtube_client()
search_response = self._search_videos(youtube, request_text)

video_ids = ""
channel_ids = ""
Expand All @@ -234,20 +234,11 @@ def search(self, request_text: str):
if channel_id in channel_ids:
continue
channel_ids = channel_ids + "," + channel_id
video_ids = video_ids[1:] # Remove first comma
channel_ids = channel_ids[1:] # Remove first comma

video_request = youtube.videos().list(
part = "contentDetails,statistics,snippet",
id = video_ids
)
video_response = video_request.execute()

channel_request = youtube.channels().list(
part="snippet,statistics",
id=channel_ids
)
channel_response = channel_request.execute()
video_ids = video_ids[1:] # Remove first comma
channel_ids = channel_ids[1:] # Remove first comma

video_response = self._get_video_details(youtube, video_ids)
channel_response = self._get_channel_details(youtube, channel_ids)
channels = {}
for channel_item in channel_response["items"]:
channels[channel_item["id"]] = channel_item
Expand All @@ -274,12 +265,12 @@ def search(self, request_text: str):
video_preview_link = search_snippet["thumbnails"]["high"]["url"]
channel_snippet = channel_item["snippet"]
channel_logo_link = channel_snippet["thumbnails"]["default"]["url"]
channel_logo_link = channel_logo_link.replace("https", "http") # https not working. I don't know why (2024.03.10)
channel_logo_link = channel_logo_link.replace("https", "http") # https is not working. I don't know why
video_snippet = video_item["snippet"]
tags = video_snippet["tags"] if "tags" in video_snippet else None
count = count + 1
result.append(
make_result_row(video_title, video_published_time, video_duration, views,
make_result_row(video_title, video_published_time, video_duration, views,
video_link, channel_title, channel_url, channel_subscribers,
channel_views, channel_joined_date, video_preview_link, channel_logo_link, tags,
video_duration_td))
Expand All @@ -289,3 +280,31 @@ def search(self, request_text: str):
except Exception as e:
self.error = str(e)
return False

def _create_youtube_client(self):
api_service_name = "youtube"
api_version = "v3"
return googleapiclient.discovery.build(api_service_name, api_version, developerKey=self._api_key)

def _search_videos(self, youtube, request_text: str):
request = youtube.search().list(
part="snippet",
maxResults=self._request_limit,
q=request_text,
type="video"
)
return request.execute()

def _get_video_details(self, youtube, video_ids):
video_request = youtube.videos().list(
part="contentDetails,statistics,snippet",
id=video_ids
)
return video_request.execute()

def _get_channel_details(self, youtube, channel_ids):
channel_request = youtube.channels().list(
part="snippet,statistics",
id=channel_ids
)
return channel_request.execute()
Loading
Loading