diff --git a/backend/director/agents/prompt_clip.py b/backend/director/agents/prompt_clip.py index c1c32cc..95f8278 100644 --- a/backend/director/agents/prompt_clip.py +++ b/backend/director/agents/prompt_clip.py @@ -23,6 +23,11 @@ "type": "string", "description": "Prompt to generate clip", }, + "content_type": { + "type": "string", + "enum": ["spoken_content", "visual_content", "multimodal"], + "description": "Type of content based on which clip is to be generated, default is spoken_content, spoken_content: based on transcript of the video, visual_content: based on visual description of the video, multimodal: based on both transcript and visual description of the video", + }, "video_id": { "type": "string", "description": "Video Id to generate clip", @@ -32,7 +37,7 @@ "description": "Collection Id to of the video", }, }, - "required": ["prompt", "video_id", "collection_id"], + "required": ["prompt", "content_type", "video_id", "collection_id"], } @@ -54,12 +59,62 @@ def _chunk_docs(self, docs, chunk_size): for i in range(0, len(docs), chunk_size): yield docs[i : i + chunk_size] # Yield the current chunk + def _filter_transcript(self, transcript, start, end): + result = [] + for entry in transcript: + if float(entry["end"]) > start and float(entry["start"]) < end: + result.append(entry) + return result + + def _get_multimodal_docs(self, transcript, scenes, club_on="scene"): + # TODO: Implement club on transcript + docs = [] + if club_on == "scene": + for scene in scenes: + spoken_result = self._filter_transcript( + transcript, float(scene["start"]), float(scene["end"]) + ) + spoken_text = " ".join( + entry["text"] for entry in spoken_result if entry["text"] != "-" + ) + data = { + "visual": scene["description"], + "spoken": spoken_text, + "start": scene["start"], + "end": scene["end"], + } + docs.append(data) + return docs + + def _prompt_runner(self, prompts): + """Run the prompts in parallel.""" + matches = [] + with concurrent.futures.ThreadPoolExecutor() as executor: + future_to_index = { + executor.submit( + self.llm.chat_completions, + [ContextMessage(content=prompt, role=RoleTypes.user).to_llm_msg()], + response_format={"type": "json_object"}, + ): i + for i, prompt in enumerate(prompts) + } + for future in concurrent.futures.as_completed(future_to_index): + try: + llm_response = future.result() + if not llm_response.status: + logger.error(f"LLM failed with {llm_response.content}") + continue + output = json.loads(llm_response.content) + matches.extend(output["sentences"]) + except Exception as e: + logger.exception(f"Error in getting matches: {e}") + continue + return matches + def _text_prompter(self, transcript_text, prompt): chunk_size = 10000 # sentence tokenizer chunks = self._chunk_docs(transcript_text, chunk_size=chunk_size) - - matches = [] prompts = [] i = 0 for chunk in chunks: @@ -94,61 +149,163 @@ def _text_prompter(self, transcript_text, prompt): prompts.append(chunk_prompt) i += 1 - with concurrent.futures.ThreadPoolExecutor() as executor: - future_to_index = { - executor.submit( - self.llm.chat_completions, - [ - ContextMessage( - content=prompt, role=RoleTypes.user - ).to_llm_msg() - ], - response_format={"type": "json_object"}, - ): i - for i, prompt in enumerate(prompts) - } - for future in concurrent.futures.as_completed(future_to_index): - i = future_to_index[future] - try: - llm_response = future.result() - if not llm_response.status: - logger.error(f"LLM failed with {llm_response.content}") - continue - output = json.loads(llm_response.content) - matches.extend(output["sentences"]) - except Exception as e: - logger.exception(f"Error in getting matches: {e}") - continue - return matches + return self._prompt_runner(prompts) + + def _scene_prompter(self, scene_index, prompt): + chunk_size = 10000 + chunks = self._chunk_docs(scene_index, chunk_size=chunk_size) + + prompts = [] + i = 0 + for chunk in chunks: + descriptions = [scene["description"] for scene in chunk] + chunk_prompt = """ + You are a video editor who uses AI. Given a user prompt and AI-generated scene descriptions of a video, analyze the descriptions to identify segments relevant to the user prompt for creating clips. + + - **Instructions**: + - Evaluate the scene descriptions for relevance to the specified user prompt. + - Choose description with the highest relevance and most comprehensive content. + - Optimize for engaging viewing experiences, considering visual appeal and narrative coherence. + + - User Prompts: Interpret prompts like 'find exciting moments' or 'identify key plot points' by matching keywords or themes in the scene descriptions to the intent of the prompt. + """ + + chunk_prompt += f""" + Descriptions: {json.dumps(descriptions)} + User Prompt: {prompt} + """ + + chunk_prompt += """ + **Output Format**: Return a JSON list of strings named 'result' that containes the fileds `sentence` Ensure the final output + strictly adheres to the JSON format specified without including additional text or explanations. \ + If there is no match return empty list without additional text. Use the following structure for your response: + {"sentences": []} + """ + prompts.append(chunk_prompt) + i += 1 + + return self._prompt_runner(prompts) + + def _multimodal_prompter(self, transcript, scene_index, prompt): + docs = self._get_multimodal_docs(transcript, scene_index) + chunk_size = 80 + chunks = self._chunk_docs(docs, chunk_size=chunk_size) + + prompts = [] + i = 0 + for chunk in chunks: + chunk_prompt = f""" + You are given visual and spoken information of the video of each second, and a transcipt of what's being spoken along with timestamp. + Your task is to evaluate the data for relevance to the specified user prompt. + Corelate visual and spoken content to find the relevant video segment. + + Multimodal Data: + video: {chunk} + User Prompt: {prompt} + + + """ + chunk_prompt += """ + **Output Format**: Return a JSON list of strings named 'result' that containes the fileds `sentence`. + sentence is from the visual section of the input. + Ensure the final output strictly adheres to the JSON format specified without including additional text or explanations. + If there is no match return empty list without additional text. Use the following structure for your response: + {"sentences": []} + """ + prompts.append(chunk_prompt) + i += 1 + + return self._prompt_runner(prompts) + + def _get_scenes(self, video_id): + self.output_message.actions.append("Retrieving video scenes..") + self.output_message.push_update() + scene_index_id = None + scene_list = self.videodb_tool.list_scene_index(video_id) + if scene_list: + scene_index_id = scene_list[0]["scene_index_id"] + return scene_index_id, self.videodb_tool.get_scene_index( + video_id=video_id, scene_id=scene_index_id + ) + else: + self.output_message.actions.append("Indexing video scenes..") + self.output_message.push_update() + scene_index_id = self.videodb_tool.index_scene( + video_id=video_id, + extraction_config={"threshold": 20, "frame_count": 3}, + prompt="Summarize the essence of the scene in one or two concise sentences without focusing on individual images.", + ) + return scene_index_id, self.videodb_tool.get_scene_index( + video_id=video_id, scene_id=scene_index_id + ) + + def _get_transcript(self, video_id): + self.output_message.actions.append("Retrieving video transcript..") + self.output_message.push_update() + try: + return self.videodb_tool.get_transcript( + video_id + ), self.videodb_tool.get_transcript(video_id, text=False) + except Exception: + self.output_message.actions.append( + "Transcript unavailable. Indexing spoken content." + ) + self.output_message.push_update() + self.videodb_tool.index_spoken_words(video_id) + return self.videodb_tool.get_transcript( + video_id + ), self.videodb_tool.get_transcript(video_id, text=False) def run( - self, prompt: str, video_id: str, collection_id: str, *args, **kwargs + self, + prompt: str, + content_type: str, + video_id: str, + collection_id: str, + *args, + **kwargs, ) -> AgentResponse: try: - videodb_tool = VideoDBTool(collection_id=collection_id) - self.output_message.actions.append("Retrieving video transcript..") - self.output_message.push_update() - try: - transcript_text = videodb_tool.get_transcript(video_id) - except Exception: - self.output_message.actions.append( - "Transcript unavailable. Indexing spoken content." - ) - self.output_message.push_update() - videodb_tool.index_spoken_words(video_id) - transcript_text = videodb_tool.get_transcript(video_id) + self.videodb_tool = VideoDBTool(collection_id=collection_id) + result = [] + if content_type == "spoken_content": + transcript_text, _ = self._get_transcript(video_id=video_id) + result = self._text_prompter(transcript_text, prompt) + + elif content_type == "visual_content": + scene_index_id, scenes = self._get_scenes(video_id=video_id) + result = self._scene_prompter(scenes, prompt) + + else: + _, transcript = self._get_transcript(video_id=video_id) + scene_index_id, scenes = self._get_scenes(video_id=video_id) + result = self._multimodal_prompter(transcript, scenes, prompt) self.output_message.actions.append("Identifying key moments..") self.output_message.push_update() - result = self._text_prompter(transcript_text, prompt) result_timestamps = [] with concurrent.futures.ThreadPoolExecutor() as executor: - future_to_index = { - executor.submit( - videodb_tool.keyword_search, description, video_id - ): description - for description in result - } + if content_type == "spoken_content": + future_to_index = { + executor.submit( + self.videodb_tool.keyword_search, + query=description, + video_id=video_id, + ): description + for description in result + } + else: + future_to_index = { + executor.submit( + self.videodb_tool.keyword_search, + query=description, + index_type="scene", + video_id=video_id, + scene_index_id=scene_index_id, + ): description + for description in result + } + for future in concurrent.futures.as_completed(future_to_index): description = future_to_index[future] try: @@ -175,7 +332,7 @@ def run( timeline = [] for timestamp in result_timestamps: timeline.append((timestamp[0], timestamp[1])) - stream_url = videodb_tool.generate_video_stream( + stream_url = self.videodb_tool.generate_video_stream( video_id=video_id, timeline=timeline ) video_content.status_message = "Clip generated successfully." diff --git a/backend/director/tools/videodb_tool.py b/backend/director/tools/videodb_tool.py index b2a68da..564a08c 100644 --- a/backend/director/tools/videodb_tool.py +++ b/backend/director/tools/videodb_tool.py @@ -1,7 +1,7 @@ import os import videodb -from videodb import SearchType, SubtitleStyle +from videodb import SearchType, SubtitleStyle, IndexType, SceneExtractionType from videodb.timeline import Timeline from videodb.asset import VideoAsset, ImageAsset @@ -124,9 +124,27 @@ def index_spoken_words(self, video_id: str): index = video.index_spoken_words() return index - def index_scene(self, video_id: str): + def index_scene( + self, + video_id: str, + extraction_type=SceneExtractionType.shot_based, + extraction_config={}, + prompt=None, + ): video = self.collection.get_video(video_id) - return video.index_scenes() + return video.index_scenes( + extraction_type=extraction_type, + extraction_config=extraction_config, + prompt=prompt, + ) + + def list_scene_index(self, video_id: str): + video = self.collection.get_video(video_id) + return video.list_scene_index() + + def get_scene_index(self, video_id: str, scene_id: str): + video = self.collection.get_video(video_id) + return video.get_scene_index(scene_id) def download(self, stream_link: str, name: str = None): download_response = self.conn.download(stream_link, name) @@ -140,10 +158,14 @@ def semantic_search(self, query, video_id=None): search_resuls = self.collection.search(query=query) return search_resuls - def keyword_search(self, query, video_id=None): + def keyword_search( + self, query, index_type=IndexType.spoken_word, video_id=None, **kwargs + ): """Search for a keyword in a video.""" video = self.collection.get_video(video_id) - return video.search(query=query, search_type=SearchType.keyword) + return video.search( + query=query, search_type=SearchType.keyword, index_type=index_type, **kwargs + ) def generate_video_stream(self, video_id: str, timeline): """Generate a video stream from a timeline. timeline is a list of tuples. ex [(0, 10), (20, 30)]"""