diff --git a/.gitignore b/.gitignore index 5fdf186..b810d51 100644 --- a/.gitignore +++ b/.gitignore @@ -35,6 +35,8 @@ bin/ # MacOS .DS_Store +#Temporary Database +sources/raw_files # Conda .conda diff --git a/src/scripts/landscape_extactor.py b/src/scripts/landscape_extactor.py new file mode 100644 index 0000000..6d7563c --- /dev/null +++ b/src/scripts/landscape_extactor.py @@ -0,0 +1,109 @@ +import requests +import yaml +import os +from tqdm import tqdm +import threading +import shutil + + +TOKEN = "Replace your token" # Replace with your GitHub token to increas github API hourly rate to 5000 +HEADERS = {'Authorization': f'Bearer {TOKEN}'} +def downloader(url, output_directory, tags_dict, semaphore): + """ + This function downloads a single file from the url in the input. It is used by downloader_multi_thread() at each thread. + This function uses a semaphore to control the number of concurrent downloads. + Args: + url (str): A single url string. + output_directory (str): The path where the downloaded files will be stored. + tags_dict(dict): A dictionary containing the tags for each file. For example: Category, Subcategory, Project_name + semaphore (threading.Semaphore): A semaphore object used to limit the number of concurrent downloads. + """ + with semaphore: + try: + # Send HTTP GET request to download the file + if TOKEN == "Replace your token": + response = requests.get(url) + else: + response = requests.get(url, headers=HEADERS) + response.raise_for_status() # Raise an exception for HTTP errors + # Extract filename from URL + filename = os.path.basename(url) + # Add tags to each filename + # Seperate tags with "_" + filename = tags_dict['Category'] +"_"+ tags_dict['Subcategory'] +"_"+ tags_dict['Project_name'] +"_"+ filename + + # Write downloaded content to file + with open(os.path.join(output_directory, filename), 'wb') as f: + f.write(response.content) + + except Exception as e: + print(f"Failed to download file from {url}: {e}") + +def downloader_multi_thread(download_urls, output_directory, tags_dict): + """ + Downloads the files from the URLs provided in the input download_urls in the output_directory. Also, tags each downloaded file with + corresponding Category, Subcategory and Project_name in each file name. It accomplishes this task in a multi thread manner and downloads + multiple files at the same time. + Args: + download_urls (dict): A dictionary which contains a list of URLs for each file extention. + output_directory (str): The path where the downloaded files will be stored. + tags_dict(dict): A dictionary containing the tags for each file. For example: Category, Subcategory, Project_name + + """ + max_threads = 16 + for file_format in download_urls: + # exclude yml and yaml files from downloading + if file_format in ["yml","yaml"]: + continue + urls_list = download_urls[file_format] + semaphore = threading.Semaphore(max_threads) + threads = [] + for url in urls_list: + thread = threading.Thread(target=downloader, args=(url, output_directory, tags_dict, semaphore)) + threads.append(thread) + thread.start() + for thread in threads: + thread.join() + + +def download_files_from_yaml(yaml_file = "sources/landscape_augmented.yml", output_directory = "sources/raw_files"): + """ + Downloads the files with specific extensions from the URLs provided in yaml_file + + Args: + yaml_file (str, optional): The path to the URLs yaml file(default: sources/landscape_augmented.yml). + output_directory (str, optional): The path where the downloaded files will be stored(defult: sources/raw_files). + + """ + # Load URLs from YAML file + with open(yaml_file, 'r') as f: + data = yaml.safe_load(f) + + # Create output directory if it doesn't exist + os.makedirs(output_directory, exist_ok=True) + # Initialize a dictionary to save tags corresponding to each file + tags_dict = {'Category': "", 'Subcategory': "", 'Project_name': ""} + # Process the loaded data + for category in data['landscape']: + # Use below block if already downloaded a category and you don't want to downloaded it again. + # if category['name'] == "Provisioning": + # continue + tags_dict['Category'] = category['name'] + print(f"Category: {tags_dict['Category']}") + for subcategory in category.get('subcategories', []): + tags_dict['Subcategory'] = subcategory['name'] + print(f"Subcategory: {tags_dict['Subcategory']}") + for item in tqdm(subcategory.get('items', [])): + tags_dict['Project_name'] = item['name'] + print(f"Item: {tags_dict['Project_name']}") + downloader_multi_thread(item.get('download_urls',[]),output_directory, tags_dict) + # Adding all the files corresponding to a category to a zip file + shutil.make_archive("sources/"+ tags_dict['Category'], 'zip', output_directory+"/") + # Removing remminig raw files after archiving + shutil.rmtree(output_directory) + # Creat dirrectory for next category + os.makedirs(output_directory, exist_ok=True) + +# Example usage: +if __name__ == "__main__": + download_files_from_yaml() diff --git a/test/unit/landscape_extractor_test.py b/test/unit/landscape_extractor_test.py new file mode 100644 index 0000000..854986e --- /dev/null +++ b/test/unit/landscape_extractor_test.py @@ -0,0 +1,56 @@ +import unittest +import os +from src.scripts.landscape_extactor import download_files_from_yaml +#from landscape_extactor import download_files_from_yaml +import zipfile +import requests + + + +class Testdownload_files_from_yaml(unittest.TestCase): + """ + In order to this test works you must add your gitHub token in landscape_extractor.py file + + + """ + + def test_with_valid_input(self): + output_directory = "sources/raw_files_test" + os.makedirs(output_directory, exist_ok=True) + expected_zipFile = "sources/Test_Provisioning.zip" + response = requests.get("https://huggingface.co/datasets/anosh-rezaei/test_landscape_extactor_yml/resolve/main/test_landscape_augumented.yml?download=true") + response.raise_for_status() + # Write downloaded content to file + with open("sources/test_landscape_augumented.yml", 'wb') as f: + f.write(response.content) + download_files_from_yaml(yaml_file = "sources/test_landscape_augumented.yml", output_directory = output_directory) + + # Create the extract output_directory if it doesn't exist + os.makedirs(output_directory, exist_ok=True) + # Open the zip file + with zipfile.ZipFile(expected_zipFile, 'r') as zip_file: + # Extract all the contents to the specified output_directory + zip_file.extractall(output_directory) + # Assert the file exists + file_path = "sources/raw_files_test/Test_Provisioning_Automation & Configuration_Airship_bug_report.md" + error_message = f"File '{file_path}' was not downloaded." + self.assertTrue(os.path.exists(file_path), error_message) + file_path = "sources/raw_files_test/Test_Provisioning_Automation & Configuration_Airship_feature_request.md" + error_message = f"File '{file_path}' was not downloaded." + self.assertTrue(os.path.exists(file_path), error_message) + + def tearDown(self): + # Clean up: remove the output_directory and its contents + output_directory = "sources/raw_files_test/" + if os.path.exists(output_directory): + for filename in os.listdir(output_directory): + os.remove(os.path.join(output_directory, filename)) + os.rmdir(output_directory) + if os.path.exists("sources/Test_Provisioning.zip"): + os.remove("sources/Test_Provisioning.zip") + if os.path.exists("sources/test_landscape_augumented.yml"): + os.remove("sources/test_landscape_augumented.yml") + + +if __name__ == '__main__': + unittest.main()