Skip to content

Commit

Permalink
多进程版本测试速度
Browse files Browse the repository at this point in the history
  • Loading branch information
lucklygaj committed Sep 26, 2017
1 parent 1762814 commit 616c37b
Show file tree
Hide file tree
Showing 10 changed files with 241 additions and 127 deletions.
208 changes: 118 additions & 90 deletions .idea/workspace.xml

Large diffs are not rendered by default.

Binary file modified com/anjie/base/__pycache__/spider.cpython-36.pyc
Binary file not shown.
51 changes: 39 additions & 12 deletions com/anjie/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@

from com.anjie.module.default_download import DefaultDownload;
from com.anjie.module.default_scheduler import DefaultScheduler;
from com.anjie.module.mongo_scheduler import MongoScheduler;
from com.anjie.utils.elog import Elog;
from com.anjie.spider.myspider import Spider
from com.anjie.spider.ThreadSpider import ThreadSpider;
from datetime import datetime
import threading,time
import threading, time
import multiprocessing


class Engine:
def __init__(self, spider=None, scheduler=DefaultScheduler(), download=DefaultDownload(delay=0), pipline=None):
def __init__(self, spider=None, scheduler=MongoScheduler(), download=DefaultDownload(delay=0), pipline=None):
super(Engine, self).__init__();
self.spider = spider;
self.scheduler = scheduler;
self.scheduler.mongo_queue.clear();
self.download = download;
self.pipline = pipline;

Expand All @@ -23,15 +26,18 @@ def addSpider(self, spider):
def start(self):
self.scheduler.addRequests(self.spider.getSeeds())
Elog.info('>>start time is %s' % datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'))
startTime = datetime.now();

self.thread_task()

def thread_task(self):
while True:
rq = self.scheduler.nextRequest();
if rq is None:
Elog.warning('Engine is will stop,because scheduler has not more request be schedule');
break;

threads = []
max_threads =1;
max_threads = 8;
lock = threading.Lock();
while threads or self.scheduler.isNotEmpty():
# the crawl is still active
Expand All @@ -54,11 +60,9 @@ def start(self):
finally:
lock.release();

# all threads have been processed
# sleep temporarily so CPU can focus execution on other threads
Elog.info('>>end time is %s' % datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'))
endTime = datetime.now();
Elog.info('cost time is %d' % (endTime - startTime).seconds);
# all threads have been processed
# sleep temporarily so CPU can focus execution on other threads


def sleep(self, time):
pass;
Expand All @@ -68,7 +72,7 @@ def stop(self):


class ThreadTask(threading.Thread):
def __init__(self, rq, download, pipline=None, scheduler=None,spider = None):
def __init__(self, rq, download, pipline=None, scheduler=None, spider=None):
super(ThreadTask, self).__init__();
self.rq = rq;
self.download = download;
Expand All @@ -89,7 +93,30 @@ def run(self):
pass


if __name__ == '__main__':
e = Engine();
def running_tas():
e = Engine(scheduler=MongoScheduler());
e.addSpider(ThreadSpider());
e.start()


def process_crawler():
num_cpus = multiprocessing.cpu_count()
# pool = multiprocessing.Pool(processes=num_cpus)
Elog.info('Starting {} processes'.format(num_cpus));
processes = []
startTime = datetime.now();
for i in range(num_cpus):
p = multiprocessing.Process(target=running_tas, )
# parsed = pool.apply_async(threaded_link_crawler, args, kwargs)
p.start()
processes.append(p)
# wait for processes to complete
for p in processes:
p.join()
Elog.info('>>end time is %s' % datetime.strftime(datetime.now(), '%Y-%m-%d %H:%M:%S'))
endTime = datetime.now();
Elog.info('cost time is %d' % (endTime - startTime).seconds);


if __name__ == '__main__':
process_crawler()
44 changes: 22 additions & 22 deletions com/anjie/mode/MongoQueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,35 @@ def push(self, rq):
def pop(self):
record = self.db.crawl_queue.find_and_modify(
query={'status': self.OUTSTANDING},
update={'$set': {'status': self.PROCESSING}, 'timestamp': datetime.now()}
update={'$set': {'status': self.PROCESSING, 'timestamp': datetime.utcnow()}}
)
if record:
return pickle.loads(zlib.decompress(record['request']));
else:
self.repair()
raise KeyError()

def peek(self):
record = self.db.crawl_queue.find_one({'status': self.OUTSTANDING})
if record:
return pickle.loads(zlib.decompress(record['request']));
def peek(self):
record = self.db.crawl_queue.find_one({'status': self.OUTSTANDING})
if record:
return pickle.loads(zlib.decompress(record['request']));

def complete(self, url):
self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.COMPLETE}})
def complete(self, url):
self.db.crawl_queue.update({'_id': url}, {'$set': {'status': self.COMPLETE}})

