-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmain.py
147 lines (129 loc) · 4.24 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import asyncio
import logging
from rich.console import Console
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
BarColumn,
TimeElapsedColumn,
TimeRemainingColumn,
)
import pandas as pd
from config import (
INPUT_CSV_PATH,
OUTPUT_DIRECTORY,
MAX_CONCURRENT_TASKS,
get_default_settings,
load_website_credentials,
format_url,
)
from utils import (
get_main_domain,
distribute_articles,
get_custom_filename,
generate_random_string,
)
from scraper import scrape_elements
from serper_api import fetch_serper_results_async
from process_websites import process_website_task
# Set up logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
console = Console()
async def main():
# Load configuration and data
defaults = await get_default_settings()
websites = load_website_credentials()
try:
df = pd.read_csv(INPUT_CSV_PATH)
except Exception as e:
logging.error(f"Failed to read CSV file: {e}")
return
formatted_urls = [await format_url(url) for url in df["Website URL"]]
data = []
serper_cache = {}
results = []
semaphore = asyncio.Semaphore(MAX_CONCURRENT_TASKS)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
TimeRemainingColumn(),
console=console,
) as progress:
overall_task = progress.add_task(
"Overall Progress", total=len(formatted_urls) * 4, visible=True
)
# Scrape elements concurrently
scraping_tasks = [scrape_elements(url, semaphore) for url in formatted_urls]
scrape_results = await asyncio.gather(*scraping_tasks)
for result in scrape_results:
url, title, language, description, combined_text = result
data.append([url, language, title, description, combined_text])
progress.update(overall_task, advance=1, description="Scraping")
# Process Serper results concurrently
keyword_columns = [col for col in df.columns if col.startswith("Keyword #")]
keywords = set(df[keyword_columns].values.flatten())
keywords.discard(None) # Remove None values
serper_tasks = [
fetch_serper_results_async(
keyword, serper_cache, defaults["input_country"], semaphore
)
for keyword in keywords
]
await asyncio.gather(*serper_tasks)
progress.update(
overall_task, advance=len(keywords), description="Fetching Serper Results"
)
# Process website tasks
website_indices = list(websites.keys())
current_website_index = 0
website_tasks = [
process_website_task(
index,
data[index][0],
data[index],
websites,
website_indices,
current_website_index,
serper_cache,
semaphore,
progress,
overall_task,
defaults,
df,
results,
)
for index in range(len(data))
]
await asyncio.gather(*website_tasks)
# Save results grouped by domain
results_by_domain = {}
for result in results:
website_url = result[0]
main_domain = get_main_domain(website_url)
results_by_domain.setdefault(main_domain, []).append(result)
for domain, domain_results in results_by_domain.items():
results_df = pd.DataFrame(
domain_results,
columns=[
"Client Website URL",
"My Post Permalink",
"Keyword",
"Article Category",
"Post ID",
"Meta Keywords",
"Meta Description",
"Image Alt Tag",
"Article Title",
],
)
output_file = f"{OUTPUT_DIRECTORY}/{domain}.csv"
results_df.to_csv(output_file, index=False)
logging.info(f"Results saved to {output_file}")
if __name__ == "__main__":
asyncio.run(main())