Skip to content

Commit

Permalink
Finished unit tests. Modified _astro_ingest_complete to use kube_utils.
Browse files Browse the repository at this point in the history
  • Loading branch information
tjacovich committed Nov 20, 2024
1 parent be8fdb3 commit 3344ab8
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 33 deletions.
14 changes: 7 additions & 7 deletions myadsp/kube_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,20 @@ def exec_commands(api_instance, name, namespace, identifier, logger):
namespace,
command=exec_command,
stderr=True, stdin=False,
stdout=True, tty=False)
stdout=True, tty=False)
test=json.loads(''.join(resp.split('\n')[:-1]))
if test['response']['numFound'] >= 0:
if test['response']['numFound'] > 0:
logger.info("pod: {} has record: {}".format(name, identifier) )
logger.info("Response: {} from pod: {}".format(json.dumps(test), identifier))
if test['response']['numFound'] > 1:
logger.error("pod: {} returned more than one record for identifier:{}".format(name, identifier) )
return 1
return 1.
else:
logger.info("pod: {} does not have record: {}".format(name, identifier) )
return 0
return 0.
except:
logger.info("Failed to get response from solr pod")
return 0
return 0.


def check_solr_update_status(ads_config, identifier):
Expand All @@ -56,8 +56,8 @@ def check_solr_update_status(ads_config, identifier):
core_v1 = core_v1_api.CoreV1Api()

pod_list = core_v1.list_namespaced_pod(namespace)
num_updated = 0
num_total = 0
num_updated = 0.
num_total = 0.
for pod in pod_list.items:
if "solr-searcher" in pod.metadata.name:
num_updated +=exec_commands(core_v1, pod.metadata.name, namespace, identifier, logger)
Expand Down
67 changes: 59 additions & 8 deletions myadsp/tests/test_kube_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ def __init__(self, *args, **kwargs):
self.name = "solr-searcher-us-east-1d-0"

return MockPods(*args, **kwargs)

class stream():
def __init__(self, *args, **kwargs):
self.stream= None

def read_namespaced_pod(*args, **kwargs):
class MockPodApI():
def __init__(self, *args, **kwargs):
Expand All @@ -41,19 +44,67 @@ def get_status(self, *args, **kwargs):
raise ApiException(status=400, reason='test failure')
return MockPodApI(*args, **kwargs).status

def stream(*args, **kwargs):
class MockStream():
def __init__(self, *args, **kwargs):
self.resp = self.connect_get_namespaced_pod_exec()
def connect_get_namespaced_pod_exec(*args, **kwargs):
return '{\n"responseHeader":{\n"status":0,\n"QTime":425,\n"params":{\n"q":"identifier:2411.08880",\n"fl":"identifier",\n"wt":"json"}},\n"response":{"numFound":1,"start":0,"docs":[\n{\n"identifier":["2024arXiv241108880K",\n"arXiv:2411.08880"]}]\n}}\nformatted string% "'
return MockStream()

def connect_get_namespaced_pod_exec(*args, **kwargs):
return '{\n"responseHeader":{\n"status":0,\n"QTime":425,\n"params":{\n"q":"identifier:2411.08880",\n"fl":"identifier",\n"wt":"json"}},\n"response":{"numFound":1,"start":0,"docs":[\n{\n"identifier":["2024arXiv241108880K",\n"arXiv:2411.08880"]}]\n}}\nformatted string% "\n'

return '{\n"responseHeader":{\n"status":0,\n"QTime":425,\n"params":{\n"q":"identifier:2411.08880",\n"fl":"identifier",\n"wt":"json"}},\n"response":{"numFound":1,"start":0,"docs":[\n{\n"identifier":["2024arXiv241108880K",\n"arXiv:2411.08880"]}]\n}}\nformatted string% "'

class MockStream():
def __init__(self, *args, **kwargs):
self.resp = self.connect_get_namespaced_pod_exec(*args, **kwargs)
def connect_get_namespaced_pod_exec(*args, **kwargs):
success = kwargs.get("found", True)
multivalue = kwargs.get("multivalue", False)
response = kwargs.get("response", True)
if success and not multivalue:
return '{\n"responseHeader":{\n"status":0,\n"QTime":425,\n"params":{\n"q":"identifier:2411.08880",\n"fl":"identifier",\n"wt":"json"}},\n"response":{"numFound":1,"start":0,"docs":[\n{\n"identifier":["2024arXiv241108880K",\n"arXiv:2411.08880"]}]\n}}\nformatted string% "'
elif multivalue:
return '{\n"responseHeader":{\n"status":0,\n"QTime":425,\n"params":{\n"q":"identifier:2411.08880",\n"fl":"identifier",\n"wt":"json"}},\n"response":{"numFound":2,"start":0,"docs":[\n{\n"identifier":["2024arXiv241108880K",\n"arXiv:2411.08880"]},\n{\n"identifier":["2024arXiv241108880K",\n"arXiv:2411.08880"]}]\n}}\nformatted string% "'
elif response:
return '{\n"responseHeader":{\n"status":0,\n"QTime":425,\n"params":{\n"q":"identifier:2411.08880",\n"fl":"identifier",\n"wt":"json"}},\n"response":{"numFound":0,"start":0,"docs":[\n{\n"identifier":[]}]\n}}\nformatted string% "'
else:
return '<html>404 not found </html>'

