-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmake_video.py
714 lines (568 loc) · 24.3 KB
/
make_video.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
import os
from typing import List, Dict, Optional, Tuple
import os
import copy
from openai.types.audio import TranscriptionVerbose, TranscriptionWord
from align import AlignedWord, deserialize_transcription_from_file, convert_file_times_to_absolute_times, word_similarity
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm
from align import AlignedWord, deserialize_transcription_from_file, convert_file_times_to_absolute_times
from config_parser import parse_opera_config
from video_gen.config.video_config import VideoConfig
from video_gen.frame.generator import create_frames
from video_gen.video.creator import create_parallel_text_video
import sys
if len(sys.argv) != 2:
print("Usage: python make_video.py <config.md>")
sys.exit(1)
config = parse_opera_config(sys.argv[1])
# Ensure all case variations of character names are included
CHARACTER_NAMES = [
*config.character_names,
*[name.lower() for name in config.character_names],
*[name.upper() for name in config.character_names]
]
def pair_libretto_lines_simple(source_text, target_text):
"""Pair corresponding lines from source and target texts."""
lines_source = [line for line in source_text.split("\n\n") if line.strip()]
lines_target = [line for line in target_text.split("\n\n") if line.strip()]
if len(lines_source) != len(lines_target):
raise ValueError(
f"Number of lines doesn't match: {len(lines_source)} source lines vs {len(lines_target)} target lines"
)
return list(zip(lines_source, lines_target))
with open(f"libretti/{config.file_prefix}_{config.language}.txt", "r", encoding="utf-8") as f:
libretto_de = f.read()
# Get translation file path from config
if not config.translation_file:
raise ValueError("No translation file specified in config")
translation_path = f"libretti/{config.translation_file}"
if not os.path.exists(translation_path):
raise FileNotFoundError(f"Translation file not found: {translation_path}")
with open(translation_path, "r", encoding="utf-8") as f:
libretto_en = f.read()
pairs = pair_libretto_lines_simple(libretto_de, libretto_en)
def is_safe_split_point(lines, index):
"""Check if splitting at this line index would break any parentheses pairs"""
text_before = '\n'.join(lines[:index])
open_count = text_before.count('(') - text_before.count(')')
return open_count == 0
def find_safe_split_point(lines):
"""Find the closest safe split point to the middle"""
mid = len(lines) // 2
# Try points progressively further from the middle
for offset in range(len(lines)):
# Try point after middle
if mid + offset < len(lines):
if is_safe_split_point(lines, mid + offset):
return mid + offset
# Try point before middle
if mid - offset >= 1: # Ensure we don't split at 0
if is_safe_split_point(lines, mid - offset):
return mid - offset
# If no safe point found, return middle as fallback
return mid
def split_long_pairs(pairs, max_length=15):
need_another_pass = True
while need_another_pass:
need_another_pass = False
i = 0
while i < len(pairs):
de, en = pairs[i]
de_lines = de.split("\n")
en_lines = en.split("\n")
if len(de_lines) > max_length:
need_another_pass = True
print(f"Splitting pair {i}")
# Find safe split point based on German text
split_point = find_safe_split_point(de_lines)
# Split both German and English at this point
pairs[i] = (
"\n".join(de_lines[:split_point]),
"\n".join(en_lines[:split_point])
)
pairs.insert(i+1, (
"\n".join(de_lines[split_point:]),
"\n".join(en_lines[split_point:])
))
i += 1
return pairs
# Apply the splitting
pairs = split_long_pairs(pairs)
# print the number of pairs
print(f"Final number of pairs: {len(pairs)}")
# print the first pair
print("First pair:", pairs[0])
# print the last pair
print("Last pair:", pairs[-1])
def align_transcription_with_libretto(
transcription: List[TranscriptionWord],
libretto: List[str],
ground_truth_timestamps: Dict[int, float] = None, # Maps libretto index to timestamp
ground_truth_duration: float = 1.0,
min_similarity: float = 0.5
) -> List[AlignedWord]:
"""
Align transcription with libretto using dynamic programming.
Returns list of aligned words with timing information where available.
Assumes ground truth words are 1 second in duration.
Args:
transcription: List of TranscriptionWord objects
libretto: List of ground truth words
ground_truth_timestamps: Dictionary mapping libretto indices to known timestamps
min_similarity: Minimum similarity score to consider words as matching
"""
ground_truth_timestamps = ground_truth_timestamps or {}
# Initialize scoring matrix
m, n = len(transcription), len(libretto)
score_matrix = [[0.0] * (n + 1) for _ in range(m + 1)]
backtrack = [[None] * (n + 1) for _ in range(m + 1)]
gap_penalty = -0.5
timestamp_bonus = 2.0 # Bonus score for matching known timestamps
# Fill scoring matrix
for i in range(m + 1):
score_matrix[i][0] = i * gap_penalty
for j in range(n + 1):
score_matrix[0][j] = j * gap_penalty
for i in range(1, m + 1):
for j in range(1, n + 1):
similarity = word_similarity(transcription[i-1].word, libretto[j-1])
# Add bonus if this alignment matches a known timestamp
if j-1 in ground_truth_timestamps:
known_time = ground_truth_timestamps[j-1]
trans_time = transcription[i-1].start
# If transcription time is close to known time, add bonus
if abs(known_time - trans_time) < 1.0: # Within 1 second
similarity += timestamp_bonus
match_score = score_matrix[i-1][j-1] + similarity
delete_score = score_matrix[i-1][j] + gap_penalty
insert_score = score_matrix[i][j-1] + gap_penalty
best_score = max(match_score, delete_score, insert_score)
score_matrix[i][j] = best_score
if best_score == match_score:
backtrack[i][j] = 'match'
elif best_score == delete_score:
backtrack[i][j] = 'delete'
else:
backtrack[i][j] = 'insert'
# Backtrack to build alignment
aligned_words: List[AlignedWord] = []
i, j = m, n
while i > 0 or j > 0:
if i > 0 and j > 0 and backtrack[i][j] == 'match':
similarity = word_similarity(transcription[i-1].word, libretto[j-1])
# If we have a ground truth timestamp for this word
if j-1 in ground_truth_timestamps:
start_time = ground_truth_timestamps[j-1]
aligned_words.append(AlignedWord(
word=libretto[j-1],
start=start_time,
end=start_time + 1.0 # Assume 1 second duration
))
elif similarity >= min_similarity:
# Regular good match - use transcription timing
aligned_words.append(AlignedWord(
word=libretto[j-1],
start=transcription[i-1].start,
end=transcription[i-1].end
))
else:
# Poor match - include word without timing
aligned_words.append(AlignedWord(
word=libretto[j-1],
start=None,
end=None
))
i -= 1
j -= 1
elif i > 0 and (j == 0 or backtrack[i][j] == 'delete'):
i -= 1
else:
# For inserted words, if we have a ground truth timestamp, use it
start_time = ground_truth_timestamps.get(j-1)
if start_time is not None:
aligned_words.append(AlignedWord(
word=libretto[j-1],
start=start_time,
end=start_time + ground_truth_duration
))
else:
aligned_words.append(AlignedWord(
word=libretto[j-1],
start=None,
end=None
))
j -= 1
return list(reversed(aligned_words))
def enforce_monotonicity(aligned_words: List[AlignedWord]) -> List[AlignedWord]:
"""
Enforce temporal monotonicity in aligned words by adjusting timestamps.
"""
result = copy.deepcopy(aligned_words)
last_end_time = float('-inf')
for word in result:
if word.start is not None:
if word.start < last_end_time:
# Adjust timing to maintain monotonicity
duration = word.end - word.start
word.start = last_end_time
word.end = last_end_time + duration
last_end_time = word.end
return result
def interpolate_word_timings(
aligned_words: List[AlignedWord],
max_interpolation_window: float = 8.0
) -> List[AlignedWord]:
"""
Interpolate timing for words between known timestamps within a maximum window.
Args:
aligned_words: List of AlignedWord objects
max_interpolation_window: Maximum time window in seconds for interpolation
Returns:
New list of AlignedWord objects with interpolated timings
"""
result = copy.deepcopy(aligned_words)
# First pass: identify sequences of words to interpolate
sequences = []
current_sequence = []
last_known_end = None
for i, word in enumerate(result):
if word.start is not None and word.end is not None:
# Found a word with known timing
if current_sequence and last_known_end is not None:
# Check if this word is within the interpolation window
if word.start - last_known_end <= max_interpolation_window:
# Add the current word as the end anchor of the sequence
current_sequence.append(i)
sequences.append(current_sequence)
current_sequence = []
last_known_end = word.end
current_sequence = [i] # Start new sequence with this word
elif current_sequence:
# Add word without timing to current sequence
current_sequence.append(i)
# Second pass: perform interpolation for each valid sequence
for sequence in sequences:
if len(sequence) < 2:
continue
start_idx = sequence[0]
end_idx = sequence[-1]
start_word = result[start_idx]
end_word = result[end_idx]
# Skip if either anchor point doesn't have timing
if (start_word.start is None or start_word.end is None or
end_word.start is None or end_word.end is None):
continue
# Calculate time distribution
total_words = len(sequence)
if total_words <= 1:
continue
# For the first word in sequence, keep its original end time
# For the last word in sequence, keep its original start time
total_time = end_word.start - start_word.end
words_to_interpolate = total_words - 1 # excluding first word
if words_to_interpolate <= 0:
continue
# Calculate time per word
time_per_word = total_time / words_to_interpolate
# Set timings for words in between
current_time = start_word.end
for i in range(1, len(sequence)):
idx = sequence[i]
word = result[idx]
if i == len(sequence) - 1:
# Last word in sequence - keep its original timing
word.start = end_word.start
word.end = end_word.end
else:
# Interpolated word
word.start = current_time
word.end = current_time + time_per_word
current_time += time_per_word
return result
from typing import Dict, List
def parse_timestamp_and_phrase(
timestamp_str: str,
phrase: str,
libretto: List[str]
) -> Dict[int, float]:
"""
Convert a human-readable timestamp and phrase into ground truth timestamp dict.
Args:
timestamp_str: Timestamp in format "H:M:S" or "M:S" or "S"
phrase: Text phrase to locate in libretto
libretto: List of ground truth words
Returns:
Dictionary mapping libretto index to timestamp in seconds
Raises:
ValueError: If phrase not found or found multiple times, or invalid timestamp
"""
# Parse timestamp to seconds
def parse_timestamp(ts: str) -> float:
parts = ts.split(':')
if len(parts) == 3: # H:M:S
h, m, s = map(float, parts)
return h * 3600 + m * 60 + s
elif len(parts) == 2: # M:S
m, s = map(float, parts)
return m * 60 + s
elif len(parts) == 1: # S
return float(parts[0])
else:
raise ValueError(f"Invalid timestamp format: {ts}")
# Convert phrase to list of words and clean
phrase_words = phrase.lower().split()
# Find all occurrences of the phrase in libretto
matches = []
for i in range(len(libretto) - len(phrase_words) + 1):
window = libretto[i:i + len(phrase_words)]
if [w.lower() for w in window] == phrase_words:
matches.append(i)
# Verify unique match
if len(matches) == 0:
raise ValueError(f"Phrase '{phrase}' not found in libretto")
if len(matches) > 1:
raise ValueError(
f"Phrase '{phrase}' found multiple times in libretto at indices {matches}"
)
# Convert timestamp to seconds
start_time = parse_timestamp(timestamp_str)
# Return dict mapping the starting index to the timestamp
return {matches[0]: start_time}
# Load transcriptions
transcriptions: List[TranscriptionVerbose] = []
for i in range(config.start_idx, config.end_idx):
i_string = str(i).zfill(2)
transcription = deserialize_transcription_from_file(f'transcribed/{config.file_prefix}_transcribed/{i_string}.json')
transcriptions.append(transcription)
for idx in config.overture_indices:
zero_idx = 0 if idx == 0 else idx - 1
transcriptions[zero_idx].words = []
transcriptions[zero_idx].text = ""
transcriptions[zero_idx].segments = []
transcriptions = convert_file_times_to_absolute_times(transcriptions)
all_words: List[TranscriptionWord] = [word for transcription in transcriptions for word in transcription.words]
# Load libretto
with open(f'libretti/{config.file_prefix}_{config.language}.txt', 'r') as f:
libretto = f.read()
libretto = libretto.split()
markers = [
]
ground_truth = {}
for timestamp, phrase in markers:
ground_truth.update(parse_timestamp_and_phrase(timestamp, phrase, libretto))
# Align texts
aligned_words = align_transcription_with_libretto(
transcription=all_words,
libretto=libretto,
ground_truth_timestamps=ground_truth,
ground_truth_duration=5,
min_similarity=0.3
)
# enforce monotonicity
# aligned_words = enforce_monotonicity(aligned_words)
# give the percentage of AlignedWords that have a start and end time
percentage_aligned = len([word for word in aligned_words if word.start is not None and word.end is not None]) / len(aligned_words)
print(f"Percentage of aligned words: {percentage_aligned}")
import matplotlib.pyplot as plt
import numpy as np
from typing import List
def detect_low_alignment(smoothed: np.ndarray, overall_avg: float, threshold: float = 0.2, window: int = 500) -> List[tuple]:
low_periods = []
current_start = None
for i, val in enumerate(smoothed):
if val < (overall_avg - threshold):
if current_start is None:
current_start = i
elif current_start is not None:
if i - current_start >= window:
low_periods.append((current_start, i))
current_start = None
if current_start is not None and len(smoothed) - current_start >= window:
low_periods.append((current_start, len(smoothed)))
return low_periods
# write aligned words to csv
def write_aligned_words_to_csv(aligned_words: List[AlignedWord], filename: str):
# Convert list of AlignedWord to list of dictionaries
data = [{'word': w.word, 'start': w.start, 'end': w.end} for w in aligned_words]
# Create DataFrame and write to CSV
df = pd.DataFrame(data)
df.to_csv(filename, index=False)
def read_edited_aligned_words_from_csv(filename: str) -> List[AlignedWord]:
# Read CSV into DataFrame
df = pd.read_csv(filename)
# Convert DataFrame to list of AlignedWord
aligned_words = [AlignedWord(row['word'], row['start'], row['end']) for _, row in df.iterrows()]
# replace nans with None
for word in aligned_words:
if pd.isna(word.start):
word.start = None
if pd.isna(word.end):
word.end = None
return aligned_words
write_aligned_words_to_csv(aligned_words, f'aligned_words_{config.file_prefix}.csv')
# read aligned words from csv
aligned_words = read_edited_aligned_words_from_csv(f'aligned_words_{config.file_prefix}.csv')
aligned_words = interpolate_word_timings(aligned_words, max_interpolation_window=20)
# give the percentage of AlignedWords that have a start and end time
percentage_aligned = len([word for word in aligned_words if word.start is not None and word.end is not None]) / len(aligned_words)
print(f"Percentage of aligned words: {percentage_aligned}")
from moviepy.editor import (
AudioFileClip, TextClip, CompositeVideoClip,
ColorClip, concatenate_audioclips, VideoClip
)
from typing import List, Tuple, Optional, Dict
import numpy as np
import imageio
from tqdm import tqdm
from dataclasses import dataclass
@dataclass
class FrameData:
time_to_line_idx: Dict[float, Optional[int]]
line_pair_clips: Dict[int, np.ndarray]
audio_clips: List[AudioFileClip]
total_duration: float
frame_order: List[int]
def create_title_clip(config: VideoConfig, title: str) -> np.ndarray:
"""Creates a title frame for the video."""
background = ColorClip(size=(config.video_width, config.video_height), color=(0, 0, 0))
title_text = TextClip(
title,
font=f"{config.font_name}-Bold",
fontsize=config.font_size + 20,
color=config.secondary_color,
size=(config.video_width // 2 - 80, None),
method='caption',
align='center'
)
composed = CompositeVideoClip([
background,
title_text.set_position((40, config.video_height // 2 - title_text.h // 2)),
title_text.set_position((config.video_width//2 + 40, config.video_height // 2 - title_text.h // 2))
])
frame = composed.get_frame(0)
background.close()
title_text.close()
composed.close()
return frame
# Create video configuration
video_config = VideoConfig(
font_name="Baskerville",
text_2_color=config.secondary_color,
font_size=config.font_size // config.res_divisor,
video_width=config.video_width // config.res_divisor,
video_height=config.video_height // config.res_divisor,
fps=4,
text_timeout=8.0
)
# Generate frames and data
frame_data = create_frames(
aligned_words=aligned_words,
line_pairs=pairs,
character_names=CHARACTER_NAMES,
audio_files=[f"audio/{config.file_prefix}/{str(i).zfill(2)}.m4a" for i in range(config.start_idx, config.end_idx)],
title=config.title,
config=video_config
)
def enforce_monotonicity(frame_data: FrameData) -> FrameData:
"""
Enforces monotonicity in frame display order by replacing backwards-going frames
with the most recently displayed valid frame.
"""
# Create frame position lookup for O(1) order comparison
frame_positions = {frame_idx: pos for pos, frame_idx in enumerate(frame_data.frame_order)}
last_valid_idx = None
last_valid_position = -1
# Create new time_to_line_idx mapping
monotonic_mapping = {}
for time in sorted(frame_data.time_to_line_idx.keys()):
current_idx = frame_data.time_to_line_idx[time]
if current_idx is None:
monotonic_mapping[time] = None
continue
current_position = frame_positions.get(current_idx, -1)
# If this is our first frame or it maintains/advances the order
if last_valid_idx is None or current_position >= last_valid_position:
monotonic_mapping[time] = current_idx
last_valid_idx = current_idx
last_valid_position = current_position
else:
# Replace with last valid frame if it would go backwards
monotonic_mapping[time] = last_valid_idx
# Create new FrameData with monotonic mapping
return FrameData(
time_to_line_idx=monotonic_mapping,
line_pair_clips=frame_data.line_pair_clips,
audio_clips=frame_data.audio_clips,
total_duration=frame_data.total_duration,
frame_order=frame_data.frame_order
)
frame_data = enforce_monotonicity(frame_data)
def interpolate_frames(times_to_idxs: Dict[float, Optional[int]]) -> Dict[float, Optional[int]]:
"""
Given a dictionary mapping timestamps to frame indices, if the same index appears twice with only None values in between, fill in the indices in between to also contain that frame index.
Example:
Input: {0.0: None, 1.0: 5, 2.0: None, 3.0: None, 4.0: 5, 5.0: None}
Output: {0.0: None, 1.0: 5, 2.0: 5, 3.0: 5, 4.0: 5, 5.0: None}
"""
# Convert to sorted list of (time, idx) pairs
sorted_times = sorted(times_to_idxs.items())
result = dict(sorted_times)
# Find sequences that should be interpolated
last_idx = None
start_time = None
for time, idx in tqdm(sorted_times, total=len(sorted_times)):
if idx is not None:
if last_idx is not None and idx == last_idx:
# Found matching indices - fill in all None values between start_time and current time
for t, current_idx in sorted_times:
if start_time < t < time and current_idx is None:
result[t] = idx
# Reset tracking
start_time = time
last_idx = idx
return result
frame_data.time_to_line_idx = interpolate_frames(frame_data.time_to_line_idx)
# Create the final video when ready
create_parallel_text_video(
frame_data=frame_data,
output_filename=f'output/{config.file_prefix}-{config.res_divisor}.mp4',
config=video_config
)
import librosa
from datetime import datetime, timedelta
def generate_audio_timestamps(audio_files):
"""
Generate timestamps for a list of audio files showing their start and end times.
Args:
audio_files (list): List of paths to audio files
Returns:
str: Formatted string with timestamps and scene numbers
"""
def format_timestamp(seconds):
"""Convert seconds to HH:MM:SS format"""
stamp = str(timedelta(seconds=int(seconds))).zfill(8)
return stamp
current_time = 0
result = []
# Process each audio file
for i, file_path in enumerate(audio_files, 1):
# try:
# Get duration of audio file
duration = librosa.get_duration(path=file_path)
# Calculate start and end times
start_time = current_time
end_time = current_time + duration
# Format the timestamp line
timestamp_line = f"{format_timestamp(start_time)} - Scene {i}"
while timestamp_line[0] == "0" or timestamp_line[0] == ":":
timestamp_line = timestamp_line[1:]
result.append(timestamp_line)
# Update current time for next file
current_time = end_time
# except Exception as e:
# print(f"Error processing file {file_path}: {str(e)}")
return "\n".join(result)
print(generate_audio_timestamps([f"{config.file_prefix}/{str(i).zfill(2)}.m4a" for i in range(config.start_idx, config.end_idx)]))