-
Notifications
You must be signed in to change notification settings - Fork 5
/
playlist_analyzer.py
executable file
·1077 lines (891 loc) · 51.6 KB
/
playlist_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/local/bin/python
"""
Playlist Analyzer for Spotify
Matt Levin - 2018
This program contains several functions that explore analyzing playlists, designed for users'
Top 100 playlists, using the Spotify Python (2.7) wrapper Spotipy (https://github.com/plamere/spotipy).
Please see README.md or matt-levin.com/PlaylistAnalyzer for instructions on using this script and
more details on the features listed below.
The -h or --help flag can be used to show proper usage for each feature.
Features:
- Train a classifier to predict which playlist a track belongs to based on its audio features using
either a Multilayer Perceptron (MLP) classifier (Analyzer.classify_mlp) or a Naive Bayes classifier
(Analyzer.classify_naive_bayes)
- Use a trained MLP classifier to create a playlist of the tracks from all the Top 100 playlists that
are deemed most likely to be from a given playlist, AKA songs the user would most likely enjoy based
on their audio features (Analyzer.predict_playlist)
- Create a playlist of the most danceable songs from each given source playlist (Analyzer.dance_party) or
the songs that would be best to study to (Analyzer.study_buddies) for a group dance party or study session
- Find the artists that have the most songs in a playlist, including if they're not the primary artist
for the track (Analyzer.top_artists)
- Extract the audio features and metadata for each track in a playlist and write into a CSV
(Writer.write_csv) or for all the user's playlists with 'Top 100' in the name (Writer.write_all_csvs)
- Perform Gaussian Mixture Model clustering (Analyzer.gmm_cluster) or Spectral
clustering (Analyzer.spectral_cluster) and plot the results (Analyzer.plot_clusters)
- Perform benchmarking on the MLP classifier or the GMM clustering in order to find the model that
works best for the data (Analyzer.benchmark_mlp or Analyzer.benchmark_gmm_cluster)
Code Outline:
Analyzer class
- Initializer
- Classification functions (MLP and Naive Bayes) and playlist generator using MLP
- Dance Party and Study Buddies playlist generator functions
- Clustering functions (GMM and Spectral) and cluster plotting function
- Top Artists function
- Helper functions
Writer class
- Write a CSV for a playlist's metadata and audio features, or for each playlist containing 'Top 100'
TestAnalyzer class
- Unit testing for all the different features
Main Method
- Authenticates with Spotify (requires environment variables to be set)
- Parses command line arguments to call appropriate function, or calls unittest.main() if no args given
------------------------------------------------------------------------------------------
Note: The following environment variables must be set in order to locally run the script:
SPOTIFY_USERNAME, SPOTIFY_CLIENT_ID, SPOTIFY_CLIENT_SECRET, SPOTIFY_REDIRECT_URI
------------------------------------------------------------------------------------------
~ I have no affiliation with Spotify ~
"""
import spotipy # Spotify Python wrapper (https://github.com/plamere/spotipy)
import spotipy.util as util
import sys
from os import environ as env # For environment variables (application credentials)
from csv import writer as csv_writer # For writing CSV file for each playlist
import glob
from datetime import datetime
import heapq # Heap data structure
import unittest
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt # For plotting
# Clustering
from sklearn.mixture import GaussianMixture
from sklearn.cluster import SpectralClustering
# Classification
from sklearn.neural_network import MLPClassifier
from sklearn.naive_bayes import GaussianNB
"""
The Analyzer class contains most of the cool functions in this project. It's constructor reads the CSV
files specified (defaults to all CSV files in the csv folder) and it then is split into several sections
for the remaining functions:
- Classification: MLP and Naive Bayes classification, predicting a playlist using MLP, MLP benchmarking
- Clustering: GMM and Spectral clustering, and plotting the resulting clusters
- Dance Party and Study Buddies: Generate a dance or study playlist from the specified playlists
- Top Artists: Finds the highest occuring artists in a playlist
- Helper Functions
"""
class Analyzer:
"""
Initializer reads CSV files for the given files (defaults to 'all' to read all present csv files) and
creates a pandas dataframe (self.data) that contains the information for all, with an extra column 'source'
which says which file (playlist) the track came from.
PARAMS:
files [Optional] - Array of filenames, which files to use, defaults to 'all' which uses all the files
RETURNS:
None
"""
def __init__(self, files='all'):
if(files == 'all'): # Defaults to all csv files in the csv folder
files = glob.glob('csv/*.csv')
# Read each selected csv file into its own dataframe - Add a column 'source' with the filename
dfs = map(lambda f: pd.read_csv(f).assign(source=f.split('.')[0].split('/')[1]), files)
# Combine the dataframes into self.data
self.data = pd.concat(dfs, ignore_index=True)
# List of source playlists for easy access since it is needed a lot
self.sources = self.data.source.unique()
# Creates tempo_0_1 and loudness_0_1, 0-1 scaled columns for tempo and loudness
tempo = self.data['tempo']
self.data['tempo_0_1'] = (tempo - tempo.min()) / (tempo.max() - tempo.min())
loudness = self.data['loudness']
self.data['loudness_0_1'] = (loudness - loudness.min()) / (loudness.max() - loudness.min())
#===============================================================================
# Start of Classification Functions
"""
Trains a Multilayer Perceptron classifier to identify which source playlist each track came from
based on its audio features.
PARAMS:
layers [Optional] - The hidden layer sizes used in the MLP
seed [Optional] - The random_state to use for training (None uses a random random_state)
features [Optional] - Features to use in the classification
RETURNS:
None, sets self.mlp_classifier to the trained model
"""
def classify_mlp(self, layers=(150,200,100), seed=0,
features=['energy', 'liveness', 'speechiness',
'acousticness', 'instrumentalness', 'danceability',
'valence', 'loudness_0_1', 'tempo_0_1']):
# Make sure any included generated columns have been created before starting
# i.e. if GMM clusters are to be used in classification but have not been generated yet
self.check_features(features)
# Create X and Y matrices
X = self.data[features] # X is the selected features for each track
Y = self.data[['source']].values.ravel() # Y is the source playlist each track came from
# Create MLP classifier, train, count how many were classified correctly
mlp = MLPClassifier(hidden_layer_sizes=layers,
alpha=0.0001,
learning_rate_init=0.001,
max_iter=1000,
shuffle=True,
random_state=seed,
tol=1e-4)
mlp.fit(X,Y) # Fit to data
self.mlp_classifier = mlp # Save for later use
"""
Uses an MLPClassifier (neural network) to find the n songs which are most likely from the given playlist
and creates a Spotify playlist of them (shuffled order). When used with a Top 100 playlist, this creates a
playlist of songs the user would be most likely to enjoy (according to the MLP) consisting of songs
from their own Top 100 playlist, and any others used in training the MLP.
Theoretically, the pool of songs chosen from can extend to more than just all the Top 100 playlists
combined, which would give even better results. Using a user's Top 200 or more songs would also lead
to better results, however from empirical testing, this works really well at creating a playlist that
the user would enjoy.
Note: Generated playlist may be less than n songs since duplicates can exist and are removed at the end.
PARAMS:
playlist [Optional] - The CSV file name (no .csv) of the Top 100 playlist of the seed user
n [Optional] - The number of songs to put in the generated playlist
features [Optional] - The audio features to use in the neural network training and prediction
RETURNS:
None, creates a playlist in Spotify
"""
def predict_playlist(self, playlist='my_top_100', n=100,
features=['energy', 'liveness', 'speechiness',
'acousticness', 'instrumentalness', 'danceability',
'valence', 'loudness_0_1', 'tempo_0_1']):
self.check_features(features) # Make sure features are valid and generated if applicable
self.check_playlist(playlist) # Make sure playlist is a valid option (no '.csv' in filename)
if not hasattr(self, 'mlp_classifier'): # If we haven't found the mlp_classifier yet...
##Find and set self.mlp_classifier to the best performing model using benchmark_mlp
#self.benchmark_mlp(features=features)
# To save time, I just hardcoded the best performing layer sizes I've found so far: (150,200,100)
self.classify_mlp(layers=(150,200,100), seed=None, features=features)
# Note: Since the random_state is not set here, it picks a different playlist each time (good!)
# Get the prediction probabilities for X (self.data[features])
predict_confidence = self.mlp_classifier.predict_proba(self.data[features])
# Figure out which column relates to the selected playlist
playlist_index = self.mlp_classifier.classes_.tolist().index(playlist)
# Isolate that column only as a 1D array
scores = predict_confidence[:,playlist_index]
# Pick the most likely n songs using a heap
song_ids = self.data[['id']].values.ravel() # Song IDs as a 1D array
score_heap = [] # Heap used to order the Song IDs by their prediction confidence
for score, song_id in zip(scores, song_ids):
heapq.heappush(score_heap, (score, song_id))
selected_songs = heapq.nlargest(n, score_heap) # Take the n most likely
tracks = [track_id for _,track_id in selected_songs] # Isolate the IDs
tracks = list(set(tracks)) # Remove duplicates
np.random.shuffle(tracks) # Shuffle order
# Create the playlist in Spotify
p_name = playlist.split('_')[0].title() + ' Generated' # Playlist name based off CSV name
# (i.e. 'my_top_100' CSV becomes 'My Generated' Spotify playlist)
new_playlist = sp.user_playlist_create(username, name=p_name, public=False) # Create playlist
sp.user_playlist_add_tracks(username, new_playlist['id'], tracks) # Add the tracks to playlist
print('{} playlist created successfully! Check Spotify!'.format(p_name))
# Print how many of the selected tracks are from each source playlist
selected = self.data.loc[self.data['id'].isin(tracks)].groupby('source') \
.count().sort_values('id', ascending=False)['id']
print('Source Playlist\tCount')
print('\t'.join(selected.to_csv().split(',')))
"""
Tests out various hidden layer sizes and initial random_states in order to find a MLP classifier
with the highest training score.
PARAMS:
features [Optional] - Features to use in training
RETURNS:
None, sets self.mlp_classifier to the best performing model
"""
def benchmark_mlp(self, features=['energy', 'liveness', 'speechiness',
'acousticness', 'instrumentalness', 'danceability',
'valence', 'loudness_0_1', 'tempo_0_1']):
# Make sure any included generated columns have been created before starting
# i.e. if GMM clusters are to be used in classification but have not been generated yet
self.check_features(features)
# Create X and Y matrices
X = self.data[features] # X is the selected features for each track
Y = self.data[['source']].values.ravel() # Y is the source playlist each track came from
best_score = (0,(0,),0) # Best trial: (score, layers, seed)
layer_options = [(150,200,100),(30,200,50),(150,250,350,200,100),(15,),(10,),(20,),
(150,200,100,300,250,125)]
seeds = [0,101,2002,30003,400004,5000005,60000006,700000007,123456789,999999999]
layer_scores = [[] for _ in layer_options]
for i in range(len(layer_options)):
layers = layer_options[i]
print('\nLayers: {}'.format(layers))
for seed in seeds:
# Create MLP classifier, train, count how many were classified correctly
mlp = MLPClassifier(hidden_layer_sizes=layers,
alpha=0.0001,
learning_rate_init=0.001,
max_iter=1000,
shuffle=True,
random_state=seed,
tol=1e-4)
mlp.fit(X,Y)
predict_Y = mlp.predict(X)
score = np.sum(Y == predict_Y) # Score is number of correct predictions
print('{} classified correctly out of {} for seed {}.'.
format(score, len(Y), seed))
if score > best_score[0]:
best_score = (score, layers, seed)
layer_scores[i].append(score)
# Print results
print('\n')
for i in range(len(layer_options)):
print('Average score: {:.1f} with SD: {:.2f} and Min: {} for layers = {}'.
format(np.average(layer_scores[i]), np.std(layer_scores[i]),
np.min(layer_scores[i]), layer_options[i]))
print('\nThe best trial scored {} with layers {} and seed {}'.format(*best_score))
print('Seeds Used = {}\nFeatures Used = {}'.format(seeds, features))
# Set self.mlp_classifier to best performing model for later use
self.mlp_classifier = MLPClassifier(hidden_layer_sizes=best_score[1],
alpha=0.0001,
learning_rate_init=0.001,
max_iter=1000,
random_state=best_score[2],
shuffle=True,
tol=1e-4)
self.mlp_classifier.fit(X,Y)
"""
Performs classification using a Gaussian Naive Bayes classifier. Seems to be less accurate than
using the MLP classifier in the above functions.
PARAMS:
features [Optional] - The features to be used in X values in training
RETURNS:
None
"""
def classify_naive_bayes(self, features=['energy', 'liveness', 'speechiness',
'acousticness', 'instrumentalness', 'danceability',
'valence', 'loudness_0_1', 'tempo_0_1']):
# Make sure any included generated columns have been created before starting
self.check_features(features)
# Create X and Y matrices
X = self.data[features] # X is the selected features for each track
Y = self.data[['source']].values.ravel() # Y is the source playlist each track came from
# Create Naive Bayes, train, predict from X and see how many are correctly matched
nb = GaussianNB()
nb.fit(X,Y)
predict_Y = nb.predict(X)
print('{} classified correctly out of {}.'.format(np.sum(Y == predict_Y), len(Y)))
# Note: This performs worse than the MLPClassifier
# End of Classification Functions
#=================================================================================
# Start of Dance Party and Study Buddies Functions
"""
Creates a playlist consisting of the N most danceable songs from each playlist passed to the function.
First combines danceability and energy features then picks to top N from each playlist. Randomly picks
from n songs from the top n + n/2 options, so different songs are picked each time even with the same
input playlists.
PARAMS:
playlists [Optional] - The array of playlist CSV filenames to use, defaults to 'all' to use all
n [Optional] - The number of tracks to use from each playlist, defaults to 3
RETURNS:
None (Playlist is created in Spotify with the name "Dance Party! [Today's Date]")
"""
def dance_party(self, playlists='all', n=5):
if(playlists == 'all'):
playlists = self.keys
# Add a column dance_value that is a combination of danceability and energy
self.data['dance_value'] = self.data['danceability'] + (self.data['energy'] / 2)
tracks = []
for playlist in playlists:
df = self.data.loc[self.data['source'] == playlist] # Select the songs from that playlist
df = df.sort_values('dance_value', ascending=False) # Sort by the dance_value
options = df.head(int(n + n/2))['id'].values.tolist() # Top int(n + n/2) tracks are options
np.random.shuffle(options) # Shuffle so it picks slightly different songs each time
tracks += options[:n] # Take n from the int(n+n/2) choices
tracks = list(set(tracks)) # Remove duplicates
np.random.shuffle(tracks) # Shuffle playlist order
# TODO: Temporal analysis to try to make tracks flow into each other better?
p_name = 'Dance Party! ' + datetime.now().strftime('%m/%d/%y') # Playlist name
new_playlist = sp.user_playlist_create(username, name=p_name, public=False) # Create playlist
sp.user_playlist_add_tracks(username, new_playlist['id'], tracks) # Add the tracks to playlist
print("'{}' playlist created successfully!".format(p_name))
"""
Creates a playlist consisting of the N songs from each playlist that would be best for a study session.
Uses several audio features: instrumentalness, acousticness, energy, speechiness
PARAMS:
playlists [Optional] - The array of playlist CSV filenames to use, defaults to 'all' to use all
n [Optional] - The number of tracks to use from each playlist, defaults to 5
RETURNS:
None (Playlist is created in Spotify with the name "Study Buddies - [Today's Date]")
"""
def study_buddies(self, playlists='all', n=5):
if(playlists == 'all'):
playlists = self.data.keys()
# Add a column for study_value, combining 4 audio features that work pretty well for a quiet
# study playlist that is conducive to studying while listening to music. (Based on testing and
# my own personal opinion of what makes good study music)
self.data['study_value'] = (2 * self.data['instrumentalness']) + self.data['acousticness'] + \
(2 * (1 - self.data['energy'])) + (1 - self.data['speechiness'])
tracks = []
for playlist in playlists:
df = self.data.loc[self.data['source'] == playlist] # Select the songs from that playlist
df = df.sort_values('study_value', ascending=False) # Sort by the study_value
options = df.head(int(n + n/2))['id'].values.tolist() # Top int(n + n/2) tracks are options
np.random.shuffle(options) # Shuffle so it picks slightly different songs each time
tracks += options[:n] # Take n from the int(n+n/2) choices
tracks = list(set(tracks)) # Remove duplicates
np.random.shuffle(tracks) # Shuffle playlist order
# TODO: Temporal analysis to try to make tracks flow into each other better?
p_name = 'Study Buddies - ' + datetime.now().strftime('%m/%d/%y') # Playlist name
new_playlist = sp.user_playlist_create(username, name=p_name, public=False) # Create playlist
sp.user_playlist_add_tracks(username, new_playlist['id'], tracks) # Add the tracks to playlist
print("'{}' playlist created successfully!".format(p_name))
# End of Dance Party and Study Buddies Functions
#=================================================================================
# Start of Clustering Functions
"""
Performs Gaussian mixture model clustering on the data. Varies the number of clusters
in order to find the best fitting clustering.
PARAMS:
show_plot [Optional] - Boolean, whether or not to call plot_clusters after, defaults to True
features [Optional] - The features to use in the clustering
RETURNS:
None, sets the 'GMM' column in self.data to the cluster indices from the optimal clustering
"""
def gmm_cluster(self, show_plot=True, n_clusters=5,
features=['energy', 'liveness', 'tempo_0_1', 'speechiness',
'acousticness', 'instrumentalness', 'danceability',
'loudness_0_1', 'valence']):
self.check_features(features) # Make sure features are valid/generated already if applicable
print('Starting GMM clustering with {} clusters and features = {}'.format(n_clusters, features))
X = self.data[features]
gm = GaussianMixture(n_components=n_clusters, covariance_type='full', max_iter=300, n_init=5)
gm.fit(X)
Y = gm.predict(X)
self.data['GMM'] = Y
if show_plot:
self.plot_clusters(features=features, algorithm='GMM')
"""
Benchmarks GMM clustering by varying the number of clusters used to find which clustering
fits the data best.
PARAMS:
show_plot [Optional] - Show the graphs after completion, defaults to True
features [Optional] - The features to use in clustering
RETURNS:
None, but shows plots in a new window (one at a time) is show_plots is set to True
"""
def benchmark_gmm_cluster(self, show_plot=True,
features=['energy', 'liveness', 'tempo_0_1', 'speechiness',
'acousticness', 'instrumentalness', 'danceability',
'loudness_0_1', 'valence']):
self.check_features(features) # Make sure features are valid/generated already if applicable
print('Benchmarking GMM clustering with features = {}'.format(features))
X = self.data[features]
best_score = None
best_n = None
for n in range(2,15): # Find the optimal number of components
gm = GaussianMixture(n_components=n, covariance_type='full', max_iter=300, n_init=5)
gm.fit(X)
score = gm.bic(X)
if best_score == None or score < best_score:
best_score = score
best_n = n
best_gm = gm
print('Best Score: {} with N = {}'.format(best_score, best_n))
Y = best_gm.predict(X)
self.data['GMM'] = Y
if show_plot:
self.plot_clusters(features=features, algorithm='GMM')
"""
Performs spectral clustering on all the data, using n clusters.
PARAMS:
show_plot [Optional] - Boolean, whether or not to call plot_clusters after, defaults to True
n_clusters [Optional] - The number of clusters to use, defaults to 5
features [Optional] - The features to use in the clustering
RETURNS:
None, sets the 'spectral' column in self.data to the predicted clusters
"""
def spectral_cluster(self, show_plot=True, n_clusters=5,
features=['energy', 'liveness', 'speechiness',
'acousticness', 'instrumentalness', 'danceability',
'valence', 'loudness_0_1', 'tempo_0_1']):
self.check_features(features) # Make sure features are valid/generated already if applicable
print('Starting Spectral clustering with {} clusters and features = {}'.format(n_clusters, features))
X = self.data[features]
spectral = SpectralClustering(n_clusters=n_clusters, eigen_solver='arpack',
affinity='nearest_neighbors', n_init=5)
Y = spectral.fit_predict(X)
self.data['spectral'] = Y
if show_plot:
self.plot_clusters(features=features, algorithm='spectral')
"""
Plots the clusters (in different colors)
- First a bar chart showing each playlists' distributions of clusters
- Then, for each combination of features (2D), shows all playlists' data combined on a scatter plot
with each cluster in a different color.
PARAMS:
algorithm [Optional] - {'GMM' or 'spectral'} The clustering algorithm you want to plot
x [Optional] - The feature for the x axis, default to None which cycles through all features
y [Optional] - The feature for the y axis, default to None which cycles through all features
features [Optional] - The list of features to plot, each used as the x and y axis with all others
RETURNS:
None, displays plots in a new window (one at a time)
"""
def plot_clusters(self, algorithm='GMM', x=None, y=None,
features=['energy', 'liveness', 'tempo_0_1', 'speechiness',
'acousticness', 'instrumentalness', 'danceability',
'loudness_0_1', 'valence']):
# Make sure clustering has been run already and features valid
self.check_features(features + [algorithm])
n_clusters = self.data[algorithm].max() + 1 # Find the number of clusters used
is_2D = '' # For the title of the graph window to say 2D if only using 2 features
if len(features) == 2:
is_2D = '2D_'
# Bar chart - Cluster count for each file (each playlist)
fig, ax = plt.subplots()
counts = np.zeros(shape=(len(self.sources), n_clusters))
i = 0
file_names = self.data.keys()
for f,data in self.data.groupby('source'):
for cluster, group in data.groupby([algorithm]):
counts[i,cluster] = len(group)
i += 1
plt.title('Cluster Frequencies in Each Playlist')
fig.canvas.set_window_title('Cluster_Frequencies_in_Each_Playlist_{}{}_{}'.
format(is_2D, algorithm, n_clusters))
counts_df = pd.DataFrame(counts)
counts_df.plot.bar(legend=None, ax=ax, fig=fig)
# Label the x axis values with the source names (just the part before the first _)
plt.xticks(range(len(self.sources)), map(lambda s: s.split('_')[0].title(), self.sources))
plt.xlabel('Playlist')
plt.ylabel('Number of Tracks in Each Cluster')
plt.show()
# Scatterplot of clusters (each pair of features used)
for x_feature in features:
# If x or y is set, skip all pairs except the desired x and/or y values
if x != None and x_feature != x:
continue
for y_feature in features:
if y != None and y_feature != y:
continue
if x_feature != y_feature:
fig, ax = plt.subplots()
colors = iter(plt.cm.rainbow(np.linspace(0,1,n_clusters)))
for name, group in self.data.groupby([algorithm]):
ax = group.plot.scatter(ax=ax, x=x_feature, y=y_feature,
alpha=0.7, label=name, c=next(colors), legend=None)
plt.xlabel(x_feature.title())
plt.ylabel(y_feature.title())
plt.title(y_feature.title() + ' versus ' + x_feature.title())
fig.canvas.set_window_title('{}_vs_{}_{}{}_{}'.format(y_feature.title(),
x_feature.title(), is_2D, algorithm, n_clusters))
plt.show()
# End of Clustering Functions
#=================================================================================
"""
Finds the artists who occur the most frequently in the given playlist. Takes into account the fact that
some songs have multiple artists. Prints the top n occurring artists (or all if no n is given) and how
many tracks of theirs are in the playlist.
Note: This doesn't use any of the instance variables of the class (since the CSV's only have the primary
artist for each song) but it is included in this class since it is still analysis of a playlist.
PARAMS:
playlist - The Spotify playlist object (full) or Playlist ID or URI
n [Optional] - The number of artists to display, defaults to None to print all artists present
RETURNS:
None, just prints results to stdout
"""
def top_artists(self, playlist, n=None):
# In case the ID/URI is passed (as opposed to the playlist object)
if type(playlist) is str:
playlist = sp.user_playlist(username, playlist.split(':')[0])
print('Finding the top artists in {}...\n'.format(playlist['name']))
artists_map = {} # The songs each artist has (takes into account that some songs have >1 artist)
# Indexed by artist name - {artist_name: [song_names]}
# For each track in the playlist
for track in playlist['tracks']['items']:
track_name = track['track']['name'].encode('utf-8')
# For each artist listed for the track
for artist in track['track']['artists']:
artist_name = artist['name'].encode('utf-8')
# Add the track to that artist's list of songs
if artist_name not in artists_map:
artists_map[artist_name] = [track_name]
else:
artists_map[artist_name].append(track_name)
# Create the heap to find the top occurring artists
heap = []
for (artist, songs) in artists_map.items():
#print("{} has {} songs: {}".format(artist, len(songs), songs))
heapq.heappush(heap, (len(songs), artist))
# If n was not set (defaults to None) then print all artists
if n is None:
n = len(artists_map.keys())
print('Top artists in playlist:')
else:
print('Top {} artists in playlist:'.format(n))
# Print them in order in a column format
for (num, artist) in heapq.nlargest(n, heap):
print('{1:<8} - {0:<30}'.format(artist[:30], # Only first 30 characters of song name
str(num) + (' song ' if num==1 else ' songs'))) # Plural if >1
# Diversity (can be >100 because songs can have multiple artists)
print('\n{} different artists appeared in this playlist.\n'.format(len(artists_map.keys())))
"""
Helper function, make sure the feature is a valid option and has been generated already
if it needs to be (i.e. clusters)
PARAMS:
features - The features to check
RETURNS:
None, raises an error if there is a problem
"""
def check_features(self, features):
for f in features:
if f not in self.data.keys():
# Generate column if its a cluster, or raise an error
features.remove(f)
if f == 'GMM':
self.gmm_cluster(show_plot=False, features=features)
elif f == 'spectral':
self.spectral_cluster(show_plot=False, features=features)
else:
raise NameError('Error: Invalid feature ({})\nPossible ' + \
'features are: GMM, spectral, {}'.format(f, self.data.keys()))
"""
Helper function, makes sure the playlist is a valid option
PARAMS:
playlist - The playlist CSV filename to check
RETURNS:
None, raises an error if there is a problem
"""
def check_playlist(self, playlist):
if playlist not in self.sources:
raise NameError("Error: Playlist '{}' not found in list of playlists ({})\n"+\
"Try using '-w -p PLAYLIST_ID' to write a CSV first and make sure "+\
"not to include '.csv' in the name.".
format(playlist, self.sources))
#===============================================================================
# ------------------- End of Analyzer Class ------------------------------------
#===============================================================================
# ------------------- Start of Writer Class ------------------------------------
#===============================================================================
"""
The Writer class provides two functions: write_csv and write_all_csvs
The first takes a Spotify playlist object and writes a csv file containing the audio features
and metadata for each track in the playlist.
The second searches the user's (defined in SPOTIFY_USERNAME environment variable) playlists for
any that contain Top 100 (since I have acquired 12 different Top 100 playlists from different people)
and writes a csv file for each.
"""
class Writer:
"""
Writes a CSV file for the passed playlist that contains the metadata and audio features for each
track in the playlist.
The playlist 'My Top 100' gets a CSV file written for it named 'my_top_100.csv' in the csv directory.
PARAMS:
playlist - Spotify playlist object (full) to be analyzed or Playlist ID/URI
RETURNS:
None
"""
def write_csv(self, playlist):
if type(playlist) is str: # Was given ID/URI not the Playlist object
playlist = sp.user_playlist(username, playlist.split(':')[0]) # Get playlist object
outfile = 'csv/' + playlist['name'].replace(' ', '_').lower() + '.csv'
print("\nWriting playlist's tracks metadata and audio feature analysis to {}...".format(outfile))
metadata = {} # Metadata = {track_id: {'name': Track Name, 'artist': Primary Artist, 'album': Album}}
# Extract the metadata (ID, Name, Primary Artist, and Album) from each track
for item in playlist['tracks']['items']:
track = item['track']
track_id = track['id'].encode('utf-8')
song_name = track['name'].encode('utf-8')
primary_artist = track['artists'][0]['name'].encode('utf-8') # Just the primary/first artist
album = track['album']['name'].encode('utf-8')
metadata[track_id] = {'name': song_name, 'artist': primary_artist, 'album': album}
# Query Spotify API to get the audio feature analysis for the tracks
offset = 0 # Need to use batches since the API endpoint has a limit of 50 tracks
all_tracks_features = []
while offset < len(metadata.keys()): # This works for playlist of any size, not just 100
all_tracks_features += sp.audio_features(metadata.keys()[offset:offset+50]) # Limit 50 tracks for this endpoint
offset += 50
# Open the CSV (rewrites old data, creates if doesn't exist)
writer = csv_writer(open(outfile, 'w+'))
header_row = ['Track Name', 'Primary Artist', 'Album'] # The header row of the CSV, first the Metadata
for feature in all_tracks_features[0].keys(): # Add all the audio feature names to the header row
if feature not in ['track_href', 'analysis_url', 'uri', 'type']: # Ignore some columns
header_row.append(feature.encode('utf-8'))
writer.writerow(header_row) # Write header row
# Now go through the features for each track
for i in range(len(all_tracks_features)):
#print("Processing song: {}".format(i))
track_features = all_tracks_features[i] # The features for this track
if track_features is None:
break # For playlists shorter than 100 tracks
track_metadata = metadata[track_features['id']]
# Start the row as the metadata for the track
row = [track_metadata['name'], track_metadata['artist'], track_metadata['album']]
# Then add each audio feature
for (feature, feature_value) in track_features.items():
if feature not in ['track_href', 'analysis_url', 'uri', 'type']: # Ignore some columns
row.append(feature_value)
writer.writerow(row) # Write the row to the CSV
"""
Writes a CSV for every playlist that has "Top 100" in the name for the user (defined in SPOTIFY_USERNAME
environment variable). I gathered a bunch of Top 100 playlists from friends and this way I can just
write all the CSV's in one function call to make sure they're all present and up to date.
PARAMS:
None
RETURNS:
None
"""
def write_all_csvs(self):
all_playlists = sp.current_user_playlists(limit=50) # Returns simplified playlist objects
selected_playlists = []
for playlist in all_playlists['items']:
if 'Top 100' in playlist['name']:
selected_playlists.append(playlist)
for p in selected_playlists:
p_uri = p['uri'].split(':')[-1] # Get the ID of the playlist
playlist = sp.user_playlist(username, p_uri) # Get the full playlist object
self.write_csv(playlist)
#===============================================================================
# ------------------------- End of Writer Class ---------------------------
#===============================================================================
# ------------------------- Start of Unit Testing --------------------------
#===============================================================================
"""
TestAnalyzer class uses unittest to test the various features of Writer and Analyzer
Note: All tests are skipped right now since they have been tested already.
Also, most of the tests do not have explicit assertion statements, but just
make sure the code is functioning as expected.
"""
class TestAnalyzer(unittest.TestCase):
@classmethod
def setUpClass(self):
# runs once before ALL tests
print('\n.... starting unit testing of playlist_analyzer.py ...')
@unittest.skip('')
def test_top_artists(self):
print('\n............ testing top artists ..................')
# Call with the Playlist object and only display top 15 artists
playlist_id = '38hwZEL0H1z6wUbq0UoHBS'
playlist = sp.user_playlist(username, playlist_id)
analyzer.top_artists(playlist, n=10)
# Call with the string instead, and with no n given (defaults to all artists)
analyzer.top_artists(playlist_id)
@unittest.skip('')
def test_predict_mlp(self):
print('\n........... testing predict MLP ...................')
analyzer.predict_playlist('rik_top_100')
@unittest.skip('')
def test_benchmark_mlp(self):
print('\n........... testing benchmark MLP .................')
analyzer.benchmark_mlp()
@unittest.skip('')
def test_dance_party(self):
print('\n............ testing dance party ..................')
source_playlists = ['p_top_100', 'a_top_100', 'ri_top_100']
analyzer.dance_party(source_playlists, n=5)
@unittest.skip('')
def test_study_buddies(self):
print('\n.......... testing study buddies ..................')
source_playlists = ['rik_top_100', 'j_top_100', 'am_top_100', 'm_top_100', 'c_top_100']
analyzer.study_buddies(source_playlists, n=6)
@unittest.skip('')
def test_invalid_playlist(sef):
print('\n.......... testing invalid playlist ...............')
try:
# Should use csv filename for this function not ID
#analyzer.predict_playlist(playlist='c_top_100') # Correct usage
analyzer.predict_playlist(playlist='5kipTqpcNptT9sCcrV0RdW')
assert False, 'Invalid playlist format should have raised an error.'
except NameError:
print('Passed test.') # Expected this error
@unittest.skip('')
def test_invalid_feature(self):
print('\n.......... testing invalid feature ................')
try:
# Synergy is not a valid audio feature
analyzer.gmm_cluster(features=['energy','synergy'])
assert False, 'Invalid feature name should have raised an error.'
except NameError:
print('Passed test.') # Expected this error
@unittest.skip('')
def test_gmm_clustering(self):
print('\n.......... testing gmm clustering ..................')
# Just use 2 features
print('Testing using only valence and energy...')
analyzer.gmm_cluster(show_plot=True, features=['valence', 'energy'])
# Use all features
print('Testing using all features...')
analyzer.gmm_cluster(show_plot=True)
@unittest.skip('')
def test_plot_clusters(self):
print('\n.......... testing cluster plotting .................')
# First make sure some clusters have been generated
analyzer.gmm_cluster(n_clusters=3, show_plot=False)
analyzer.plot_clusters(algorithm='GMM', x='energy', y='valence') # Plot this x,y combo only
analyzer.plot_clusters(algorithm='GMM', y='valence') # Cycle through each x
analyzer.plot_clusters(algorithm='GMM', x='energy') # Cycle through each y
@unittest.skip('')
def test_benchmark_gmm(self):
print('\n...... testing benchmark GMM clustering ...........')
# Use all features - then show all plots
print('Benchmarking GMM clustering using all features...')
analyzer.benchmark_gmm_cluster(show_plot=False)
analyzer.plot_clusters(algorithm='GMM')
# Only use energy and valence - then show plots
print('Benchmarking GMM clustering using only 2 features...')
analyzer.benchmark_gmm_cluster(show_plot=True, features=['energy','valence'])
@unittest.skip('')
def test_spectral_clustering(self):
print('\n........... testing spectral clustering ...........')
analyzer.spectral_cluster()
analyzer.spectral_cluster(features=['acousticness','valence'])
@unittest.skip('')
def test_spectral_vs_gmm(self):
print('\n........... testing spectral vs GMM ...........')
print('Spectral...')
analyzer.spectral_cluster(features=['energy','valence'], n_clusters=5)
print('GMM...')
analyzer.gmm_cluster(features=['energy','valence'], n_clusters=5)
@unittest.skip('')
def test_gmm_two_features(self):
print('\n.......... testing gmm with 2 features .................')
# Test clustering using each pair of features
features = ['energy', 'liveness', 'speechiness',
'acousticness', 'instrumentalness', 'danceability',
'valence', 'loudness_0_1', 'tempo_0_1']
for f1 in features:
for f2 in features:
if f1 != f2:
# Find the best n_clusters for this pair and then show plots
analyzer.benchmark_gmm_cluster(features=[f1,f2], show_plot=True)
def tearDown(self):
print('\n')
@classmethod
def tearDownClass(self):
print('\n.... finished unit testing of playlist_analyzer.py ...')
print('\n (s = Test Skipped)')
print('\n\nUse -h or --help flag from command line to see how to use '+\
'\neach feature if you did not mean to perform unittesting.')
#===============================================================================
# ------------------------- End of Unit Testing -------------------------
#===============================================================================
# ------------------------- Start of Main Method ------------------------
#===============================================================================
"""
Main Method - Connects to Spotify API, parses command line arguments and calls
the appropriate function. If no command line arguments are given, unittest.main() is called.
-h or --help can be provided to print a help message on how to use the different features.
"""
if __name__ == '__main__':
# Authenticate with Spotify - Access environment variables and request auth token
try:
username = env['SPOTIFY_USERNAME']
scope = 'playlist-read-collaborative playlist-read-private playlist-modify-private'
token = util.prompt_for_user_token(username, scope,
client_id=env['SPOTIFY_CLIENT_ID'],
client_secret=env['SPOTIFY_CLIENT_SECRET'],
redirect_uri=env['SPOTIFY_REDIRECT_URI'])
# Environment variable(s) not set - print the names of the variables needed to run script
except KeyError:
print('\nThe following environment variables must be set in order to run this script:\n' +\
'\t- SPOTIFY_USERNAME\n\t- SPOTIFY_CLIENT_ID\n\t- SPOTIFY_CLIENT_SECRET\n\t- '+\
'SPOTIFY_REDIRECT_URI\n'+\
'\nPlease see README.md or matt-levin.com/PlaylistAnalyzer for more details.\n')
sys.exit(0)
# Raises AssertionError if auth token not acquired successfully
assert token, 'Unable to acquire access token from Spotify. Please try again.'
# Successfully created an authentication token with Spotify
sp = spotipy.Spotify(auth=token) # Object to interact with the API
analyzer = Analyzer()
# Parse command line arguments if given, otherwise run unittest.main()
if len(sys.argv) == 1: # No arguments given...
print("Use '-h' or '--help' flag to see how to use each feature from Command Line.\n"+\
"Beginning unittesting since no arguments were given...")
unittest.main() # Run unit testing
else: # Command line were arguments given...
feature = sys.argv[1] # Which feature is being used
args = {} # The remaining arguments stored as a dict
for i in range(2, len(sys.argv), 2): # Parse any optional arguments into args dict
try:
args[sys.argv[i]] = sys.argv[i+1]
except IndexError: # Invalid number of arguments (i.e. given an 'N' without '-n N')
print('Invalid arguments, use -h or --help flag to see proper usage')
sys.exit(0)
# Generate Predicted Playlist with MLP
if feature == '-g':
playlist = 'my_top_100'
if '-p' in args:
playlist = args['-p'].split('.')[0] # Strip .csv if given
n = 100
if '-n' in args:
n = int(args['-n'])
analyzer.predict_playlist(playlist, n)
# Top Artists
elif feature == '-a':
n = None # Default to None (showing all artists)
if '-n' in args:
n = int(args['-n'])
playlist_id = '38hwZEL0H1z6wUbq0UoHBS' # Default playlist to analyze
if '-p' in args:
playlist_id = args['-p'].split(':')[-1] # Isolate ID if URI was given
playlist = sp.user_playlist(username, playlist_id)