Skip to content

Commit

Permalink
Merge pull request #113 from usc-isi-i2/development
Browse files Browse the repository at this point in the history
Development
  • Loading branch information
saggu committed Sep 5, 2017
2 parents e7f82aa + 94f04e7 commit f0618ff
Show file tree
Hide file tree
Showing 70 changed files with 24,983 additions and 2,289 deletions.
144 changes: 144 additions & 0 deletions add_images_ads.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from optparse import OptionParser
import time
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import codecs
from datetime import datetime


class Janitor(object):
def __init__(self, ads_no_images_path, log_path):
self.ad_uri_prefix = 'http://dig.isi.edu/ht/data/webpage/'
self.image_query = {"_source": ["identifier", "url"],
"query": {
"term": {
"isImagePartOf.uri": {

}
}
}
}
self.o_file = codecs.open(ads_no_images_path, 'a+')
self.log_file = codecs.open(log_path, 'a+')
self.source_index = 'dig-4'
self.source_doc_type = 'image'
self.source_es = Elasticsearch(['http://10.1.94.68:9200'])
self.dest_index = 'dig-etk-search-10'
self.dest_doc_type = 'ads'
self.dest_es = Elasticsearch(['http://10.1.94.103:9201'])
self.total_docs_processed = 0

def get_images_from_es(self, ads):
new_ads = list()
for ad in ads:
q = self.image_query
q["query"]["term"]["isImagePartOf.uri"]["value"] = '{}{}'.format(self.ad_uri_prefix, ad['_id'])
r = self.source_es.search(index=self.source_index, doc_type=self.source_doc_type, body=q)
all_images = self.convert_image_to_kg(r['hits']['hits'])
if len(all_images) > 0:
source = ad['_source']
if 'knowledge_graph' not in source:
source['knowledge_graph'] = dict()
source['knowledge_graph']['all_images'] = all_images
if 'dig_version' not in source:
source['dig_version'] = 1.0
else:
source['dig_version'] += 1.0
ad['_source'] = source
new_ads.append(ad)
else:
print 'No images for ad: {}'.format(ad['_id'])
# self.o_file.write(ad['_id'])
# self.o_file.write('\n')
self.bulk_upload(new_ads)
updated_docs = len(new_ads)
self.total_docs_processed += updated_docs
self.log_file.write('{}\n'.format(datetime.now().isoformat()))
self.log_file.write('Documents updated in this batch: {}\n'.format(str(updated_docs)))
self.log_file.write('Total documents processed till date: {}\n'.format(str(self.total_docs_processed)))

@staticmethod
def convert_image_to_kg(images_list):
all_images = list()
for image_obj in images_list:
image = image_obj['_source']
kg_object = dict()
kg_object['confidence'] = 0.5
kg_object['key'] = image['identifier']
kg_object['value'] = image['url']
all_images.append(kg_object)
return all_images

def get_ads(self, batch_size=10):
# TODO PROCESS BACKPAGE first
q = {
"query": {
"filtered": {
"query": {"match_all": {}},
"filter": {
"and": {
"filters": [
{
"term": {
"knowledge_graph.website.key": "backpage.com"
}
},
{
"not": {
"filter": {
"exists": {
"field": "dig_version"
}
}
}
}
]
}
}
}
}
}
page = self.dest_es.search(index=self.dest_index, doc_type=self.dest_doc_type, scroll='2m',
search_type='scan', size=batch_size, body=q)
sid = page['_scroll_id']
scroll_size = page['hits']['total']

# Start scrolling
while (scroll_size > 0):
page = self.dest_es.scroll(scroll_id=sid, scroll='2m')
# Update the scroll ID
sid = page['_scroll_id']
scroll_size = len(page['hits']['hits'])
print "scroll size: " + str(scroll_size)

self.get_images_from_es(page['hits']['hits'])

def bulk_upload(self, ads):
actions = list()
for ad in ads:
action = {
"_op_type": "update",
"_index": ad["_index"],
"_type": ad["_type"],
"_id": ad['_id'],
"doc": ad['_source']
}
actions.append(action)
helpers.bulk(self.dest_es, actions)


if __name__ == '__main__':
parser = OptionParser()
(c_options, args) = parser.parse_args()
ads_no_images_path = args[0]
log_path = args[1]
j = Janitor(ads_no_images_path, log_path)
start_time = time.time()
for i in range(0,1000):
try:
j.get_ads()
except:
j.log_file.write('Failed attempt: {}\n'.format(i))
pass
j.log_file.write('The script took {0} second !'.format(time.time() - start_time))

87 changes: 87 additions & 0 deletions convert_to_cdr3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
from optparse import OptionParser
from pyspark import SparkContext, SparkConf
import requests
# from elasticsearch import Elasticsearch
import json

__author__ = 'amandeep'

image_query = """{"_source": ["identifier", "url"],
"query": {
"term": {
"isImagePartOf.uri": {
}
}
}
}"""

ad_uri_prefix = 'http://dig.isi.edu/ht/data/webpage/'

source_index = 'dig-4'
source_doc_type = 'image'
# source_es = Elasticsearch(['http://10.1.94.68:9200'])
es_url = 'http://10.1.94.68:9200'


def convert_image_to_cdr3(images_list):
all_images = list()
for image_obj in images_list:
image = image_obj['_source']
i_obj = dict()
i_obj['identifier'] = image['identifier']
i_obj['obj_stored_url'] = image['url']
all_images.append(i_obj)
return all_images


def search(query):
return json.loads(requests.post('{}/{}/{}/_search'.format(es_url, source_index, source_doc_type), json=query).text)


def get_images_from_es(ad):
q = json.loads(image_query)
q["query"]["term"]["isImagePartOf.uri"]["value"] = '{}{}'.format(ad_uri_prefix, ad['doc_id'])
r = None
# try 10 times in case of time out error
for i in range(0,10):
try:
r = search(q)
break
except:
continue
if r:
all_images = convert_image_to_cdr3(r['hits']['hits'])
if len(all_images) > 0:
ad['objects'] = all_images
else:
print 'No images for ad: {}'.format(ad['doc_id'])
return ad

if __name__ == '__main__':
compression = "org.apache.hadoop.io.compress.GzipCodec"

parser = OptionParser()
parser.add_option("-p", "--partitions", action="store",
type="int", dest="partitions", default=0)
(c_options, args) = parser.parse_args()
input_path = args[0]
output_path = args[1]

partitions = c_options.partitions

sc = SparkContext(appName="CONVERT-To-CDR3")
conf = SparkConf()

if partitions == 0:
input_rdd = sc.sequenceFile(input_path)
else:
input_rdd = sc.sequenceFile(input_path).partitionBy(partitions)
output_rdd = input_rdd.mapValues(json.loads).mapValues(get_images_from_es).mapValues(json.dumps)

output_rdd.saveAsSequenceFile(output_path, compressionCodecClass=compression)
output_rdd = None
output_rdd = sc.sequenceFile(output_path).mapValues(json.loads)

print output_rdd.filter(lambda x: 'objects' in x[1] and len(x[1]['objects']) > 0).count()
print output_rdd.filter(lambda x: 'objects' not in x[1]).count()
6 changes: 6 additions & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,9 @@ dependencies:
- nose2==0.6.4
- ply==3.9
- requests-file==1.4
- scikit-learn==0.18.1
- scipy
- numpy
- kafka-python>=1.3.3
- digsandpaper>=0.1.4
- python-logstash>=0.4.6
Empty file added etk/classifiers/__init__.py
Empty file.
Loading

0 comments on commit f0618ff

Please sign in to comment.