-
Notifications
You must be signed in to change notification settings - Fork 0
/
musicsync.py
199 lines (158 loc) · 6.45 KB
/
musicsync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
"""Syncronize a playlist from Music to a disk."""
import logging
import os
import subprocess as sp
import unicodedata
from typing import TypeVar
import click as c
from tqdm import tqdm
logging.basicConfig(level=logging.WARNING, format="%(message)s")
log = logging.getLogger(__name__)
T = TypeVar("T")
def wrap_tqdm(iter: T, *args, **kwargs) -> T:
"""Use tqdm if not in debug mode."""
if log.level < logging.INFO:
return iter
return tqdm(iter, *args, **kwargs)
def norm(s: str) -> str:
"""Normalize unicode strings to handle FAT32 issues."""
return unicodedata.normalize("NFKC", s)
def get_playlist_files(playlist: str) -> list[str]:
"""Collect all files from a Music playlist with AppleScript."""
script = f"""
tell application "Music"
set thePlaylist to the playlist named "{playlist}"
set output to ""
repeat with theTrack in (get the location of every track in thePlaylist)
set output to output & (posix path of theTrack) & "\n"
end repeat
end tell
"""
log.info(f'Collecting songs from playlist "{playlist}"')
job = sp.run(["osascript", "-e", script], stdout=sp.PIPE)
files = sorted(f for f in job.stdout.decode("utf-8").splitlines() if f)
log.info(f" Collected {len(files)} song{'s' * (len(files) != 1)}")
return files
def find_root_directory(files: list[str]) -> str:
"""Find a root directory (common prefix) in a list of paths."""
prefix = str(max(files, key=len))
while len(prefix) > 0:
if all(file.startswith(prefix) for file in files):
break
prefix = prefix[:-1]
log.info(f'Playlist root directory is "{prefix}"')
return prefix
class directory:
"""Representation of a directory with files and subdirectories."""
def __init__(self) -> None:
self.dirs: dict[str, directory] = {}
self.files: list[str] = []
def __str__(self) -> str:
return f"directory({self.count_dirs()} dirs, {self.count_files()} files)"
def __repr__(self) -> str:
return str(self)
def count_dirs(self) -> int:
"""Recursively count subdirectories w/caching."""
if not hasattr(self, "_num_dirs"):
n = len(self.dirs) + sum(d.count_dirs() for d in self.dirs.values())
self._num_dirs = n
return self._num_dirs
def count_files(self) -> int:
"""Recursively count files w/caching."""
if not hasattr(self, "_num_files"):
n = len(self.files) + sum(d.count_files() for d in self.dirs.values())
self._num_files = n
return self._num_files
def build_playlist_tree(songs: list[str], root: str) -> directory:
"""Construct a directory tree of all files in the playlist."""
log.info("Building playlist song tree")
tree = directory()
for entry in songs:
dirs, file = os.path.split(os.path.relpath(entry, root))
ptr = tree
for d in dirs.split(os.path.sep):
d_norm = norm(d) # Normed to find keys reliably
if d_norm not in ptr.dirs:
ptr.dirs[d_norm] = directory()
ptr = ptr.dirs[d_norm]
ptr.files += [file]
log.info(f" Found {tree.count_dirs()} dirs and {tree.count_files()} files")
return tree
def cleanup_volume(volume: str, tree: directory) -> None:
"""Delete files and directories not appearing in the playlist."""
for pwd, _, files in wrap_tqdm(os.walk(volume), desc=" Processing", leave=False):
relpath = os.path.relpath(pwd, volume)
# Delete high-level directories if they don't appear
truncated = False
ptr = tree
if relpath != ".":
dir_parts = relpath.split(os.path.sep)
for d in dir_parts:
d_norm = norm(d)
if d_norm not in map(norm, ptr.dirs):
log.debug(f' Removing directory "{pwd}"')
sp.run(["rm", "-r", f"{pwd}"])
truncated = True
break
ptr = ptr.dirs[d_norm]
# No files to delete (below) if the directory was removed
if truncated:
continue
# Delete files in these directories if the directory exists but they don't
for file in [f for f in files if norm(f) not in map(norm, ptr.files)]:
path = os.path.join(pwd, file)
log.debug(f' Removing file "{path}"')
sp.run(["rm", path])
def copy_structure(tree: directory, ptr: str, root: str, base: str | None = None):
"""Recursively synchronize the playlist tree into a destination."""
base = ptr if base is None else base
for subdir in tree.dirs:
new_ptr = norm(os.path.join(ptr, subdir))
new_root = norm(os.path.join(root, subdir))
if not os.path.isdir(new_ptr):
log.debug(f' Creating directory "{new_ptr}"')
sp.run(["mkdir", "-p", new_ptr])
copy_structure(tree.dirs[subdir], new_ptr, new_root, base)
ptr_n = norm(ptr)
tqdm_desc = f" {os.path.relpath(ptr, base)}"
ditto = ["ditto", "--nocache", "--noextattr", "--noqtn", "--norsrc"]
for file in wrap_tqdm(tree.files, desc=tqdm_desc, leave=False):
src = os.path.join(root, file)
dst = norm(os.path.join(ptr_n, file)) # Copy to a normalized path
# Only copy if the destination file is not there or is wrong size.
# This gets around timestamp issues with FAT32 (same as w/rsync)
src_st = os.stat(src)
if not os.path.exists(dst) or src_st.st_size != os.stat(dst).st_size:
log.debug(f' dittoing file "{src}"')
sp.run(ditto + [src, ptr_n])
@c.command()
@c.help_option("-h", "--help")
@c.option(
"--verbose",
"-v",
is_flag=True,
help="Enable debug logging.",
)
@c.option(
"--playlist",
"-p",
metavar="NAME",
type=str,
default="Selected for Car",
help="Apple Music playlist to sync.",
)
@c.argument(
"VOLUME",
type=c.Path(exists=True, file_okay=False, writable=True),
)
def main(playlist: str, volume: str, verbose: bool) -> None:
log.setLevel(logging.DEBUG if verbose else logging.INFO)
songlist = get_playlist_files(playlist)
root = find_root_directory(songlist)
songtree = build_playlist_tree(songlist, root)
log.info(f'Removing extra files from "{volume}"')
cleanup_volume(volume, songtree)
log.info(f'Copying new files from "{playlist}" to "{volume}" ')
copy_structure(songtree, volume, root)
if __name__ == "__main__":
main()