diff --git a/core/common/page/page.go b/core/common/page/page.go index 8853a1b..cc68513 100644 --- a/core/common/page/page.go +++ b/core/common/page/page.go @@ -8,6 +8,7 @@ import ( "github.com/hu17889/go_spider/core/common/page_items" "github.com/hu17889/go_spider/core/common/request" "net/http" + //"fmt" ) // Page represents an entity be crawled. diff --git a/core/common/util/util.go b/core/common/util/util.go index cf86bcb..79ca0f7 100644 --- a/core/common/util/util.go +++ b/core/common/util/util.go @@ -20,6 +20,7 @@ func JsonpToJson(json string) string { if end > start && end != -1 && start != -1 { json = json[start : end+1] } + json = strings.Replace(json, "\\'", "", -1) regDetail, _ := regexp.Compile("([^\\s\\:\\{\\,\\d\"]+|[a-z][a-z\\d]*)\\s*\\:") return regDetail.ReplaceAllString(json, "\"$1\":") } diff --git a/core/downloader/downloader_http.go b/core/downloader/downloader_http.go index 3535b48..437816c 100644 --- a/core/downloader/downloader_http.go +++ b/core/downloader/downloader_http.go @@ -186,7 +186,7 @@ func (this *HttpDownloader) downloadJson(p *page.Page, req *request.Request) *pa var r *simplejson.Json if r, err = simplejson.NewJson(body); err != nil { - mlog.LogInst().LogError(err.Error()) + mlog.LogInst().LogError(string(body) + "\t" + err.Error()) p.SetStatus(true, err.Error()) return p } diff --git a/core/scheduler/scheduler_queue.go b/core/scheduler/scheduler_queue.go index 26552f8..dc28d8b 100644 --- a/core/scheduler/scheduler_queue.go +++ b/core/scheduler/scheduler_queue.go @@ -6,30 +6,64 @@ package scheduler import ( + "container/list" + "crypto/md5" "github.com/hu17889/go_spider/core/common/request" + "sync" + //"fmt" ) type QueueScheduler struct { - queue chan *request.Request + locker *sync.Mutex + rm bool + rmKey map[[md5.Size]byte]*list.Element + queue *list.List } -func NewQueueScheduler() *QueueScheduler { - ch := make(chan *request.Request, 1024) - return &QueueScheduler{ch} +func NewQueueScheduler(rmDuplicate bool) *QueueScheduler { + queue := list.New() + rmKey := make(map[[md5.Size]byte]*list.Element) + locker := new(sync.Mutex) + return &QueueScheduler{rm: rmDuplicate, queue: queue, rmKey: rmKey, locker: locker} } func (this *QueueScheduler) Push(requ *request.Request) { - this.queue <- requ + this.locker.Lock() + var key [md5.Size]byte + if this.rm { + key = md5.Sum([]byte(requ.GetUrl())) + if _, ok := this.rmKey[key]; ok { + this.locker.Unlock() + return + } + } + e := this.queue.PushBack(requ) + if this.rm { + this.rmKey[key] = e + } + this.locker.Unlock() } func (this *QueueScheduler) Poll() *request.Request { - if len(this.queue) == 0 { + this.locker.Lock() + if this.queue.Len() <= 0 { + this.locker.Unlock() return nil - } else { - return <-this.queue } + e := this.queue.Front() + requ := e.Value.(*request.Request) + key := md5.Sum([]byte(requ.GetUrl())) + this.queue.Remove(e) + if this.rm { + delete(this.rmKey, key) + } + this.locker.Unlock() + return requ } func (this *QueueScheduler) Count() int { - return len(this.queue) + this.locker.Lock() + len := this.queue.Len() + this.locker.Unlock() + return len } diff --git a/core/scheduler/scheduler_test.go b/core/scheduler/scheduler_test.go index 5bec640..318aa6d 100644 --- a/core/scheduler/scheduler_test.go +++ b/core/scheduler/scheduler_test.go @@ -15,15 +15,46 @@ import ( func TestQueueScheduler(t *testing.T) { var r *request.Request r = request.NewRequest("http://baidu.com", "html") + fmt.Printf("%v\n", r) var s *scheduler.QueueScheduler - s = scheduler.NewQueueScheduler() + s = scheduler.NewQueueScheduler(false) s.Push(r) var count int = s.Count() + if count != 1 { + t.Error("count error") + } fmt.Println(count) var r1 *request.Request r1 = s.Poll() - fmt.Println(r1) + if r1 == nil { + t.Error("poll error") + } + fmt.Printf("%v\n", r1) + + // remove duplicate + s = scheduler.NewQueueScheduler(true) + + r2 := request.NewRequest("http://qq.com", "html") + s.Push(r) + s.Push(r2) + s.Push(r) + count = s.Count() + if count != 2 { + t.Error("count error") + } + fmt.Println(count) + + r1 = s.Poll() + if r1 == nil { + t.Error("poll error") + } + fmt.Printf("%v\n", r1) + r1 = s.Poll() + if r1 == nil { + t.Error("poll error") + } + fmt.Printf("%v\n", r1) } diff --git a/core/spider/spider.go b/core/spider/spider.go index 0781024..da929c5 100644 --- a/core/spider/spider.go +++ b/core/spider/spider.go @@ -13,6 +13,7 @@ import ( "github.com/hu17889/go_spider/core/scheduler" "math/rand" "time" + //"fmt" ) type Spider struct { @@ -53,7 +54,7 @@ func NewSpider(pageinst page_processer.PageProcesser, taskname string) *Spider { // init spider if ap.pScheduler == nil { - ap.SetScheduler(scheduler.NewQueueScheduler()) + ap.SetScheduler(scheduler.NewQueueScheduler(false)) } if ap.pDownloader == nil { @@ -128,7 +129,7 @@ func (this *Spider) Run() { } func (this *Spider) close() { - this.SetScheduler(scheduler.NewQueueScheduler()) + this.SetScheduler(scheduler.NewQueueScheduler(false)) this.SetDownloader(downloader.NewHttpDownloader()) this.pPiplelines = make([]pipeline.Pipeline, 0) this.exitWhenComplete = true @@ -272,6 +273,7 @@ func (this *Spider) pageProcess(req *request.Request) { this.pPageProcesser.Process(p) for _, req := range p.GetTargetRequests() { + //fmt.Printf("%v\n",req) this.addRequest(req) }