mirror of
https://github.com/terorie/od-database-crawler.git
synced 2025-12-13 15:19:03 +00:00
Compare commits
8 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9bc3455ee0 | ||
|
|
c72f4ba475 | ||
|
|
d69cd4400e | ||
|
|
b18b70f798 | ||
|
|
9d5f549774 | ||
|
|
5239af08f7 | ||
|
|
46c0e0bd32 | ||
|
|
0ca6deede8 |
25
config.go
25
config.go
@@ -7,7 +7,6 @@ import (
|
||||
"github.com/spf13/viper"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
@@ -135,17 +134,23 @@ func prepareConfig() {
|
||||
|
||||
func readConfig() {
|
||||
// If config.yml in working dir, use it
|
||||
if _, err := os.Stat("config.yml"); err == nil {
|
||||
configFile = "config.yml"
|
||||
if configFile == "" {
|
||||
_, err := os.Stat("config.yml")
|
||||
if err == nil {
|
||||
configFile = "config.yml"
|
||||
}
|
||||
}
|
||||
|
||||
if configFile != "" {
|
||||
var err error
|
||||
confPath, err := filepath.Abs(configFile)
|
||||
if err != nil { panic(err) }
|
||||
confF, err := os.Open(configFile)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
}
|
||||
defer confF.Close()
|
||||
|
||||
viper.SetConfigFile(confPath)
|
||||
err = viper.ReadInConfig()
|
||||
viper.SetConfigType("yml")
|
||||
err = viper.ReadConfig(confF)
|
||||
if err != nil {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
os.Exit(1)
|
||||
@@ -189,10 +194,6 @@ func readConfig() {
|
||||
|
||||
config.UserAgent = viper.GetString(ConfUserAgent)
|
||||
|
||||
setDialTimeout(viper.GetDuration(ConfDialTimeout))
|
||||
|
||||
setTimeout(viper.GetDuration(ConfTimeout))
|
||||
|
||||
config.JobBufferSize = viper.GetInt(ConfJobBufferSize)
|
||||
|
||||
config.Verbose = viper.GetBool(ConfVerbose)
|
||||
|
||||
52
crawl.go
52
crawl.go
@@ -3,6 +3,7 @@ package main
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/terorie/od-database-crawler/ds/redblackhash"
|
||||
"github.com/terorie/od-database-crawler/fasturl"
|
||||
"github.com/valyala/fasthttp"
|
||||
@@ -15,24 +16,35 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var client = fasthttp.Client {
|
||||
TLSConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
},
|
||||
var tlsConfig = tls.Config {
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
|
||||
func setDialTimeout(d time.Duration) {
|
||||
client.Dial = func(addr string) (net.Conn, error) {
|
||||
return fasthttp.DialTimeout(addr, d)
|
||||
func newHTTPClient(url fasturl.URL) *fasthttp.PipelineClient {
|
||||
var isTLS bool
|
||||
switch url.Scheme {
|
||||
case fasturl.SchemeHTTP:
|
||||
isTLS = false
|
||||
case fasturl.SchemeHTTPS:
|
||||
isTLS = true
|
||||
}
|
||||
|
||||
url.AddDefaultPort()
|
||||
|
||||
return &fasthttp.PipelineClient {
|
||||
MaxConns: viper.GetInt(ConfWorkers),
|
||||
Addr: url.Host,
|
||||
IsTLS: isTLS,
|
||||
TLSConfig: &tlsConfig,
|
||||
ReadTimeout: viper.GetDuration(ConfTimeout),
|
||||
WriteTimeout: viper.GetDuration(ConfTimeout) / 2,
|
||||
Dial: func(addr string) (conn net.Conn, e error) {
|
||||
return fasthttp.DialTimeout(addr, viper.GetDuration(ConfDialTimeout))
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func setTimeout(d time.Duration) {
|
||||
client.ReadTimeout = d
|
||||
client.WriteTimeout = d / 2
|
||||
}
|
||||
|
||||
func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
||||
func (w *WorkerContext) GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
||||
f.IsDir = true
|
||||
f.Name = path.Base(j.Uri.Path)
|
||||
|
||||
@@ -45,7 +57,7 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
|
||||
res := fasthttp.AcquireResponse()
|
||||
defer fasthttp.ReleaseResponse(res)
|
||||
|
||||
err = client.Do(req, res)
|
||||
err = w.client.Do(req, res)
|
||||
fasthttp.ReleaseRequest(req)
|
||||
|
||||
if err != nil {
|
||||
@@ -123,6 +135,14 @@ func ParseDir(body []byte, baseUri *fasturl.URL) (links []fasturl.URL, err error
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.HasSuffix(link.Path, ".php") {
|
||||
continue
|
||||
}
|
||||
|
||||
if strings.Contains(link.Path, "/cgi-bin/") {
|
||||
continue
|
||||
}
|
||||
|
||||
links = append(links, link)
|
||||
}
|
||||
}
|
||||
@@ -131,7 +151,7 @@ func ParseDir(body []byte, baseUri *fasturl.URL) (links []fasturl.URL, err error
|
||||
return
|
||||
}
|
||||
|
||||
func GetFile(u fasturl.URL, f *File) (err error) {
|
||||
func (w *WorkerContext) GetFile(u fasturl.URL, f *File) (err error) {
|
||||
f.IsDir = false
|
||||
u.Path = path.Clean(u.Path)
|
||||
f.Name = path.Base(u.Path)
|
||||
@@ -148,7 +168,7 @@ func GetFile(u fasturl.URL, f *File) (err error) {
|
||||
res.SkipBody = true
|
||||
defer fasthttp.ReleaseResponse(res)
|
||||
|
||||
err = client.Do(req, res)
|
||||
err = w.client.Do(req, res)
|
||||
fasthttp.ReleaseRequest(req)
|
||||
|
||||
if err != nil {
|
||||
|
||||
28
errors.go
28
errors.go
@@ -3,6 +3,8 @@ package main
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/valyala/fasthttp"
|
||||
"net"
|
||||
)
|
||||
|
||||
var ErrRateLimit = errors.New("too many requests")
|
||||
@@ -15,3 +17,29 @@ type HttpError struct {
|
||||
func (e HttpError) Error() string {
|
||||
return fmt.Sprintf("http status %d", e.code)
|
||||
}
|
||||
|
||||
func shouldRetry(err error) bool {
|
||||
// HTTP errors
|
||||
if httpErr, ok := err.(*HttpError); ok {
|
||||
switch httpErr.code {
|
||||
case fasthttp.StatusTooManyRequests:
|
||||
return true
|
||||
default:
|
||||
// Don't retry HTTP error codes
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if dnsError, ok := err.(*net.DNSError); ok {
|
||||
// Don't retry permanent DNS errors
|
||||
return dnsError.IsTemporary
|
||||
}
|
||||
|
||||
if netErr, ok := err.(*net.OpError); ok {
|
||||
// Don't retry permanent network errors
|
||||
return netErr.Temporary()
|
||||
}
|
||||
|
||||
// Retry by default
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -562,6 +562,20 @@ func validOptionalPort(port string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// TODO Check if RFC-compliant (99% sure not)
|
||||
func (u *URL) AddDefaultPort() {
|
||||
if strings.ContainsRune(u.Host, ':') {
|
||||
return
|
||||
}
|
||||
|
||||
switch u.Scheme {
|
||||
case SchemeHTTP:
|
||||
u.Host += ":80"
|
||||
case SchemeHTTPS:
|
||||
u.Host += ":443"
|
||||
}
|
||||
}
|
||||
|
||||
// String reassembles the URL into a valid URL string.
|
||||
// The general form of the result is one of:
|
||||
//
|
||||
|
||||
4
main.go
4
main.go
@@ -17,7 +17,7 @@ var configFile string
|
||||
|
||||
var rootCmd = cobra.Command {
|
||||
Use: "od-database-crawler",
|
||||
Version: "1.2.0",
|
||||
Version: "1.2.1",
|
||||
Short: "OD-Database Go crawler",
|
||||
Long: helpText,
|
||||
PersistentPreRunE: preRun,
|
||||
@@ -35,7 +35,7 @@ var serverCmd = cobra.Command {
|
||||
}
|
||||
|
||||
var crawlCmd = cobra.Command {
|
||||
Use: "crawl",
|
||||
Use: "crawl <url>",
|
||||
Short: "Crawl an URL",
|
||||
Long: "Crawl the URL specified.\n" +
|
||||
"Results will not be uploaded to the database,\n" +
|
||||
|
||||
@@ -22,14 +22,16 @@ func Schedule(c context.Context, remotes <-chan *OD) {
|
||||
go Stats(c)
|
||||
|
||||
for remote := range remotes {
|
||||
// Create HTTP client
|
||||
remote.WCtx.OD = remote
|
||||
remote.WCtx.Prepare()
|
||||
|
||||
logrus.WithField("url", remote.BaseUri.String()).
|
||||
Info("Starting crawler")
|
||||
|
||||
// Collect results
|
||||
results := make(chan File)
|
||||
|
||||
remote.WCtx.OD = remote
|
||||
|
||||
// Get queue path
|
||||
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
|
||||
|
||||
|
||||
10
server.go
10
server.go
@@ -17,8 +17,11 @@ import (
|
||||
|
||||
var serverClient = http.Client {
|
||||
Timeout: config.ServerTimeout,
|
||||
Transport: new(ServerTripper),
|
||||
}
|
||||
|
||||
var serverUserAgent = "od-database-crawler/" + rootCmd.Version
|
||||
|
||||
func FetchTask() (t *Task, err error) {
|
||||
res, err := serverClient.PostForm(
|
||||
config.ServerUrl + "/task/get",
|
||||
@@ -176,3 +179,10 @@ func CancelTask(websiteId uint64) (err error) {
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
type ServerTripper struct{}
|
||||
|
||||
func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
|
||||
req.Header.Set("User-Agent", serverUserAgent)
|
||||
return http.DefaultTransport.RoundTrip(req)
|
||||
}
|
||||
|
||||
23
worker.go
23
worker.go
@@ -19,6 +19,11 @@ type WorkerContext struct {
|
||||
Queue *BufferedQueue
|
||||
lastRateLimit time.Time
|
||||
numRateLimits int
|
||||
client *fasthttp.PipelineClient
|
||||
}
|
||||
|
||||
func (w *WorkerContext) Prepare() {
|
||||
w.client = newHTTPClient(w.OD.BaseUri)
|
||||
}
|
||||
|
||||
func (w *WorkerContext) Worker(results chan<- File) {
|
||||
@@ -55,14 +60,12 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
|
||||
if err != nil {
|
||||
job.Fails++
|
||||
|
||||
if httpErr, ok := err.(*HttpError); ok {
|
||||
switch httpErr.code {
|
||||
case fasthttp.StatusTooManyRequests:
|
||||
err = ErrRateLimit
|
||||
default:
|
||||
// Don't retry HTTP error codes
|
||||
return
|
||||
}
|
||||
if !shouldRetry(err) {
|
||||
atomic.AddUint64(&totalAborted, 1)
|
||||
logrus.WithField("url", job.UriStr).
|
||||
WithError(err).
|
||||
Error("Giving up after failure")
|
||||
return
|
||||
}
|
||||
|
||||
if job.Fails > config.Retries {
|
||||
@@ -94,7 +97,7 @@ func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
|
||||
if len(job.Uri.Path) == 0 { return }
|
||||
if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
|
||||
// Load directory
|
||||
links, err := GetDir(job, f)
|
||||
links, err := w.GetDir(job, f)
|
||||
if err != nil {
|
||||
if !isErrSilent(err) {
|
||||
logrus.WithError(err).
|
||||
@@ -144,7 +147,7 @@ func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
|
||||
}
|
||||
} else {
|
||||
// Load file
|
||||
err := GetFile(job.Uri, f)
|
||||
err := w.GetFile(job.Uri, f)
|
||||
if err != nil {
|
||||
if !isErrSilent(err) {
|
||||
logrus.WithError(err).
|
||||
|
||||
Reference in New Issue
Block a user