Skip to content

Commit

Permalink
Add support for markers in description
Browse files Browse the repository at this point in the history
  • Loading branch information
tjwp committed Sep 4, 2023
1 parent a49e81d commit 746140b
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 31 deletions.
3 changes: 3 additions & 0 deletions pr_agent/settings/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ automatic_review=true
extra_instructions = ""

[pr_description] # /describe #
publish_labels=true
publish_description_as_comment=false
add_original_user_description=false
keep_original_user_title=false
use_description_markers=false
include_generated_by_header=true
extra_instructions = ""

[pr_questions] # /ask #
Expand Down
116 changes: 85 additions & 31 deletions pr_agent/tools/pr_description.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import copy
import json
import re
import logging
from typing import List, Tuple

Expand Down Expand Up @@ -28,6 +29,7 @@ def __init__(self, pr_url: str, args: list = None):
self.main_pr_language = get_main_pr_language(
self.git_provider.get_languages(), self.git_provider.get_files()
)
self.pr_id = f"{self.git_provider.repo}/{self.git_provider.pr_num}"

# Initialize the AI handler
self.ai_handler = AiHandler()
Expand Down Expand Up @@ -61,22 +63,34 @@ async def run(self):
"""
Generates a PR description using an AI model and publishes it to the PR.
"""
logging.info('Generating a PR description...')
logging.info(f"Generating a PR description {self.pr_id}")
if get_settings().config.publish_output:
self.git_provider.publish_comment("Preparing pr description...", is_temporary=True)

await retry_with_fallback_models(self._prepare_prediction)

logging.info('Preparing answer...')
pr_title, pr_body, pr_types, markdown_text = self._prepare_pr_answer()


logging.info(f"Preparing answer {self.pr_id}")
if self.prediction:
self._prepare_data()
else:
return None

if get_settings().pr_description.publish_labels:
pr_types = self._prepare_types()

if get_settings().pr_description.use_description_markers:
logging.info(f"Using description marker replacements {self.pr_id}")
pr_title, pr_body = self._prepare_marker_replacements()
else:
pr_title, pr_body, markdown_text = self._prepare_pr_answer()

if get_settings().config.publish_output:
logging.info('Pushing answer...')
logging.info(f"Pushing answer {self.pr_id}")
if get_settings().pr_description.publish_description_as_comment:
self.git_provider.publish_comment(markdown_text)
else:
self.git_provider.publish_description(pr_title, pr_body)
if self.git_provider.is_supported("get_labels"):
if get_settings().pr_description.publish_labels and self.git_provider.is_supported("get_labels"):
current_labels = self.git_provider.get_labels()
if current_labels is None:
current_labels = []
Expand All @@ -99,9 +113,12 @@ async def _prepare_prediction(self, model: str) -> None:
Any exceptions raised by the 'get_pr_diff' and '_get_prediction' functions.
"""
logging.info('Getting PR diff...')
if get_settings().pr_description.use_description_markers and 'pr_agent:' not in self.user_description:
return None

logging.info(f"Getting PR diff {self.pr_id}")
self.patches_diff = get_pr_diff(self.git_provider, self.token_handler, model)
logging.info('Getting AI prediction...')
logging.info(f"Getting AI prediction {self.pr_id}")
self.prediction = await self._get_prediction(model)

async def _get_prediction(self, model: str) -> str:
Expand Down Expand Up @@ -134,34 +151,71 @@ async def _get_prediction(self, model: str) -> str:

return response

def _prepare_pr_answer(self) -> Tuple[str, str, List[str], str]:
"""
Prepare the PR description based on the AI prediction data.

