Skip to content

Commit

Permalink
Merge pull request #33 from OphidiaBigData/feature-project
Browse files Browse the repository at this point in the history
Add option to specify 'project' value for interacting with the job scheduler
  • Loading branch information
eldoo authored Jul 20, 2021
2 parents 46fcfa9 + 959fd64 commit bc5ba44
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 17 deletions.
16 changes: 13 additions & 3 deletions PyOphidia/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def get_linenumber():


class Client():
"""Client(username='', password='', server='', port='11732', token='', read_env=False, api_mode=True) -> obj
"""Client(username='', password='', server='', port='11732', token='', read_env=False, api_mode=True, project=None) -> obj
Attributes:
username: Ophidia username
Expand All @@ -57,6 +57,7 @@ class Client():
last_return_value: Last return value associated to response
last_error: Last error value associated to response
last_exec_time: Last execution time associated to response
project: Project ID to be used for the resource manager (if required)
Methods:
submit(query, display=False) -> self : Submit a query like 'operator=myoperator;param1=value1;' or 'myoperator param1=value1;' to the
Expand All @@ -75,8 +76,8 @@ class Client():
pretty_print(response, response_i) -> self : Prints the last_response JSON string attribute as a formatted response
"""

def __init__(self, username='', password='', server='', port='11732', token='', read_env=False, api_mode=True):
"""Client(username='', password='', server='', port='11732', token='', read_env=False, api_mode=True) -> obj
def __init__(self, username='', password='', server='', port='11732', token='', read_env=False, api_mode=True, project=None):
"""Client(username='', password='', server='', port='11732', token='', read_env=False, api_mode=True, project=None) -> obj
:param api_mode: If True, use the class as an API and catch also framework-level errors
:type api_mode: bool
:param username: Ophidia username
Expand All @@ -91,6 +92,8 @@ def __init__(self, username='', password='', server='', port='11732', token='',
:type token: str
:param read_env: If True read the client variables from the environment
:type read_env: bool
:param project: String with project ID to be used for job scheduling
:type project: str
:returns: None
:rtype: None
:raises: RuntimeError
Expand Down Expand Up @@ -125,6 +128,8 @@ def __init__(self, username='', password='', server='', port='11732', token='',
else:
access_token = os.environ.get('OPH_TOKEN')

self.project = project

self.session = ''
self.cwd = '/'
self.cdd = '/'
Expand Down Expand Up @@ -188,6 +193,7 @@ def __del__(self):
del self.last_jobid
del self.last_return_value
del self.last_error
del self.project

def submit(self, query, display=False):
"""submit(query,display=False) -> self : Submit a query like 'operator=myoperator;param1=value1;' or 'myoperator param1=value1;' to the Ophidia server
Expand Down Expand Up @@ -227,6 +233,8 @@ def submit(self, query, display=False):
query += 'exec_mode=' + self.exec_mode + ';'
if self.ncores and 'ncores' not in query:
query += 'ncores=' + str(self.ncores) + ';'
if self.project and 'project' not in query:
query += 'project=' + str(self.project) + ';'
self.last_request = query
try:
self.last_response, self.last_jobid, newsession, self.last_return_value, self.last_error = _ophsubmit.submit(self.username, self.password, self.server, self.port, query)
Expand Down Expand Up @@ -750,6 +758,8 @@ def wsubmit(self, workflow, *params):
request['exec_mode'] = self.exec_mode
if self.ncores and 'ncores' not in request:
request['ncores'] = str(self.ncores)
if self.project and 'project' not in request:
request['project'] = str(self.project)
self.last_request = json.dumps(request)
try:
err, err_msg = self.wisvalid(self.last_request)
Expand Down
10 changes: 6 additions & 4 deletions PyOphidia/cube.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class Cube():
-> dict or None : wrapper of the operator OPH_UNPUBLISH
Class Methods:
setclient(username='', password='', server, port='11732', token='', read_env=False)
setclient(username='', password='', server, port='11732', token='', read_env=False, project=None)
-> None : Instantiate the Client, common for all Cube objects, for submitting requests
b2drop(action='put', auth_path='-', src_path=None, dst_path='-', cdd=None, exec_mode='sync', display=False)
-> dict or None : wrapper of the operator OPH_B2DROP
Expand Down Expand Up @@ -234,8 +234,8 @@ class Cube():
client = None

