Skip to content

Commit

Permalink
Merge pull request #142 from usc-isi-i2/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
saggu committed Nov 9, 2017
2 parents 5d2a001 + efc9b78 commit d964c5e
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 26 deletions.
73 changes: 51 additions & 22 deletions etk/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@
import logging
import logstash
import signal
import datetime

_KEY = 'key'
_VALUE = 'value'
_QUALIFIERS = 'qualifiers'
_KNOWLEDGE_GRAPH = "knowledge_graph"
_EXTRACTION_POLICY = 'extraction_policy'
_KEEP_EXISTING = 'keep_existing'
Expand Down Expand Up @@ -129,6 +133,7 @@
_SEGMENT_TITLE = "title"
_SEGMENT_INFERLINK_DESC = "inferlink_description"
_SEGMENT_OTHER = "other_segment"
_SEGMENT_NAME = "segment_name"

_METHOD_INFERLINK = "inferlink"

Expand Down Expand Up @@ -662,7 +667,13 @@ def process(self, doc, create_knowledge_graph=False, html_description=False):
raise e
else:
return None
time_taken_process = time.time() - start_time_process
end_time_process = time.time()
time_taken_process = end_time_process - start_time_process
if '@execution_profile' not in doc:
doc['@execution_profile'] = dict()
doc['@execution_profile']['@etk_start_time'] = datetime.datetime.utcfromtimestamp(start_time_process).isoformat()
doc['@execution_profile']['@etk_end_time'] = datetime.datetime.utcfromtimestamp(end_time_process).isoformat()
doc['@execution_profile']['@etk_process_time'] = end_time_process - start_time_process
if time_taken_process > 5:
extra = dict()
extra['time_taken'] = time_taken
Expand All @@ -675,7 +686,7 @@ def process(self, doc, create_knowledge_graph=False, html_description=False):

def convert_json_content(self, doc, json_content_extractor):
input_path = json_content_extractor[_INPUT_PATH]
field_name = json_content_extractor[_FIELD_NAME]
segment_name = json_content_extractor[_SEGMENT_NAME]
val_list = list()

if input_path not in self.json_content_paths:
Expand All @@ -690,24 +701,40 @@ def convert_json_content(self, doc, json_content_extractor):
o = dict()
o[_TEXT] = str(val)
val_list.append(o)
elif isinstance(val, dict):
if _VALUE in val:
o = dict()
o[_TEXT] = val[_VALUE]
if [_KEY] in val:
o[_KEY] = val[_KEY]
if _QUALIFIERS in val:
o[_QUALIFIERS] = val[_QUALIFIERS]
val_list.append(o)
else:
if val:
msg = 'Error while extracting json content, input path: {} is not a leaf node in the json ' \
'document'.format(input_path)
msg = 'Error while extracting json content, input path: {} is either not a leaf node in ' \
'the json or not a dict with keys \'value\', \'key\' and/or \'qualifiers\' ' \
'document'.format(input_path)
self.log(msg, _ERROR)
print msg
if self.global_error_handling == _RAISE_ERROR:
raise ValueError(msg)
if len(val_list) > 0:
if _CONTENT_EXTRACTION not in doc:
doc[_CONTENT_EXTRACTION] = dict()
if field_name not in doc[_CONTENT_EXTRACTION]:
doc[_CONTENT_EXTRACTION][field_name] = list()
doc[_CONTENT_EXTRACTION][field_name].extend(val_list)
if segment_name not in doc[_CONTENT_EXTRACTION]:
doc[_CONTENT_EXTRACTION][segment_name] = list()
doc[_CONTENT_EXTRACTION][segment_name].extend(val_list)
return doc

def extract_as_is(self, d, config=None):
return self._relevant_text_from_context(d[_TEXT], {"value": d[_TEXT]}, config[_FIELD_NAME])
result = dict()
result[_VALUE] = d[_TEXT]
if _KEY in d:
result[_KEY] = d[_KEY]
if _QUALIFIERS in d:
result[_QUALIFIERS] = d[_QUALIFIERS]
return self._relevant_text_from_context(d[_TEXT], result, config[_FIELD_NAME])

def pseudo_extraction_results(self, values, method, segment, doc_id=None, score=1.0):
results = list()
Expand Down Expand Up @@ -758,13 +785,14 @@ def rearrange_description(doc, html_description=False):
description = Core.remove_line_breaks(description)
if _KNOWLEDGE_GRAPH not in doc:
doc[_KNOWLEDGE_GRAPH] = dict()
doc[_KNOWLEDGE_GRAPH][_DESCRIPTION] = list()
o = dict()
o['value'] = description
o['key'] = 'description'
o['confidence'] = 1
o['provenance'] = [Core.custom_provenance_object(method, segment, doc[_DOCUMENT_ID])]
doc[_KNOWLEDGE_GRAPH][_DESCRIPTION].append(o)
if _DESCRIPTION not in doc[_KNOWLEDGE_GRAPH]:
doc[_KNOWLEDGE_GRAPH][_DESCRIPTION] = list()
o = dict()
o['value'] = description
o['key'] = 'description'
o['confidence'] = 1
o['provenance'] = [Core.custom_provenance_object(method, segment, doc[_DOCUMENT_ID])]
doc[_KNOWLEDGE_GRAPH][_DESCRIPTION].append(o)
return doc