Returns:
- title: a string containing the PR title.
- pr_body: a string containing the PR body in a markdown format.
- pr_types: a list of strings containing the PR types.
- markdown_text: a string containing the AI prediction data in a markdown format. used for publishing a comment
"""
def _prepare_data(self):
# Load the AI prediction data into a dictionary
data = load_yaml(self.prediction.strip())
self.data = load_yaml(self.prediction.strip())

if get_settings().pr_description.add_original_user_description and self.user_description:
data["User Description"] = self.user_description
self.data["User Description"] = self.user_description


# Initialization
def _prepare_types(self) -> List[str]:
pr_types = []

# If the 'PR Type' key is present in the dictionary, split its value by comma and assign it to 'pr_types'
if 'PR Type' in data:
if type(data['PR Type']) == list:
pr_types = data['PR Type']
elif type(data['PR Type']) == str:
pr_types = data['PR Type'].split(',')
if 'PR Type' in self.data:
if type(self.data['PR Type']) == list:
pr_types = self.data['PR Type']
elif type(self.data['PR Type']) == str:
pr_types = self.data['PR Type'].split(',')

return pr_types

def _prepare_marker_replacements(self) -> Tuple[str, str]:
title = self.vars["title"]
body = self.user_description
if get_settings().pr_description.include_generated_by_header:
ai_header = f"### 🤖 Generated by PR Agent at {self.git_provider.last_commit_id.sha}\n\n"
else:
ai_header = ""

ai_summary = self.data.get('PR Description')
if ai_summary and not re.search(r'<!--\s*pr_agent:summary\s*-->', body):
summary = f"{ai_header}{ai_summary}"
body = body.replace('pr_agent:summary', summary)

if not re.search(r'<!--\s*pr_agent:walkthrough\s*-->', body):
ai_walkthrough = self.data.get('PR Main Files Walkthrough')
if ai_walkthrough:
walkthrough = str(ai_header)
for file in ai_walkthrough:
filename = file['filename'].replace("'", "`")
description = file['changes in file'].replace("'", "`")
walkthrough += f'- `{filename}`: {description}\n'

body = body.replace('pr_agent:walkthrough', walkthrough)

return title, body

def _prepare_pr_answer(self) -> Tuple[str, str, str]:
"""
Prepare the PR description based on the AI prediction data.
Returns:
- title: a string containing the PR title.
- pr_body: a string containing the PR description body in a markdown format.
- markdown_text: a string containing the AI prediction data in a markdown format. used for publishing a comment
"""

# Iterate over the dictionary items and append the key and value to 'markdown_text' in a markdown format
markdown_text = ""
for key, value in self.data.items():
markdown_text += f"## {key}\n\n"
markdown_text += f"{value}\n\n"

# Remove the 'PR Title' key from the dictionary
ai_title = data.pop('PR Title')
ai_title = self.data.pop('PR Title', self.vars["title"])
if get_settings().pr_description.keep_original_user_title:
# Assign the original PR title to the 'title' variable
title = self.vars["title"]
Expand All @@ -172,7 +226,7 @@ def _prepare_pr_answer(self) -> Tuple[str, str, List[str], str]:
# Iterate over the remaining dictionary items and append the key and value to 'pr_body' in a markdown format,
# except for the items containing the word 'walkthrough'
pr_body = ""
for idx, (key, value) in enumerate(data.items()):
for idx, (key, value) in enumerate(self.data.items()):
pr_body += f"## {key}:\n"
if 'walkthrough' in key.lower():
# for filename, description in value.items():
Expand All @@ -185,12 +239,12 @@ def _prepare_pr_answer(self) -> Tuple[str, str, List[str], str]:
if type(value) == list:
value = ', '.join(v for v in value)
pr_body += f"{value}\n"
if idx < len(data) - 1:
if idx < len(self.data) - 1:
pr_body += "\n___\n"

markdown_text = f"## Title\n\n{title}\n\n___\n{pr_body}"

if get_settings().config.verbosity_level >= 2:
logging.info(f"title:\n{title}\n{pr_body}")

return title, pr_body, pr_types, markdown_text
return title, pr_body, markdown_text

0 comments on commit 746140b

Please sign in to comment.