mirror of
https://github.com/terorie/od-database-crawler.git
synced 2025-04-19 18:36:43 +00:00
Use fasthttp.PipelineClient
This commit is contained in:
parent
b18b70f798
commit
d69cd4400e
@ -194,10 +194,6 @@ func readConfig() {
|
|||||||
|
|
||||||
config.UserAgent = viper.GetString(ConfUserAgent)
|
config.UserAgent = viper.GetString(ConfUserAgent)
|
||||||
|
|
||||||
setDialTimeout(viper.GetDuration(ConfDialTimeout))
|
|
||||||
|
|
||||||
setTimeout(viper.GetDuration(ConfTimeout))
|
|
||||||
|
|
||||||
config.JobBufferSize = viper.GetInt(ConfJobBufferSize)
|
config.JobBufferSize = viper.GetInt(ConfJobBufferSize)
|
||||||
|
|
||||||
config.Verbose = viper.GetBool(ConfVerbose)
|
config.Verbose = viper.GetBool(ConfVerbose)
|
||||||
|
50
crawl.go
50
crawl.go
@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"github.com/spf13/viper"
|
||||||
"github.com/terorie/od-database-crawler/ds/redblackhash"
|
"github.com/terorie/od-database-crawler/ds/redblackhash"
|
||||||
"github.com/terorie/od-database-crawler/fasturl"
|
"github.com/terorie/od-database-crawler/fasturl"
|
||||||
"github.com/valyala/fasthttp"
|
"github.com/valyala/fasthttp"
|
||||||
@ -15,24 +16,33 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var client = fasthttp.Client {
|
var tlsConfig = tls.Config {
|
||||||
TLSConfig: &tls.Config{
|
InsecureSkipVerify: true,
|
||||||
InsecureSkipVerify: true,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func setDialTimeout(d time.Duration) {
|
func newHTTPClient(url *fasturl.URL) *fasthttp.PipelineClient {
|
||||||
client.Dial = func(addr string) (net.Conn, error) {
|
var isTLS bool
|
||||||
return fasthttp.DialTimeout(addr, d)
|
switch url.Scheme {
|
||||||
|
case fasturl.SchemeHTTP:
|
||||||
|
isTLS = false
|
||||||
|
case fasturl.SchemeHTTPS:
|
||||||
|
isTLS = true
|
||||||
|
}
|
||||||
|
|
||||||
|
return &fasthttp.PipelineClient {
|
||||||
|
MaxConns: viper.GetInt(ConfWorkers),
|
||||||
|
Addr: url.Host,
|
||||||
|
IsTLS: isTLS,
|
||||||
|
TLSConfig: &tlsConfig,
|
||||||
|
ReadTimeout: viper.GetDuration(ConfTimeout),
|
||||||
|
WriteTimeout: viper.GetDuration(ConfTimeout) / 2,
|
||||||
|
Dial: func(addr string) (conn net.Conn, e error) {
|
||||||
|
return fasthttp.DialTimeout(addr, viper.GetDuration(ConfDialTimeout))
|
||||||
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func setTimeout(d time.Duration) {
|
func (w *WorkerContext) GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
||||||
client.ReadTimeout = d
|
|
||||||
client.WriteTimeout = d / 2
|
|
||||||
}
|
|
||||||
|
|
||||||
func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
|
||||||
f.IsDir = true
|
f.IsDir = true
|
||||||
f.Name = path.Base(j.Uri.Path)
|
f.Name = path.Base(j.Uri.Path)
|
||||||
|
|
||||||
@ -45,7 +55,7 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
|||||||
res := fasthttp.AcquireResponse()
|
res := fasthttp.AcquireResponse()
|
||||||
defer fasthttp.ReleaseResponse(res)
|
defer fasthttp.ReleaseResponse(res)
|
||||||
|
|
||||||
err = client.Do(req, res)
|
err = w.client.Do(req, res)
|
||||||
fasthttp.ReleaseRequest(req)
|
fasthttp.ReleaseRequest(req)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -123,6 +133,14 @@ func ParseDir(body []byte, baseUri *fasturl.URL) (links []fasturl.URL, err error
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if strings.HasSuffix(link.Path, ".php") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if strings.Contains(link.Path, "/cgi-bin/") {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
links = append(links, link)
|
links = append(links, link)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -131,7 +149,7 @@ func ParseDir(body []byte, baseUri *fasturl.URL) (links []fasturl.URL, err error
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetFile(u fasturl.URL, f *File) (err error) {
|
func (w *WorkerContext) GetFile(u fasturl.URL, f *File) (err error) {
|
||||||
f.IsDir = false
|
f.IsDir = false
|
||||||
u.Path = path.Clean(u.Path)
|
u.Path = path.Clean(u.Path)
|
||||||
f.Name = path.Base(u.Path)
|
f.Name = path.Base(u.Path)
|
||||||
@ -148,7 +166,7 @@ func GetFile(u fasturl.URL, f *File) (err error) {
|
|||||||
res.SkipBody = true
|
res.SkipBody = true
|
||||||
defer fasthttp.ReleaseResponse(res)
|
defer fasthttp.ReleaseResponse(res)
|
||||||
|
|
||||||
err = client.Do(req, res)
|
err = w.client.Do(req, res)
|
||||||
fasthttp.ReleaseRequest(req)
|
fasthttp.ReleaseRequest(req)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
2
main.go
2
main.go
@ -35,7 +35,7 @@ var serverCmd = cobra.Command {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var crawlCmd = cobra.Command {
|
var crawlCmd = cobra.Command {
|
||||||
Use: "crawl",
|
Use: "crawl <url>",
|
||||||
Short: "Crawl an URL",
|
Short: "Crawl an URL",
|
||||||
Long: "Crawl the URL specified.\n" +
|
Long: "Crawl the URL specified.\n" +
|
||||||
"Results will not be uploaded to the database,\n" +
|
"Results will not be uploaded to the database,\n" +
|
||||||
|
@ -25,6 +25,9 @@ func Schedule(c context.Context, remotes <-chan *OD) {
|
|||||||
logrus.WithField("url", remote.BaseUri.String()).
|
logrus.WithField("url", remote.BaseUri.String()).
|
||||||
Info("Starting crawler")
|
Info("Starting crawler")
|
||||||
|
|
||||||
|
// Create HTTP client
|
||||||
|
remote.WCtx.Prepare()
|
||||||
|
|
||||||
// Collect results
|
// Collect results
|
||||||
results := make(chan File)
|
results := make(chan File)
|
||||||
|
|
||||||
|
10
worker.go
10
worker.go
@ -3,6 +3,7 @@ package main
|
|||||||
import (
|
import (
|
||||||
"github.com/beeker1121/goque"
|
"github.com/beeker1121/goque"
|
||||||
"github.com/sirupsen/logrus"
|
"github.com/sirupsen/logrus"
|
||||||
|
"github.com/valyala/fasthttp"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
@ -18,6 +19,11 @@ type WorkerContext struct {
|
|||||||
Queue *BufferedQueue
|
Queue *BufferedQueue
|
||||||
lastRateLimit time.Time
|
lastRateLimit time.Time
|
||||||
numRateLimits int
|
numRateLimits int
|
||||||
|
client *fasthttp.PipelineClient
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WorkerContext) Prepare() {
|
||||||
|
w.client = newHTTPClient(&w.OD.BaseUri)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkerContext) Worker(results chan<- File) {
|
func (w *WorkerContext) Worker(results chan<- File) {
|
||||||
@ -91,7 +97,7 @@ func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
|
|||||||
if len(job.Uri.Path) == 0 { return }
|
if len(job.Uri.Path) == 0 { return }
|
||||||
if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
|
if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
|
||||||
// Load directory
|
// Load directory
|
||||||
links, err := GetDir(job, f)
|
links, err := w.GetDir(job, f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !isErrSilent(err) {
|
if !isErrSilent(err) {
|
||||||
logrus.WithError(err).
|
logrus.WithError(err).
|
||||||
@ -141,7 +147,7 @@ func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// Load file
|
// Load file
|
||||||
err := GetFile(job.Uri, f)
|
err := w.GetFile(job.Uri, f)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if !isErrSilent(err) {
|
if !isErrSilent(err) {
|
||||||
logrus.WithError(err).
|
logrus.WithError(err).
|
||||||
|
Loading…
x
Reference in New Issue
Block a user