Skip to content

Commit

Permalink
Refactoring for reducing number of queries (#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
renato2099 authored Mar 29, 2023
1 parent 4847d53 commit 1727cb1
Show file tree
Hide file tree
Showing 5 changed files with 253 additions and 110 deletions.
90 changes: 90 additions & 0 deletions tagbase_server/tagbase_server/test/test_ingest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# coding: utf-8

import unittest
from unittest import mock

import psycopg2
import tagbase_server.utils.processing_utils as pu


class TestIngest(unittest.TestCase):
PG_VERSION = "postgres:9.5"
SAMPLE_METADATA_LINES = [
"// global attributes:",
"// etag device attributes:",
':instrument_name = "159903_2012_117464"',
':instrument_type = "s"',
':manufacturer = "Wildlife"',
':model = "SPOT"',
':owner_contact = "a@a.org"',
':person_owner = "foo bar"',
':ptt = "117464"',
]

fake_submission_id = 1
fake_submission_filename = "test_file"

@mock.patch("psycopg2.connect")
def test_processing_file_metadata_with_existing_attributes(self, mock_connect):
metadata_attribs_in_db = [[1, "instrument_name"], [2, "model"]]
# result of psycopg2.connect(**connection_stuff)
mock_con = mock_connect.return_value
# result of con.cursor(cursor_factory=DictCursor)
mock_cur = mock_con.cursor.return_value
# return this when calling cur.fetchall()
mock_cur.fetchall.return_value = metadata_attribs_in_db

conn = psycopg2.connect(
dbname="test",
user="test",
host="localhost",
port="32780",
password="test",
)
cur = conn.cursor()

metadata = []
processed_lines = pu.process_global_attributes(
TestIngest.SAMPLE_METADATA_LINES,
cur,
TestIngest.fake_submission_id,
metadata,
TestIngest.fake_submission_filename,
)
assert len(TestIngest.SAMPLE_METADATA_LINES), processed_lines + 1
assert len(metadata_attribs_in_db), len(metadata)
assert metadata[0][2], "159903_2012_117464"
assert metadata[1][2], "SPOT"

@mock.patch("psycopg2.connect")
def test_processing_file_metadata_without_attributes(self, mock_connect):
metadata_attribs_in_db = []
# result of psycopg2.connect(**connection_stuff)
mock_con = mock_connect.return_value
# result of con.cursor(cursor_factory=DictCursor)
mock_cur = mock_con.cursor.return_value
# return this when calling cur.fetchall()
mock_cur.fetchall.return_value = metadata_attribs_in_db

conn = psycopg2.connect(
dbname="test",
user="test",
host="localhost",
port="32780",
password="test",
)
cur = conn.cursor()

metadata = []
processed_lines = pu.process_global_attributes(
TestIngest.SAMPLE_METADATA_LINES,
cur,
TestIngest.fake_submission_id,
metadata,
TestIngest.fake_submission_filename,
)
assert len(TestIngest.SAMPLE_METADATA_LINES), processed_lines + 1


if __name__ == "__main__":
unittest.main()
1 change: 1 addition & 0 deletions tagbase_server/tagbase_server/utils/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .processing_utils import *
249 changes: 139 additions & 110 deletions tagbase_server/tagbase_server/utils/processing_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import os
import logging
from datetime import datetime as dt
from io import StringIO
Expand All @@ -7,46 +6,82 @@
import pandas as pd
import psycopg2.extras
import pytz
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from tzlocal import get_localzone

from tagbase_server.utils.db_utils import connect
from tagbase_server.utils.slack_utils import post_msg

logger = logging.getLogger(__name__)
slack_token = os.environ.get("SLACK_BOT_TOKEN", "")
slack_channel = os.environ.get("SLACK_BOT_CHANNEL", "tagbase-server")
client = WebClient(token=slack_token)


def process_global_attributes(
line, cur, submission_id, metadata, submission_filename, line_counter
def process_all_lines_for_global_attributes(
global_attributes_lines,
cur,
submission_id,
metadata,
submission_filename,
line_counter,
):
logger.debug("Processing global attribute: %s", line)
tokens = line.strip()[1:].split(" = ")
logger.debug("Processing token: %s", tokens)
cur.execute(
"SELECT attribute_id FROM metadata_types WHERE attribute_name = %s",
(tokens[0],),
attrbs_map = {}
for line in global_attributes_lines:
line = line.strip()
logger.debug("Processing global attribute: %s", line)
tokens = line[1:].split(" = ")
# attribute_name = tokens[0], attribute_value = tokens[1]
if len(tokens) > 1:
attrbs_map[tokens[0]] = tokens[1]
else:
logger.warning("Metadata line %s NOT in expected format!", line)

attrbs_names = ", ".join(
["'{}'".format(attrib_name) for attrib_name in attrbs_map.keys()]
)
attrbs_ids_query = (
"SELECT attribute_id, attribute_name FROM metadata_types "
"WHERE attribute_name IN ({})".format(attrbs_names)
)
logger.debug("Query=%s", attrbs_ids_query)
cur.execute(attrbs_ids_query)
rows = cur.fetchall()
if len(rows) == 0:

str_submission_id = str(submission_id)
for row in rows:
attribute_id = row[0]
attribute_name = row[1]
attribute_value = attrbs_map[attribute_name]
metadata.append((str_submission_id, str(attribute_id), attribute_value))
attrbs_map.pop(attribute_name)

if len(attrbs_map.keys()) > 0:
not_found_attributes = ", ".join(attrbs_map.keys())
msg = (
f"*{submission_filename}* _line:{line_counter}_ - "
f"Unable to locate attribute_name *{tokens[0]}* in _metadata_types_ table."
f"Unable to locate attribute_names *{not_found_attributes}* in _metadata_types_ table."
)
post_msg(msg)

logger.warning(msg)
try:
client.chat_postMessage(
channel=slack_channel, text="<!channel> :warning: " + msg
)
except SlackApiError as e:
logger.error(e)
else:
str_submission_id = str(submission_id)
str_row = str(rows[0][0])
metadata.append((str_submission_id, str_row, tokens[1]))

def process_global_attributes(lines, cur, submission_id, metadata, submission_filename):
processed_lines = 0
global_attributes = []
for line in lines:
processed_lines += 1
if line.startswith("//"):
continue
elif line.strip().startswith(":"):
global_attributes.append(line)
else:
break

process_all_lines_for_global_attributes(
global_attributes,
cur,
submission_id,
metadata,
submission_filename,
processed_lines,
)
return processed_lines - 1 if processed_lines > 0 else 0


def process_etuff_file(file, version=None, notes=None):
Expand Down Expand Up @@ -82,97 +117,91 @@ def process_etuff_file(file, version=None, notes=None):

metadata = []
proc_obs = []

s_time = time.perf_counter()
with open(file, "rb") as data:
lines = [line.decode("utf-8", "ignore") for line in data.readlines()]
variable_lookup = {}
line_counter = 0
for line in lines:
line_counter += 1
if line.startswith("//"):
continue
elif line.strip().startswith(":"):
process_global_attributes(
line,
cur,
submission_id,
metadata,
submission_filename,
line_counter,
)
lines_length = len(lines)

line_counter = 0
variable_lookup = {}

metadata_lines = process_global_attributes(
lines, cur, submission_id, metadata, submission_filename
)
line_counter += metadata_lines

for counter in range(metadata_lines, lines_length):
line = lines[line_counter]
line_counter += 1
tokens = line.split(",")
tokens = [token.replace('"', "") for token in tokens]
if tokens:
variable_name = tokens[3]
if variable_name in variable_lookup:
variable_id = variable_lookup[variable_name]
else:
# Parse proc_observations
tokens = line.split(",")
tokens = [token.replace('"', "") for token in tokens]
if tokens:
variable_name = tokens[3]
if variable_name in variable_lookup:
variable_id = variable_lookup[variable_name]
else:
cur.execute(
"SELECT variable_id FROM observation_types WHERE variable_name = %s",
(variable_name,),
)
row = cur.fetchone()
if row:
variable_id = row[0]
else:
try:
logger.debug(
"variable_name=%s\ttokens=%s", variable_name, tokens
)
cur.execute(
"SELECT variable_id FROM observation_types WHERE variable_name = %s",
(variable_name,),
"INSERT INTO observation_types("
"variable_name, variable_units) VALUES (%s, %s) "
"ON CONFLICT (variable_name) DO NOTHING",
(variable_name, tokens[4].strip()),
)
row = cur.fetchone()
if row:
variable_id = row[0]
else:
try:
logger.debug(variable_name, tokens)
cur.execute(
"INSERT INTO observation_types("
"variable_name, variable_units) VALUES (%s, %s) "
"ON CONFLICT (variable_name) DO NOTHING",
(variable_name, tokens[4].strip()),
)
except (
Exception,
psycopg2.DatabaseError,
) as error:
logger.error(
"variable_id '%s' already exists in 'observation_types'. tokens:"
" '%s. \nerror: %s",
variable_name,
tokens,
error,
)
conn.rollback()
cur.execute(
"SELECT nextval('observation_types_variable_id_seq')"
)
variable_id = cur.fetchone()[0]
variable_lookup[variable_name] = variable_id
date_time = None
if tokens[0] != '""' and tokens[0] != "":
if tokens[0].startswith('"'):
tokens[0].replace('"', "")
date_time = dt.strptime(
tokens[0], "%Y-%m-%d %H:%M:%S"
).astimezone(pytz.utc)
else:
stripped_line = line.strip("\n")
msg = (
f"*{submission_filename}* _line:{line_counter}_ - "
f"No datetime... skipping line: {stripped_line}"
except (
Exception,
psycopg2.DatabaseError,
) as error:
logger.error(
"variable_id '%s' already exists in 'observation_types'. tokens:"
" '%s. \nerror: %s",
variable_name,
tokens,
error,
)
logger.warning(msg)
try:
client.chat_postMessage(
channel=slack_channel,
text="<!channel> :warning: " + msg,
)
except SlackApiError as e:
logger.error(e)
continue
proc_obs.append(
[
date_time,
variable_id,
tokens[2],
submission_id,
str(submission_id),
]
conn.rollback()
cur.execute(
"SELECT nextval('observation_types_variable_id_seq')"
)
variable_id = cur.fetchone()[0]
variable_lookup[variable_name] = variable_id
date_time = None
if tokens[0] != '""' and tokens[0] != "":
if tokens[0].startswith('"'):
tokens[0].replace('"', "")
date_time = dt.strptime(
tokens[0], "%Y-%m-%d %H:%M:%S"
).astimezone(pytz.utc)
else:
stripped_line = line.strip("\n")
msg = (
f"*{submission_filename}* _line:{line_counter}_ - "
f"No datetime... skipping line: {stripped_line}"
)
post_msg(msg)
continue

proc_obs.append(
[
date_time,
variable_id,
tokens[2],
submission_id,
str(submission_id),
]
)

len_proc_obs = len(proc_obs)
e_time = time.perf_counter()
sub_elapsed = round(e_time - s_time, 2)
Expand Down
Loading

0 comments on commit 1727cb1

Please sign in to comment.