Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix video memory leak #374

Merged
merged 5 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def wrapper(samples, *args, **kwargs):
logger.error(
f'An error occurred in mapper operation when processing '
f'samples {samples}, {type(e)}: {e}')
traceback.print_exc()
ret = {key: [] for key in samples.keys()}
ret[Fields.stats] = []
ret[Fields.source_file] = []
Expand Down Expand Up @@ -97,6 +98,7 @@ def wrapper(sample, *args, **kwargs):
logger.error(
f'An error occurred in mapper operation when processing '
f'sample {sample}, {type(e)}: {e}')
traceback.print_exc()
ret = {key: [] for key in sample.keys()}
ret[Fields.stats] = []
ret[Fields.source_file] = []
Expand Down
6 changes: 5 additions & 1 deletion data_juicer/ops/deduplicator/ray_video_deduplicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

from jsonargparse.typing import PositiveInt

from data_juicer.utils.mm_utils import load_data_with_context, load_video
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video)

from ..base_op import OPERATORS
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -52,4 +53,7 @@ def calculate_hash(self, sample, context=False):
if packet.stream.type == 'video':
md5_hash.update(bytes(packet))

for key in videos:
close_video(videos[key])

return md5_hash.hexdigest()
6 changes: 5 additions & 1 deletion data_juicer/ops/deduplicator/video_deduplicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from typing import Dict, Set, Tuple

from data_juicer.utils.constant import HashKeys
from data_juicer.utils.mm_utils import load_data_with_context, load_video
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video)

from ..base_op import OPERATORS, Deduplicator
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -61,6 +62,9 @@ def compute_hash(self, sample, context=False):
if packet.stream.type == 'video':
md5_hash.update(bytes(packet))

for key in videos:
close_video(videos[key])

sample[HashKeys.videohash] = md5_hash.hexdigest()
return sample

Expand Down
4 changes: 2 additions & 2 deletions data_juicer/ops/filter/video_aesthetics_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import (extract_key_frames,
from data_juicer.utils.mm_utils import (close_video, extract_key_frames,
extract_video_frames_uniformly,
load_data_with_context, load_video)

Expand Down Expand Up @@ -181,7 +181,7 @@ def compute_stats(self, sample, rank=None, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])

return sample

Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/filter/video_aspect_ratio_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
import numpy as np

from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import load_data_with_context, load_video
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video)

from ..base_op import OPERATORS, Filter
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -67,7 +68,7 @@ def compute_stats(self, sample, context=False):
video_aspect_ratios[
key] = stream.codec_context.width / stream.codec_context.height
if not context:
video.close()
close_video(video)

sample[Fields.stats][StatsKeys.video_aspect_ratios] = [
video_aspect_ratios[key] for key in loaded_video_keys
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/filter/video_duration_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from jsonargparse.typing import NonNegativeInt

from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import load_data_with_context, load_video
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video)

from ..base_op import OPERATORS, Filter
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -68,7 +69,7 @@ def compute_stats(self, sample, context=False):
video_durations[video_key] = round(stream.duration *
stream.time_base)
if not context:
video.close()
close_video(video)

