-
Notifications
You must be signed in to change notification settings - Fork 0
/
cli.go
351 lines (325 loc) · 9.97 KB
/
cli.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
package main
import (
"encoding/json"
"flag"
"fmt"
"github.com/Abramovic/soundclouder/config"
"github.com/Abramovic/soundclouder/crawler"
"github.com/Abramovic/soundclouder/helpers"
"github.com/garyburd/redigo/redis"
"os"
"os/signal"
"runtime"
"strconv"
"strings"
"sync"
"syscall"
)
var (
max_workers int = 200
playlist_ids chan int
track_ids chan int
maxPlaylist = flag.Int("playlist", 40000000, "max playlist id") // We can't automatically grab the max playlist id
useEmpty = flag.Bool("empty", true, "use empty database")
configFile = flag.String("config", "", "path to config file")
restartTodo = flag.Bool("restart", false, "restart incomplete crawls due to a crash")
)
/*
NOTES:
We can probably merge the two channels (playlists and tracks) into one batch crawl channel
type BatchCrawl struct {
BatchId int
Type string // track or playlist
}
*/
type Crawler struct {
*crawler.Crawler
}
func main() {
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU())
var err error
file, err := os.Open(*configFile)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
decoder := json.NewDecoder(file)
config := config.Configuration{}
err = decoder.Decode(&config)
if err != nil {
fmt.Println(err)
os.Exit(1)
}
if config.Host == "" || config.ClientId == "" {
fmt.Println("Missing Configs", config)
os.Exit(1)
}
if config.MaxWorkers > 0 {
max_workers = config.MaxWorkers
}
c := crawler.New(config)
defer c.Close()
crawler := Crawler{c}
// We are able to get the highest track id on our own.
max_id, err := crawler.GetHighTrackId()
if err != nil {
fmt.Println(err)
os.Exit(1)
}
// Handle signals to stop crawling.
var canCrawl bool = true
stopCrawler := make(chan os.Signal, 1)
signal.Notify(stopCrawler, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-stopCrawler
fmt.Println("Exit signal recieved. Will finish up current crawls and exit the program.")
canCrawl = false
close(track_ids)
close(playlist_ids)
}()
track_ids = make(chan int, max_workers)
var trackMonitor sync.WaitGroup
trackMonitor.Add(max_workers)
for i := 0; i < max_workers; i++ {
go crawler.ProcessTracks(&trackMonitor)
}
r := c.RedisClient.Get()
defer r.Close()
// If we want to restart crawls due to a server crash...
if *restartTodo == true && *useEmpty == false && canCrawl {
playlists, _ := redis.Ints(r.Do("SMEMBERS", "crawlPlaylistsTodo"))
for _, i := range playlists {
r.Do("SADD", "crawlPlaylists", i)
}
tracks, _ := redis.Ints(r.Do("SMEMBERS", "crawlPlaylistsTodo"))
for _, i := range tracks {
r.Do("SADD", "crawlTracks", i)
}
}
// Starts a new crawl from scratch...
if *useEmpty == true && canCrawl {
r.Do("DEL", "crawlTracks")
batch_max := int(max_id/1000) + 1
for i := batch_max; i > 0; i-- {
nulls := []interface{}{fmt.Sprintf("trackMeta:%d", i)}
track_id := i * 1000
for k := 999; k >= 0; k-- {
nulls = append(nulls, fmt.Sprintf("%d", (track_id+k)), "null")
}
r.Do("HMSET", nulls...)
r.Do("SADD", "crawlTracks", i)
}
}
var hasMoreTracks bool = true
for hasMoreTracks && canCrawl {
// Add all of the tracks that are scheduled to be crawled into a channel
i, err := redis.Int(r.Do("SPOP", "crawlTracks"))
if err != nil {
hasMoreTracks = false
} else {
r.Do("SADD", "crawlTracksTodo", i)
track_ids <- i
}
}
close(track_ids)
// Wait for all of the tracks to be crawled before going onto the playlists
trackMonitor.Wait()
// Manually run garbage collection to free up any memory that is no longer used
runtime.GC()
playlist_ids = make(chan int, max_workers)
var playlistMonitor sync.WaitGroup
playlistMonitor.Add(max_workers)
for i := 0; i < max_workers; i++ {
go crawler.ProcessPlaylists(&playlistMonitor)
}
if *useEmpty == true && canCrawl {
r.Do("DEL", "crawlPlaylists")
playlist_max := int(*maxPlaylist/1000) + 1
for i := playlist_max; i > 0; i-- {
nulls := []interface{}{fmt.Sprintf("playlistTracks:%d", i)}
playlist_id := i * 1000
for k := 999; k >= 0; k-- {
nulls = append(nulls, fmt.Sprintf("%d", (playlist_id+k)), "null")
}
r.Do("HMSET", nulls...)
r.Do("SADD", "crawlPlaylists", i)
}
}
var hasMorePlaylists bool = true
for hasMorePlaylists && canCrawl {
i, err := redis.Int(r.Do("SPOP", "crawlPlaylists"))
if err != nil {
hasMorePlaylists = false
} else {
r.Do("SADD", "crawlPlaylistsTodo", i)
playlist_ids <- i
}
}
close(playlist_ids)
playlistMonitor.Wait()
}
func (c *Crawler) ProcessPlaylists(wg *sync.WaitGroup) error {
r := c.RedisClient.Get()
defer r.Close()
P:
for {
select {
case batch_id, open := <-playlist_ids:
if !open {
break P
}
// Get all of the IDs in this batch of 1,000 playlists
ids, err := redis.Strings(r.Do("HKEYS", fmt.Sprintf("playlistTracks:%d", batch_id)))
if err != nil {
fmt.Println(err)
continue
}
for _, id := range ids {
// CLEANUP Go's string to int is string to int64 and then we are turning the int64 into an int
pid, err := strconv.ParseInt(id, 0, 64)
if err != nil {
continue
}
playlist_id := int(pid)
key, hkey := crawler.RedisKey("playlistTracks", playlist_id)
exists, _ := redis.Bool(r.Do("HEXISTS", key, hkey))
if exists == false {
// we can't crawl this record because it was deleted before in a past crawl.
continue
}
playlist, err := c.GetPlaylist(playlist_id)
if err != nil {
// We can't crawl this record. Make sure we delete it from our database.
r.Do("HDEL", key, hkey)
continue
}
track_ids := []string{}
for _, track := range playlist.Tracks {
// AppendSlice keeps a unique slice in case the playlist has the same track multiple times
track_ids = helpers.AppendSlice(track_ids, fmt.Sprintf("%d", track.Id))
}
if len(track_ids) == 0 {
// This playlist doesn't have any tracks associated with it
r.Do("HDEL", key, hkey)
continue
}
// If this is the first time that we have seen this playlist (empty string or "null") then we
// want to increment the counter for each track in the playlist
s, _ := redis.String(r.Do("HGET", key, hkey))
if s == "null" || s == "" {
for _, track := range playlist.Tracks {
// Increment the counter for the tracks, not the playlist
k, h := crawler.RedisKey("trackCountPlaylist", track.Id)
r.Do("HINCRBY", k, h, 1)
}
}
r.Do("HSET", key, hkey, strings.Join(track_ids, ","))
}
r.Do("SREM", "crawlPlaylistsTodo", batch_id)
}
}
wg.Done()
return nil
}
func (c *Crawler) ProcessTracks(wg *sync.WaitGroup) error {
r := c.RedisClient.Get()
defer r.Close()
T:
for {
select {
case batch_id, open := <-track_ids:
if !open {
break T
}
// Grab the batch of tracks to be crawled (up to 1,000)
ids, err := redis.Strings(r.Do("HKEYS", fmt.Sprintf("trackMeta:%d", batch_id)))
if err != nil {
fmt.Println(err)
continue
}
for _, id := range ids {
tid, err := strconv.ParseInt(id, 0, 64)
if err != nil {
continue
}
track_id := int(tid)
key, hkey := crawler.RedisKey("trackMeta", track_id)
exists, _ := redis.Bool(r.Do("HEXISTS", key, hkey))
if exists == false {
// we can't crawl this record because it was deleted before in a past crawl.
continue
}
track, err := c.GetTrack(track_id)
if err != nil {
// We can't crawl this record. Make sure we delete it from our database.
r.Do("HDEL", key, hkey)
continue
}
j, err := json.Marshal(track)
if err != nil {
r.Do("HDEL", key, hkey)
continue
}
r.Do("HSET", key, hkey, string(j))
if track.User.Id > 0 {
// Store the user meta data if available
userKey, userHkey := crawler.RedisKey("userMeta", track.User.Id)
s, _ := redis.String(r.Do("HGET", userKey, userHkey))
if s == "null" || s == "" {
// Only update the user data if this is the first time that we have seen them
j, err := json.Marshal(track.User)
if err != nil {
continue
}
r.Do("HSET", userKey, userHkey, string(j))
}
}
// Get all of the comments on this track
comments := c.GetTrackComments(track.Id)
// Transform all of the User Ids from ints into strings
track_commenters := []string{}
for _, comment := range comments {
// AppendSlice will only append to the slice if the user id does not already exist
track_commenters = helpers.AppendSlice(track_commenters, fmt.Sprintf("%d", comment.UserId))
}
if len(track_commenters) > 0 {
comKey, comHkey := crawler.RedisKey("trackCommenters", track_id)
// If this is the first time that we have seen this track (empty string or "null") then we
// want to increment the counter of the number of user commenters that we have seen
s, _ := redis.String(r.Do("HGET", comKey, comHkey))
if s == "null" || s == "" {
for range track_commenters {
k, h := crawler.RedisKey("trackCountCommenters", track_id)
r.Do("HINCRBY", k, h, 1)
}
}
r.Do("HSET", comKey, comHkey, strings.Join(track_commenters, ","))
}
// Get all of the users who have favorited this track
favoriters := c.GetTrackFavoriters(track.Id)
// Transform the User IDs from a slice of ints to a slice of strings
track_favoriters := []string{}
for _, favorite := range favoriters {
track_favoriters = append(track_favoriters, fmt.Sprintf("%d", favorite.Id))
}
if len(track_favoriters) > 0 {
favKey, favHkey := crawler.RedisKey("trackFavoriters", track_id)
// If this is the first time that we have seen this track (empty string or "null") then we
// want to increment the counter of the number of user favorites that we have seen
s, _ := redis.String(r.Do("HGET", favKey, favHkey))
if s == "null" || s == "" {
for range track_favoriters {
k, h := crawler.RedisKey("trackCountFavoriters", track_id)
r.Do("HINCRBY", k, h, 1)
}
}
r.Do("HSET", favKey, favHkey, strings.Join(track_favoriters, ","))
}
}
r.Do("SREM", "crawlTracksTodo", batch_id)
}
}
wg.Done()
return nil
}