Skip to content

Commit

Permalink
modify scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
stock committed Dec 6, 2014
1 parent 069e22e commit 7ad5208
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 14 deletions.
1 change: 1 addition & 0 deletions core/common/page/page.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/hu17889/go_spider/core/common/page_items"
"github.com/hu17889/go_spider/core/common/request"
"net/http"
//"fmt"
)

// Page represents an entity be crawled.
Expand Down
1 change: 1 addition & 0 deletions core/common/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func JsonpToJson(json string) string {
if end > start && end != -1 && start != -1 {
json = json[start : end+1]
}
json = strings.Replace(json, "\\'", "", -1)
regDetail, _ := regexp.Compile("([^\\s\\:\\{\\,\\d\"]+|[a-z][a-z\\d]*)\\s*\\:")
return regDetail.ReplaceAllString(json, "\"$1\":")
}
Expand Down
2 changes: 1 addition & 1 deletion core/downloader/downloader_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func (this *HttpDownloader) downloadJson(p *page.Page, req *request.Request) *pa

var r *simplejson.Json
if r, err = simplejson.NewJson(body); err != nil {
mlog.LogInst().LogError(err.Error())
mlog.LogInst().LogError(string(body) + "\t" + err.Error())
p.SetStatus(true, err.Error())
return p
}
Expand Down
52 changes: 43 additions & 9 deletions core/scheduler/scheduler_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,30 +6,64 @@
package scheduler

import (
"container/list"
"crypto/md5"
"github.com/hu17889/go_spider/core/common/request"
"sync"
//"fmt"
)

type QueueScheduler struct {
queue chan *request.Request
locker *sync.Mutex
rm bool
rmKey map[[md5.Size]byte]*list.Element
queue *list.List
}

func NewQueueScheduler() *QueueScheduler {
ch := make(chan *request.Request, 1024)
return &QueueScheduler{ch}
func NewQueueScheduler(rmDuplicate bool) *QueueScheduler {
queue := list.New()
rmKey := make(map[[md5.Size]byte]*list.Element)
locker := new(sync.Mutex)
return &QueueScheduler{rm: rmDuplicate, queue: queue, rmKey: rmKey, locker: locker}
}

func (this *QueueScheduler) Push(requ *request.Request) {
this.queue <- requ
this.locker.Lock()
var key [md5.Size]byte
if this.rm {
key = md5.Sum([]byte(requ.GetUrl()))
if _, ok := this.rmKey[key]; ok {
this.locker.Unlock()
return
}
}
e := this.queue.PushBack(requ)
if this.rm {
this.rmKey[key] = e
}
this.locker.Unlock()
}

func (this *QueueScheduler) Poll() *request.Request {
if len(this.queue) == 0 {
this.locker.Lock()
if this.queue.Len() <= 0 {
this.locker.Unlock()
return nil
} else {
return <-this.queue
}
e := this.queue.Front()
requ := e.Value.(*request.Request)
key := md5.Sum([]byte(requ.GetUrl()))
this.queue.Remove(e)
if this.rm {
delete(this.rmKey, key)
}
this.locker.Unlock()
return requ
}

func (this *QueueScheduler) Count() int {
return len(this.queue)
this.locker.Lock()
len := this.queue.Len()
this.locker.Unlock()
return len
}
35 changes: 33 additions & 2 deletions core/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,46 @@ import (
func TestQueueScheduler(t *testing.T) {
var r *request.Request
r = request.NewRequest("http://baidu.com", "html")
fmt.Printf("%v\n", r)

var s *scheduler.QueueScheduler
s = scheduler.NewQueueScheduler()
s = scheduler.NewQueueScheduler(false)

s.Push(r)
var count int = s.Count()
if count != 1 {
t.Error("count error")
}
fmt.Println(count)

var r1 *request.Request
r1 = s.Poll()
fmt.Println(r1)
if r1 == nil {
t.Error("poll error")
}
fmt.Printf("%v\n", r1)

// remove duplicate
s = scheduler.NewQueueScheduler(true)

r2 := request.NewRequest("http://qq.com", "html")
s.Push(r)
s.Push(r2)
s.Push(r)
count = s.Count()
if count != 2 {
t.Error("count error")
}
fmt.Println(count)

r1 = s.Poll()
if r1 == nil {
t.Error("poll error")
}
fmt.Printf("%v\n", r1)
r1 = s.Poll()
if r1 == nil {
t.Error("poll error")
}
fmt.Printf("%v\n", r1)
}
6 changes: 4 additions & 2 deletions core/spider/spider.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/hu17889/go_spider/core/scheduler"
"math/rand"
"time"
//"fmt"
)

type Spider struct {
Expand Down Expand Up @@ -53,7 +54,7 @@ func NewSpider(pageinst page_processer.PageProcesser, taskname string) *Spider {

// init spider
if ap.pScheduler == nil {
ap.SetScheduler(scheduler.NewQueueScheduler())
ap.SetScheduler(scheduler.NewQueueScheduler(false))
}

if ap.pDownloader == nil {
Expand Down Expand Up @@ -128,7 +129,7 @@ func (this *Spider) Run() {
}

func (this *Spider) close() {
this.SetScheduler(scheduler.NewQueueScheduler())
this.SetScheduler(scheduler.NewQueueScheduler(false))
this.SetDownloader(downloader.NewHttpDownloader())
this.pPiplelines = make([]pipeline.Pipeline, 0)
this.exitWhenComplete = true
Expand Down Expand Up @@ -272,6 +273,7 @@ func (this *Spider) pageProcess(req *request.Request) {

this.pPageProcesser.Process(p)
for _, req := range p.GetTargetRequests() {
//fmt.Printf("%v\n",req)
this.addRequest(req)
}

Expand Down

0 comments on commit 7ad5208

Please sign in to comment.