-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathyoutube_history.py
executable file
·455 lines (392 loc) · 16.4 KB
/
youtube_history.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Downloads, analyzes, and reports all Youtube videos associated with a user's Google account.
"""
import argparse
import json
import pickle
import re
import subprocess as sp
import sys
from collections import namedtuple
from dataclasses import dataclass
from getpass import getuser
from pathlib import Path
from webbrowser import open_new_tab
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
from emoji import emoji_list
from flask import Flask
from flask import render_template
from loguru import logger
from pandas._libs.tslibs.timestamps import Timestamp
from tqdm import tqdm
from wordcloud import WordCloud
from grapher import Grapher, flatten_without_nones
app = Flask(__name__)
@app.route("/", methods=["GET", "POST"])
def index():
return render_template("index.html", analysis=analysis)
def launch_web(analysis):
app.debug = True
app.secret_key = "this is not real"
some_data = (analysis.raw / "00001.info.json").is_file()
if some_data:
url = "http://127.0.0.1:5000"
open_new_tab(url)
app.run()
def make_fake_series(title="N/A", webpage_url="N/A", **kwargs):
params = ["title", "webpage_url"] + list(kwargs.keys())
Mock = namedtuple("MockSeries", params)
return Mock(title, webpage_url, **kwargs)
@dataclass
class Watch:
"""One instance of watching a Video
I want to separate out the concept of watching a video from the video itself.
These two things were originally conflated in the code.
The only piece of video metadata stored here is the url, which we'll use as a primary key.
"""
url: str
date: Timestamp
class TakeoutParser:
"""This class is responsible for parsing Takeout.
Specifically, it reads and extracts info from watch-history.html.
This data then gets passed on to Analysis.
Parameters
----------
takeout : str
Path to an unzipped Takeout folder downloaded from https://takeout.google.com/
"""
def __init__(self, takeout: str):
self.takeout = Path(takeout).expanduser()
self.watches, self.ad_count = self.get_views_and_ads(self.get_soup())
def get_soup(self):
watch_history = self.takeout / "YouTube and YouTube Music/history/watch-history.html"
if not watch_history.is_file():
raise ValueError(f"'{watch_history}' is not a file. Did you download your YouTube data?")
logger.info("Extracting video urls from Takeout.")
sys.stdout.flush()
try:
text = watch_history.read_text()
except UnicodeDecodeError:
text = watch_history.read_text(encoding="utf-8")
soup = BeautifulSoup(text, "lxml")
return soup
def get_views_and_ads(self, soup):
"""Extract ad counts and video urls from html soup"""
mdl_grid = next(soup.body.children)
# I'm keeping ad_count but it really only started in 2022
ad_count = 0
watches = []
for outer_cell in mdl_grid.children:
inner_cell = next(outer_cell.children)
inner_children = list(inner_cell.children)
div_with_vid_url = inner_children[1]
div_with_ads_info = inner_children[3]
if "From Google Ads" not in str(div_with_ads_info):
try:
url = div_with_vid_url.a["href"]
except TypeError: # TODO I don't remember why this block is here
continue
raw_date = list(div_with_vid_url.stripped_strings)[-1].replace("\u202f", "")
pd_date = pd.to_datetime(re.sub(r"\s+[A-Z]{3,}$", "", raw_date), errors="coerce")
watches.append(Watch(url, pd_date))
else:
ad_count += 1
return watches, ad_count
def unique_vid_urls(self):
return [watch.url for watch in dict.fromkeys(self.watches)]
class Analysis:
"""Main class responsible for downloading and analyzing data.
Parameters
----------
takeout_parser : TakeoutParser
Structured data from user's Takeout path
out_base : str (default='data')
The path to the directory where both raw and computed results should be stored.
name : Optional[str]
Subdir of out_base where this particular analysis should be stored (e.g. 'jessime')
Attributes
----------
raw : str
Path to 'raw' directory in self.path directory
ran : str
Path to 'ran' directory in self.path directory
df : Dataframe
Pandas Dataframe used to store compiled results
tags : [[str]]
A list of tags for each downloaded video
grapher : Grapher
Creates the interactive graphs portion of the analysis
seconds : int
The sum of video durations
formatted_time : str
Seconds converted to W/D/H/M/S format
most_viewed : Series
Video with the most total views
least_viewed : DataFrame
Collection of at most 10 videos with single digit views
best_per_decile : DataFrame
10 videos, one per view_count decile, where each video as the highest average rating in that decile
worse_per_decile : DataFrame
Same as best_per_decile, but lowest average rating
emojis: Series
Video with the most unique emojis in the description
oldest_videos : Dataframe
First 10 videos watched on user's account.
oldest_upload : Series
Video with the oldest upload date to youtube.
HD : int
The number of videos that have high-definition resolution
UHD : int
The number of videos that have ultra-high-definition resolution
top_uploaders : Series
The most watched channel names with corresponding video counts
funny_counts : int
The max number of times a video's description says the word 'funny'
funny : Series
The 'funniest' video as determined by funny_counts
"""
def __init__(self, takeout_parser, out_base="data", name=None):
self.takeout_parser = takeout_parser
if name is None:
name = getuser()
self.name = name
self.path = Path(out_base) / self.name
self.raw = self.path / "raw"
self.ran = self.path / "ran"
self.df = None
self.tags = None
self.grapher = None
self.ad_count = None
self.seconds = None
self.formatted_time = None
self.most_viewed = None
self.least_viewed = None
self.best_per_decile = None
self.worst_per_decile = None
self.emojis = None
self.oldest_videos = None
self.oldest_upload = None
self.most_comments = None
self.highest_comment_ratio = None
self.top_uploaders = None
self.funny = None
self.funny_counts = None
self.primary_lang = None
self.primary_lang_count = None
self.other_langs_count = None
self.best_per_lang = None
self.monthly_watches = None
def setup_dirs(self):
self.raw.mkdir(parents=True, exist_ok=True)
self.ran.mkdir(parents=True, exist_ok=True)
def download_data(self):
"""Uses Takeout to download individual json files for each video."""
unique_vid_urls = self.takeout_parser.get_unique_vid_urls()
url_path = self.path / "urls.txt"
url_path.write_text("\n".join(unique_vid_urls))
logger.info(f"Urls extracted. Downloading data for {len(unique_vid_urls)} videos now.")
output = self.raw / "%(autonumber)s"
cmd = f'yt-dlp -o "{output}" --skip-download --write-info-json -i -a {url_path}'
p = sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.STDOUT, shell=True)
line = True
while line:
line = p.stdout.readline().decode("utf-8").strip()
logger.info(line)
def df_from_files(self):
"""Constructs a Dataframe from the downloaded json files.
All json keys whose values are not lists are compiled into the dataframe.
The dataframe is then saved as a pickle file in the self.ran directory.
The tags of each video are pickled and saved as `tags.pkl`
"""
logger.info("Creating dataframe...")
raw_paths = sorted(self.raw.glob("*.json"))
video_metas = []
keys_and_defaults = {
"like_count": pd.NA,
"comment_count": pd.NA,
"duration": pd.NA,
"view_count": pd.NA,
"upload_date": pd.NaT,
"description": "",
"height": pd.NA,
"title": "",
"webpage_url": "",
"uploader": "",
"language": "",
}
tags = []
for raw_path in tqdm(raw_paths):
meta = json.load(open(raw_path))
tags.append(meta.get("tags", []))
meta_to_keep = {k: meta.get(k, d) for k, d in keys_and_defaults.items()}
video_metas.append(meta_to_keep)
self.df = pd.DataFrame(video_metas)
self.df["upload_date"] = pd.to_datetime(self.df["upload_date"], format="%Y%m%d")
self.tags = tags
def make_wordcloud(self):
"""Generate the wordcloud file and save it to static/images/."""
wordcloud_path = Path(f"static/images/{self.name}_wordcloud.png")
if wordcloud_path.is_file():
logger.info(f"Wordcloud found at: {wordcloud_path}")
else:
logger.info("Creating wordcloud")
wordcloud = WordCloud(width=1920, height=1080, relative_scaling=0.5)
flat_tags = flatten_without_nones(self.tags)
wordcloud.generate(" ".join(flat_tags))
wordcloud.to_file(wordcloud_path)
def check_df(self):
"""Create the dataframe and tags from files if file doesn't exist."""
df_file = self.ran / "df.pkl"
if df_file.is_file():
self.df = pd.read_pickle(df_file)
self.tags = pickle.load(open(self.ran / "tags.pkl", "rb"))
else:
self.df_from_files()
self.df.to_pickle(self.ran / "df.pkl")
pickle.dump(self.tags, open(self.ran / "tags.pkl", "wb"))
def total_time(self):
"""The amount of time spent watching videos."""
self.seconds = self.df["duration"].sum()
seconds = self.seconds
intervals = (
("years", 31449600), # 60 * 60 * 24 * 7 * 52
("weeks", 604800), # 60 * 60 * 24 * 7
("days", 86400), # 60 * 60 * 24
("hours", 3600), # 60 * 60
("minutes", 60),
("seconds", 1),
)
result = []
for name, count in intervals:
value = seconds // count
if value:
seconds -= value * count
if value == 1:
name = name.rstrip("s")
result.append("{} {}".format(int(value), name))
self.formatted_time = ", ".join(result)
def best_and_worst_videos(self):
"""Finds well liked and highly viewed videos
Note that Youtube has removed the dislike count,
so we have to get a bit creative about what we're analyzing.
"""
self.most_viewed = self.df.loc[self.df["view_count"].idxmax()]
low_views = self.df[self.df["view_count"] < 10]
self.least_viewed = low_views.sample(min(len(low_views), 10), random_state=0)
self.df["likes_pct"] = ((self.df["like_count"] / self.df["view_count"]) * 100).fillna(0).round(4)
self.df["deciles"] = pd.qcut(self.df["view_count"].fillna(0), 10, labels=False)
grouped = self.df.groupby(by="deciles")
self.best_per_decile = self.df.iloc[grouped["likes_pct"].idxmax()].reset_index()
self.worst_per_decile = self.df.iloc[grouped["likes_pct"].idxmin()].reset_index()
def most_emojis_description(self):
def _emoji_variety(desc):
return len({x["emoji"] for x in emoji_list(desc)})
counts = self.df["description"].apply(_emoji_variety)
self.emojis = self.df.iloc[counts.idxmax()]
def funniest_description(self):
"""Counts number of times 'funny' is in each description. Saves top result."""
funny_counts = []
descriptions = []
index = []
for i, d in enumerate(self.df["description"]):
try:
funny_counts.append(d.lower().count("funny"))
descriptions.append(d)
index.append(i)
except AttributeError:
pass
funny_counts = np.array(funny_counts)
funny_counts_idx = funny_counts.argmax()
self.funny_counts = funny_counts[funny_counts_idx]
if self.funny_counts > 0:
self.funny = self.df.iloc[index[funny_counts_idx]]
else:
title = "Wait, 0? You're too cool to watch funny videos on youtube?"
self.funny = make_fake_series(title, average_rating="N/A")
def chatty(self):
"Finds videos with lots of comments"
self.most_comments = self.df.loc[self.df["comment_count"].idxmax()]
self.df["comment_to_view"] = self.df["comment_count"] / self.df["view_count"]
chatty = self.df[self.df["comment_count"] > 100]
if chatty.empty: # No videos have more than 100 comments
chatty = self.df[self.df["comment_count"] > 10]
self.highest_comment_ratio = chatty.loc[chatty["comment_to_view"].idxmax()]
def three_randoms(self):
"""Finds results for video resolutions, most popular channels, and funniest video."""
self.chatty()
self.top_uploaders = self.df["uploader"].value_counts().head(n=15)
self.funniest_description()
def by_language(self):
"""Finds videos in other languages."""
raw_counts = self.df["language"].value_counts().drop("")
self.primary_lang = raw_counts.idxmax()
self.primary_lang_count = raw_counts.max()
self.other_langs_count = raw_counts.drop(self.primary_lang)
other_langs_df = self.df[self.df["language"].isin(self.other_langs_count.index)]
by_lang = other_langs_df.groupby("language")
liked_idxs = by_lang["likes_pct"].idxmax()
self.best_per_lang = other_langs_df.loc[liked_idxs]
def watches_by_month(self):
"""Calculate how many watches occurred each month."""
df = pd.DataFrame([w.date for w in self.takeout_parser.watches], columns=["date"])
df["year_month"] = df["date"].dt.to_period("M")
self.monthly_watches = df.groupby("year_month").size().reset_index(name="counts")
self.monthly_watches["year_month"] = self.monthly_watches["year_month"].astype(str)
def compute(self):
logger.info("Computing...")
self.total_time()
self.best_and_worst_videos()
self.most_emojis_description()
self.oldest_videos = self.df[["title", "webpage_url"]].tail(n=10)
self.oldest_upload = self.df.loc[self.df["upload_date"].idxmin()]
self.three_randoms()
self.by_language()
self.watches_by_month()
def graph(self):
self.grapher = Grapher(self.df, self.monthly_watches, self.tags)
self.grapher.average_rating()
self.grapher.duration()
self.grapher.views()
self.grapher.make_tags_plot()
self.grapher.make_monthly_watches_plot()
def start_analysis(self):
self.check_df()
self.make_wordcloud()
self.compute()
self.graph()
def analyze(self):
"""Main function for downloading and analyzing data."""
self.setup_dirs()
some_data = (self.raw / "00001.info.json").is_file()
if not some_data:
self.download_data()
some_data = (self.raw / "00001.info.json").is_file()
if some_data:
self.start_analysis()
else:
logger.info("No data was downloaded.")
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument(
"-t",
"--takeout",
required=True,
help="Path to an unzipped Takeout folder downloaded from https://takeout.google.com/",
)
parser.add_argument("-o", "--out", default="data", help="Path to empty directory for data storage.")
parser.add_argument("-n", "--name", default=getuser(), help="Name of analyses (e.g. jessime)")
return parser.parse_args()
def run(args):
"""Entrypoint to the program"""
logger.info("Welcome!")
takeout_parser = TakeoutParser(args.takeout)
analysis = Analysis(takeout_parser, args.out, args.name)
analysis.analyze()
return analysis
if __name__ == "__main__":
analysis = run(parse_args()) ## TODO make this not a global
launch_web(analysis)