# get video durations
sample[Fields.stats][StatsKeys.video_duration] = [
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/filter/video_frames_text_similarity_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import (SpecialTokens, extract_key_frames,
from data_juicer.utils.mm_utils import (SpecialTokens, close_video,
extract_key_frames,
extract_video_frames_uniformly,
load_data_with_context, load_video,
remove_special_tokens)
Expand Down Expand Up @@ -195,7 +196,7 @@ def compute_stats(self, sample, rank=None, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])

return sample

Expand Down
4 changes: 2 additions & 2 deletions data_juicer/ops/filter/video_nsfw_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import (extract_key_frames,
from data_juicer.utils.mm_utils import (close_video, extract_key_frames,
extract_video_frames_uniformly,
load_data_with_context, load_video)
from data_juicer.utils.model_utils import get_model, prepare_model
Expand Down Expand Up @@ -156,7 +156,7 @@ def compute_stats(self, sample, rank=None, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])

return sample

Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/filter/video_ocr_area_ratio_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from data_juicer import cuda_device_count
from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import (extract_video_frames_uniformly,
from data_juicer.utils.mm_utils import (close_video,
extract_video_frames_uniformly,
load_data_with_context, load_video)

from ..base_op import OPERATORS, UNFORKABLE, Filter
Expand Down Expand Up @@ -171,7 +172,7 @@ def compute_stats(self, sample, rank=None, context=False):
video_ocr_area_ratios[video_key] = np.mean(frame_ocr_area_ratios)

if not context:
container.close()
close_video(container)

# get video durations
sample[Fields.stats][StatsKeys.video_ocr_area_ratio] = [
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/filter/video_resolution_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from jsonargparse.typing import PositiveInt

from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import load_data_with_context, load_video
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video)

from ..base_op import OPERATORS, Filter
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -91,7 +92,7 @@ def compute_stats(self, sample, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])

return sample

Expand Down
4 changes: 2 additions & 2 deletions data_juicer/ops/filter/video_watermark_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields, StatsKeys
from data_juicer.utils.mm_utils import (extract_key_frames,
from data_juicer.utils.mm_utils import (close_video, extract_key_frames,
extract_video_frames_uniformly,
load_data_with_context, load_video)
from data_juicer.utils.model_utils import get_model, prepare_model
Expand Down Expand Up @@ -157,7 +157,7 @@ def compute_stats(self, sample, rank=None, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])

return sample

Expand Down
6 changes: 4 additions & 2 deletions data_juicer/ops/mapper/video_captioning_from_frames_mapper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# yapf: disable
import copy
import random

Expand All @@ -8,7 +9,8 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import HashKeys
from data_juicer.utils.mm_utils import (SpecialTokens, extract_key_frames,
from data_juicer.utils.mm_utils import (SpecialTokens, close_video,
extract_key_frames,
extract_video_frames_uniformly,
insert_texts_after_placeholders,
load_data_with_context, load_video,
Expand Down Expand Up @@ -285,7 +287,7 @@ def _process_single_sample(self, ori_sample, rank=None, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])
return generated_samples

def _reduce_captions(self, chunk, generated_text_candidates_single_chunk):
Expand Down
6 changes: 4 additions & 2 deletions data_juicer/ops/mapper/video_captioning_from_video_mapper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# yapf: disable
import copy
import random

Expand All @@ -8,7 +9,8 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import HashKeys
from data_juicer.utils.mm_utils import (SpecialTokens, extract_key_frames,
from data_juicer.utils.mm_utils import (SpecialTokens, close_video,
extract_key_frames,
extract_video_frames_uniformly,
insert_texts_after_placeholders,
load_data_with_context, load_video,
Expand Down Expand Up @@ -292,7 +294,7 @@ def _process_single_sample(self, ori_sample, rank=None, context=False):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])
return generated_samples

def _reduce_captions(self, chunk, generated_text_candidates_single_chunk):
Expand Down
7 changes: 4 additions & 3 deletions data_juicer/ops/mapper/video_face_blur_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import transfer_filename
from data_juicer.utils.mm_utils import (load_data_with_context, load_video,
pil_to_opencv, process_each_frame)
from data_juicer.utils.mm_utils import (close_video, load_data_with_context,
load_video, pil_to_opencv,
process_each_frame)

from ..base_op import OPERATORS, Mapper
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -93,7 +94,7 @@ def process(self, sample, context=False):
processed_video_keys[video_key] = output_video_key

if not context:
video.close()
close_video(video)

# when the file is modified, its source file needs to be updated.
for i, value in enumerate(loaded_video_keys):
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/mapper/video_remove_watermark_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import transfer_filename
from data_juicer.utils.logger_utils import HiddenPrints
from data_juicer.utils.mm_utils import (extract_video_frames_uniformly,
from data_juicer.utils.mm_utils import (close_video,
extract_video_frames_uniformly,
load_data_with_context, load_video,
parse_string_to_roi,
process_each_frame)
Expand Down Expand Up @@ -233,7 +234,7 @@ def process_frame_func(frame):

if not context:
for vid_key in videos:
videos[vid_key].close()
close_video(videos[vid_key])

# when the file is modified, its source file needs to be updated.
for i, value in enumerate(sample[self.video_key]):
Expand Down
4 changes: 2 additions & 2 deletions data_juicer/ops/mapper/video_resize_aspect_ratio_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import transfer_filename
from data_juicer.utils.logger_utils import HiddenPrints
from data_juicer.utils.mm_utils import load_video
from data_juicer.utils.mm_utils import close_video, load_video

from ..base_op import OPERATORS, Mapper

Expand Down Expand Up @@ -117,7 +117,7 @@ def process(self, sample):
original_width = video.codec_context.width
original_height = video.codec_context.height
original_aspect_ratio = Fraction(original_width, original_height)
container.close()
close_video(container)

if (original_aspect_ratio >= self.min_ratio
and original_aspect_ratio <= self.max_ratio):
Expand Down
4 changes: 2 additions & 2 deletions data_juicer/ops/mapper/video_resize_resolution_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import transfer_filename
from data_juicer.utils.logger_utils import HiddenPrints
from data_juicer.utils.mm_utils import load_video
from data_juicer.utils.mm_utils import close_video, load_video

from ..base_op import OPERATORS, Mapper
from ..op_fusion import LOADED_VIDEOS
Expand Down Expand Up @@ -102,7 +102,7 @@ def process(self, sample, context=False):
width = video.codec_context.width
height = video.codec_context.height
origin_ratio = width / height
container.close()
close_video(container)

if width >= self.min_width and width <= self.max_width and \
height >= self.min_height and height <= self.max_height:
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/mapper/video_split_by_duration_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import (add_suffix_to_filename,
transfer_filename)
from data_juicer.utils.mm_utils import (SpecialTokens, cut_video_by_seconds,
from data_juicer.utils.mm_utils import (SpecialTokens, close_video,
cut_video_by_seconds,
get_video_duration, load_video)

from ..base_op import OPERATORS, Mapper
Expand Down Expand Up @@ -123,7 +124,7 @@ def _process_single_sample(self, sample):
video = videos[video_key]
new_video_keys = self.split_videos_by_duration(
video_key, video)
video.close()
close_video(video)
split_video_keys.extend(new_video_keys)
place_holders.append(SpecialTokens.video *
len(new_video_keys))
Expand Down
5 changes: 3 additions & 2 deletions data_juicer/ops/mapper/video_split_by_key_frame_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from data_juicer.utils.constant import Fields
from data_juicer.utils.file_utils import (add_suffix_to_filename,
transfer_filename)
from data_juicer.utils.mm_utils import (SpecialTokens, cut_video_by_seconds,
from data_juicer.utils.mm_utils import (SpecialTokens, close_video,
cut_video_by_seconds,
get_key_frame_seconds, load_video)

from ..base_op import OPERATORS, Mapper
Expand Down Expand Up @@ -105,7 +106,7 @@ def _process_single_sample(self, sample):
video_count]:
video = videos[video_key]
new_video_keys = self.get_split_key_frame(video_key, video)
video.close()
close_video(video)
split_video_keys.extend(new_video_keys)
place_holders.append(SpecialTokens.video *
len(new_video_keys))
Expand Down
7 changes: 6 additions & 1 deletion data_juicer/ops/mapper/video_tagging_from_frames_mapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from data_juicer.utils.availability_utils import AvailabilityChecking
from data_juicer.utils.constant import Fields
from data_juicer.utils.mm_utils import (extract_key_frames,
from data_juicer.utils.mm_utils import (close_video, extract_key_frames,
extract_video_frames_uniformly,
load_data_with_context, load_video)
from data_juicer.utils.model_utils import get_model, prepare_model
Expand Down Expand Up @@ -110,5 +110,10 @@ def process(self, sample, rank=None, context=False):
word_count = Counter(words)
sorted_word_list = [item for item, _ in word_count.most_common()]
video_tags.append(sorted_word_list)

if not context:
for vid_key in videos:
close_video(videos[vid_key])

sample[Fields.video_frame_tags] = video_tags
return sample
1 change: 1 addition & 0 deletions data_juicer/ops/op_fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ def compute_stats(self, sample, rank=None):
for context_key in sample[Fields.context]:
if isinstance(sample[Fields.context][context_key],
av.container.InputContainer):
sample[Fields.context][context_key].streams.video[0].close()
sample[Fields.context][context_key].close()
_ = sample.pop(Fields.context)
return sample
Expand Down
Loading
Loading