-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathcrawl.py
174 lines (152 loc) · 5.1 KB
/
crawl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
Download all publicly available and linked PDF documents from one domain.
"""
import ssl
import certifi
import hashlib
import json
import os
import shutil
import sys
from io import BytesIO
from typing import Dict, List, Optional, Set
from urllib.parse import urljoin, urlparse
import requests
from bs4 import BeautifulSoup # pip install bs4
def md5(f):
hash_md5 = hashlib.md5()
stream = BytesIO(f)
for chunk in iter(lambda: stream.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def standardize_url(url):
# crop anchor
if "?" in url:
return url.split("?")[0]
if "#" in url:
return url.split("#")[0]
return url
class Spider:
SNAPSHOT_FILE = "spider-snapshot.json"
def __init__(self, urls: List[str], required_prefix: str = ""):
self.pending_urls = urls
self.visited_urls: Set[str] = set()
self.required_prefix = required_prefix
self.stored_at: Dict[str, str] = {}
self.skip_suffixes = [
".jpg",
".png",
".gif",
".css",
".csv",
".xls",
".log",
".ps",
".xmp",
]
def get_url(self, target) -> Optional[str]:
return self.stored_at.get(target)
def crawl_loop(self):
while self.pending_urls:
url = self.pending_urls.pop()
self.visited_urls.add(url)
self.crawl_page(url)
print(url)
self.snapshot()
def crawl_page(self, url: str):
try:
response = requests.get(url)
except Exception as exc:
print(f"FAILED with {url} due to {exc}")
return
if response.status_code == 200:
if "content-type" not in response.headers:
print(f"!! No content type: {url}")
return
if "text/html" in response.headers["content-type"]:
self.get_links(url, response.text)
elif "application/pdf" in response.headers["content-type"]:
self.store_pdf(response.content, url)
else:
print((response.headers["content-type"], url))
def is_parsing_target(self, url: str) -> bool:
_, ext = os.path.splitext(url)
skip_domains = [
"youtube.com",
"twitter.com",
"google.com",
"xing.com",
"linkedin.com",
"apple.com",
"wordpress.com",
"mediawiki.com",
"facebook.com",
"flickr.com",
"creativecommons.org",
"matomo.org",
"github.com", # here might actually be quite a lot of PDFs
]
domain = urlparse(url).netloc # with subdomain
domain = ".".join(domain.split(".")[-2:])
return (
url not in self.visited_urls
and url not in self.pending_urls
and url.startswith(self.required_prefix)
and not ext in self.skip_suffixes
and domain not in skip_domains
)
def get_links(self, current_url, html_page):
soup = BeautifulSoup(html_page, "lxml")
for link in soup.findAll("a"):
url = link.get("href")
if not url:
continue
if not url.startswith("http"):
url = urljoin(current_url, url)
url = standardize_url(url)
print(f"...{url}")
if self.is_parsing_target(url):
self.pending_urls.append(url)
def store_pdf(self, content, url) -> None:
_total, _used, free = shutil.disk_usage("/")
free_gb = free / (2**30)
if free_gb < 3: # leave at least 3 GB
print("out of disk space")
sys.exit()
# url_md5sum = hashlib.md5(url.encode('utf-8')).hexdigest()
url_md5sum = md5(content)
if not os.path.exists("pdf"):
os.mkdir("pdf")
target = f"pdf/{url_md5sum}.pdf"
if target not in self.stored_at:
self.stored_at[target] = url
if os.path.exists(target):
print(f"{target} already exists: Skip {url}")
return None
with open(target, "wb") as f:
f.write(content)
def snapshot(self):
with open(Spider.SNAPSHOT_FILE, "w") as f:
json.dump(
{
"stored_at": self.stored_at,
"visited_urls": list(self.visited_urls),
"pending_urls": self.pending_urls,
},
f,
indent=2,
)
def load(self):
if os.path.exists(Spider.SNAPSHOT_FILE):
with open(Spider.SNAPSHOT_FILE) as f:
snapshot = json.load(f)
self.pending_urls = snapshot["pending_urls"]
self.visited_urls = set(snapshot["visited_urls"])
self.stored_at = snapshot.get("stored_at", {})
if __name__ == "__main__":
spider = Spider(
["https://corpora.tika.apache.org/base/docs/govdocs1/"],
required_prefix="https://corpora.tika",
)
spider.load()
spider.crawl_loop()