Colly源码解析——框架
? ? ? ??Colly是一個(gè)使用golang實(shí)現(xiàn)的數(shù)據(jù)抓取框架,我們可以使用它快速搭建類似網(wǎng)絡(luò)爬蟲這樣的應(yīng)用。本文我們將剖析其源碼,以探析其中奧秘。(轉(zhuǎn)載請(qǐng)指明出于breaksoftware的csdn博客)
? ? ? ??Collector是Colly的核心結(jié)構(gòu)體,其中包含了用戶對(duì)框架行為的定義。一般情況下,我們可以使用NewCollector方法構(gòu)建一個(gè)它的指針
// NewCollector creates a new Collector instance with default configuration
func NewCollector(options ...func(*Collector)) *Collector {c := &Collector{}c.Init()for _, f := range options {f(c)}c.parseSettingsFromEnv()return c
}
? ? ? ? 第4行調(diào)用了Init方法初始化了Collector的一些成員。然后遍歷并調(diào)用不定長參數(shù),這些參數(shù)都是函數(shù)類型——func(*Collector)。我們看個(gè)例子
c := colly.NewCollector(// Visit only domains: coursera.org, www.coursera.orgcolly.AllowedDomains("coursera.org", "www.coursera.org"),// Cache responses to prevent multiple download of pages// even if the collector is restartedcolly.CacheDir("./coursera_cache"),)
? ? ? ? AllowedDomains和CacheDir都返回一個(gè)匿名函數(shù),其邏輯就是將Collector對(duì)象中對(duì)應(yīng)的成員設(shè)置為指定的值
// AllowedDomains sets the domain whitelist used by the Collector.
func AllowedDomains(domains ...string) func(*Collector) {return func(c *Collector) {c.AllowedDomains = domains}
}
? ? ? ? Collector中絕大部分成員均有對(duì)應(yīng)的方法,而且它們的名稱(函數(shù)名和成員名)也一致。但是其中只有3個(gè)方法——ParseHTTPErrorResponse、AllowURLRevisit和IgnoreRobotsTxt比較特殊,因?yàn)樗鼈儧]有參數(shù)。如果被調(diào)用,則對(duì)應(yīng)的Collector成員會(huì)被設(shè)置為true
// AllowURLRevisit instructs the Collector to allow multiple downloads of the same URL
func AllowURLRevisit() func(*Collector) {return func(c *Collector) {c.AllowURLRevisit = true}
}
? ? ? ? 再回到NewCollector函數(shù),其最后一個(gè)邏輯是調(diào)用parseSettingsFromEnv方法。從名稱我們可以看出它是用于解析環(huán)境變量的。將它放在最后是可以理解的,因?yàn)楹竺鎴?zhí)行的邏輯可以覆蓋前面的邏輯。這樣我們可以讓環(huán)境變量對(duì)應(yīng)的設(shè)置生效。
func (c *Collector) parseSettingsFromEnv() {for _, e := range os.Environ() {if !strings.HasPrefix(e, "COLLY_") {continue}pair := strings.SplitN(e[6:], "=", 2)if f, ok := envMap[pair[0]]; ok {f(c, pair[1])} else {log.Println("Unknown environment variable:", pair[0])}}
}
? ? ? ? 它從os.Environ()中獲取系統(tǒng)環(huán)境變量,然后遍歷它們。對(duì)于以COLLY_開頭的變量,找到其在envMap中的對(duì)應(yīng)方法,并調(diào)用之以覆蓋之前設(shè)置的Collector成員變量值。envMap是一個(gè)<string,func>的映射,它是包內(nèi)全局的。
var envMap = map[string]func(*Collector, string){"ALLOWED_DOMAINS": func(c *Collector, val string) {c.AllowedDomains = strings.Split(val, ",")},"CACHE_DIR": func(c *Collector, val string) {c.CacheDir = val},
……
? ? ? ? 初始化完Collector,我們就可以讓其發(fā)送請(qǐng)求。目前Colly公開了5個(gè)方法,其中3個(gè)是和Post相關(guān)的:Post、PostRaw和PostMultipart。一個(gè)Get請(qǐng)求方法:Visit。以及一個(gè)用戶可以高度定制的方法:Request。這些方法底層都調(diào)用了scrape方法。比如Visit的實(shí)現(xiàn)是
func (c *Collector) Visit(URL string) error {return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}
scrape
? ? ? ??scrape方法是需要我們展開分析的。因?yàn)樗荂olly庫中兩個(gè)最重要的方法之一。
// scrape method
func (c *Collector) scrape(u, method string, depth int, requestData io.Reader, ctx *Context, hdr http.Header, checkRevisit bool) error {if err := c.requestCheck(u, method, depth, checkRevisit); err != nil {return err}
? ? ? ? 首先requestCheck方法檢測一些和遞歸深度以及URL相關(guān)的信息
func (c *Collector) requestCheck(u, method string, depth int, checkRevisit bool) error {if u == "" {return ErrMissingURL}if c.MaxDepth > 0 && c.MaxDepth < depth {return ErrMaxDepth}
? ? ? ? Collector的MaxDepth默認(rèn)設(shè)置為0,即不用比較深度。如果它被設(shè)置值,則遞歸深度不可以超過它。
? ? ? ? 然后檢測URL是否在被禁止的URL過濾器中。如果在,則返回錯(cuò)誤。
if len(c.DisallowedURLFilters) > 0 {if isMatchingFilter(c.DisallowedURLFilters, []byte(u)) {return ErrForbiddenURL}}
? ? ? ? 之后檢測URL是否在準(zhǔn)入的URL過濾器中。如果不在,則返回錯(cuò)誤
if len(c.URLFilters) > 0 {if !isMatchingFilter(c.URLFilters, []byte(u)) {return ErrNoURLFiltersMatch}}
? ? ? ? 最后針對(duì)GET請(qǐng)求,檢查其是否被請(qǐng)求過。
if checkRevisit && !c.AllowURLRevisit && method == "GET" {h := fnv.New64a()h.Write([]byte(u))uHash := h.Sum64()visited, err := c.store.IsVisited(uHash)if err != nil {return err}if visited {return ErrAlreadyVisited}return c.store.Visited(uHash)}return nil
}
? ? ? ? 通過這些檢測后,scrape會(huì)對(duì)URL組成進(jìn)行分析補(bǔ)齊
// scrape methodparsedURL, err := url.Parse(u)if err != nil {return err}if parsedURL.Scheme == "" {parsedURL.Scheme = "http"}
? ? ? ? 然后針對(duì)host進(jìn)行精確匹配(在requestCheck中,是對(duì)URL使用正則進(jìn)行匹配)。先檢測host是否在被禁止的列表中,然后檢測其是否在準(zhǔn)入的列表中。
// scrape methodif !c.isDomainAllowed(parsedURL.Host) {return ErrForbiddenDomain}
func (c *Collector) isDomainAllowed(domain string) bool {for _, d2 := range c.DisallowedDomains {if d2 == domain {return false}}if c.AllowedDomains == nil || len(c.AllowedDomains) == 0 {return true}for _, d2 := range c.AllowedDomains {if d2 == domain {return true}}return false
}
? ? ? ? 通過上面檢測,還需要檢查是否需要遵從Robots協(xié)議
// scrape methodif !c.IgnoreRobotsTxt {if err = c.checkRobots(parsedURL); err != nil {return err}}
? ? ? ? 所有檢測通過后,就需要填充請(qǐng)求了
// scrape methodif hdr == nil {hdr = http.Header{"User-Agent": []string{c.UserAgent}}}rc, ok := requestData.(io.ReadCloser)if !ok && requestData != nil {rc = ioutil.NopCloser(requestData)}req := &http.Request{Method: method,URL: parsedURL,Proto: "HTTP/1.1",ProtoMajor: 1,ProtoMinor: 1,Header: hdr,Body: rc,Host: parsedURL.Host,}setRequestBody(req, requestData)
? ? ? ? 第5~8行,使用類型斷言等方法,將請(qǐng)求的數(shù)據(jù)(requestData)轉(zhuǎn)換成io.ReadCloser接口數(shù)據(jù)。setRequestBody方法則是根據(jù)數(shù)據(jù)(requestData)的原始類型,設(shè)置Request結(jié)構(gòu)中的GetBody方法
func setRequestBody(req *http.Request, body io.Reader) {if body != nil {switch v := body.(type) {case *bytes.Buffer:req.ContentLength = int64(v.Len())buf := v.Bytes()req.GetBody = func() (io.ReadCloser, error) {r := bytes.NewReader(buf)return ioutil.NopCloser(r), nil}case *bytes.Reader:req.ContentLength = int64(v.Len())snapshot := *vreq.GetBody = func() (io.ReadCloser, error) {r := snapshotreturn ioutil.NopCloser(&r), nil}case *strings.Reader:req.ContentLength = int64(v.Len())snapshot := *vreq.GetBody = func() (io.ReadCloser, error) {r := snapshotreturn ioutil.NopCloser(&r), nil}}if req.GetBody != nil && req.ContentLength == 0 {req.Body = http.NoBodyreq.GetBody = func() (io.ReadCloser, error) { return http.NoBody, nil }}}
}
? ? ? ? 這種抽象方式,使得不同類型的requestData都可以通過統(tǒng)一的GetBody方法獲取內(nèi)容。目前Colly中發(fā)送數(shù)據(jù)有3種復(fù)合結(jié)構(gòu),分別是:map[string]string、requestData []byte和map[string][]byte。對(duì)于普通的Post傳送map[string]string數(shù)據(jù),Colly會(huì)使用createFormReader方法將其轉(zhuǎn)換成Reader結(jié)構(gòu)指針
func createFormReader(data map[string]string) io.Reader {form := url.Values{}for k, v := range data {form.Add(k, v)}return strings.NewReader(form.Encode())
}
? ? ? ? 如果是一個(gè)二進(jìn)制切片,則使用bytes.NewReader直接將其轉(zhuǎn)換為Reader結(jié)構(gòu)指針
? ? ? ? 如果是map[string][]byte,則是Post數(shù)據(jù)的Multipart結(jié)構(gòu),使用createMultipartReader方法將其轉(zhuǎn)換成Buffer結(jié)構(gòu)指針。
func createMultipartReader(boundary string, data map[string][]byte) io.Reader {dashBoundary := "--" + boundarybody := []byte{}buffer := bytes.NewBuffer(body)buffer.WriteString("Content-type: multipart/form-data; boundary=" + boundary + "\n\n")for contentType, content := range data {buffer.WriteString(dashBoundary + "\n")buffer.WriteString("Content-Disposition: form-data; name=" + contentType + "\n")buffer.WriteString(fmt.Sprintf("Content-Length: %d \n\n", len(content)))buffer.Write(content)buffer.WriteString("\n")}buffer.WriteString(dashBoundary + "--\n\n")return buffer
}
? ? ? ? 回到scrape方法中,數(shù)據(jù)準(zhǔn)備結(jié)束,開始正式獲取數(shù)據(jù)
// scrape methodu = parsedURL.String()c.wg.Add(1)if c.Async {go c.fetch(u, method, depth, requestData, ctx, hdr, req)return nil}return c.fetch(u, method, depth, requestData, ctx, hdr, req)
}
? ? ? ? 通過第4行我們可以看到,可以通過Async參數(shù)決定是否異步的獲取數(shù)據(jù)。
fetch
? ? ? ? 在解析fetch方法前,我們要先介紹Collector的幾個(gè)回調(diào)函數(shù)
htmlCallbacks []*htmlCallbackContainerxmlCallbacks []*xmlCallbackContainerrequestCallbacks []RequestCallbackresponseCallbacks []ResponseCallbackerrorCallbacks []ErrorCallbackscrapedCallbacks []ScrapedCallback
? ? ? ? 以requestCallbacks為例,Colly提供了OnRequest方法用于注冊(cè)回調(diào)。由于這些回調(diào)函數(shù)通過切片保存,所以可以多次調(diào)用注冊(cè)方法。(即不是覆蓋之前的注冊(cè)回調(diào))
// OnRequest registers a function. Function will be executed on every
// request made by the Collector
func (c *Collector) OnRequest(f RequestCallback) {c.lock.Lock()if c.requestCallbacks == nil {c.requestCallbacks = make([]RequestCallback, 0, 4)}c.requestCallbacks = append(c.requestCallbacks, f)c.lock.Unlock()
}
? ? ? ? 用戶則可以使用下面方法進(jìn)行注冊(cè)
// Before making a request print "Visiting ..."c.OnRequest(func(r *colly.Request) {fmt.Println("Visiting", r.URL.String())})
? ? ? ? 這些回調(diào)會(huì)被在handleOnXXXX類型的函數(shù)中被調(diào)用。調(diào)用的順序和注冊(cè)的順序一致。
func (c *Collector) handleOnResponse(r *Response) {if c.debugger != nil {c.debugger.Event(createEvent("response", r.Request.ID, c.ID, map[string]string{"url": r.Request.URL.String(),"status": http.StatusText(r.StatusCode),}))}for _, f := range c.responseCallbacks {f(r)}
}
? ? ? ? 每次調(diào)用fetch方法都會(huì)構(gòu)建一個(gè)全新Request結(jié)構(gòu)。
// fetch method
func (c *Collector) fetch(u, method string, depth int, requestData io.Reader, ctx *Context, hdr http.Header, req *http.Request) error {defer c.wg.Done()if ctx == nil {ctx = NewContext()}request := &Request{URL: req.URL,Headers: &req.Header,Ctx: ctx,Depth: depth,Method: method,Body: requestData,collector: c,ID: atomic.AddUint32(&c.requestCount, 1),}
? ? ? ? 這兒注意一下3~5行ctx(上下文)的構(gòu)建邏輯。如果傳入的ctx為nil,則構(gòu)建一個(gè)新的,否則使用老的。這就意味著Request結(jié)構(gòu)體(以及之后出現(xiàn)的Response結(jié)構(gòu)體)中的ctx可以是每次調(diào)用fetch時(shí)全新產(chǎn)生的,也可以是各個(gè)Request公用的。我們回溯下ctx的調(diào)用棧,發(fā)現(xiàn)只有func (c *Collector) Request(……)方法使用的不是nil
func (c *Collector) Request(method, URL string, requestData io.Reader, ctx *Context, hdr http.Header) error {return c.scrape(URL, method, 1, requestData, ctx, hdr, true)
}
? ? ? ? 這也就意味著,調(diào)用Visit、Post、PostRaw和PostMultipart方法在每次調(diào)用fetch時(shí)都會(huì)產(chǎn)生一個(gè)新的上下文。
? ? ? ? 由于Context存在被多個(gè)goroutine共享訪問的可能性,所以其定義了讀寫鎖進(jìn)行保護(hù)
type Context struct {contextMap map[string]interface{}lock *sync.RWMutex
}
? ? ? ? 再回到fetch方法。數(shù)據(jù)填充完畢后,就提供了一次給用戶干預(yù)之后流程的機(jī)會(huì)
// fetch methodc.handleOnRequest(request)if request.abort {return nil}
? ? ? ? 之前我們講解過,handleOnRequest調(diào)用的是用戶通過OnRequest注冊(cè)個(gè)所有回調(diào)函數(shù)。如果用戶在該回調(diào)中調(diào)用了下面方法,則之后的流程都不走了。
// Abort cancels the HTTP request when called in an OnRequest callback
func (r *Request) Abort() {r.abort = true
}
? ? ? ? 如果用戶沒用終止執(zhí)行,則開始發(fā)送請(qǐng)求
// fetch methodif method == "POST" && req.Header.Get("Content-Type") == "" {req.Header.Add("Content-Type", "application/x-www-form-urlencoded")}if req.Header.Get("Accept") == "" {req.Header.Set("Accept", "*/*")}origURL := req.URLresponse, err := c.backend.Cache(req, c.MaxBodySize, c.CacheDir)
? ? ? ? 對(duì)于這次請(qǐng)求,不管是否出錯(cuò)都會(huì)觸發(fā)用戶定義的Error回調(diào)
// fetch methodif err := c.handleOnError(response, err, request, ctx); err != nil {return err}
? ? ? ? 在handleOnError函數(shù)中,回調(diào)函數(shù)會(huì)接收到err原因,所以用戶自定義的錯(cuò)誤處理函數(shù)需要通過該值來做區(qū)分。
for _, f := range c.errorCallbacks {f(response, err)}return err
? ? ? ? 正常請(qǐng)求后,fetch會(huì)使用ctx和修復(fù)后的request填充到response中
// fetch methodif req.URL != origURL {request.URL = req.URLrequest.Headers = &req.Header}if proxyURL, ok := req.Context().Value(ProxyURLKey).(string); ok {request.ProxyURL = proxyURL}atomic.AddUint32(&c.responseCount, 1)response.Ctx = ctxresponse.Request = requesterr = response.fixCharset(c.DetectCharset, request.ResponseCharacterEncoding)if err != nil {return err}
? ? ? ? 最后在一系列調(diào)用用戶回調(diào)中結(jié)束fetch
// fetch methodc.handleOnResponse(response)err = c.handleOnHTML(response)if err != nil {c.handleOnError(response, err, request, ctx)}err = c.handleOnXML(response)if err != nil {c.handleOnError(response, err, request, ctx)}c.handleOnScraped(response)return err
}
?
總結(jié)
以上是生活随笔為你收集整理的Colly源码解析——框架的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 并行计算——OpenMP加速矩阵相乘
- 下一篇: Colly源码解析——结合例子分析底层实