-
Notifications
You must be signed in to change notification settings - Fork 0
/
coeus.py
100 lines (77 loc) · 2.85 KB
/
coeus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
# -*- coding: utf-8 -*-
# Copyright 2013, Luís Bastião Silva, Universidade de Aveiro
# Authors: Luis A. Bastiao Silva <luis.kop@gmail.com>
__title__ = 'coeus'
__version__ = '0.1-dev'
__author__ = 'Luís A. Bastião Silva'
__license__ = 'CC-BY-NC'
__copyright__ = 'Copyright 2013, Luís Bastião Silva, Universidade de Aveiro'
__url__ = 'http://bioinformatics.ua.pt/coeus'
__maintainer__ = 'Luís A. Bastião Silva'
__email__ = 'bastiao@ua.pt'
__version__ = '0.1-dev'
import sys
import time
import json
import requests
from SPARQLWrapper import SPARQLWrapper, JSON
class COEUS(object):
def __init__(self, api_key, host="http://bioinformatics.ua.pt/coeus/"):
self.host = host
self.key = api_key
def triple(self, sub, pred, obj):
content = requests.get(self.host + 'api/triple/' + sub + '/' + pred + '/' + obj + '/js')
print self.host + 'api/triple/' + sub + '/' + pred + '/' + obj + '/js'
return json.loads(content.text)['results']['bindings']
def query(self,query):
sparql = SPARQLWrapper(self.host+"sparql")
sparql.setQuery(query)
sparql.setReturnFormat(JSON)
results = sparql.query()
return results.convert()
def write(self,sub, pred, obj):
if self.key=='':
raise '[COEUS] undefined API key'
else:
content = requests.get(self.host + 'api/' + self.key + '/write/' + sub + '/' + pred + '/' + obj)
result = json.loads(content.text)
if result['status'] != 100:
raise Exception('[COEUS] unable to store triple ')
else:
return True
def update(self, sub, pred, old_obj, new_obj):
if self.key == '':
raise '[COEUS] undefined API key'
else:
content = requests.get(self.host + 'api/' + self.key + '/update/' + sub + '/' + pred + '/' + old_obj + ',' + new_obj)
result = json.loads(content.text)
if result['status'] != 100:
raise '[COEUS] unable to update triple: '
else:
return True
def delete(self,sub, pred, obj):
if self.key == '':
raise '[COEUS] undefined API key'
else:
content = requests.get(self.host + 'api/' + self.key + '/delete/' + sub + '/' + pred + '/' + obj)
result = json.loads(content.text)
if result['status'] != 100:
raise '[COEUS] unable to delete triple: '
else:
return True
def search(self,query):
content = requests.get(self.host + 'textsearch/query='+query)
result = json.loads(content.text)
return result
def read(self,filename, language):
if self.key == '':
raise '[COEUS] undefined API key'
else:
f = open(filename, 'r')
post_data = json.dumps({'data':f.read(), 'lang':language})
content = requests.post(self.host + 'api/' + self.key + '/read/', json.loads(post_data))
result = json.loads(content.text)
if result['status'] != 100:
return '[COEUS] unable to read "'+ filename+ '" - '+ result['message']
else:
return '[COEUS] read of "'+ filename+ '" OK'