forked from Polaris000/LinkRot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfinder.py
198 lines (148 loc) · 6.17 KB
/
finder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import requests
from bs4 import BeautifulSoup
import sldextract as s
import sys
import normalizer as n
import sqlite3 as sql
from tqdm import tqdm
import time
import urllib.parse
import argparse
import pandas as pd
broken_links = set()
checked_links = set()
old_working_links = set()
old_broken_links = set()
#checked_file = None
#broken_file = None
checkedURLs = None
brokenURLs = None
headers_checked_file = ""
headers_broken_file = ""
main_url = ""
def summary():
print()
print("Ended at {}\n".format(int(time.time()*1000.0)))
print("Summary")
print("--------------------------------------------------")
print("Links Checked: \t", len(checked_links))
print("Broken Links: \t", len(broken_links))
print("Newly Broken Links: \t", len(old_broken_links))
print("Link Rot: \t{0:.2f}%".format(0 if not len(checked_links) else len(broken_links)
/ len(checked_links) * 100))
print("--------------------------------------------------")
def read_url(url):
global brokenURLs, checkedURLs, broken_links, checked_links
#print("TODO: Replace `last_seen` with `is_new` UNIX timestamp")
#checked_links.append(url)
checked_links.add(url)
if targetURL in broken_links:
#print("Link is already broken skipping!")
return
# check normalizer.py mailto: condition
if url is not None:
try:
#enc_url = urllib.parse.quote(url)
#enc_url = url.replace(" ","%20")
#print(enc_url)
#url_request = requests.get(enc_url)
url_request = requests.get(url,stream=True)
except Exception:
print("Could not read url...")
return None
#print("...done")
#if url != main_url:
#print("Checking: ", url)
# url_domain = s.extract(url)["url_domain"]
#else:
# url_domain = main_url_domain
is_ok = True
if url_request.status_code >= 400:
#broken_links.append(url)
broken_links.add(url)
is_ok = False
#write_broken = url + "," + str(url_request.status_code) + "\n"
#broken_file.write(write_broken)
brokenURLs = brokenURLs.append({'url': url, 'status_code': url_request.status_code, 'gone_by': int(time.time()*1000.0)},ignore_index=True)
if url in old_working_links:
#remove!
#print("I SHOULD REMOVE")
#print(url)
#print(checkedURLs[checkedURLs['url']==url])
old_working_links.discard(url)
old_broken_links.add(url)
#print("* Broken url: ", url)
#print("")
return None
#write_checked = url + "," \
# + str(url_request.status_code) + "," + str(is_ok) + "\n"
#checked_file.write(write_checked)
checkedURLs = checkedURLs.append({'url': url, 'status_code': url_request.status_code, 'last_seen': int(time.time()*1000)},ignore_index=True)
def initialize():
print("TODO (LATER): OUTPUT TO SQLite DB as well!")
#global checked_file, broken_file, headers_checked_file, headers_broken_file
#checked_file = open(CHURL_PATH, "w")
#headers_checked_file = "url" + "," + "status_code" + "," \
# + "is_ok" + "\n"
#checked_file.write(headers_checked_file)
#try:
# broken_file = open(BURL_PATH, "a+")
#except:
# #if broken_file does not exist
# broken_file = open(BURL_PATH, "w")
# #headers_broken_file = "url" + "," + "status_code" + "\n"
# headers_broken_file = "url" + "," + "status_code" + "," + "last_seen" + "\n"
# broken_file.write(headers_broken_file)
global checkedURLs, brokenURLs, checked_links, broken_links, old_working_links
try:
pd.read_csv(CHURL_PATH,index_col=0)
checkedURLs = pd.DataFrame({'url': [], 'status_code': [], 'last_seen': []})
old_working_links = set(checkedURLs['url'].to_list())
except:
checkedURLs = pd.DataFrame({'url': [], 'status_code': [], 'last_seen': []})
old_working_links = set()
try:
brokenURLs = pd.read_csv(BURL_PATH,index_col=0)
broken_links = set(brokenURLs['url'].to_list())
#print(broken_links)
except:
brokenURLs = pd.DataFrame({'url': [], 'status_code': [], 'gone_by': []})
if __name__ == '__main__':
#Database File
#DBFile = sys.argv[1]
#LIMIT URLs to check, recommended for debugging
#LIMIT = -1
#if len(sys.argv) > 2:
# LIMIT = sys.argv[2]
parser = argparse.ArgumentParser(description="Check for dead links on MediaFire.")
parser.add_argument("dbfile",type=str,help="Path to database file downloaded from https://urls.ajay.app/.")
parser.add_argument("-b","--brokenurls",type=str,help="Path to csv of broken urls.")
parser.add_argument("-c","--checkedurls",type=str,help="Path to csv of previously checked urls.")
parser.add_argument("-L","--limit",type=int,help="Limit of max links to check, meant for debugging.")
#parser.add_argument("-br","--brokenURLs",type=int,help="TBA")
parser.add_argument("-l","--lastexecution",type=float,help="UNIX Timestamp of Last Execution.\nThis is used for labeling when a broken link was last seen.")
args = parser.parse_args()
#print(args)
#SET LIMIT, DBFILE and LAST SEEN
DBFile, LIMIT, BYGONE = args.dbfile, (args.limit or -1), (str(args.lastexecution) or "n/a")
BURL_PATH, CHURL_PATH = (args.brokenurls or "broken_urls.csv"), (args.checkedurls or "checked_urls.csv")
#print(BURL_PATH, CHURL_PATH)
initialize()
#quit()
print("Reading: ", DBFile)
print("--------------------------------------------------")
N = s.read_SQLite_DB(DBFile,"url","urls",LIMIT)
print("Started at {}\n".format(int(time.time()*1000.0)))
#BYGONE 1612126565
for i in tqdm (range (N), desc="Checking URLs in {}".format(DBFile)):
targetURL = s.getList()[i]
#print(targetURL,broken_links)
read_url(targetURL)
summary()
#checkedURLs.reset_index(drop=False)
#brokenURLs.reset_index(drop=False)
print(checkedURLs)
checkedURLs.to_csv(CHURL_PATH)
brokenURLs.to_csv(BURL_PATH)
#checked_file.close()
#broken_file.close()