def repair(self):
"""Release stalled jobs
"""
record = self.db.crawl_queue.find_and_modify(
query={
'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
'status': {'$ne': self.COMPLETE}
},
update={'$set': {'status': self.OUTSTANDING}}
)
if record:
Elog.info('Released:', record['_id'])
def repair(self):
"""Release stalled jobs
"""
record = self.db.crawl_queue.find_and_modify(
query={
'timestamp': {'$lt': datetime.now() - timedelta(seconds=self.timeout)},
'status': {'$ne': self.COMPLETE}
},
update={'$set': {'status': self.OUTSTANDING}}
)
if record:
Elog.info('Released:' + record['_id'])

def clear(self):
Elog.info('mongo queue be clear')
self.db.crawl_queue.drop()
def clear(self):
Elog.info('mongo queue be clear')
self.db.crawl_queue.drop()
Binary file modified com/anjie/module/__pycache__/default_download.cpython-36.pyc
Binary file not shown.
Binary file modified com/anjie/module/__pycache__/default_scheduler.cpython-36.pyc
Binary file not shown.
59 changes: 59 additions & 0 deletions com/anjie/module/mongo_scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from urllib import parse;
from urllib.robotparser import RobotFileParser;

from com.anjie.base.scheduler import BaseScheduler
from com.anjie.mode.spider_exception import SpiderException;
from com.anjie.utils.elog import Elog
from com.anjie.utils.utils import UserAgent;
from com.anjie.mode.MongoQueue import MongoQueue;


class MongoScheduler(BaseScheduler):
def __init__(self):
super(MongoScheduler, self).__init__();

# 待抓取队列
self.mongo_queue = MongoQueue();

def __iter__(self):
return self

def __next__(self):
try:
r = self.mongo_queue.pop();
except KeyError as e:
raise StopIteration
else:
return r;

def nextRequest(self):
try:
r = self.mongo_queue.pop();
except IndexError as e:
return None;
except KeyError as e:
return None
else:
return r;

def isNotEmpty(self):
if self.mongo_queue and self.mongo_queue.peek():
return True;
else:
return False;

def addRequest(self, rq):
rq.headers.setdefault(UserAgent.user_agent_key, UserAgent.user_agent_list[0]);
self.mongo_queue.push(rq);

def addRequests(self, rqs):
for r in rqs:
r.headers.setdefault(UserAgent.user_agent_key, UserAgent.user_agent_list[0]);
self.mongo_queue.push(r);

def addLoserRequest(self, rq):
rq.headers.setdefault(UserAgent.user_agent_key, UserAgent.user_agent_list[0]);
self.mongo_queue.push(rq);

def addCompleteRequest(self, rq):
self.mongo_queue.complete(rq.url);
6 changes: 3 additions & 3 deletions com/anjie/spider/ThreadSpider.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ class ThreadSpider(BaseSpider):
def __init__(self):
super(ThreadSpider, self).__init__();
self.max_urls = 000;
ulist = 'https://www.baidu.com';
ulist = 'https://www.baidu.com/';
rlist = [];
for _ in range(0,100):
r = ERequest(url=ulist, needParse=False)
for i in range(0,100):
r = ERequest(url=ulist+str(i), needParse=False)
rlist.append(r);
self.seed_request = list(rlist)

Expand Down
Binary file modified com/anjie/utils/__pycache__/elog.cpython-36.pyc
Binary file not shown.
Empty file added mongod.lock
Empty file.

0 comments on commit 616c37b

Please sign in to comment.