Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

54 add vision and multimodal search capability to prompt clip agent #55

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 207 additions & 50 deletions backend/director/agents/prompt_clip.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
"type": "string",
"description": "Prompt to generate clip",
},
"content_type": {
"type": "string",
"enum": ["spoken_content", "visual_content", "multimodal"],
"description": "Type of content based on which clip is to be generated, default is spoken_content, spoken_content: based on transcript of the video, visual_content: based on visual description of the video, multimodal: based on both transcript and visual description of the video",
},
"video_id": {
"type": "string",
"description": "Video Id to generate clip",
Expand All @@ -32,7 +37,7 @@
"description": "Collection Id to of the video",
},
},
"required": ["prompt", "video_id", "collection_id"],
"required": ["prompt", "content_type", "video_id", "collection_id"],
}


Expand All @@ -54,12 +59,62 @@ def _chunk_docs(self, docs, chunk_size):
for i in range(0, len(docs), chunk_size):
yield docs[i : i + chunk_size] # Yield the current chunk

def _filter_transcript(self, transcript, start, end):
result = []
for entry in transcript:
if float(entry["end"]) > start and float(entry["start"]) < end:
result.append(entry)
return result

def _get_multimodal_docs(self, transcript, scenes, club_on="scene"):
# TODO: Implement club on transcript
docs = []
if club_on == "scene":
for scene in scenes:
spoken_result = self._filter_transcript(
transcript, float(scene["start"]), float(scene["end"])
)
spoken_text = " ".join(
entry["text"] for entry in spoken_result if entry["text"] != "-"
)
data = {
"visual": scene["description"],
"spoken": spoken_text,
"start": scene["start"],
"end": scene["end"],
}
docs.append(data)
return docs

def _prompt_runner(self, prompts):
"""Run the prompts in parallel."""
matches = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_index = {
executor.submit(
self.llm.chat_completions,
[ContextMessage(content=prompt, role=RoleTypes.user).to_llm_msg()],
response_format={"type": "json_object"},
): i
for i, prompt in enumerate(prompts)
}
for future in concurrent.futures.as_completed(future_to_index):
try:
llm_response = future.result()
if not llm_response.status:
logger.error(f"LLM failed with {llm_response.content}")
continue
output = json.loads(llm_response.content)
matches.extend(output["sentences"])
except Exception as e:
logger.exception(f"Error in getting matches: {e}")
continue
return matches

def _text_prompter(self, transcript_text, prompt):
chunk_size = 10000
# sentence tokenizer
chunks = self._chunk_docs(transcript_text, chunk_size=chunk_size)

matches = []
prompts = []
i = 0
for chunk in chunks:
Expand Down Expand Up @@ -94,61 +149,163 @@ def _text_prompter(self, transcript_text, prompt):
prompts.append(chunk_prompt)
i += 1

with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_index = {
executor.submit(
self.llm.chat_completions,
[
ContextMessage(
content=prompt, role=RoleTypes.user
).to_llm_msg()
],
response_format={"type": "json_object"},
): i
for i, prompt in enumerate(prompts)
}
for future in concurrent.futures.as_completed(future_to_index):
i = future_to_index[future]
try:
llm_response = future.result()
if not llm_response.status:
logger.error(f"LLM failed with {llm_response.content}")
continue
output = json.loads(llm_response.content)
matches.extend(output["sentences"])
except Exception as e:
logger.exception(f"Error in getting matches: {e}")
continue
return matches
return self._prompt_runner(prompts)

def _scene_prompter(self, scene_index, prompt):
chunk_size = 10000
chunks = self._chunk_docs(scene_index, chunk_size=chunk_size)

prompts = []
i = 0
for chunk in chunks:
descriptions = [scene["description"] for scene in chunk]
chunk_prompt = """
You are a video editor who uses AI. Given a user prompt and AI-generated scene descriptions of a video, analyze the descriptions to identify segments relevant to the user prompt for creating clips.

- **Instructions**:
- Evaluate the scene descriptions for relevance to the specified user prompt.
- Choose description with the highest relevance and most comprehensive content.
- Optimize for engaging viewing experiences, considering visual appeal and narrative coherence.

- User Prompts: Interpret prompts like 'find exciting moments' or 'identify key plot points' by matching keywords or themes in the scene descriptions to the intent of the prompt.
"""

chunk_prompt += f"""
Descriptions: {json.dumps(descriptions)}
User Prompt: {prompt}
"""

chunk_prompt += """
**Output Format**: Return a JSON list of strings named 'result' that containes the fileds `sentence` Ensure the final output
strictly adheres to the JSON format specified without including additional text or explanations. \
If there is no match return empty list without additional text. Use the following structure for your response:
{"sentences": []}
"""
prompts.append(chunk_prompt)
i += 1

return self._prompt_runner(prompts)

def _multimodal_prompter(self, transcript, scene_index, prompt):
docs = self._get_multimodal_docs(transcript, scene_index)
chunk_size = 80
chunks = self._chunk_docs(docs, chunk_size=chunk_size)

prompts = []
i = 0
for chunk in chunks:
chunk_prompt = f"""
You are given visual and spoken information of the video of each second, and a transcipt of what's being spoken along with timestamp.
Your task is to evaluate the data for relevance to the specified user prompt.
Corelate visual and spoken content to find the relevant video segment.

Multimodal Data:
video: {chunk}
User Prompt: {prompt}


"""
chunk_prompt += """
**Output Format**: Return a JSON list of strings named 'result' that containes the fileds `sentence`.
sentence is from the visual section of the input.
Ensure the final output strictly adheres to the JSON format specified without including additional text or explanations.
If there is no match return empty list without additional text. Use the following structure for your response:
{"sentences": []}
"""
prompts.append(chunk_prompt)
i += 1

