-
Notifications
You must be signed in to change notification settings - Fork 41
/
Copy pathrfp.go
115 lines (106 loc) · 2.55 KB
/
rfp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package dupefilter
import (
"bytes"
"crypto/md5"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"net/url"
"sort"
"strings"
"sync"
"github.com/antchfx/antch"
"github.com/tylertreat/BoomFilters"
)
// RFPDupeFilter is a middleware of HTTP downloader to filter duplicate
// requests, based on request fingerprint.
type RFPDupeFilter struct {
// IncludeHeaders specifies what header value is included to
// fingerprint compute.
IncludeHeaders []string
mu sync.Mutex
boom boom.Filter
next antch.HttpMessageHandler
}
func canonicalizeURL(u *url.URL) (n *url.URL) {
n = &url.URL{
Scheme: u.Scheme,
User: u.User,
Host: u.Host,
Path: u.Path,
RawPath: u.RawPath,
}
if host, port, _ := net.SplitHostPort(u.Host); (u.Scheme == "http" && port == "80") ||
(u.Scheme == "https" && port == "443") {
n.Host = host
}
if v := u.EscapedPath(); v == "" {
n.Path = "/"
n.RawPath = "/"
}
// The query params.
var querys []string
for name, value := range u.Query() {
for _, v := range value {
querys = append(querys, name+"="+url.QueryEscape(v))
}
}
// Sorted query list by it's name.
a := sort.StringSlice(querys)
sort.Sort(a)
if len(a) > 0 {
n.RawQuery = strings.Join(a, "&")
}
return n
}
func fingerprint(req *http.Request, includeHeaders []string) []byte {
var fp bytes.Buffer
method := req.Method
if req.Method == "" {
method = "GET"
}
fp.WriteString(method)
fp.WriteString(canonicalizeURL(req.URL).String())
switch req.Method {
case "POST", "DELETE", "PUT":
if r, err := req.GetBody(); err == nil && r != http.NoBody {
if b, err := ioutil.ReadAll(r); err == nil {
h := md5.New()
fp.WriteString(fmt.Sprintf("%x", h.Sum(b)))
}
}
}
if includeHeaders != nil {
for name, value := range req.Header {
for _, name2 := range includeHeaders {
if name == name2 {
fp.WriteString(strings.Join(value, "&"))
}
}
}
}
return fp.Bytes()
}
func (f *RFPDupeFilter) Send(req *http.Request) (*http.Response, error) {
// A request is specifies force to crawling.
if v, ok := req.Context().Value("dont_filter").(bool); ok && v {
return f.next.Send(req)
}
fp := fingerprint(req, f.IncludeHeaders)
f.mu.Lock()
if f.boom.TestAndAdd(fp) {
f.mu.Unlock()
// Is has visited before.
return nil, errors.New("RFPDupeFilter: request was denied")
}
f.mu.Unlock()
return f.next.Send(req)
}
func RFPDupeFilterMiddleware() antch.Middleware {
return func(next antch.HttpMessageHandler) antch.HttpMessageHandler {
bf := boom.NewDefaultScalableBloomFilter(0.01)
return &RFPDupeFilter{next: next, boom: bf}
}
}