Skip to content

Commit

Permalink
Simplify storage interface
Browse files Browse the repository at this point in the history
Git rid of File model and use Episode instead
  • Loading branch information
mxpv committed Jan 25, 2020
1 parent 415bd0e commit d7a6a62
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 141 deletions.
44 changes: 20 additions & 24 deletions cmd/podsync/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,58 +99,54 @@ func (u *Updater) updateFeed(ctx context.Context, feedConfig *config.Feed) error

func (u *Updater) downloadEpisodes(ctx context.Context, feedConfig *config.Feed, targetDir string) error {
var (
feedID = feedConfig.ID
updateList []*model.Episode
feedID = feedConfig.ID
downloadList []*model.Episode
)

// Build the list of files to download
if err := u.db.WalkFiles(ctx, feedID, func(file *model.File) error {
if file.Status != model.EpisodeNew && file.Status != model.EpisodeError {
if err := u.db.WalkEpisodes(ctx, feedID, func(episode *model.Episode) error {
if episode.Status != model.EpisodeNew && episode.Status != model.EpisodeError {
// File already downloaded
return nil
}

episode, err := u.db.GetEpisode(ctx, feedID, file.EpisodeID)
if err != nil {
return errors.Wrapf(err, "failed to query episode %q from database", file.EpisodeID)
}

updateList = append(updateList, episode)
downloadList = append(downloadList, episode)
return nil
}); err != nil {
return errors.Wrapf(err, "failed to build update list")
}

var (
updateListLen = len(updateList)
downloadCount = len(downloadList)
downloaded = 0
)

if updateListLen > 0 {
log.Infof("update list size: %d", updateListLen)
if downloadCount > 0 {
log.Infof("download count: %d", downloadCount)
} else {
log.Info("no episodes to download")
return nil
}

// Download pending episodes
for idx, episode := range updateList {

for idx, episode := range downloadList {
logger := log.WithFields(log.Fields{
"index": idx,
"episode_id": episode.ID,
})

// Check whether episode exists on disk

episodePath := filepath.Join(targetDir, u.episodeName(feedConfig, episode))
stat, err := os.Stat(episodePath)
if err == nil {
logger.Infof("episode %q already exists on disk (%s)", episode.ID, episodePath)

// File already exists, update file status and disk size
if err := u.db.UpdateFile(feedID, episode.ID, func(file *model.File) error {
file.Size = stat.Size()
file.Status = model.EpisodeDownloaded
if err := u.db.UpdateEpisode(feedID, episode.ID, func(episode *model.Episode) error {
episode.Size = stat.Size()
episode.Status = model.EpisodeDownloaded
return nil
}); err != nil {
logger.WithError(err).Error("failed to update file info")
Expand Down Expand Up @@ -179,8 +175,8 @@ func (u *Updater) downloadEpisodes(ctx context.Context, feedConfig *config.Feed,
break
}

if err := u.db.UpdateFile(feedID, episode.ID, func(file *model.File) error {
file.Status = model.EpisodeError
if err := u.db.UpdateEpisode(feedID, episode.ID, func(episode *model.Episode) error {
episode.Status = model.EpisodeError
return nil
}); err != nil {
return err
Expand All @@ -193,17 +189,17 @@ func (u *Updater) downloadEpisodes(ctx context.Context, feedConfig *config.Feed,

logger.Infof("successfully downloaded file %q", episode.ID)

if err := u.db.UpdateFile(feedID, episode.ID, func(file *model.File) error {
if err := u.db.UpdateEpisode(feedID, episode.ID, func(episode *model.Episode) error {
// Record file size of newly downloaded file
size, err := u.fileSize(episodePath)
if err != nil {
logger.WithError(err).Error("failed to get episode file size")
} else {
logger.Debugf("file size: %d bytes", file.Size)
file.Size = size
logger.Debugf("file size: %d bytes", episode.Size)
episode.Size = size
}

file.Status = model.EpisodeDownloaded
episode.Status = model.EpisodeDownloaded
return nil
}); err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions pkg/feed/vimeo.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ func (v *VimeoBuilder) queryVideos(getVideos getVideosFunc, feed *model.Feed) er
PubDate: video.CreatedTime,
Thumbnail: image,
VideoURL: videoURL,
Status: model.EpisodeNew,
})

added++
Expand Down
1 change: 1 addition & 0 deletions pkg/feed/youtube.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ func (yt *YouTubeBuilder) queryVideoDescriptions(ctx context.Context, playlist m
VideoURL: videoURL,
PubDate: pubDate,
Order: order,
Status: model.EpisodeNew,
})
}

Expand Down
28 changes: 11 additions & 17 deletions pkg/model/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,16 @@ const (

type Episode struct {
// ID of episode
ID string `json:"id"`
Title string `json:"title"`
Description string `json:"description"`
Thumbnail string `json:"thumbnail"`
Duration int64 `json:"duration"`
VideoURL string `json:"video_url"`
PubDate time.Time `json:"pub_date"`
Size int64 `json:"size"`
Order string `json:"order"`
ID string `json:"id"`
Title string `json:"title"`
Description string `json:"description"`
Thumbnail string `json:"thumbnail"`
Duration int64 `json:"duration"`
VideoURL string `json:"video_url"`
PubDate time.Time `json:"pub_date"`
Size int64 `json:"size"`
Order string `json:"order"`
Status EpisodeStatus `json:"status"` // Disk status
}

type Feed struct {
Expand All @@ -54,7 +55,7 @@ type Feed struct {
PubDate time.Time `json:"pub_date"`
Author string `json:"author"`
ItemURL string `json:"item_url"` // Platform specific URL
Episodes []*Episode `json:"-"` // Array of episodes, serialized as gziped EpisodesData in DynamoDB
Episodes []*Episode `json:"-"` // Array of episodes
UpdatedAt time.Time `json:"updated_at"`
}

Expand All @@ -66,10 +67,3 @@ const (
EpisodeError = EpisodeStatus("error") // Could not download, will retry
EpisodeCleaned = EpisodeStatus("cleaned") // Downloaded and later removed from disk due to update strategy
)

type File struct {
EpisodeID string `json:"episode_id"`
FeedID string `json:"feed_id"`
Size int64 `json:"size"` // Size on disk after encoding
Status EpisodeStatus `json:"status"`
}
79 changes: 23 additions & 56 deletions pkg/storage/badger.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@ const (
feedPath = "feed/%s"
episodePrefix = "episode/%s/"
episodePath = "episode/%s/%s" // FeedID + EpisodeID
filePrefix = "file/%s/"
filePath = "file/%s/%s" // FeedID + EpisodeID
)

type Badger struct {
Expand Down Expand Up @@ -100,22 +98,6 @@ func (b *Badger) AddFeed(_ context.Context, feedID string, feed *model.Feed) err
}
}

// Update download file statuses
for _, episode := range feed.Episodes {
fileKey := b.getKey(filePath, feedID, episode.ID)
file := &model.File{
EpisodeID: episode.ID,
FeedID: feedID,
Size: episode.Size, // Use estimated file size
Status: model.EpisodeNew,
}

err := b.setObj(txn, fileKey, file, false)
if err != nil && err != ErrAlreadyExists {
return errors.Wrapf(err, "failed to set %q status for %q", model.EpisodeNew, episode.ID)
}
}

return nil
})
}
Expand Down Expand Up @@ -190,16 +172,6 @@ func (b *Badger) DeleteFeed(_ context.Context, feedID string) error {
return errors.Wrapf(err, "failed to iterate episodes for feed %q", feedID)
}

// Files
opts = badger.DefaultIteratorOptions
opts.Prefix = b.getKey(filePrefix, feedID)
opts.PrefetchValues = false
if err := b.iterator(txn, opts, func(item *badger.Item) error {
return txn.Delete(item.KeyCopy(nil))
}); err != nil {
return errors.Wrapf(err, "failed to iterate files for feed %q", feedID)
}

return nil
})
}
Expand All @@ -218,47 +190,42 @@ func (b *Badger) GetEpisode(_ context.Context, feedID string, episodeID string)
return &episode, err
}

func (b *Badger) WalkFiles(_ context.Context, feedID string, cb func(file *model.File) error) error {
opts := badger.DefaultIteratorOptions
opts.Prefix = b.getKey(filePrefix, feedID)
opts.PrefetchValues = true

return b.db.View(func(txn *badger.Txn) error {
return b.iterator(txn, opts, func(item *badger.Item) error {
file := &model.File{}
if err := b.unmarshalObj(item, file); err != nil {
return err
}

return cb(file)
})
})
}

func (b *Badger) UpdateFile(feedID string, episodeID string, cb func(file *model.File) error) error {
func (b *Badger) UpdateEpisode(feedID string, episodeID string, cb func(episode *model.Episode) error) error {
var (
key = b.getKey(filePath, feedID, episodeID)
file = &model.File{}
key = b.getKey(episodePath, feedID, episodeID)
episode model.Episode
)

return b.db.Update(func(txn *badger.Txn) error {
if err := b.getObj(txn, key, file); err != nil {
if err := b.getObj(txn, key, &episode); err != nil {
return err
}

if err := cb(file); err != nil {
if err := cb(&episode); err != nil {
return err
}

if file.FeedID != feedID {
return errors.New("can't change feed ID")
}

if file.EpisodeID != episodeID {
if episode.ID != episodeID {
return errors.New("can't change episode ID")
}

return b.setObj(txn, key, file, true)
return b.setObj(txn, key, &episode, true)
})
}

func (b *Badger) WalkEpisodes(ctx context.Context, feedID string, cb func(episode *model.Episode) error) error {
return b.db.View(func(txn *badger.Txn) error {
opts := badger.DefaultIteratorOptions
opts.Prefix = b.getKey(episodePrefix, feedID)
opts.PrefetchValues = true
return b.iterator(txn, opts, func(item *badger.Item) error {
feed := &model.Episode{}
if err := b.unmarshalObj(item, feed); err != nil {
return err
}

return cb(feed)
})
})
}

Expand Down
56 changes: 18 additions & 38 deletions pkg/storage/badger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,23 +122,15 @@ func TestBadger_DeleteFeed(t *testing.T) {
assert.NoError(t, err)

called := 0

err = db.WalkFeeds(testCtx, func(feed *model.Feed) error {
called++
return nil
})
assert.NoError(t, err)

err = db.WalkFiles(testCtx, feed.ID, func(file *model.File) error {
called++
return nil
})
assert.NoError(t, err)

assert.Equal(t, 0, called)
}

func TestBadger_WalkFiles(t *testing.T) {
func TestBadger_UpdateEpisode(t *testing.T) {
dir, err := ioutil.TempDir("", "podsync-badger-")
assert.NoError(t, err)
defer os.RemoveAll(dir)
Expand All @@ -151,23 +143,24 @@ func TestBadger_WalkFiles(t *testing.T) {
err = db.AddFeed(testCtx, feed.ID, feed)
assert.NoError(t, err)

called := 0

err = db.WalkFiles(testCtx, feed.ID, func(file *model.File) error {
assert.Equal(t, feed.ID, file.FeedID)
assert.Equal(t, feed.Episodes[called].ID, file.EpisodeID)
assert.Equal(t, feed.Episodes[called].Size, file.Size)
assert.Equal(t, model.EpisodeNew, file.Status)

called++
err = db.UpdateEpisode(feed.ID, feed.Episodes[0].ID, func(file *model.Episode) error {
file.Size = 333
file.Status = model.EpisodeDownloaded
return nil
})
assert.NoError(t, err)

episode, err := db.GetEpisode(testCtx, feed.ID, feed.Episodes[0].ID)
assert.NoError(t, err)

assert.Equal(t, feed.Episodes[0].ID, episode.ID)
assert.EqualValues(t, 333, episode.Size)
assert.Equal(t, model.EpisodeDownloaded, episode.Status)

assert.NoError(t, err)
assert.Equal(t, 2, called)
}

func TestBadger_UpdateFile(t *testing.T) {
func TestBadger_WalkEpisodes(t *testing.T) {
dir, err := ioutil.TempDir("", "podsync-badger-")
assert.NoError(t, err)
defer os.RemoveAll(dir)
Expand All @@ -180,28 +173,15 @@ func TestBadger_UpdateFile(t *testing.T) {
err = db.AddFeed(testCtx, feed.ID, feed)
assert.NoError(t, err)

err = db.UpdateFile(feed.ID, feed.Episodes[0].ID, func(file *model.File) error {
file.Size = 333
file.Status = model.EpisodeDownloaded
return nil
})
assert.NoError(t, err)

first := true

err = db.WalkFiles(testCtx, feed.ID, func(file *model.File) error {
if first {
assert.Equal(t, feed.ID, file.FeedID)
assert.Equal(t, feed.Episodes[0].ID, file.EpisodeID)
assert.EqualValues(t, 333, file.Size)
assert.Equal(t, model.EpisodeDownloaded, file.Status)
first = false
}

called := 0
err = db.WalkEpisodes(testCtx, feed.ID, func(actual *model.Episode) error {
assert.EqualValues(t, feed.Episodes[called], actual)
called++
return nil
})

assert.NoError(t, err)
assert.Equal(t, called, 2)
}

func getFeed() *model.Feed {
Expand Down
Loading

0 comments on commit d7a6a62

Please sign in to comment.