-
Notifications
You must be signed in to change notification settings - Fork 0
/
compress_songs.py
109 lines (91 loc) · 3.41 KB
/
compress_songs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import concurrent.futures
import os
import re
import subprocess
import sys
def get_jobs(directory):
jobs = {"albums": [], "audio_files": [], "delete": []}
for root, _, files in os.walk(directory):
for f in files:
path = os.path.join(root, f)
if f.lower() in ["album.jpeg", "album.jpg", "album.png"]:
jobs["albums"].append(path)
continue
elif f in ["ch.dat", "notes.eof", "ps.dat"]:
jobs["delete"].append(path)
continue
_, ext = os.path.splitext(path)
if ext in [".mp3", ".ogg", ".wav"]:
jobs["audio_files"].append(path)
elif ext in [".sfk", ".sfl"]:
jobs["delete"].append(path)
return jobs
def audio_to_opus(ffmpeg, file):
root, _ = os.path.splitext(file)
output = root + ".opus"
result = subprocess.run(
[ffmpeg, "-y", "-i", file, "-b:a", "80k", output], capture_output=True
)
if result.returncode == 0:
os.remove(file)
def image_type_and_size(ffmpeg, file):
result = subprocess.run([ffmpeg, "-hide_banner", "-i", file], capture_output=True)
for line in result.stderr.split(b"\n"):
if line.startswith(b" Stream #0:0: Video: png"):
sub_parts = line.split(b"), ")
if len(sub_parts) < 2:
return None
res_str = image_type_and_size.regex.match(sub_parts[1])
if res_str is None or res_str.start() != 0:
return None
x, _, y = res_str.group().partition(b"x")
return ("png", int(x), int(y))
elif line.startswith(b" Stream #0:0: Video: mjpeg ("):
sub_parts = line.split(b"), ")
if len(sub_parts) < 3:
return None
res_str = image_type_and_size.regex.match(sub_parts[2])
if res_str is None or res_str.start() != 0:
return None
x, _, y = res_str.group().partition(b"x")
return ("jpeg", int(x), int(y))
return None
image_type_and_size.regex = re.compile(rb"\d+x\d+")
def resize_and_convert_image(ffmpeg, file):
data = image_type_and_size(ffmpeg, file)
if data is None:
return
type, x, y = data
if type == "jpeg" and max(x, y) <= 500:
head, tail = os.path.split(file)
if tail != "album.jpg":
os.rename(file, os.path.join(head, "album.jpg"))
return
max_size = max(x, y)
x = 500 * x // max_size
y = 500 * y // max_size
root, _ = os.path.splitext(file)
output = root + "-tmp-copy.jpg"
result = subprocess.run(
[ffmpeg, "-y", "-i", file, "-vf", f"scale={x}:{y}", output.replace("%", "%%")],
capture_output=True,
)
if result.returncode == 0:
head, _ = os.path.split(file)
os.remove(file)
os.rename(output, os.path.join(head, "album.jpg"))
if __name__ == "__main__":
jobs = get_jobs(sys.argv[1])
ffmpeg = "ffmpeg.exe"
if os.name != "nt":
ffmpeg = "ffmpeg"
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = []
for f in jobs["audio_files"]:
future = executor.submit(audio_to_opus, ffmpeg, f)
futures.append(future)
for f in jobs["albums"]:
future = executor.submit(resize_and_convert_image, ffmpeg, f)
futures.append(future)
for f in jobs["delete"]:
os.remove(f)