forked from h2oai/h2ogpt
-
Notifications
You must be signed in to change notification settings - Fork 2
/
read_wiki_full.py
343 lines (280 loc) · 12.3 KB
/
read_wiki_full.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
"""Load Data from a MediaWiki dump xml."""
import ast
import glob
import pickle
import uuid
from typing import List, Optional
import os
import bz2
import csv
import numpy as np
import pandas as pd
import pytest
from matplotlib import pyplot as plt
from langchain.docstore.document import Document
from langchain.document_loaders import MWDumpLoader
# path where downloaded wiki files exist, to be processed
root_path = "/data/jon/h2o-llm"
def unescape(x):
try:
x = ast.literal_eval(x)
except:
try:
x = x.encode('ascii', 'ignore').decode('unicode_escape')
except:
pass
return x
def get_views():
# views = pd.read_csv('wiki_page_views_more_1000month.csv')
views = pd.read_csv('wiki_page_views_more_5000month.csv')
views.index = views['title']
views = views['views']
views = views.to_dict()
views = {str(unescape(str(k))): v for k, v in views.items()}
views2 = {k.replace('_', ' '): v for k, v in views.items()}
# views has _ but pages has " "
views.update(views2)
return views
class MWDumpDirectLoader(MWDumpLoader):
def __init__(self, data: str, encoding: Optional[str] = "utf8",
title_words_limit=None, use_views=True, verbose=True):
"""Initialize with file path."""
self.data = data
self.encoding = encoding
self.title_words_limit = title_words_limit
self.verbose = verbose
if use_views:
# self.views = get_views()
# faster to use global shared values
self.views = global_views
else:
self.views = None
def load(self) -> List[Document]:
"""Load from file path."""
import mwparserfromhell
import mwxml
dump = mwxml.Dump.from_page_xml(self.data)
docs = []
for page in dump.pages:
if self.views is not None and page.title not in self.views:
if self.verbose:
print("Skipped %s low views" % page.title, flush=True)
continue
for revision in page:
if self.title_words_limit is not None:
num_words = len(' '.join(page.title.split('_')).split(' '))
if num_words > self.title_words_limit:
if self.verbose:
print("Skipped %s" % page.title, flush=True)
continue
if self.verbose:
if self.views is not None:
print("Kept %s views: %s" % (page.title, self.views[page.title]), flush=True)
else:
print("Kept %s" % page.title, flush=True)
code = mwparserfromhell.parse(revision.text)
text = code.strip_code(
normalize=True, collapse=True, keep_template_params=False
)
title_url = str(page.title).replace(' ', '_')
metadata = dict(title=page.title,
source="https://en.wikipedia.org/wiki/" + title_url,
id=page.id,
redirect=page.redirect,
views=self.views[page.title] if self.views is not None else -1,
)
metadata = {k: v for k, v in metadata.items() if v is not None}
docs.append(Document(page_content=text, metadata=metadata))
return docs
def search_index(search_term, index_filename):
byte_flag = False
data_length = start_byte = 0
index_file = open(index_filename, 'r')
csv_reader = csv.reader(index_file, delimiter=':')
for line in csv_reader:
if not byte_flag and search_term == line[2]:
start_byte = int(line[0])
byte_flag = True
elif byte_flag and int(line[0]) != start_byte:
data_length = int(line[0]) - start_byte
break
index_file.close()
return start_byte, data_length
def get_start_bytes(index_filename):
index_file = open(index_filename, 'r')
csv_reader = csv.reader(index_file, delimiter=':')
start_bytes = set()
for line in csv_reader:
start_bytes.add(int(line[0]))
index_file.close()
return sorted(start_bytes)
def get_wiki_filenames():
# requires
# wget http://ftp.acc.umu.se/mirror/wikimedia.org/dumps/enwiki/20230401/enwiki-20230401-pages-articles-multistream-index.txt.bz2
base_path = os.path.join(root_path, 'enwiki-20230401-pages-articles-multistream')
index_file = 'enwiki-20230401-pages-articles-multistream-index.txt'
index_filename = os.path.join(base_path, index_file)
wiki_filename = os.path.join(base_path, 'enwiki-20230401-pages-articles-multistream.xml.bz2')
return index_filename, wiki_filename
def get_documents_by_search_term(search_term):
index_filename, wiki_filename = get_wiki_filenames()
start_byte, data_length = search_index(search_term, index_filename)
with open(wiki_filename, 'rb') as wiki_file:
wiki_file.seek(start_byte)
data = bz2.BZ2Decompressor().decompress(wiki_file.read(data_length))
loader = MWDumpDirectLoader(data.decode())
documents = loader.load()
return documents
def get_one_chunk(wiki_filename, start_byte, end_byte, return_file=True,
title_words_limit=None,
use_views=True):
data_length = end_byte - start_byte
with open(wiki_filename, 'rb') as wiki_file:
wiki_file.seek(start_byte)
data = bz2.BZ2Decompressor().decompress(wiki_file.read(data_length))
loader = MWDumpDirectLoader(data.decode(), title_words_limit=title_words_limit,
use_views=use_views)
documents1 = loader.load()
if return_file:
base_tmp = "temp_wiki"
if not os.path.isdir(base_tmp):
os.makedirs(base_tmp, exist_ok=True)
filename = os.path.join(base_tmp, str(uuid.uuid4()) + ".tmp.pickle")
with open(filename, 'wb') as f:
pickle.dump(documents1, f)
return filename
return documents1
from joblib import Parallel, delayed
global_views = get_views()
def get_all_documents(small_test=2, n_jobs=None, use_views=True):
print("DO get all wiki docs: %s" % small_test, flush=True)
index_filename, wiki_filename = get_wiki_filenames()
start_bytes = get_start_bytes(index_filename)
end_bytes = start_bytes[1:]
start_bytes = start_bytes[:-1]
if small_test:
start_bytes = start_bytes[:small_test]
end_bytes = end_bytes[:small_test]
if n_jobs is None:
n_jobs = 5
else:
if n_jobs is None:
n_jobs = os.cpu_count() // 4
# default loky backend leads to name space conflict problems
return_file = True # large return from joblib hangs
documents = Parallel(n_jobs=n_jobs, verbose=10, backend='multiprocessing')(
delayed(get_one_chunk)(wiki_filename, start_byte, end_byte,
return_file=return_file, use_views=use_views) for start_byte, end_byte in
zip(start_bytes, end_bytes))
if return_file:
# then documents really are files
files = documents.copy()
documents = []
for fil in files:
with open(fil, 'rb') as f:
documents.extend(pickle.load(f))
os.remove(fil)
else:
from functools import reduce
from operator import concat
documents = reduce(concat, documents)
assert isinstance(documents, list)
print("DONE get all wiki docs", flush=True)
return documents
def test_by_search_term():
search_term = 'Apollo'
assert len(get_documents_by_search_term(search_term)) == 100
search_term = 'Abstract (law)'
assert len(get_documents_by_search_term(search_term)) == 100
search_term = 'Artificial languages'
assert len(get_documents_by_search_term(search_term)) == 100
def test_start_bytes():
index_filename, wiki_filename = get_wiki_filenames()
assert len(get_start_bytes(index_filename)) == 227850
def test_get_all_documents():
small_test = 20 # 227850
n_jobs = os.cpu_count() // 4
assert len(get_all_documents(small_test=small_test, n_jobs=n_jobs, use_views=False)) == small_test * 100
assert len(get_all_documents(small_test=small_test, n_jobs=n_jobs, use_views=True)) == 429
def get_one_pageviews(fil):
df1 = pd.read_csv(fil, sep=' ', header=None, names=['region', 'title', 'views', 'foo'], quoting=csv.QUOTE_NONE)
df1.index = df1['title']
df1 = df1[df1['region'] == 'en']
df1 = df1.drop('region', axis=1)
df1 = df1.drop('foo', axis=1)
df1 = df1.drop('title', axis=1) # already index
base_tmp = "temp_wiki_pageviews"
if not os.path.isdir(base_tmp):
os.makedirs(base_tmp, exist_ok=True)
filename = os.path.join(base_tmp, str(uuid.uuid4()) + ".tmp.csv")
df1.to_csv(filename, index=True)
return filename
def test_agg_pageviews(gen_files=False):
if gen_files:
path = os.path.join(root_path, 'wiki_pageviews/dumps.wikimedia.org/other/pageviews/2023/2023-04')
files = glob.glob(os.path.join(path, 'pageviews*.gz'))
# files = files[:2] # test
n_jobs = os.cpu_count() // 2
csv_files = Parallel(n_jobs=n_jobs, verbose=10, backend='multiprocessing')(
delayed(get_one_pageviews)(fil) for fil in files)
else:
# to continue without redoing above
csv_files = glob.glob(os.path.join(root_path, 'temp_wiki_pageviews/*.csv'))
df_list = []
for csv_file in csv_files:
print(csv_file)
df1 = pd.read_csv(csv_file)
df_list.append(df1)
df = pd.concat(df_list, axis=0)
df = df.groupby('title')['views'].sum().reset_index()
df.to_csv("wiki_page_views.csv", index=True)
def test_reduce_pageview():
filename = "wiki_page_views.csv"
df = pd.read_csv(filename)
df = df[df['views'] < 1e7]
#
plt.hist(df['views'], bins=100, log=True)
views_avg = np.mean(df['views'])
views_median = np.median(df['views'])
plt.title("Views avg: %s median: %s" % (views_avg, views_median))
plt.savefig(filename.replace('.csv', '.png'))
plt.close()
#
views_limit = 5000
df = df[df['views'] > views_limit]
filename = "wiki_page_views_more_5000month.csv"
df.to_csv(filename, index=True)
#
plt.hist(df['views'], bins=100, log=True)
views_avg = np.mean(df['views'])
views_median = np.median(df['views'])
plt.title("Views avg: %s median: %s" % (views_avg, views_median))
plt.savefig(filename.replace('.csv', '.png'))
plt.close()
@pytest.mark.skip("Only if doing full processing again, some manual steps")
def test_do_wiki_full_all():
# Install other requirements for wiki specific conversion:
# pip install -r reqs_optional/requirements_optional_wikiprocessing.txt
# Use "Transmission" in Ubuntu to get wiki dump using torrent:
# See: https://meta.wikimedia.org/wiki/Data_dump_torrents
# E.g. magnet:?xt=urn:btih:b2c74af2b1531d0b63f1166d2011116f44a8fed0&dn=enwiki-20230401-pages-articles-multistream.xml.bz2&tr=udp%3A%2F%2Ftracker.opentrackr.org%3A1337
# Get index
os.system("wget http://ftp.acc.umu.se/mirror/wikimedia.org/dumps/enwiki/20230401/enwiki-20230401-pages-articles-multistream-index.txt.bz2")
# Test that can use LangChain to get docs from subset of wiki as sampled out of full wiki directly using bzip multistream
test_get_all_documents()
# Check can search wiki multistream
test_by_search_term()
# Test can get all start bytes in index
test_start_bytes()
# Get page views, e.g. for entire month of April 2023
os.system("wget -b -m -k -o wget.log -e robots=off https://dumps.wikimedia.org/other/pageviews/2023/2023-04/")
# Aggregate page views from many files into single file
test_agg_pageviews(gen_files=True)
# Reduce page views to some limit, so processing of full wiki is not too large
test_reduce_pageview()
# Start generate.py with requesting wiki_full in prep. This will use page views as referenced in get_views.
# Note get_views as global() function done once is required to avoid very slow processing
# WARNING: Requires alot of memory to handle, used up to 300GB system RAM at peak
"""
python generate.py --langchain_mode='wiki_full' --visible_langchain_modes="['wiki_full', 'UserData', 'MyData', 'github h2oGPT', 'DriverlessAI docs']" &> lc_out.log
"""