@classmethod
def setclient(cls, username='', password='', server='', port='11732', token='', read_env=False, api_mode=True):
"""setclient(username='', password='', server='', port='11732', token='', read_env=False, api_mode=True) -> None : Instantiate the Client, common for all Cube objects, for submitting requests
def setclient(cls, username='', password='', server='', port='11732', token='', read_env=False, api_mode=True, project=None):
"""setclient(username='', password='', server='', port='11732', token='', read_env=False, api_mode=True, project=None) -> None : Instantiate the Client, common for all Cube objects, for submitting requests
:param username: Ophidia user
:type username: str
Expand All @@ -251,12 +251,14 @@ def setclient(cls, username='', password='', server='', port='11732', token='',
:type read_env: bool
:param api_mode: If True, use the class as an API and catch also framework-level errors
:type api_mode: bool
:param project: String with project ID to be used for job scheduling
:type project: str
:returns: None
:rtype: None
"""

try:
cls.client = _client.Client(username, password, server, port, token, read_env, api_mode)
cls.client = _client.Client(username, password, server, port, token, read_env, api_mode, project)
except Exception as e:
print(get_linenumber(), "Something went wrong in setting the client:", e)
finally:
Expand Down
30 changes: 20 additions & 10 deletions PyOphidia/ophsubmit.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,13 @@ def get_linenumber():
WRAPPING_WORKFLOW3_1 = "\","
WRAPPING_WORKFLOW4 = "\n \"callback_url\":\""
WRAPPING_WORKFLOW4_1 = "\","
WRAPPING_WORKFLOW5 = "\n \"tasks\": [\n {\n \"name\":\"Task 0\",\n \"operator\":\""
WRAPPING_WORKFLOW5_1 = "\",\n \"arguments\": ["
WRAPPING_WORKFLOW6 = "\"%s\""
WRAPPING_WORKFLOW7 = ",\"%s\""
WRAPPING_WORKFLOW8 = "]\n }\n ]\n}"
WRAPPING_WORKFLOW5 = "\n \"project\":\""
WRAPPING_WORKFLOW5_1 = "\","
WRAPPING_WORKFLOW6 = "\n \"tasks\": [\n {\n \"name\":\"Task 0\",\n \"operator\":\""
WRAPPING_WORKFLOW6_1 = "\",\n \"arguments\": ["
WRAPPING_WORKFLOW7 = "\"%s\""
WRAPPING_WORKFLOW8 = ",\"%s\""
WRAPPING_WORKFLOW9 = "]\n }\n ]\n}"


def submit(username, password, server, port, query):
Expand Down Expand Up @@ -130,19 +132,27 @@ def submit(username, password, server, port, query):
if element_list[0] == 'callback_url':
request += WRAPPING_WORKFLOW4 + element_list[1] + WRAPPING_WORKFLOW4_1
break
request += WRAPPING_WORKFLOW5 + operator + WRAPPING_WORKFLOW5_1
# project
for element in query_list:
if element:
element_list = element.split('=', 1)
if element_list[0] == 'project':
request += WRAPPING_WORKFLOW5 + element_list[1] + WRAPPING_WORKFLOW5_1
break
request += WRAPPING_WORKFLOW6 + operator + WRAPPING_WORKFLOW6_1
# all remaining arguments
step = 0
for element in query_list:
if element:
element_list = element.split('=', 1)
if element_list[0] != 'operator' and element_list[0] != 'sessionid' and element_list[0] != 'exec_mode' and element_list[0] != 'callback_url':
if element_list[0] != 'operator' and element_list[0] != 'sessionid' and element_list[0] != 'exec_mode' and element_list[0] != 'callback_url' and element_list[0] != 'project':
step += 1
if step == 1:
request += WRAPPING_WORKFLOW6.replace('%s', element)
else:
request += WRAPPING_WORKFLOW7.replace('%s', element)
request += WRAPPING_WORKFLOW8
else:
request += WRAPPING_WORKFLOW8.replace('%s', element)
request += WRAPPING_WORKFLOW9

try:
# Escape &, <, > and \n chars for http
request = request.replace("&", "&amp;")
Expand Down

0 comments on commit bc5ba44

Please sign in to comment.