From 38f5c3cffc28a1c7501fd8666f95c854df469f68 Mon Sep 17 00:00:00 2001 From: Maksym Pavlenko Date: Sat, 7 Mar 2020 16:55:13 -0800 Subject: [PATCH] Use fs storage interface to access episodes --- cmd/podsync/main.go | 8 ++- cmd/podsync/updater.go | 128 ++++++++++++----------------------- cmd/podsync/updater_test.go | 50 -------------- pkg/fs/local.go | 100 ++++++++++++++++++++++++++++ pkg/fs/local_test.go | 129 ++++++++++++++++++++++++++++++++++++ pkg/fs/storage.go | 20 ++++++ 6 files changed, 297 insertions(+), 138 deletions(-) delete mode 100644 cmd/podsync/updater_test.go create mode 100644 pkg/fs/local.go create mode 100644 pkg/fs/local_test.go create mode 100644 pkg/fs/storage.go diff --git a/cmd/podsync/main.go b/cmd/podsync/main.go index 42fe3898..90791b7b 100644 --- a/cmd/podsync/main.go +++ b/cmd/podsync/main.go @@ -16,6 +16,7 @@ import ( "github.com/mxpv/podsync/pkg/config" "github.com/mxpv/podsync/pkg/db" + "github.com/mxpv/podsync/pkg/fs" "github.com/mxpv/podsync/pkg/ytdl" ) @@ -94,9 +95,14 @@ func main() { log.WithError(err).Fatal("failed to open database") } + storage, err := fs.NewLocal(cfg.Server.DataDir, cfg.Server.Hostname) + if err != nil { + log.WithError(err).Fatal("failed to open storage") + } + // Run updater thread log.Debug("creating updater") - updater, err := NewUpdater(cfg, downloader, database) + updater, err := NewUpdater(cfg, downloader, database, storage) if err != nil { log.WithError(err).Fatal("failed to create updater") } diff --git a/cmd/podsync/updater.go b/cmd/podsync/updater.go index 108bed5f..c6cdad20 100644 --- a/cmd/podsync/updater.go +++ b/cmd/podsync/updater.go @@ -1,15 +1,13 @@ package main import ( + "bytes" "context" "fmt" "io" - "io/ioutil" "os" - "path/filepath" "regexp" "strconv" - "strings" "time" itunes "github.com/mxpv/podcast" @@ -19,6 +17,7 @@ import ( "github.com/mxpv/podsync/pkg/config" "github.com/mxpv/podsync/pkg/db" "github.com/mxpv/podsync/pkg/feed" + "github.com/mxpv/podsync/pkg/fs" "github.com/mxpv/podsync/pkg/link" "github.com/mxpv/podsync/pkg/model" "github.com/mxpv/podsync/pkg/ytdl" @@ -32,13 +31,15 @@ type Updater struct { config *config.Config downloader Downloader db db.Storage + fs fs.Storage } -func NewUpdater(config *config.Config, downloader Downloader, db db.Storage) (*Updater, error) { +func NewUpdater(config *config.Config, downloader Downloader, db db.Storage, fs fs.Storage) (*Updater, error) { return &Updater{ config: config, downloader: downloader, db: db, + fs: fs, }, nil } @@ -50,18 +51,11 @@ func (u *Updater) Update(ctx context.Context, feedConfig *config.Feed) error { }).Infof("-> updating %s", feedConfig.URL) started := time.Now() - // Make sure feed directory exists - feedPath := filepath.Join(u.config.Server.DataDir, feedConfig.ID) - log.Debugf("creating directory for feed %q", feedPath) - if err := os.MkdirAll(feedPath, 0755); err != nil { - return errors.Wrapf(err, "failed to create directory for feed %q", feedConfig.ID) - } - if err := u.updateFeed(ctx, feedConfig); err != nil { return err } - if err := u.downloadEpisodes(ctx, feedConfig, feedPath); err != nil { + if err := u.downloadEpisodes(ctx, feedConfig); err != nil { return err } @@ -100,7 +94,7 @@ func (u *Updater) updateFeed(ctx context.Context, feedConfig *config.Feed) error return nil } -func (u *Updater) downloadEpisodes(ctx context.Context, feedConfig *config.Feed, targetDir string) error { +func (u *Updater) downloadEpisodes(ctx context.Context, feedConfig *config.Feed) error { var ( feedID = feedConfig.ID downloadList []*model.Episode @@ -146,21 +140,19 @@ func (u *Updater) downloadEpisodes(ctx context.Context, feedConfig *config.Feed, // Download pending episodes for idx, episode := range downloadList { - logger := log.WithFields(log.Fields{ - "index": idx, - "episode_id": episode.ID, - }) - - // Check whether episode exists on disk + var ( + logger = log.WithFields(log.Fields{"index": idx, "episode_id": episode.ID}) + episodeName = u.episodeName(feedConfig, episode) + ) - episodePath := filepath.Join(targetDir, u.episodeName(feedConfig, episode)) - stat, err := os.Stat(episodePath) + // Check whether episode already exists + size, err := u.fs.Size(ctx, feedID, episodeName) if err == nil { - logger.Infof("episode %q already exists on disk (%s)", episode.ID, episodePath) + logger.Infof("episode %q already exists on disk", episode.ID) // File already exists, update file status and disk size if err := u.db.UpdateEpisode(feedID, episode.ID, func(episode *model.Episode) error { - episode.Size = stat.Size() + episode.Size = size episode.Status = model.EpisodeDownloaded return nil }); err != nil { @@ -200,16 +192,14 @@ func (u *Updater) downloadEpisodes(ctx context.Context, feedConfig *config.Feed, continue } - logger.Debugf("copying file to: %s", episodePath) - fileSize, err := copyFile(tempFile, episodePath) - + logger.Debug("copying file") + fileSize, err := u.fs.Create(ctx, feedID, episodeName, tempFile) tempFile.Close() - if err != nil { logger.WithError(err).Error("failed to copy file") return err } - logger.Debugf("copied %d bytes", episode.Size) + logger.Debugf("copied %d bytes", fileSize) // Update file status in database @@ -229,22 +219,6 @@ func (u *Updater) downloadEpisodes(ctx context.Context, feedConfig *config.Feed, return nil } -func copyFile(source io.Reader, destinationPath string) (int64, error) { - dest, err := os.Create(destinationPath) - if err != nil { - return 0, errors.Wrap(err, "failed to create destination file") - } - - defer dest.Close() - - written, err := io.Copy(dest, source) - if err != nil { - return 0, errors.Wrap(err, "failed to copy data") - } - - return written, nil -} - func (u *Updater) buildXML(ctx context.Context, feedConfig *config.Feed) error { feed, err := u.db.GetFeed(ctx, feedConfig.ID) if err != nil { @@ -253,23 +227,24 @@ func (u *Updater) buildXML(ctx context.Context, feedConfig *config.Feed) error { // Build iTunes XML feed with data received from builder log.Debug("building iTunes podcast feed") - podcast, err := u.buildPodcast(feed, feedConfig) + podcast, err := u.buildPodcast(ctx, feed, feedConfig) if err != nil { return err } - // Save XML to disk - xmlName := fmt.Sprintf("%s.xml", feedConfig.ID) - xmlPath := filepath.Join(u.config.Server.DataDir, xmlName) - log.Debugf("saving feed XML file to %s", xmlPath) - if err := ioutil.WriteFile(xmlPath, []byte(podcast.String()), 0600); err != nil { - return errors.Wrapf(err, "failed to write XML feed to disk") + var ( + reader = bytes.NewReader([]byte(podcast.String())) + xmlName = fmt.Sprintf("%s.xml", feedConfig.ID) + ) + + if _, err := u.fs.Create(ctx, "", xmlName, reader); err != nil { + return errors.Wrap(err, "failed to upload new XML feed") } return nil } -func (u *Updater) buildPodcast(feed *model.Feed, cfg *config.Feed) (*itunes.Podcast, error) { +func (u *Updater) buildPodcast(ctx context.Context, feed *model.Feed, cfg *config.Feed) (*itunes.Podcast, error) { const ( podsyncGenerator = "Podsync generator (support us at https://github.com/mxpv/podsync)" defaultCategory = "TV & Film" @@ -320,7 +295,19 @@ func (u *Updater) buildPodcast(feed *model.Feed, cfg *config.Feed) (*itunes.Podc item.AddSummary(episode.Description) item.AddImage(episode.Thumbnail) item.AddDuration(episode.Duration) - item.AddEnclosure(u.makeEnclosure(feed, episode, cfg)) + + enclosureType := itunes.MP4 + if feed.Format == model.FormatAudio { + enclosureType = itunes.MP4 + } + + episodeName := u.episodeName(cfg, episode) + downloadURL, err := u.fs.URL(ctx, cfg.ID, episodeName) + if err != nil { + return nil, errors.Wrapf(err, "failed to obtain download URL for: %s", episodeName) + } + + item.AddEnclosure(downloadURL, enclosureType, episode.Size) // p.AddItem requires description to be not empty, use workaround if item.Description == "" { @@ -333,8 +320,7 @@ func (u *Updater) buildPodcast(feed *model.Feed, cfg *config.Feed) (*itunes.Podc item.IExplicit = "no" } - _, err := p.AddItem(item) - if err != nil { + if _, err := p.AddItem(item); err != nil { return nil, errors.Wrapf(err, "failed to add item to podcast (id %q)", episode.ID) } } @@ -342,38 +328,6 @@ func (u *Updater) buildPodcast(feed *model.Feed, cfg *config.Feed) (*itunes.Podc return &p, nil } -func (u *Updater) makeEnclosure( - feed *model.Feed, - episode *model.Episode, - cfg *config.Feed, -) (string, itunes.EnclosureType, int64) { - ext := "mp4" - contentType := itunes.MP4 - if feed.Format == model.FormatAudio { - ext = "mp3" - contentType = itunes.MP3 - } - - url := fmt.Sprintf( - "%s/%s/%s.%s", - u.hostname(), - cfg.ID, - episode.ID, - ext, - ) - - return url, contentType, episode.Size -} - -func (u *Updater) hostname() string { - hostname := strings.TrimSuffix(u.config.Server.Hostname, "/") - if !strings.HasPrefix(hostname, "http") { - hostname = fmt.Sprintf("http://%s", hostname) - } - - return hostname -} - func (u *Updater) episodeName(feedConfig *config.Feed, episode *model.Episode) string { ext := "mp4" if feedConfig.Format == model.FormatAudio { diff --git a/cmd/podsync/updater_test.go b/cmd/podsync/updater_test.go deleted file mode 100644 index 556af65e..00000000 --- a/cmd/podsync/updater_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package main - -import ( - "bytes" - "io/ioutil" - "os" - "path/filepath" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/mxpv/podsync/pkg/config" -) - -func TestUpdater_hostname(t *testing.T) { - u := Updater{ - config: &config.Config{ - Server: config.Server{ - Hostname: "localhost", - Port: 7979, - }, - }, - } - - assert.Equal(t, "http://localhost", u.hostname()) - - // Trim end slash - u.config.Server.Hostname = "https://localhost:8080/" - assert.Equal(t, "https://localhost:8080", u.hostname()) -} - -func TestCopyFile(t *testing.T) { - reader := bytes.NewReader([]byte{1, 2, 4}) - - tmpDir, err := ioutil.TempDir("", "podsync-test-") - require.NoError(t, err) - - defer os.RemoveAll(tmpDir) - - file := filepath.Join(tmpDir, "1") - - size, err := copyFile(reader, file) - assert.NoError(t, err) - assert.EqualValues(t, 3, size) - - stat, err := os.Stat(file) - assert.NoError(t, err) - assert.EqualValues(t, 3, stat.Size()) -} diff --git a/pkg/fs/local.go b/pkg/fs/local.go new file mode 100644 index 00000000..65885515 --- /dev/null +++ b/pkg/fs/local.go @@ -0,0 +1,100 @@ +package fs + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/pkg/errors" + log "github.com/sirupsen/logrus" +) + +type Local struct { + hostname string + rootDir string +} + +func NewLocal(rootDir string, hostname string) (*Local, error) { + if hostname == "" { + return nil, errors.New("hostname can't be empty") + } + + hostname = strings.TrimSuffix(hostname, "/") + if !strings.HasPrefix(hostname, "http") { + hostname = fmt.Sprintf("http://%s", hostname) + } + + return &Local{rootDir: rootDir, hostname: hostname}, nil +} + +func (l *Local) Create(ctx context.Context, ns string, fileName string, reader io.Reader) (int64, error) { + var ( + logger = log.WithField("feed_id", ns).WithField("episode_id", fileName) + feedDir = filepath.Join(l.rootDir, ns) + ) + + if err := os.MkdirAll(feedDir, 0755); err != nil { + return 0, errors.Wrapf(err, "failed to create a directory for the feed: %s", feedDir) + } + + logger.Debugf("creating directory: %s", feedDir) + if err := os.MkdirAll(feedDir, 0755); err != nil { + return 0, errors.Wrapf(err, "failed to create feed dir: %s", feedDir) + } + + var ( + episodePath = filepath.Join(l.rootDir, ns, fileName) + ) + + logger.Debugf("copying to: %s", episodePath) + written, err := l.copyFile(reader, episodePath) + if err != nil { + return 0, errors.Wrap(err, "failed to copy file") + } + + logger.Debugf("copied %d bytes", written) + return written, nil +} + +func (l *Local) Delete(ctx context.Context, ns string, fileName string) error { + path := filepath.Join(l.rootDir, ns, fileName) + return os.Remove(path) +} + +func (l *Local) Size(ctx context.Context, ns string, fileName string) (int64, error) { + path := filepath.Join(l.rootDir, ns, fileName) + + stat, err := os.Stat(path) + if err == nil { + return stat.Size(), nil + } + + return 0, err +} + +func (l *Local) URL(ctx context.Context, ns string, fileName string) (string, error) { + if _, err := l.Size(ctx, ns, fileName); err != nil { + return "", errors.Wrap(err, "failed to check whether file exists") + } + + return fmt.Sprintf("%s/%s/%s", l.hostname, ns, fileName), nil +} + +func (l *Local) copyFile(source io.Reader, destinationPath string) (int64, error) { + dest, err := os.Create(destinationPath) + if err != nil { + return 0, errors.Wrap(err, "failed to create destination file") + } + + defer dest.Close() + + written, err := io.Copy(dest, source) + if err != nil { + return 0, errors.Wrap(err, "failed to copy data") + } + + return written, nil +} diff --git a/pkg/fs/local_test.go b/pkg/fs/local_test.go new file mode 100644 index 00000000..33a4135a --- /dev/null +++ b/pkg/fs/local_test.go @@ -0,0 +1,129 @@ +package fs + +import ( + "bytes" + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +var ( + testCtx = context.Background() +) + +func TestNewLocal(t *testing.T) { + local, err := NewLocal("", "localhost") + assert.NoError(t, err) + assert.Equal(t, "http://localhost", local.hostname) + + local, err = NewLocal("", "https://localhost:8080/") + assert.NoError(t, err) + assert.Equal(t, "https://localhost:8080", local.hostname) +} + +func TestLocal_Create(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "podsync-local-stor-") + require.NoError(t, err) + + defer os.RemoveAll(tmpDir) + + stor, err := NewLocal(tmpDir, "localhost") + assert.NoError(t, err) + + written, err := stor.Create(testCtx, "1", "test", bytes.NewBuffer([]byte{1, 5, 7, 8, 3})) + assert.NoError(t, err) + assert.EqualValues(t, 5, written) + + stat, err := os.Stat(filepath.Join(tmpDir, "1", "test")) + assert.NoError(t, err) + assert.EqualValues(t, 5, stat.Size()) +} + +func TestLocal_Size(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "podsync-local-stor-") + require.NoError(t, err) + + defer os.RemoveAll(tmpDir) + + stor, err := NewLocal(tmpDir, "localhost") + assert.NoError(t, err) + + _, err = stor.Create(testCtx, "1", "test", bytes.NewBuffer([]byte{1, 5, 7, 8, 3})) + assert.NoError(t, err) + + sz, err := stor.Size(testCtx, "1", "test") + assert.NoError(t, err) + assert.EqualValues(t, 5, sz) +} + +func TestLocal_NoSize(t *testing.T) { + stor, err := NewLocal("", "localhost") + assert.NoError(t, err) + + _, err = stor.Size(testCtx, "1", "test") + assert.True(t, os.IsNotExist(err)) +} + +func TestLocal_Delete(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "podsync-local-stor-") + require.NoError(t, err) + + defer os.RemoveAll(tmpDir) + + stor, err := NewLocal(tmpDir, "localhost") + assert.NoError(t, err) + + _, err = stor.Create(testCtx, "1", "test", bytes.NewBuffer([]byte{1, 5, 7, 8, 3})) + assert.NoError(t, err) + + err = stor.Delete(testCtx, "1", "test") + assert.NoError(t, err) + + _, err = stor.Size(testCtx, "1", "test") + assert.True(t, os.IsNotExist(err)) + + _, err = os.Stat(filepath.Join(tmpDir, "1", "test")) + assert.True(t, os.IsNotExist(err)) +} + +func TestLocal_URL(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "podsync-local-stor-") + require.NoError(t, err) + + defer os.RemoveAll(tmpDir) + + stor, err := NewLocal(tmpDir, "localhost") + assert.NoError(t, err) + + _, err = stor.Create(testCtx, "1", "test", bytes.NewBuffer([]byte{1, 5, 7, 8, 3})) + assert.NoError(t, err) + + url, err := stor.URL(testCtx, "1", "test") + assert.NoError(t, err) + assert.EqualValues(t, "http://localhost/1/test", url) +} + +func TestLocal_copyFile(t *testing.T) { + reader := bytes.NewReader([]byte{1, 2, 4}) + + tmpDir, err := ioutil.TempDir("", "podsync-test-") + require.NoError(t, err) + + defer os.RemoveAll(tmpDir) + + file := filepath.Join(tmpDir, "1") + + l := &Local{} + size, err := l.copyFile(reader, file) + assert.NoError(t, err) + assert.EqualValues(t, 3, size) + + stat, err := os.Stat(file) + assert.NoError(t, err) + assert.EqualValues(t, 3, stat.Size()) +} diff --git a/pkg/fs/storage.go b/pkg/fs/storage.go new file mode 100644 index 00000000..a157bcf2 --- /dev/null +++ b/pkg/fs/storage.go @@ -0,0 +1,20 @@ +package fs + +import ( + "context" + "io" +) + +type Storage interface { + // Create will create a new file from reader + Create(ctx context.Context, ns string, fileName string, reader io.Reader) (int64, error) + + // Delete deletes the file + Delete(ctx context.Context, ns string, fileName string) error + + // Size returns the size of a file in bytes + Size(ctx context.Context, ns string, fileName string) (int64, error) + + // URL will generate a download link for a file + URL(ctx context.Context, ns string, fileName string) (string, error) +}