return self._prompt_runner(prompts)

def _get_scenes(self, video_id):
self.output_message.actions.append("Retrieving video scenes..")
self.output_message.push_update()
scene_index_id = None
scene_list = self.videodb_tool.list_scene_index(video_id)
if scene_list:
scene_index_id = scene_list[0]["scene_index_id"]
return scene_index_id, self.videodb_tool.get_scene_index(
video_id=video_id, scene_id=scene_index_id
)
else:
self.output_message.actions.append("Indexing video scenes..")
self.output_message.push_update()
scene_index_id = self.videodb_tool.index_scene(
video_id=video_id,
extraction_config={"threshold": 20, "frame_count": 3},
prompt="Summarize the essence of the scene in one or two concise sentences without focusing on individual images.",
)
return scene_index_id, self.videodb_tool.get_scene_index(
video_id=video_id, scene_id=scene_index_id
)

def _get_transcript(self, video_id):
self.output_message.actions.append("Retrieving video transcript..")
self.output_message.push_update()
try:
return self.videodb_tool.get_transcript(
video_id
), self.videodb_tool.get_transcript(video_id, text=False)
except Exception:
self.output_message.actions.append(
"Transcript unavailable. Indexing spoken content."
)
self.output_message.push_update()
self.videodb_tool.index_spoken_words(video_id)
return self.videodb_tool.get_transcript(
video_id
), self.videodb_tool.get_transcript(video_id, text=False)

def run(
self, prompt: str, video_id: str, collection_id: str, *args, **kwargs
self,
prompt: str,
content_type: str,
video_id: str,
collection_id: str,
*args,
**kwargs,
) -> AgentResponse:
try:
videodb_tool = VideoDBTool(collection_id=collection_id)
self.output_message.actions.append("Retrieving video transcript..")
self.output_message.push_update()
try:
transcript_text = videodb_tool.get_transcript(video_id)
except Exception:
self.output_message.actions.append(
"Transcript unavailable. Indexing spoken content."
)
self.output_message.push_update()
videodb_tool.index_spoken_words(video_id)
transcript_text = videodb_tool.get_transcript(video_id)
self.videodb_tool = VideoDBTool(collection_id=collection_id)
result = []
if content_type == "spoken_content":
transcript_text, _ = self._get_transcript(video_id=video_id)
result = self._text_prompter(transcript_text, prompt)

elif content_type == "visual_content":
scene_index_id, scenes = self._get_scenes(video_id=video_id)
result = self._scene_prompter(scenes, prompt)

else:
_, transcript = self._get_transcript(video_id=video_id)
scene_index_id, scenes = self._get_scenes(video_id=video_id)
result = self._multimodal_prompter(transcript, scenes, prompt)

self.output_message.actions.append("Identifying key moments..")
self.output_message.push_update()
result = self._text_prompter(transcript_text, prompt)
result_timestamps = []
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_index = {
executor.submit(
videodb_tool.keyword_search, description, video_id
): description
for description in result
}
if content_type == "spoken_content":
future_to_index = {
executor.submit(
self.videodb_tool.keyword_search,
query=description,
video_id=video_id,
): description
for description in result
}
else:
future_to_index = {
executor.submit(
self.videodb_tool.keyword_search,
query=description,
index_type="scene",
video_id=video_id,
scene_index_id=scene_index_id,
): description
for description in result
}

for future in concurrent.futures.as_completed(future_to_index):
description = future_to_index[future]
try:
Expand All @@ -175,7 +332,7 @@ def run(
timeline = []
for timestamp in result_timestamps:
timeline.append((timestamp[0], timestamp[1]))
stream_url = videodb_tool.generate_video_stream(
stream_url = self.videodb_tool.generate_video_stream(
video_id=video_id, timeline=timeline
)
video_content.status_message = "Clip generated successfully."
Expand Down
32 changes: 27 additions & 5 deletions backend/director/tools/videodb_tool.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
import videodb

from videodb import SearchType, SubtitleStyle
from videodb import SearchType, SubtitleStyle, IndexType, SceneExtractionType
from videodb.timeline import Timeline
from videodb.asset import VideoAsset, ImageAsset

Expand Down Expand Up @@ -124,9 +124,27 @@ def index_spoken_words(self, video_id: str):
index = video.index_spoken_words()
return index

def index_scene(self, video_id: str):
def index_scene(
self,
video_id: str,
extraction_type=SceneExtractionType.shot_based,
extraction_config={},
prompt=None,
):
video = self.collection.get_video(video_id)
return video.index_scenes()
return video.index_scenes(
extraction_type=extraction_type,
extraction_config=extraction_config,
prompt=prompt,
)

def list_scene_index(self, video_id: str):
video = self.collection.get_video(video_id)
return video.list_scene_index()

def get_scene_index(self, video_id: str, scene_id: str):
video = self.collection.get_video(video_id)
return video.get_scene_index(scene_id)

def download(self, stream_link: str, name: str = None):
download_response = self.conn.download(stream_link, name)
Expand All @@ -140,10 +158,14 @@ def semantic_search(self, query, video_id=None):
search_resuls = self.collection.search(query=query)
return search_resuls

def keyword_search(self, query, video_id=None):
def keyword_search(
self, query, index_type=IndexType.spoken_word, video_id=None, **kwargs
):
"""Search for a keyword in a video."""
video = self.collection.get_video(video_id)
return video.search(query=query, search_type=SearchType.keyword)
return video.search(
query=query, search_type=SearchType.keyword, index_type=index_type, **kwargs
)

def generate_video_stream(self, video_id: str, timeline):
"""Generate a video stream from a timeline. timeline is a list of tuples. ex [(0, 10), (20, 30)]"""
Expand Down