Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: make diskmanager package respect locked clips during cleanup #455

Merged
merged 2 commits into from
Feb 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions internal/analysis/realtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func RealtimeAnalysis(settings *conf.Settings, notificationChan chan handlers.No

// start cleanup of clips
if conf.Setting().Realtime.Audio.Export.Retention.Policy != "none" {
startClipCleanupMonitor(&wg, quitChan)
startClipCleanupMonitor(&wg, quitChan, dataStore)
}

// start weather polling
Expand Down Expand Up @@ -201,9 +201,9 @@ func startAudioCapture(wg *sync.WaitGroup, settings *conf.Settings, quitChan, re
}

// startClipCleanupMonitor initializes and starts the clip cleanup monitoring routine in a new goroutine.
func startClipCleanupMonitor(wg *sync.WaitGroup, quitChan chan struct{}) {
func startClipCleanupMonitor(wg *sync.WaitGroup, quitChan chan struct{}, dataStore datastore.Interface) {
wg.Add(1)
go clipCleanupMonitor(wg, quitChan)
go clipCleanupMonitor(wg, quitChan, dataStore)
}

// startWeatherPolling initializes and starts the weather polling routine in a new goroutine.
Expand Down Expand Up @@ -260,7 +260,7 @@ func closeDataStore(store datastore.Interface) {
}

// ClipCleanupMonitor monitors the database and deletes clips that meet the retention policy.
func clipCleanupMonitor(wg *sync.WaitGroup, quitChan chan struct{}) {
func clipCleanupMonitor(wg *sync.WaitGroup, quitChan chan struct{}, dataStore datastore.Interface) {
defer wg.Done() // Ensure that the WaitGroup is marked as done after the function exits

// Create a ticker that triggers every five minutes to perform cleanup
Expand All @@ -278,14 +278,14 @@ func clipCleanupMonitor(wg *sync.WaitGroup, quitChan chan struct{}) {
case <-ticker.C:
// age based cleanup method
if conf.Setting().Realtime.Audio.Export.Retention.Policy == "age" {
if err := diskmanager.AgeBasedCleanup(quitChan); err != nil {
if err := diskmanager.AgeBasedCleanup(quitChan, dataStore); err != nil {
log.Println("Error cleaning up clips: ", err)
}
}

// priority based cleanup method
if conf.Setting().Realtime.Audio.Export.Retention.Policy == "usage" {
if err := diskmanager.UsageBasedCleanup(quitChan); err != nil {
if err := diskmanager.UsageBasedCleanup(quitChan, dataStore); err != nil {
log.Println("Error cleaning up clips: ", err)
}
}
Expand Down
19 changes: 19 additions & 0 deletions internal/datastore/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Interface interface {
GetImageCache(scientificName string) (*ImageCache, error)
SaveImageCache(cache *ImageCache) error
GetAllImageCaches() ([]ImageCache, error)
GetLockedNotesClipPaths() ([]string, error)
}

// DataStore implements StoreInterface using a GORM database.
Expand Down Expand Up @@ -874,3 +875,21 @@ func (ds *DataStore) GetAllImageCaches() ([]ImageCache, error) {
}
return caches, nil
}

// GetLockedNotesClipPaths retrieves a list of clip paths from all locked notes
func (ds *DataStore) GetLockedNotesClipPaths() ([]string, error) {
var clipPaths []string

// Query to get clip paths from notes that have an associated lock
err := ds.DB.Model(&Note{}).
Joins("JOIN note_locks ON notes.id = note_locks.note_id").
Where("notes.clip_name != ''"). // Only include notes that have a clip path
Pluck("notes.clip_name", &clipPaths).
Error

if err != nil {
return nil, fmt.Errorf("error getting locked notes clip paths: %w", err)
}

return clipPaths, nil
}
45 changes: 43 additions & 2 deletions internal/diskmanager/file_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,22 @@ import (
"time"
)

// allowedFileTypes is the list of file extensions that are allowed to be deleted
var allowedFileTypes = []string{".wav", ".flac", ".aac", ".opus", ".mp3"}

// FileInfo holds information about a file
type FileInfo struct {
Path string
Species string
Confidence int
Timestamp time.Time
Size int64
Locked bool
}

// Interface represents the minimal database interface needed for diskmanager
type Interface interface {
GetLockedNotesClipPaths() ([]string, error)
}

// LoadPolicy loads the cleanup policies from a CSV file
Expand Down Expand Up @@ -57,10 +66,20 @@ func LoadPolicy(policyFile string) (*Policy, error) {
}

// GetAudioFiles returns a list of audio files in the directory and its subdirectories
func GetAudioFiles(baseDir string, allowedExts []string, debug bool) ([]FileInfo, error) {
func GetAudioFiles(baseDir string, allowedExts []string, db Interface, debug bool) ([]FileInfo, error) {
var files []FileInfo

err := filepath.Walk(baseDir, func(path string, info os.FileInfo, err error) error {
// Get list of protected clips from database
lockedClips, err := getLockedClips(db)
if err != nil {
return nil, fmt.Errorf("failed to get protected clips: %w", err)
}

if debug {
log.Printf("Found %d protected clips", len(lockedClips))
}

err = filepath.Walk(baseDir, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
Expand All @@ -71,6 +90,8 @@ func GetAudioFiles(baseDir string, allowedExts []string, debug bool) ([]FileInfo
if err != nil {
return err
}
// Check if the file is protected
fileInfo.Locked = isLockedClip(fileInfo.Path, lockedClips)
files = append(files, fileInfo)
}
}
Expand Down Expand Up @@ -150,3 +171,23 @@ func WriteSortedFilesToFile(files []FileInfo, filePath string) error {
log.Printf("Sorted files have been written to %s", filePath)
return nil
}

// getLockedClips retrieves the list of locked clip paths from the database
func getLockedClips(db Interface) ([]string, error) {
if db == nil {
return nil, fmt.Errorf("database interface is nil")
}
return db.GetLockedNotesClipPaths()
}

// isLockedClip checks if a file path is in the list of locked clips
func isLockedClip(path string, lockedClips []string) bool {
filename := filepath.Base(path)
for _, lockedPath := range lockedClips {
if filepath.Base(lockedPath) == filename {
log.Printf("Locked clip found: %s", path)
return true
}
}
return false
}
15 changes: 11 additions & 4 deletions internal/diskmanager/policy_age.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

// AgeBasedCleanup removes clips from the filesystem based on their age and the number of clips per species.
func AgeBasedCleanup(quit <-chan struct{}) error {
func AgeBasedCleanup(quit <-chan struct{}, db Interface) error {
settings := conf.Setting()

debug := settings.Realtime.Audio.Export.Retention.Debug
Expand All @@ -26,13 +26,12 @@ func AgeBasedCleanup(quit <-chan struct{}) error {
return err
}

allowedExts := []string{".wav"}

if debug {
log.Printf("Starting age-based cleanup process. Base directory: %s, Retention period: %s", baseDir, retentionPeriod)
}

files, err := GetAudioFiles(baseDir, allowedExts, debug)
// Get the list of audio files, limited to allowed file types defined in file_utils.go
files, err := GetAudioFiles(baseDir, allowedFileTypes, db, debug)
if err != nil {
return err
}
Expand All @@ -58,6 +57,14 @@ func AgeBasedCleanup(quit <-chan struct{}) error {
log.Printf("Cleanup interrupted by quit signal\n")
return nil
default:
// Skip locked files from deletion
if file.Locked {
if debug {
log.Printf("Skipping locked file: %s", file.Path)
}
continue
}

if file.Timestamp.Before(expirationTime) {
subDir := filepath.Dir(file.Path)

Expand Down
17 changes: 11 additions & 6 deletions internal/diskmanager/policy_usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Policy struct {
}

// UsageBasedCleanup cleans up old audio files based on the configuration and monitors for quit signals
func UsageBasedCleanup(quitChan chan struct{}) error {
func UsageBasedCleanup(quitChan chan struct{}, db Interface) error {
settings := conf.Setting()

debug := settings.Realtime.Audio.Export.Retention.Debug
Expand All @@ -31,9 +31,6 @@ func UsageBasedCleanup(quitChan chan struct{}) error {
return err
}

// Only remove files with extensions in this list
allowedExts := []string{".wav"}

if debug {
log.Printf("Starting cleanup process. Base directory: %s, Threshold: %.1f%%", baseDir, threshold)
}
Expand All @@ -49,8 +46,8 @@ func UsageBasedCleanup(quitChan chan struct{}) error {
log.Printf("Disk usage %.1f%% is above the %.1f%% threshold. Cleanup needed.", diskUsage, threshold)
}

// Get the list of audio files
files, err := GetAudioFiles(baseDir, allowedExts, debug)
// Get the list of audio files, limited to allowed file types defined in file_utils.go
files, err := GetAudioFiles(baseDir, allowedFileTypes, db, debug)
if err != nil {
return err
}
Expand Down Expand Up @@ -86,6 +83,14 @@ func performCleanup(files []FileInfo, baseDir string, threshold float64, minClip
log.Println("Received quit signal, ending cleanup run.")
return nil
default:
// Skip locked files
if file.Locked {
if debug {
log.Printf("Skipping locked file: %s", file.Path)
}
continue
}

// Get the subdirectory name
subDir := filepath.Dir(file.Path)
month := file.Timestamp.Format("2006-01")
Expand Down
1 change: 1 addition & 0 deletions internal/imageprovider/imageprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (m *mockStore) LockNote(noteID string) error { re
func (m *mockStore) UnlockNote(noteID string) error { return nil }
func (m *mockStore) GetNoteLock(noteID string) (*datastore.NoteLock, error) { return nil, nil }
func (m *mockStore) IsNoteLocked(noteID string) (bool, error) { return false, nil }
func (m *mockStore) GetLockedNotesClipPaths() ([]string, error) { return nil, nil }

// mockFailingStore is a mock implementation that simulates database failures
type mockFailingStore struct {
Expand Down
Loading