-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path2-script.py
101 lines (73 loc) · 2.66 KB
/
2-script.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
# Inpsired by - https://github.com/mrocklin/arxiv-matplotlib
# Released with BSD-3-Clause license - https://opensource.org/license/bsd-3-clause'
from google.cloud import storage
import coiled
# set key credentials file path
# import os
# os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = 'C:\\Users\\\USERNAME_GOES_HERE\\AppData\\Roaming\\gcloud\\application_default_credentials.json'
# define function that list files in the bucket
def list_cs_files(bucket_name, path, limit=1000):
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
print("Bucket is: "+str(bucket))
blobs = bucket.list_blobs(prefix=path, max_results=limit)
file_list = [file.name for file in blobs]
return file_list
# Path - 'gs://arxiv-dataset/arxiv/' - gs://<bucket_name>/<file_path_inside_bucket>.
file_list = list_cs_files('arxiv-dataset', 'arxiv', limit=1000)
pdf_files = [filename for filename in file_list if filename.endswith(".pdf")]
print(pdf_files[:10])
print("Length of PDF List: "+str(len(pdf_files)))
def extract(file_set):
""" Extract and process one directory of arXiv data
Returns
-------
filename: str
contains_matplotlib: boolean
"""
out = []
# Create a connection per file_set
storage_client = storage.Client()
bucket = storage_client.bucket('arxiv-dataset')
for filename in file_set:
"""Read the content of a PDF file from Google Cloud Storage."""
# client = storage.Client()
# bucket = client.bucket(bucket_name)
blob = bucket.blob(filename)
try:
# Download the PDF content as bytes
pdf_bytes = blob.download_as_bytes()
except Exception as e:
print(f"Error reading PDF: {str(e)}")
out.append((
filename,
b"matplotlib" in pdf_bytes.lower()
))
return out
out = extract(pdf_files[0:10])
print(out)
############
chunk_size = 10
chunked_list = [pdf_files[i:i + chunk_size] for i in range(0, len(pdf_files), chunk_size)]
print(chunked_list[0])
print("Number of chunks: "+str(len(chunked_list)))
############
cluster = coiled.Cluster(
n_workers=5,
name="arxiv",
package_sync=True,
backend_options={"region": "us-east1"}, # faster and cheaper
)
# Start cluster. Billing starts.
#client = cluster.get_client()
from dask.distributed import Client, wait
client = Client(cluster)
############
futures = client.map(extract, chunked_list)
wait(futures)
# We had one error in one file. Let's just ignore and move on.
good = [future for future in futures if future.status == "finished"]
lists = client.gather(good)
print(lists)
print("Closing cluster")
cluster.close()