Skip to content

Commit

Permalink
SOLR-FEEDER - add business address in bor solr update (bcgov#1528)
Browse files Browse the repository at this point in the history
* SOLR-FEEDER - add business address in bor solr update

Signed-off-by: Kial Jinnah <kialj876@gmail.com>

* lint fixes

Signed-off-by: Kial Jinnah <kialj876@gmail.com>

---------

Signed-off-by: Kial Jinnah <kialj876@gmail.com>
  • Loading branch information
kialj876 authored May 2, 2024
1 parent 6b08e6a commit 50498a2
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 94 deletions.
192 changes: 99 additions & 93 deletions solr-feeder/solr_feeder/services/colin.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,29 +51,60 @@ def _parse_party(party: dict, legal_type: str):
return party


def get_business_info(legal_type: str, identifier: str, token: str) -> tuple[dict, dict]:
"""Return the basic business info of the business."""
def _get_colin_api_resp(path: str, token: str, accepted_codes: list[HTTPStatus]) -> tuple[dict, dict]:
"""Return colin-api response json for the given path."""
try:
headers = {'Authorization': 'Bearer ' + token}

res = requests.get(f'{current_app.config["COLIN_API_URL"]}/businesses/{legal_type}/{identifier}',
headers=headers,
timeout=current_app.config['COLIN_API_TIMEOUT'])
resp = requests.get(f'{current_app.config["COLIN_API_URL"]}/{path}',
headers=headers,
timeout=current_app.config['COLIN_API_TIMEOUT'])

if resp.status_code not in accepted_codes:
logging.debug('COLIN service unexpected response code %s %s %s', resp.status_code, path, resp.json())
return None, {'message': resp.json(), 'status_code': resp.status_code}

return resp.json(), None

except (exceptions.ConnectionError, exceptions.Timeout) as err:
logging.debug('COLIN connection failure %s', err)
return None, {'message': 'COLIN connection failure.', 'status_code': HTTPStatus.GATEWAY_TIMEOUT}
except Exception as err:
logging.debug('COLIN service error %s', err.with_traceback(None))
return None, {'message': 'COLIN service error.', 'status_code': HTTPStatus.INTERNAL_SERVER_ERROR}


if res.status_code != HTTPStatus.OK:
return None, {'message': res.json(), 'status_code': res.status_code}
def get_business_info(legal_type: str, identifier: str, token: str) -> tuple[dict, dict]:
"""Return the business info for the identifier and legal type."""
try:
business_json, error = _get_colin_api_resp(f'businesses/{legal_type}/{identifier}', token, [HTTPStatus.OK])
if error:
return None, error

bus_addresses = None
legal_types_with_ro = ['LP', 'BC', 'C', 'CC', 'CCC', 'CUL',
'QA', 'QB', 'QC', 'QD', 'QE', 'ULC',
'UQA', 'UQB', 'UQC', 'UQD', 'UQE']
if legal_type in legal_types_with_ro:
bus_addresses, error = _get_colin_api_resp(f'businesses/{legal_type}/{identifier}/office',
token,
[HTTPStatus.OK])
if error:
# log error for ops and continue (address info should not block the update)
logging.error('Error getting address data while updating %s.', identifier)

res_json = res.json()
business = {
'email': business_json['business']['email'],
'goodStanding': business_json['business'].get('goodStanding', None),
# add BC prefix to payload if its a BC limited company
'email': res_json['business']['email'],
'goodStanding': res_json['business'].get('goodStanding', None),
'identifier': f'BC{identifier}' if legal_type == 'BC' else identifier,
'legalName': res_json['business']['legalName'],
'legalName': business_json['business']['legalName'],
'legalType': legal_type,
'state': 'HISTORICAL' if res_json['business']['corpStateClass'] == 'HIS' else 'ACTIVE',
'taxId': res_json['business']['businessNumber']
'state': 'HISTORICAL' if business_json['business']['corpStateClass'] == 'HIS' else 'ACTIVE',
'taxId': business_json['business']['businessNumber']
}
if bus_addresses and (address := bus_addresses.get('registeredOffice', {}).get('deliveryAddress')):
business['addresses'] = [{**address, 'addressType': 'DELIVERY'}]

return {'business': business}, None

Expand All @@ -87,89 +118,64 @@ def get_business_info(legal_type: str, identifier: str, token: str) -> tuple[dic

def get_owners(legal_type: str, identifier: str, token: str) -> tuple[list[dict], dict]:
"""Return the firm owners of the business."""
try:
owners = []
if legal_type not in FIRM_LEGAL_TYPES:
return [], None
owners = []
if legal_type not in FIRM_LEGAL_TYPES:
return [], None

parties_url = f'{current_app.config["COLIN_API_URL"]}/businesses/{legal_type}/{identifier}/parties'
headers = {'Authorization': 'Bearer ' + token}
# get owners
fio_res = requests.get(f'{parties_url}?partyType=Firm Individual Owner',
headers=headers,
timeout=current_app.config['COLIN_API_TIMEOUT'])
fbo_res = requests.get(f'{parties_url}?partyType=Firm Business Owner',
headers=headers,
timeout=current_app.config['COLIN_API_TIMEOUT'])
if fio_res.status_code not in [HTTPStatus.OK, HTTPStatus.NOT_FOUND]:
return None, {'message': fio_res.json(), 'status_code': fio_res.status_code}
if fbo_res.status_code not in [HTTPStatus.OK, HTTPStatus.NOT_FOUND]:
return None, {'message': fbo_res.json(), 'status_code': fbo_res.status_code}

fio_json = fio_res.json()
fbo_json = fbo_res.json()
colin_owners = fio_json.get('directors', []) + fbo_json.get('directors', [])

for owner in colin_owners:
parsed_owner = _parse_party(owner, legal_type)
# only need to include officer and roles for this flow
owners.append({'officer': parsed_owner['officer'], 'roles': parsed_owner['roles']})

return owners, None
parties_path = f'businesses/{legal_type}/{identifier}/parties'
# get owners
fio_json, fio_error = _get_colin_api_resp(f'{parties_path}?partyType=Firm Individual Owner',
token,
[HTTPStatus.OK, HTTPStatus.NOT_FOUND])
fbo_json, fbo_error = _get_colin_api_resp(f'{parties_path}?partyType=Firm Business Owner',
token,
[HTTPStatus.OK, HTTPStatus.NOT_FOUND])
if fio_error or fbo_error:
return None, fio_error if fio_error else fbo_error

except (exceptions.ConnectionError, exceptions.Timeout) as err:
logging.error('COLIN connection failure %s', err)
return None, {'message': 'COLIN connection failure.', 'status_code': HTTPStatus.GATEWAY_TIMEOUT}
except Exception as err:
logging.error('COLIN service error %s', err.with_traceback(None))
return None, {'message': 'COLIN service error.', 'status_code': HTTPStatus.INTERNAL_SERVER_ERROR}
colin_owners = fio_json.get('directors', []) + fbo_json.get('directors', [])

for owner in colin_owners:
parsed_owner = _parse_party(owner, legal_type)
# only need to include officer and roles for this flow
owners.append({'officer': parsed_owner['officer'], 'roles': parsed_owner['roles']})

return owners, None


def get_parties(legal_type: str, identifier: str, token: str) -> tuple[list[dict], dict]:
"""Return the parties of the business."""
try:
parties = []
parties_url = f'{current_app.config["COLIN_API_URL"]}/businesses/{legal_type}/{identifier}/parties/all'
headers = {'Authorization': 'Bearer ' + token}
# get all parties
all_parties = requests.get(f'{parties_url}',
headers=headers,
timeout=current_app.config['COLIN_API_TIMEOUT'])
if all_parties.status_code not in [HTTPStatus.OK, HTTPStatus.NOT_FOUND]:
return None, {'message': all_parties.json(), 'status_code': all_parties.status_code}

for party in (all_parties.json()).get('parties', []):
# David R said to ignore these (they are also ignored in the importer job):
# - Data for these is garbage and won't be brought over when they are modernized.
# - party_typ_cds: ('PAS','PDI','PSA','RAD','RAF','RAO','RAS','TAP','TAA','TSP')
ignored_types = ['Partner ATT SK', 'Partner DIRECTOR', 'Partner Sign Auth',
'Partner Reinstatement Applicant - Director of foreign entity',
'Partner Reinstatement Applicant - Foreign Entity Reinstated',
'Partner Reinstatement Applicant - Office of foreign entity',
'Partner Reinstatement Applicant - Shareholder of foreign entity',
'TILMA Alternate Attorney', 'TILMA Primary Attorney', 'TILMA Submitting Party']
roles = []
for role in party['roles']:
# only add valid roles
if role['roleType'] not in ignored_types:
roles.append(role)
if roles:
# has valid roles so add party to update
party['roles'] = roles
if parsed_party := _parse_party(party, legal_type):
# check to make sure it contains a person name (otherwise we will ignore this record)
# - NOTE: some records contain no organization name AND no party name
has_person_name = parsed_party['officer'].get('firstName') \
or parsed_party['officer'].get('middleInitial') \
or parsed_party['officer'].get('lastName')
if has_person_name:
parties.append(parsed_party)

return parties, None

except (exceptions.ConnectionError, exceptions.Timeout) as err:
logging.error('COLIN connection failure %s', err)
return None, {'message': 'COLIN connection failure.', 'status_code': HTTPStatus.GATEWAY_TIMEOUT}
except Exception as err:
logging.error('COLIN service error %s', err.with_traceback(None))
return None, {'message': 'COLIN service error.', 'status_code': HTTPStatus.INTERNAL_SERVER_ERROR}
parties_path = f'businesses/{legal_type}/{identifier}/parties/all'
all_parties_json, error = _get_colin_api_resp(parties_path, token, [HTTPStatus.OK, HTTPStatus.NOT_FOUND])
if error:
return None, error

parties = []
for party in all_parties_json.get('parties', []):
# David R said to ignore these (they are also ignored in the importer job):
# - Data for these is garbage and won't be brought over when they are modernized.
# - party_typ_cds: ('PAS','PDI','PSA','RAD','RAF','RAO','RAS','TAP','TAA','TSP')
ignored_types = ['Partner ATT SK', 'Partner DIRECTOR', 'Partner Sign Auth',
'Partner Reinstatement Applicant - Director of foreign entity',
'Partner Reinstatement Applicant - Foreign Entity Reinstated',
'Partner Reinstatement Applicant - Office of foreign entity',
'Partner Reinstatement Applicant - Shareholder of foreign entity',
'TILMA Alternate Attorney', 'TILMA Primary Attorney', 'TILMA Submitting Party']
roles = []
for role in party['roles']:
# only add valid roles
if role['roleType'] not in ignored_types:
roles.append(role)
if roles:
# has valid roles so add party to update
party['roles'] = roles
if parsed_party := _parse_party(party, legal_type):
# check to make sure it contains a person name (otherwise we will ignore this record)
# - NOTE: some records contain no organization name AND no party name
has_person_name = parsed_party['officer'].get('firstName') \
or parsed_party['officer'].get('middleInitial') \
or parsed_party['officer'].get('lastName')
if has_person_name:
parties.append(parsed_party)

return parties, None
2 changes: 1 addition & 1 deletion solr-feeder/solr_feeder/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
Development release segment: .devN
"""

__version__ = '2.0.8' # pylint: disable=invalid-name
__version__ = '2.0.9' # pylint: disable=invalid-name

0 comments on commit 50498a2

Please sign in to comment.