-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrun.py
308 lines (281 loc) · 11.7 KB
/
run.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
__author__="JeremyNelson, Mike Stabile"
import argparse
import datetime
import requests
import rdflib
import json
from elasticsearch import Elasticsearch
from elasticsearch import helpers
from sparql.general import*
from sparql.languages import workflow as languages
from sparql.subjects import workflow as subjects
from sparql import CONSTRUCT_GRAPH_PRE_URI
from sparql import CONSTRUCT_GRAPH_POST_URI
from sparql import CONSTRUCT_GRAPH_PRE_LANG
from sparql import CONSTRUCT_GRAPH_POST_LANG
from sparql import CONSTRUCT_GRAPH_END
# This is in-lue of a config file until such time it makes sense to create one
def config():
settings = {
'triplestore':"http://localhost:9999/bigdata/sparql",
'pulltype':"all",
'format':'application/x-turtle',
'filename':"default",
'langpref':"all languages",
'queryfile':"elasticsearchquery.rq",
'es_url':'http://localhost:9200',
'bulkactions':{
'_op_type': 'create',
'_index': 'bf',
'_type': 'reference',
'_id': ['field','resource']
}
}
return settings
def execute_queries(queries,url):
for i, sparql in enumerate(queries):
print("{}.".format(i+1),end="")
print(sparql[1:sparql.find('\n')])
result=requests.post(
url,
data={"update":sparql})
if result.status_code>399:
print("..ERROR with {} for {}\n{}".format(
sparql,
url,
result.text))
#This function will query the sparl endpoint and create those items in elasticsearch
def push_dataToElasticsearch(args):
#set initial args\parameters
c = config()
url = args.get('triplestore',c['triplestore']) #URL to sparql endpoint of the triplestore
esUrl = args.get('es_url',c['es_url']) #URL to elasticsearch
qFile = args.get('queryfile',c['queryfile']) #File containing the SPARQL query
mode = args.get('mode','normal') #Turn on debug mode
aargs = args.get('bulkactions',c['bulkactions'])
#************************* Bulkactions arg passing needs corrected *******************
actionSettings = {
'_op_type': aargs.get('_op_type',c['bulkactions']['_op_type']),
'_index': aargs.get('_index',c['bulkactions']['_index']),
'_type': aargs.get('_type',c['bulkactions']['_type']),
'_id': aargs.get('_id',c['bulkactions']['_id'])
}
args.get('bulkactions',c['bulkactions']).get('_op_type',c['bulkactions']['_op_type']), #bulk update settings
startSparql =datetime.datetime.now() #Start a timer for the query portion
print("Starting {} at {}".format(
"SPARQL Query",
startSparql.isoformat()))
#read the query string file
qFile_fo = open(qFile, encoding="utf-8")
qStr = qFile_fo.read()
qFile_fo.close()
#run the query against the triplestore and store the results in esItems
result = requests.post(
url,
data={"query": qStr,
'format':'json'})
esItems = result.json().get('results').get('bindings')
#end query timer
endSparql = datetime.datetime.now()
print("\nFinished {} at {}, total time={} seconds".format(
"SPARQL query",
endSparql.isoformat(),
(endSparql-startSparql).seconds))
#iterate over query results and build the elasticsearch actionList for bulk upload
total = 0
actionList = []
for i in esItems:
total = total + 1
if actionSettings['_id'][0] == 'field':
itemId = i[actionSettings['_id'][1]]['value']
if mode != 'debug':
jsonItem = json.loads('{'+i['obj']['value']+'}') #convert SPARQL item result to json object
actionItem = { #create the action item to be pushed into es
'_op_type': actionSettings['_op_type'],
'_index': actionSettings['_index'],
'_type': actionSettings['_type'],
'_id': itemId,
'_source': jsonItem
}
actionList.append(actionItem) #add the item to the list of actions for the bulk loader
#actionList.append(jsonItem)
else:
#if in debug mode post each item individually and print any errors
print(total,". ",i['resource']['value'])
lFile=(('{ "create" : { "_index" : "bf", "_type" : "reference", "_id" : "'+ i['resource']['value'] + '" } }\n'))
lFile+=(('{'+i['obj']['value']+'}\n').encode())
jsonItem = json.loads('{'+i['obj']['value']+'}')
result = requests.post(esUrl + "/_bulk",
data = lFile)
rNote = json.loads((result.content).decode())
if rNote.get("errors"):
item = rNote.get("items")[0]
if item.get("create").get("status") != 409:
print(json.dumps(item.get("create")))
#push items to es using bulk helper
if mode != 'debug':
print(total," items to post to elasticsearch")
print("Now pushing into ElasticSearch")
es = Elasticsearch([esUrl])
helpers.bulk(
es,
actionList,
stats_only=True
)
endEs = datetime.datetime.now()
print("\nFinished {} at {}, total time={} seconds".format(
"Processing and Pushing to Es",
endEs.isoformat(),
(endEs-endSparql).seconds))
#This function will generate a list of resource URIs as strings based on a filter for triples
def get_referenceURIs(filterTriple,url):
result = requests.post(
url,
data={"query": PREFIX + "SELECT ?sReturn WHERE { " +filterTriple + " . BIND (STR(?s) AS ?sReturn) }",
'format':'json'})
uriItems = result.json().get('results').get('bindings')
returnlist = []
for uri in uriItems:
returnlist.append(uri['sReturn']['value'])
return returnlist
#This function will generate the fedora resources
#*** graph subjects must already be encoded to the fedoraURIs
def generate_fedora_refs(tripleStoreURL,refType):
if refType == "language":
uriList = get_referenceURIs("?s a bf:Language",tripleStoreURL)
if refType == 'test':
uriList = ['http://localhost:8080/fedora/rest/ref/fre']
print("There are ",len(uriList)," references to process.")
i = 0
for uri in uriList:
i = i + 1
print(i,". ",uri)
args = {'triplestore':tripleStoreURL,
'returntype': 'return',
'resourceuri': uri}
graph = pull_graph(args)
graph_response = requests.put(uri,
data=graph,
headers={"Content-Type": "text/turtle"})
#This function will pull a graph from the triplestore based on a resource URI or a group of resource URIs
def pull_graph(args):
c=config()
url = args.get('triplestore',c['triplestore'])
pulltype = args.get('pulltype',c['pulltype'])
header_format = args.get('format',c['format'])
fName = args.get('filename',c['filename'])
langpref = args.get('langpref',c['langpref'])
returnType = args.get('returntype','file')
if pulltype=="resource":
#add the resource uri to the query string to pull a single graph
qstr="BIND(<"+args['resourceuri']+"> AS ?s1)"
elif pulltype=="all":
#use the string from the sparl select to pull all of the graph values.
#the variable ?s1 needs to contain all of the resources that you want to pull.
#example: ?s1 a bf:Language pulls all of the language graphs
qstr=args['sparqlselect']
qstr=PREFIX+CONSTRUCT_GRAPH_PRE_URI+qstr+CONSTRUCT_GRAPH_POST_URI
#langpref allows you to display the graph only in one language.
if langpref == 'all languages':
qstr=qstr+CONSTRUCT_GRAPH_END
else:
qstr=qstr+CONSTRUCT_GRAPH_PRE_LANG+langpref+CONSTRUCT_GRAPH_POST_LANG+CONSTRUCT_GRAPH_END
#send query to triplestore
result=requests.post(
url,
data={"query":qstr},
headers={"Accept":header_format}
)
#process results as file or return the contents to the calling function
if returnType == 'file':
if fName=='default':
fName=result.headers.get('Content-disposition')[result.headers.get('Content-disposition').find("=")+1:]
result.encoding='utf-8'
with open(fName,"wb") as file_obj:
file_obj.write(result.content)
print("File saved as:{}".format(fName))
elif returnType == 'return':
return result.content
#print any error codes
if result.status_code>399:
print(qstr)
print("Error{}\n{}".format(result.status_code,result.text))
def main(args):
start=datetime.datetime.now()
print("Starting {} Workflow at {}".format(
args['workflow'],
start.isoformat()))
if args['workflow'].startswith("languages"):
execute_queries(languages, args['triplestore'])
if args['workflow'].startswith("subjects"):
execute_queries(subjects, args['triplestore'])
if args['workflow'].startswith("test"):
test_queries(languages, args['triplestore'])
if args['workflow'].startswith("elastic"):
push_dataToElasticsearch(args)
if args['workflow'].startswith("graph"):
pull_graph(args)
if args['workflow'].startswith("fedora"):
generate_fedora_refs(args['triplestore'],args['fedoraaction'])
end = datetime.datetime.now()
print("\nFinished {} Workflow at {}, total time={} min".format(
end.isoformat(),
args['workflow'],
(end-start).seconds/60.0))
if __name__ == '__main__':
c = config()
parser=argparse.ArgumentParser()
parser.add_argument(
'workflow',
choices=['languages', 'subjects', 'fedora','graph','elastic'],
help="Run SPARQL workflow, choices: languages, subjects, fedora, graph, elastic")
parser.add_argument(
'--triplestore',
default=c['triplestore'],
help="Triplestore URL")
parser.add_argument(
'--pulltype',
default=c['pulltype'],
help="Write triple statement or resourceURI",
choices=["all","resource"])
parser.add_argument(
'--resourceuri',
help="Used when pulltype=resource, enter the resourceURI as a string w/o <>")
parser.add_argument(
'--sparqlselect',
help="""Used when pulltype=all, enter the sparql triple statement to
select the desired resources. Var ?s1 must resolve to the resourceURI list""")
parser.add_argument(
'--format',
default=c['format'],
help="Sets content-type for request in the header")
parser.add_argument(
'--filename',
default=c['filename'],
help="Specifies filename for output file, default from sparql header")
parser.add_argument(
'--langpref',
default=c['langpref'],
help="Enter the iso 639-1 two letter language code to return a graph with only that language")
parser.add_argument(
'--fedoraaction',
default="test",
help="Enter the actionpath")
parser.add_argument(
'--queryfile',
default=c['queryfile'],
help="File Containing the query to run")
parser.add_argument(
'--mode',
default="normal",
help="enter 'normal' or 'debug'")
parser.add_argument(
'--es_url',
default=c['es_url'],
help="enter 'normal' or 'debug'")
parser.add_argument(
'--bulkactions',
default=c['bulkactions'],
help="change bulk upload settings'")
args=vars(parser.parse_args())
main(args)