@staticmethod
Expand Down Expand Up @@ -811,13 +839,14 @@ def rearrange_title(doc):
if title and title != '':
if _KNOWLEDGE_GRAPH not in doc:
doc[_KNOWLEDGE_GRAPH] = dict()
doc[_KNOWLEDGE_GRAPH][_TITLE] = list()
o = dict()
o['value'] = title
o['key'] = 'title'
o['confidence'] = 1
o['provenance'] = [Core.custom_provenance_object(method, segment, doc[_DOCUMENT_ID])]
doc[_KNOWLEDGE_GRAPH][_TITLE].append(o)
if _TITLE not in doc[_KNOWLEDGE_GRAPH]:
doc[_KNOWLEDGE_GRAPH][_TITLE] = list()
o = dict()
o['value'] = title
o['key'] = 'title'
o['confidence'] = 1
o['provenance'] = [Core.custom_provenance_object(method, segment, doc[_DOCUMENT_ID])]
doc[_KNOWLEDGE_GRAPH][_TITLE].append(o)

return doc

Expand Down
30 changes: 28 additions & 2 deletions etk/run_core_kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import time
from datetime import datetime
import json
import codecs
import sys
Expand Down Expand Up @@ -42,30 +43,50 @@ def run_serial(input, output, core, prefix='', kafka_server=None, kafka_topic=No
if kafka_producer is None:
output.close()

def run_serial_cdrs(etk_core, consumer, producer, producer_topic, indexing=False):

def run_serial_cdrs(etk_core, consumer, producer, producer_topic, indexing=False, worker_id=0):
prev_doc_sent_time = None

# high level api will handle batch thing
# will exit once timeout
for msg in consumer:
cdr = msg.value
cdr['@execution_profile'] = {'@worker_id': worker_id}
doc_arrived_time = time.time()
cdr['@execution_profile']['@doc_arrived_time'] = datetime.utcfromtimestamp(doc_arrived_time).isoformat()
cdr['@execution_profile']['@doc_wait_time'] = 0 if not prev_doc_sent_time \
else doc_arrived_time - prev_doc_sent_time

if 'doc_id' not in cdr:
cdr['doc_id'] = cdr.get('_id', cdr.get('document_id', ''))
if len(cdr['doc_id']) == 0:
print 'invalid cdr: unknown doc_id'
print 'processing', cdr['doc_id']

try:
start_run_core_time = time.time()
# run core
result = etk_core.process(cdr, create_knowledge_graph=True)
if not result:
raise Exception('run core error')

# indexing
if indexing:
result = index_knowledge_graph_fields(result)
cdr['@execution_profile']['@run_core_time'] = time.time() - start_run_core_time

doc_sent_time = time.time()
cdr['@execution_profile']['@doc_sent_time'] = datetime.utcfromtimestamp(doc_sent_time).isoformat()
prev_doc_sent_time = doc_sent_time
cdr['@execution_profile']['@doc_processed_time'] = doc_sent_time - doc_arrived_time
# dumping result
if result:
r = producer.send(producer_topic, result)
r.get(timeout=60) # wait till sent
else:
etk_core.log('fail to indexing doc {}'.format(cdr['doc_id']), core._ERROR)
print 'done'

except Exception as e:
# print e
exc_type, exc_value, exc_traceback = sys.exc_info()
Expand Down Expand Up @@ -169,6 +190,8 @@ def usage():
parser.add_argument("--kafka-output-args", action="store", type=str, dest="kafkaOutputArgs")
parser.add_argument("--indexing", action="store_true", dest="indexing")

parser.add_argument("--worker-id", action="store", type=str, dest="workerId")

c_options, args = parser.parse_known_args()

if not c_options.configPath and \
Expand All @@ -178,6 +201,8 @@ def usage():
usage()
sys.exit()

worker_id = int(c_options.workerId) if c_options.workerId is not None else 0

# kafka input
if c_options.kafkaInputServer is not None:
try:
Expand Down Expand Up @@ -208,7 +233,8 @@ def usage():
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
**output_args
)
run_serial_cdrs(c, consumer, producer, c_options.kafkaOutputTopic, indexing=c_options.indexing)
run_serial_cdrs(c, consumer, producer, c_options.kafkaOutputTopic, indexing=c_options.indexing,
worker_id=worker_id)

except Exception as e:
# print e
Expand Down
12 changes: 10 additions & 2 deletions run_etk_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,15 @@ def remove_if_no_html(x):
return False
return True


def remove_landmark_extractions(x):
if 'content_extraction' in x:
ce = x['content_extraction']
if 'inferlink_extractions' in ce:
ce.pop('inferlink_extractions')
x['content_extraction'] = ce
return x

def remove_extra_fields(x):
if 'content_extraction' in x:
ce = x['content_extraction']
Expand Down Expand Up @@ -61,8 +70,7 @@ def remove_extra_fields(x):

input_rdd = sc.sequenceFile(input_path)#.partitionBy(partitions)

output_rdd = input_rdd.mapValues(json.loads).filter(lambda x: remove_if_no_html(x[1])).mapValues(add_doc_id)\
.mapValues(lambda x: c.process(x, create_knowledge_graph=True))
output_rdd = input_rdd.mapValues(json.loads).filter(lambda x: remove_if_no_html(x[1])).mapValues(add_doc_id).mapValues(remove_landmark_extractions).mapValues(lambda x: c.process(x, create_knowledge_graph=True))

output_rdd = output_rdd.filter(lambda x: x[1] is not None).mapValues(remove_extra_fields).mapValues(json.dumps)
output_rdd.saveAsSequenceFile(output_path, compressionCodecClass=compression)
Expand Down

0 comments on commit d964c5e

Please sign in to comment.