class TestkubernetesServices(unittest.TestCase):
@patch('kubernetes.client.api.core_v1_api.CoreV1Api', side_effect=MockKubernetes, load_instance=True )
@patch('kubernetes.client.api.core_v1_api.CoreV1Api', side_effect=MockKubernetes, load_instance=True)
@patch('kubernetes.config.load_kube_config', return_value=Mock())
def test_successful_read_namepaced_pod(self, core_v1, mock_config):
def test_successful_read_namepaced_pod_found(self, core_v1, mock_config):
ads_config={"KUBE_ENV":"Success"}
identifier='Success'
kube_utils.check_solr_update_status(ads_config, identifier)

@patch('kubernetes.client.api.core_v1_api.CoreV1Api', side_effect=MockKubernetes, load_instance=True )
with unittest.mock.patch.object(kube_utils, 'stream', return_value=MockStream().resp):
kube_utils.check_solr_update_status(ads_config, identifier)

@patch('kubernetes.client.api.core_v1_api.CoreV1Api', side_effect=MockKubernetes, load_instance=True)
@patch('kubernetes.config.load_kube_config', return_value=Mock())
def test_successful_read_namepaced_pod_found(self, core_v1, mock_config):
ads_config={"KUBE_ENV":"Success"}
identifier='Success'
with unittest.mock.patch.object(kube_utils, 'stream', return_value=MockStream(multivalue=True).resp):
kube_utils.check_solr_update_status(ads_config, identifier)

@patch('kubernetes.client.api.core_v1_api.CoreV1Api', side_effect=MockKubernetes, load_instance=True)
@patch('kubernetes.config.load_kube_config', return_value=Mock())
def test_successful_read_namepaced_pod_not_found(self, core_v1, mock_config):
ads_config={"KUBE_ENV":"Success"}
identifier='Success'
with unittest.mock.patch.object(kube_utils, 'stream', return_value=MockStream(found=False).resp):
kube_utils.check_solr_update_status(ads_config, identifier)

@patch('kubernetes.client.api.core_v1_api.CoreV1Api', side_effect=MockKubernetes, load_instance=True)
@patch('kubernetes.config.load_kube_config', return_value=Mock())
def test_successful_read_namepaced_pod_unsuccessful_get_namespaced_pod_exec(self, core_v1, mock_config):
ads_config={"KUBE_ENV":"Success"}
identifier='Success'
with unittest.mock.patch.object(kube_utils, 'stream', return_value=MockStream(found=False, response=False).resp):
kube_utils.check_solr_update_status(ads_config, identifier)

@patch('kubernetes.client.api.core_v1_api.CoreV1Api', side_effect=MockKubernetes, load_instance=True)
@patch('kubernetes.config.load_kube_config', return_value=Mock())
def test_unsuccessful_read_namepaced_pod(self, core_v1, mock_config):
ads_config={"KUBE_ENV":"Failure"}
Expand Down
22 changes: 4 additions & 18 deletions run.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,18 +200,9 @@ def _astro_ingest_complete(date=None, sleep_delay=60, sleep_timeout=7200, admin_
num_sampled = 0
for s in sample:
num_sampled += 1
r = app.client.get('{0}?q=identifier:{1}&fl=bibcode,identifier,entry_date'.format(config.get('API_SOLR_QUERY_ENDPOINT'), s),
headers={'Authorization': 'Bearer ' + config.get('API_TOKEN')})
# if there's a solr error, sleep then move to the next bibcode
if r.status_code != 200:
time.sleep(sleep_delay)
total_delay += sleep_delay
logger.error('Error retrieving bibcode {0} from Solr ({1} {2}), sleeping {3}s, for a total delay of {4}s'.
format(s, r.status_code, r.text, sleep_delay, total_delay))
continue
r = kube_utils.check_solr_update_status(config, s)

numfound = r.json()['response']['numFound']
if numfound == 0:
if not r:
# nothing found - if all bibcodes in the sample were tried, sleep then start the while loop again
if num_sampled == config.get('ASTRO_SAMPLE_SIZE'):
time.sleep(sleep_delay)
Expand All @@ -224,13 +215,8 @@ def _astro_ingest_complete(date=None, sleep_delay=60, sleep_timeout=7200, admin_
'Astronomy ingest not complete (test astro bibcode: {0}). Trying the next in the sample.'
.format(s))
continue
elif numfound > 1:
# returning this as true for now, since technically something was found
logger.error('Too many records returned for bibcode {0}'.format(s))

logger.info('Numfound: {0} for test bibcode {1}. Response: {2}. URL: {3}'.format(numfound, s,
json.dumps(r.json()),
r.url))

logger.info('Test bibcode {} Exists in all searchers.'.format(s))
return s

logger.warning('Astronomy ingest did not complete within the {0}s timeout limit. Exiting.'.format(sleep_timeout))
Expand Down

0 comments on commit 3344ab8

Please sign in to comment.