31 Commits

Author SHA1 Message Date
Richard Patel
8ed2cf3b93 Bump to v1.0.1 2018-11-18 14:49:07 +01:00
Richard Patel
f3620262fc Add log file support 2018-11-18 14:46:52 +01:00
Richard Patel
dc4e4212a0 Add freebsd to release.sh 2018-11-18 14:38:18 +01:00
Richard Patel
6e6a4edd27 Ignore all HTTP errors 2018-11-18 14:25:06 +01:00
Richard Patel
a71157b4d8 Add User-Agent parameter 2018-11-18 14:24:04 +01:00
Richard Patel
6dbec8c789 Add release script 2018-11-18 02:36:22 +01:00
Richard Patel
605f6db5a5 Don't call /task/upload for websites with no results 2018-11-18 01:42:57 +01:00
Richard Patel
d593ba2d0b Bump to 1.0 2018-11-18 00:54:58 +01:00
Richard Patel
6793086c22 Ignore HTTPS errors 2018-11-18 00:37:30 +01:00
Richard Patel
4464f34779 Add recheck and timeout parameters 2018-11-18 00:29:29 +01:00
Richard Patel
339175220d Refactor uploading & chunk size parameter 2018-11-18 00:19:43 +01:00
Richard Patel
1e6687c519 Upload result ignoring errors 2018-11-17 15:04:20 +01:00
Richard Patel
8060556089 Fix: make crawled dir 2018-11-17 13:36:35 +01:00
Richard Patel
73ba848e17 Grammar 2018-11-17 13:35:29 +01:00
Richard Patel
115983f70e Silent HTTP errors 2018-11-17 13:22:46 +01:00
Richard Patel
9210996b4c Fix multiple part file upload 2018-11-17 12:52:24 +01:00
Richard Patel
7b29da9340 Fix file uploads 2018-11-17 12:47:16 +01:00
Richard Patel
24ee6fcba2 Quickfix: Revert FTP give back 2018-11-17 12:43:30 +01:00
Richard Patel
bfb18d62b2 mini fix 2018-11-17 05:27:09 +01:00
Richard Patel
f4054441ab Return FTP tasks 2018-11-17 05:07:52 +01:00
Richard Patel
f8d2bf386d Fix FTP error ignore 2018-11-17 04:57:19 +01:00
Richard Patel
f41198b00c Ignore FTP URLs 2018-11-17 04:50:59 +01:00
Richard Patel
7fdffff58f Update config.yml 2018-11-17 04:19:04 +01:00
Richard Patel
d596882b40 Fix ton of bugs 2018-11-17 04:18:22 +01:00
Richard Patel
0fe97a8058 Update README.md 2018-11-17 01:36:07 +01:00
Richard Patel
718f9d7fbc Rename project 2018-11-17 01:33:15 +01:00
Richard Patel
f1687679ab Unescape results & don't recrawl 404 2018-11-17 01:21:20 +01:00
Richard Patel
145d37f84a Fix wait, add back crawl command 2018-11-17 00:49:09 +01:00
Richard Patel
cc777bcaeb redblackhash: Use bytes.Compare 2018-11-16 21:17:39 +01:00
Simon
1e78cea7e7 Saved path should not contain file name 2018-11-16 13:58:12 -05:00
Richard Patel
3f85cf679b Getting tasks 2018-11-16 04:47:08 +01:00
18 changed files with 633 additions and 294 deletions

BIN
.github/stress.png vendored

Binary file not shown.

Before

Width:  |  Height:  |  Size: 369 KiB

2
.gitignore vendored
View File

@@ -1,3 +1,3 @@
/.idea/ /.idea/
.DS_Store .DS_Store
/oddb-go /od-database-crawler

View File

@@ -1,12 +1,7 @@
# oddb Go crawler 🚀 # od-database Go crawler 🚀
> by terorie 2018 :P > by terorie 2018 :P
* Crawler for [__OD-Database__](https://github.com/simon987/od-database)
* Crawls HTTP open directories (standard Web Server Listings) * Crawls HTTP open directories (standard Web Server Listings)
* Gets name, path, size and modification time of all files * Gets name, path, size and modification time of all files
* Soon: Will work as a crawler for [OD-Database](https://github.com/simon987/od-database)! * Lightweight and fast: __over 9000 requests per second__ on a standard laptop
Stress test crawling [pandoradir](https://github.com/terorie/pandoradir)
on an average laptop (~10K requests per second, 4 connections):
![image](.github/stress.png)
Memory usage is being optimized :P

View File

@@ -1,45 +1,64 @@
package main package main
import ( import (
"bufio"
"fmt" "fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper" "github.com/spf13/viper"
"io"
"os" "os"
"strings"
"time" "time"
) )
var config struct { var config struct {
ServerUrl string ServerUrl string
Token string Token string
ServerTimeout time.Duration
Recheck time.Duration
ChunkSize int64
Retries int Retries int
Workers int Workers int
UserAgent string
Timeout time.Duration Timeout time.Duration
Tasks int32 Tasks int32
CrawlStats time.Duration CrawlStats time.Duration
AllocStats time.Duration AllocStats time.Duration
Verbose bool Verbose bool
PrintHTTP bool
} }
const ( const (
ConfServerUrl = "server.url" ConfServerUrl = "server.url"
ConfToken = "server.token" ConfToken = "server.token"
ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck"
ConfChunkSize = "server.upload_chunk"
ConfTasks = "crawl.tasks" ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries" ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections" ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent"
ConfTimeout = "crawl.timeout" ConfTimeout = "crawl.timeout"
ConfCrawlStats = "output.crawl_stats" ConfCrawlStats = "output.crawl_stats"
ConfAllocStats = "output.resource_stats" ConfAllocStats = "output.resource_stats"
ConfVerbose = "output.verbose" ConfVerbose = "output.verbose"
ConfPrintHTTP = "output.http"
ConfLogFile = "output.log"
) )
func prepareConfig() { func prepareConfig() {
viper.SetDefault(ConfRetries, 5) viper.SetDefault(ConfRetries, 5)
viper.SetDefault(ConfWorkers, 2) viper.SetDefault(ConfWorkers, 2)
viper.SetDefault(ConfTasks, 3) viper.SetDefault(ConfTasks, 3)
viper.SetDefault(ConfUserAgent, "")
viper.SetDefault(ConfTimeout, 10 * time.Second) viper.SetDefault(ConfTimeout, 10 * time.Second)
viper.SetDefault(ConfCrawlStats, 3 * time.Second) viper.SetDefault(ConfCrawlStats, 3 * time.Second)
viper.SetDefault(ConfAllocStats, 0) viper.SetDefault(ConfAllocStats, 0)
viper.SetDefault(ConfVerbose, false) viper.SetDefault(ConfVerbose, false)
viper.SetDefault(ConfPrintHTTP, false)
viper.SetDefault(ConfLogFile, "")
viper.SetDefault(ConfRecheck, 3 * time.Second)
viper.SetDefault(ConfChunkSize, "1 MB")
} }
func readConfig() { func readConfig() {
@@ -52,14 +71,24 @@ func readConfig() {
} }
config.ServerUrl = viper.GetString(ConfServerUrl) config.ServerUrl = viper.GetString(ConfServerUrl)
//if config.ServerUrl == "" { if config.ServerUrl == "" {
// configMissing(ConfServerUrl) configMissing(ConfServerUrl)
//} }
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
config.Token = viper.GetString(ConfToken) config.Token = viper.GetString(ConfToken)
//if config.Token == "" { if config.Token == "" {
// configMissing(ConfToken) configMissing(ConfToken)
//} }
config.ServerTimeout = viper.GetDuration(ConfServerTimeout)
config.Recheck = viper.GetDuration(ConfRecheck)
config.ChunkSize = int64(viper.GetSizeInBytes(ConfChunkSize))
if config.ChunkSize < 100 {
configOOB(ConfChunkSize, config.ChunkSize)
}
config.Retries = viper.GetInt(ConfRetries) config.Retries = viper.GetInt(ConfRetries)
if config.Retries < 0 { if config.Retries < 0 {
@@ -76,6 +105,8 @@ func readConfig() {
configOOB(ConfTasks, int(config.Tasks)) configOOB(ConfTasks, int(config.Tasks))
} }
config.UserAgent = viper.GetString(ConfUserAgent)
config.Timeout = viper.GetDuration(ConfTimeout) config.Timeout = viper.GetDuration(ConfTimeout)
config.CrawlStats = viper.GetDuration(ConfCrawlStats) config.CrawlStats = viper.GetDuration(ConfCrawlStats)
@@ -86,6 +117,19 @@ func readConfig() {
if config.Verbose { if config.Verbose {
logrus.SetLevel(logrus.DebugLevel) logrus.SetLevel(logrus.DebugLevel)
} }
if filePath := viper.GetString(ConfLogFile); filePath != "" {
f, err := os.OpenFile(filePath, os.O_CREATE | os.O_WRONLY | os.O_APPEND, 0644)
bufWriter := bufio.NewWriter(f)
if err != nil { panic(err) }
exitHooks.Add(func() {
bufWriter.Flush()
f.Close()
})
logrus.SetOutput(io.MultiWriter(os.Stdout, bufWriter))
}
config.PrintHTTP = viper.GetBool(ConfPrintHTTP)
} }
func configMissing(key string) { func configMissing(key string) {
@@ -93,7 +137,7 @@ func configMissing(key string) {
os.Exit(1) os.Exit(1)
} }
func configOOB(key string, v int) { func configOOB(key string, v interface{}) {
fmt.Fprintf(os.Stderr, "config: illegal value %d for %key!\n", v, key) fmt.Fprintf(os.Stderr, "config: illegal value %v for key %s!\n", v, key)
os.Exit(1) os.Exit(1)
} }

View File

@@ -1,28 +1,60 @@
# OD-Database server settings # OD-Database server settings
server: server:
# Connection URL # Connection URL
url: localhost:6969 url: http://od-db.mine.terorie.com/api
# Server auth token # Server auth token
token: token:
# Request timeout
timeout: 60s
# Recheck interval
# The crawler periodically asks the server
# for new jobs. Sets the minimum wait time
# between /task/get requests to the server.
recheck: 1s
# Upload chunk size
# If the value is too high, the upload fails.
upload_chunk: 1 MB
# Log output settings # Log output settings
output: output:
# Crawl statistics # Crawl statistics
crawl_stats: 1s crawl_stats: 1s
# CPU/RAM/Job queue stats # CPU/RAM/Job queue stats
resource_stats: 1s resource_stats: 10s
# More output? (Every listed dir) # More output? (Every listed dir)
verbose: false verbose: false
# Print HTTP errors (Super spammy)
http: false
# Log file
# If empty, no log file is created.
log: crawler.log
# Crawler settings # Crawler settings
crawl: crawl:
# Number of sites that can be # Number of sites that can be processed at once
# processed at once tasks: 100
tasks: 3
# Number of connections per site # Number of connections per site
connections: 2 # Please be careful with this setting!
# The crawler fires fast and more than
# ten connections can overwhelm a server.
connections: 10
# How often to retry getting data # How often to retry getting data
# from the site before giving up # from the site before giving up
retries: 5 retries: 5
# Time before discarding a network request # Time before discarding a network request
timeout: 10s timeout: 10s
# Crawler User-Agent
# If empty, no User-Agent header is sent.
user-agent: "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0"

View File

@@ -2,9 +2,9 @@ package main
import ( import (
"bytes" "bytes"
"fmt" "crypto/tls"
"github.com/terorie/oddb-go/ds/redblackhash" "github.com/terorie/od-database-crawler/ds/redblackhash"
"github.com/terorie/oddb-go/fasturl" "github.com/terorie/od-database-crawler/fasturl"
"github.com/valyala/fasthttp" "github.com/valyala/fasthttp"
"golang.org/x/crypto/blake2b" "golang.org/x/crypto/blake2b"
"golang.org/x/net/html" "golang.org/x/net/html"
@@ -14,13 +14,20 @@ import (
"time" "time"
) )
var client fasthttp.Client var client = fasthttp.Client {
TLSConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
func GetDir(j *Job, f *File) (links []fasturl.URL, err error) { func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
f.IsDir = true f.IsDir = true
f.Name = path.Base(j.Uri.Path) f.Name = path.Base(j.Uri.Path)
req := fasthttp.AcquireRequest() req := fasthttp.AcquireRequest()
if config.UserAgent != "" {
req.Header.SetUserAgent(config.UserAgent)
}
req.SetRequestURI(j.UriStr) req.SetRequestURI(j.UriStr)
res := fasthttp.AcquireResponse() res := fasthttp.AcquireResponse()
@@ -29,10 +36,14 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
err = client.DoTimeout(req, res, config.Timeout) err = client.DoTimeout(req, res, config.Timeout)
fasthttp.ReleaseRequest(req) fasthttp.ReleaseRequest(req)
if err != nil { return } if err != nil {
return
}
err = checkStatusCode(res.StatusCode()) err = checkStatusCode(res.StatusCode())
if err != nil { return } if err != nil {
return
}
body := res.Body() body := res.Body()
doc := html.NewTokenizer(bytes.NewReader(body)) doc := html.NewTokenizer(bytes.NewReader(body))
@@ -83,7 +94,9 @@ func GetDir(j *Job, f *File) (links []fasturl.URL, err error) {
var link fasturl.URL var link fasturl.URL
err = j.Uri.ParseRel(&link, href) err = j.Uri.ParseRel(&link, href)
if err != nil { continue } if err != nil {
continue
}
if link.Scheme != j.Uri.Scheme || if link.Scheme != j.Uri.Scheme ||
link.Host != j.Uri.Host || link.Host != j.Uri.Host ||
@@ -106,10 +119,13 @@ func GetFile(u fasturl.URL, f *File) (err error) {
f.IsDir = false f.IsDir = false
u.Path = path.Clean(u.Path) u.Path = path.Clean(u.Path)
f.Name = path.Base(u.Path) f.Name = path.Base(u.Path)
f.Path = strings.Trim(u.Path, "/") f.Path = strings.Trim(path.Dir(u.Path), "/")
req := fasthttp.AcquireRequest() req := fasthttp.AcquireRequest()
req.Header.SetMethod("HEAD") req.Header.SetMethod("HEAD")
if config.UserAgent != "" {
req.Header.SetUserAgent(config.UserAgent)
}
req.SetRequestURI(u.String()) req.SetRequestURI(u.String())
res := fasthttp.AcquireResponse() res := fasthttp.AcquireResponse()
@@ -119,10 +135,14 @@ func GetFile(u fasturl.URL, f *File) (err error) {
err = client.DoTimeout(req, res, config.Timeout) err = client.DoTimeout(req, res, config.Timeout)
fasthttp.ReleaseRequest(req) fasthttp.ReleaseRequest(req)
if err != nil { return } if err != nil {
return
}
err = checkStatusCode(res.StatusCode()) err = checkStatusCode(res.StatusCode())
if err != nil { return } if err != nil {
return
}
f.applyContentLength(string(res.Header.Peek("content-length"))) f.applyContentLength(string(res.Header.Peek("content-length")))
f.applyLastModified(string(res.Header.Peek("last-modified"))) f.applyLastModified(string(res.Header.Peek("last-modified")))
@@ -143,38 +163,49 @@ func (f *File) HashDir(links []fasturl.URL) (o redblackhash.Key) {
} }
func (f *File) applyContentLength(v string) { func (f *File) applyContentLength(v string) {
if v == "" { return } if v == "" {
return
}
size, err := strconv.ParseInt(v, 10, 64) size, err := strconv.ParseInt(v, 10, 64)
if err != nil { return } if err != nil {
if size < 0 { return } return
}
if size < 0 {
return
}
f.Size = size f.Size = size
} }
// TODO Cleanup
func (f *File) applyLastModified(v string) { func (f *File) applyLastModified(v string) {
if v == "" { return } if v == "" {
return
}
var t time.Time
var err error var err error
f.MTime, err = time.Parse(time.RFC1123, v) t, err = time.Parse(time.RFC1123, v)
if err == nil { return } if err == nil {
f.MTime, err = time.Parse(time.RFC850, v) f.MTime = t.Unix()
if err == nil { return } return
}
t, err = time.Parse(time.RFC850, v)
if err == nil {
f.MTime = t.Unix()
return
}
// TODO Parse asctime // TODO Parse asctime
f.MTime, err = time.Parse("2006-01-02", v[:10]) t, err = time.Parse("2006-01-02", v[:10])
if err == nil { return } if err == nil {
f.MTime = t.Unix()
return
}
} }
func checkStatusCode(status int) error { func checkStatusCode(status int) error {
switch status { switch status {
case fasthttp.StatusOK: case fasthttp.StatusOK:
return nil return nil
case fasthttp.StatusTooManyRequests:
return ErrRateLimit
case fasthttp.StatusForbidden,
fasthttp.StatusUnauthorized:
return ErrForbidden
default: default:
return fmt.Errorf("got HTTP status %d", status) return &HttpError{status}
} }
} }

View File

@@ -14,7 +14,9 @@
package redblackhash package redblackhash
import ( import (
"bytes"
"fmt" "fmt"
"sync"
) )
const ( const (
@@ -27,6 +29,7 @@ type Key [KeySize]byte
// Tree holds elements of the red-black tree // Tree holds elements of the red-black tree
type Tree struct { type Tree struct {
sync.Mutex
Root *Node Root *Node
size int size int
} }
@@ -41,42 +44,7 @@ type Node struct {
} }
func (k *Key) Compare(o *Key) int { func (k *Key) Compare(o *Key) int {
// TODO Assembly return bytes.Compare(k[:], o[:])
/*for i := 0; i < KeySize / 8; i++ {
a := uint64(k[i+0] ) |
uint64(k[i+1] >> 8) |
uint64(k[i+2] >> 16) |
uint64(k[i+3] >> 24) |
uint64(k[i+4] >> 32) |
uint64(k[i+5] >> 40) |
uint64(k[i+6] >> 48) |
uint64(k[i+7] >> 56)
b := uint64(o[i+0] ) |
uint64(o[i+1] >> 8) |
uint64(o[i+2] >> 16) |
uint64(o[i+3] >> 24) |
uint64(o[i+4] >> 32) |
uint64(o[i+5] >> 40) |
uint64(o[i+6] >> 48) |
uint64(o[i+7] >> 56)
switch {
case a < b:
return -1
case a > b:
return 1
}
}*/
for i := 0; i < KeySize; i++ {
switch {
case k[i] < o[i]:
return -1
case k[i] > o[i]:
return 1
}
}
return 0
} }
// Put inserts node into the tree. // Put inserts node into the tree.

View File

@@ -1,8 +1,17 @@
package main package main
import "errors" import (
"errors"
"fmt"
)
var ErrRateLimit = errors.New("too many requests") var ErrRateLimit = errors.New("too many requests")
var ErrForbidden = errors.New("access denied")
var ErrKnown = errors.New("already crawled") var ErrKnown = errors.New("already crawled")
type HttpError struct {
code int
}
func (e HttpError) Error() string {
return fmt.Sprintf("http status %d", e.code)
}

View File

@@ -33,6 +33,8 @@ var Schemes = [SchemeCount]string {
"https", "https",
} }
var ErrUnknownScheme = errors.New("unknown protocol scheme")
// Error reports an error and the operation and URL that caused it. // Error reports an error and the operation and URL that caused it.
type Error struct { type Error struct {
Op string Op string
@@ -353,7 +355,7 @@ func getscheme(rawurl string) (scheme Scheme, path string, err error) {
case "https": case "https":
scheme = SchemeHTTPS scheme = SchemeHTTPS
default: default:
return SchemeInvalid, "", errors.New("unknown protocol scheme") return SchemeInvalid, "", ErrUnknownScheme
} }
path = rawurl[i+1:] path = rawurl[i+1:]
@@ -811,3 +813,57 @@ func validUserinfo(s string) bool {
} }
return true return true
} }
func PathUnescape(s string) string {
newStr, err := pathUnescape(s)
if err != nil {
return s
} else {
return newStr
}
}
func pathUnescape(s string) (string, error) {
// Count %, check that they're well-formed.
n := 0
for i := 0; i < len(s); {
switch s[i] {
case '%':
n++
if i+2 >= len(s) || !ishex(s[i+1]) || !ishex(s[i+2]) {
s = s[i:]
if len(s) > 3 {
s = s[:3]
}
return "", EscapeError(s)
}
i += 3
default:
i++
}
}
if n == 0 {
return s, nil
}
t := make([]byte, len(s)-2*n)
j := 0
for i := 0; i < len(s); {
switch s[i] {
case '%':
t[j] = unhex(s[i+1])<<4 | unhex(s[i+2])
j++
i += 3
case '+':
t[j] = '+'
j++
i++
default:
t[j] = s[i]
j++
i++
}
}
return string(t), nil
}

123
main.go
View File

@@ -3,54 +3,113 @@ package main
import ( import (
"context" "context"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/terorie/oddb-go/fasturl" "github.com/terorie/od-database-crawler/fasturl"
"github.com/urfave/cli" "github.com/urfave/cli"
"log"
"net/http"
_ "net/http/pprof"
"os" "os"
"strings" "strings"
"sync/atomic"
"time" "time"
) )
var app = cli.App { var app = cli.App {
Name: "oddb-go", Name: "od-database-crawler",
Usage: "OD-Database Go crawler", Usage: "OD-Database Go crawler",
Version: "0.2", Version: "1.0.1",
BashComplete: cli.DefaultAppComplete, BashComplete: cli.DefaultAppComplete,
Writer: os.Stdout, Writer: os.Stdout,
Compiled: buildDate, Action: cmdBase,
Commands: []cli.Command{ Commands: []cli.Command{
{ {
Name: "crawl", Name: "crawl",
Usage: "Crawl a list of URLs", Usage: "Crawl a list of URLs",
ArgsUsage: "[site, site, ...]", ArgsUsage: "<site>",
Action: cmdCrawler, Action: cmdCrawler,
}, },
}, },
After: func(i *cli.Context) error {
exitHooks.Execute()
return nil
},
} }
var exitHooks Hooks
func init() { func init() {
prepareConfig() prepareConfig()
} }
func main() { func main() {
go func() { err := os.MkdirAll("crawled", 0755)
log.Println(http.ListenAndServe("localhost:42069", nil)) if err != nil {
}() panic(err)
}
readConfig()
app.Run(os.Args) app.Run(os.Args)
} }
func cmdCrawler(clic *cli.Context) error { func cmdBase(_ *cli.Context) error {
readConfig() // TODO Graceful shutdown
appCtx := context.Background()
forceCtx := context.Background()
if clic.NArg() == 0 { inRemotes := make(chan *OD)
go Schedule(forceCtx, inRemotes)
ticker := time.NewTicker(config.Recheck)
defer ticker.Stop()
for {
select {
case <-appCtx.Done():
return nil
case <-ticker.C:
t, err := FetchTask()
if err != nil {
logrus.WithError(err).
Error("Failed to get new task")
time.Sleep(30 * time.Second)
continue
}
if t == nil {
// No new task
if atomic.LoadInt32(&numActiveTasks) == 0 {
logrus.Info("Waiting …")
}
continue
}
var baseUri fasturl.URL
err = baseUri.Parse(t.Url)
if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme {
// Not an error
err = nil
// Give back task
//err2 := CancelTask(t.WebsiteId)
//if err2 != nil {
// logrus.Error(err2)
//}
continue
} else if err != nil {
logrus.WithError(err).
Error("Failed to get new task")
time.Sleep(30 * time.Second)
continue
}
ScheduleTask(inRemotes, t, &baseUri)
}
}
return nil
}
func cmdCrawler(clic *cli.Context) error {
if clic.NArg() != 1 {
cli.ShowCommandHelpAndExit(clic, "crawl", 1) cli.ShowCommandHelpAndExit(clic, "crawl", 1)
} }
args := clic.Args() arg := clic.Args()[0]
remotes := make([]*OD, len(args))
for i, arg := range args {
// https://github.com/golang/go/issues/19779 // https://github.com/golang/go/issues/19779
if !strings.Contains(arg, "://") { if !strings.Contains(arg, "://") {
arg = "http://" + arg arg = "http://" + arg
@@ -61,34 +120,24 @@ func cmdCrawler(clic *cli.Context) error {
u.Path += "/" u.Path += "/"
} }
if err != nil { return err } if err != nil { return err }
remotes[i] = &OD {
Task: &Task{
WebsiteId: 0,
Url: u.String(),
},
BaseUri: u,
}
}
c := context.Background() // TODO Graceful shutdown
forceCtx := context.Background()
inRemotes := make(chan *OD) inRemotes := make(chan *OD)
go Schedule(c, inRemotes) go Schedule(forceCtx, inRemotes)
for _, remote := range remotes { ticker := time.NewTicker(3 * time.Second)
globalWait.Add(1) defer ticker.Stop()
inRemotes <- remote
task := Task {
WebsiteId: 0,
Url: u.String(),
} }
ScheduleTask(inRemotes, &task, &u)
// Wait for all jobs to finish // Wait for all jobs to finish
globalWait.Wait() globalWait.Wait()
logrus.Info("All dirs processed!")
return nil return nil
} }
var buildDate = time.Date(
2018, 11, 15,
23, 24, 0, 0,
time.UTC)

View File

@@ -1,12 +1,27 @@
package main package main
import ( import (
"github.com/terorie/oddb-go/ds/redblackhash" "github.com/terorie/od-database-crawler/ds/redblackhash"
"github.com/terorie/oddb-go/fasturl" "github.com/terorie/od-database-crawler/fasturl"
"sync" "sync"
"time" "time"
) )
type Task struct {
WebsiteId uint64 `json:"website_id"`
Url string `json:"url"`
}
type TaskResult struct {
StatusCode string `json:"status_code"`
FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"`
StartTimeUnix int64 `json:"start_time"`
EndTimeUnix int64 `json:"end_time"`
WebsiteId uint64 `json:"website_id"`
}
type Job struct { type Job struct {
OD *OD OD *OD
Uri fasturl.URL Uri fasturl.URL
@@ -16,26 +31,25 @@ type Job struct {
} }
type OD struct { type OD struct {
Task *Task Task Task
Result TaskResult
Wait sync.WaitGroup Wait sync.WaitGroup
BaseUri fasturl.URL BaseUri fasturl.URL
WCtx WorkerContext WCtx WorkerContext
Scanned redblackhash.Tree Scanned redblackhash.Tree
lock sync.Mutex
} }
type File struct { type File struct {
Name string `json:"name"` Name string `json:"name"`
Size int64 `json:"size"` Size int64 `json:"size"`
MTime time.Time `json:"mtime"` MTime int64 `json:"mtime"`
Path string `json:"path"` Path string `json:"path"`
IsDir bool `json:"-"` IsDir bool `json:"-"`
} }
func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) { func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
o.lock.Lock() o.Scanned.Lock()
defer o.lock.Unlock() defer o.Scanned.Unlock()
exists = o.Scanned.Get(k) exists = o.Scanned.Get(k)
if exists { return true } if exists { return true }

25
release.sh Executable file
View File

@@ -0,0 +1,25 @@
#!/usr/bin/env bash
appname="od-database-crawler"
tag=$1
[ -z "$tag" ] && echo "Usage: build <version>" && exit 1
name=${appname}-${tag}-windows.exe
GOOS="windows" GOARCH="amd64" go build -ldflags="-s -w" -o $name
gzip -f $name
echo $name
name=${appname}-${tag}-linux
GOOS="linux" GOARCH="amd64" go build -ldflags="-s -w" -o $name
gzip -f $name
echo $name
name=${appname}-${tag}-mac
GOOS="darwin" GOARCH="amd64" go build -ldflags="-s -w" -o $name
gzip -f $name
echo $name
name=${appname}-${tag}-freebsd
GOOS="freebsd" GOARCH="amd64" go build -ldflags="-s -w" -o $name
gzip -f $name
echo $name

View File

@@ -5,12 +5,17 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/terorie/od-database-crawler/fasturl"
"os" "os"
"path" "path"
"sync"
"sync/atomic" "sync/atomic"
"time"
) )
var activeTasks int32 var activeTasksLock sync.Mutex
var activeTasks = make(map[uint64]bool)
var numActiveTasks int32
var totalBuffered int64 var totalBuffered int64
func Schedule(c context.Context, remotes <-chan *OD) { func Schedule(c context.Context, remotes <-chan *OD) {
@@ -30,7 +35,7 @@ func Schedule(c context.Context, remotes <-chan *OD) {
} }
// Enqueue initial job // Enqueue initial job
atomic.AddInt32(&activeTasks, 1) atomic.AddInt32(&numActiveTasks, 1)
remote.WCtx.queueJob(Job{ remote.WCtx.queueJob(Job{
OD: remote, OD: remote,
Uri: remote.BaseUri, Uri: remote.BaseUri,
@@ -40,46 +45,149 @@ func Schedule(c context.Context, remotes <-chan *OD) {
// Upload result when ready // Upload result when ready
go remote.Watch(results) go remote.Watch(results)
// Sleep if max number of tasks are active
for atomic.LoadInt32(&numActiveTasks) > config.Tasks {
select {
case <-c.Done():
return
case <-time.After(time.Second):
continue
}
}
} }
} }
func (r *OD) Watch(results chan File) { func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
go r.Task.Collect(results) if !t.register() {
return
}
globalWait.Add(1)
now := time.Now()
od := &OD {
Task: *t,
BaseUri: *u,
Result: TaskResult {
WebsiteId: t.WebsiteId,
StartTime: now,
StartTimeUnix: now.Unix(),
},
}
remotes <- od
}
func (t *Task) register() bool {
activeTasksLock.Lock()
defer activeTasksLock.Unlock()
if _, known := activeTasks[t.WebsiteId]; known {
return false
} else {
activeTasks[t.WebsiteId] = true
return true
}
}
func (t *Task) unregister() {
activeTasksLock.Lock()
delete(activeTasks, t.WebsiteId)
activeTasksLock.Unlock()
}
func (o *OD) Watch(results chan File) {
// Mark job as completely done
defer globalWait.Done()
defer o.Task.unregister()
filePath := path.Join("crawled", fmt.Sprintf("%d.json", o.Task.WebsiteId))
// Open crawl results file
f, err := os.OpenFile(
filePath,
os.O_CREATE | os.O_RDWR | os.O_TRUNC,
0644,
)
if err != nil {
logrus.WithError(err).
Error("Failed saving crawl results")
return
}
defer f.Close()
defer os.Remove(filePath)
// Listen for exit code of Collect()
collectErrC := make(chan error)
// Block until all results are written
// (closes results channel)
o.handleCollect(results, f, collectErrC)
// Exit code of Collect()
err = <-collectErrC
close(collectErrC)
if err != nil {
logrus.WithError(err).
Error("Failed saving crawl results")
return
}
// Upload results
err = PushResult(&o.Result, f)
if err != nil {
logrus.WithError(err).
Error("Failed uploading crawl results")
return
}
}
func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error) {
// Begin collecting results
go o.Task.Collect(results, f, collectErrC)
defer close(results)
// Wait for all jobs on remote to finish // Wait for all jobs on remote to finish
r.Wait.Wait() o.Wait.Wait()
close(r.WCtx.in) close(o.WCtx.in)
atomic.AddInt32(&activeTasks, -1) atomic.AddInt32(&numActiveTasks, -1)
logrus.WithField("url", r.BaseUri.String()). // Log finish
Info("Crawler finished")
globalWait.Done() logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"duration": time.Since(o.Result.StartTime),
}).Info("Crawler finished")
close(results) // Set status code
now := time.Now()
o.Result.EndTimeUnix = now.Unix()
fileCount := atomic.LoadUint64(&o.Result.FileCount)
if fileCount == 0 {
errorCount := atomic.LoadUint64(&o.Result.ErrorCount)
if errorCount == 0 {
o.Result.StatusCode = "empty"
} else {
o.Result.StatusCode = "directory listing failed"
}
} else {
o.Result.StatusCode = "success"
}
} }
func (t *Task) Collect(results chan File) { func (t *Task) Collect(results chan File, f *os.File, errC chan<- error) {
err := t.collect(results) err := t.collect(results, f)
if err != nil { if err != nil {
logrus.WithError(err). logrus.WithError(err).
Error("Failed saving crawl results") Error("Failed saving crawl results")
} }
errC <- err
} }
func (t *Task) collect(results chan File) error { func (t *Task) collect(results chan File, f *os.File) error {
err := os.MkdirAll("crawled", 0755)
if err != nil { return err }
f, err := os.OpenFile(
path.Join("crawled", fmt.Sprintf("%d.json", t.WebsiteId)),
os.O_CREATE | os.O_WRONLY | os.O_TRUNC,
0755,
)
if err != nil { return err }
defer f.Close()
for result := range results { for result := range results {
result.Path = fasturl.PathUnescape(result.Path)
result.Name = fasturl.PathUnescape(result.Name)
resJson, err := json.Marshal(result) resJson, err := json.Marshal(result)
if err != nil { panic(err) } if err != nil { panic(err) }
_, err = f.Write(resJson) _, err = f.Write(resJson)

130
server.go
View File

@@ -10,34 +10,27 @@ import (
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"path/filepath"
"strconv" "strconv"
"strings"
) )
const ( var serverClient = http.Client {
fileListChunkSize int64 = 5000000 // 5 mb Timeout: config.ServerTimeout,
) }
var serverClient = http.DefaultClient
func FetchTask() (t *Task, err error) { func FetchTask() (t *Task, err error) {
escToken, _ := json.Marshal(config.Token) res, err := serverClient.PostForm(
payload := `{"token":` + string(escToken) + `}`
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/get", config.ServerUrl + "/task/get",
strings.NewReader(payload)) url.Values{ "token": {config.Token} })
if err != nil { return }
res, err := serverClient.Do(req)
if err != nil { return } if err != nil { return }
defer res.Body.Close() defer res.Body.Close()
if res.StatusCode != 200 { switch res.StatusCode {
err = fmt.Errorf("http %s", res.Status) case 200:
return break
case 404, 500:
return nil, nil
default:
return nil, fmt.Errorf("http %s", res.Status)
} }
t = new(Task) t = new(Task)
@@ -47,21 +40,17 @@ func FetchTask() (t *Task, err error) {
return return
} }
func PushResult(result *TaskResult) (err error) { func PushResult(result *TaskResult, f *os.File) (err error) {
filePath := filepath.Join( if result.WebsiteId == 0 {
".", "crawled", // Not a real result, don't push
fmt.Sprintf("%d.json", result.WebsiteId)) return nil
}
defer os.Remove(filePath) // Rewind to the beginning of the file
_, err = f.Seek(0, 0)
f, err := os.Open(filePath) if err != nil {
if os.IsNotExist(err) {
err = fmt.Errorf("cannot upload result: %s does not exist", filePath)
return
} else if err != nil {
return return
} }
defer f.Close()
err = uploadChunks(result.WebsiteId, f) err = uploadChunks(result.WebsiteId, f)
if err != nil { if err != nil {
@@ -73,47 +62,50 @@ func PushResult(result *TaskResult) (err error) {
return return
} }
err = uploadResult(result) // Upload result ignoring errors
if err != nil { uploadResult(result)
logrus.Errorf("Failed to upload result: %s", err)
err2 := CancelTask(result.WebsiteId)
if err2 != nil {
logrus.Error(err2)
}
return
}
return return
} }
func uploadChunks(websiteId uint64, f *os.File) (err error) { func uploadChunks(websiteId uint64, f *os.File) error {
for iter := 1; iter > 0; iter++ { eof := false
for iter := 1; !eof; iter++ {
// TODO Stream with io.Pipe? // TODO Stream with io.Pipe?
var b bytes.Buffer var b bytes.Buffer
multi := multipart.NewWriter(&b) multi := multipart.NewWriter(&b)
// Set upload fields // Set upload fields
var err error
err = multi.WriteField("token", config.Token) err = multi.WriteField("token", config.Token)
if err != nil { return } if err != nil { return err }
err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId)) err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId))
if err != nil { return } if err != nil { return err }
// Copy chunk to file_list // Copy chunk to file_list
formFile, err := multi.CreateFormFile("file_list", "file_list") formFile, err := multi.CreateFormFile("file_list", "file_list")
_, err = io.CopyN(formFile, f, fileListChunkSize) var n int64
if err == io.EOF { n, err = io.CopyN(formFile, f, config.ChunkSize)
break if err != io.EOF && err != nil {
} else if err == io.ErrUnexpectedEOF { return err
}
if n == 0 {
// Don't upload, no content
return nil
} else if n < config.ChunkSize {
err = nil err = nil
// Break at end of iteration // Break at end of iteration
iter = -420 eof = true
} }
multi.Close()
req, err := http.NewRequest( req, err := http.NewRequest(
http.MethodPost, http.MethodPost,
config.ServerUrl + "/task/upload", config.ServerUrl + "/task/upload",
&b) &b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { return err } if err != nil { return err }
res, err := serverClient.Do(req) res, err := serverClient.Do(req)
@@ -125,52 +117,42 @@ func uploadChunks(websiteId uint64, f *os.File) (err error) {
iter, res.Status) iter, res.Status)
} }
logrus.Infof("Uploading file list part %d: %s", logrus.WithField("id", websiteId).
iter, res.Status) WithField("part", iter).
Infof("Uploading files chunk")
} }
return return nil
} }
func uploadResult(result *TaskResult) (err error) { func uploadResult(result *TaskResult) (err error) {
resultEnc, err := json.Marshal(result) resultEnc, err := json.Marshal(result)
if err != nil { panic(err) } if err != nil { panic(err) }
payload := url.Values { res, err := serverClient.PostForm(
config.ServerUrl + "/task/complete",
url.Values {
"token": {config.Token}, "token": {config.Token},
"result": {string(resultEnc)}, "result": {string(resultEnc)},
}.Encode() },
)
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/complete",
strings.NewReader(payload))
if err != nil { return }
res, err := serverClient.Do(req)
if err != nil { return } if err != nil { return }
res.Body.Close() res.Body.Close()
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to cancel task: %s", res.Status) return HttpError{res.StatusCode}
} }
return return
} }
func CancelTask(websiteId uint64) (err error) { func CancelTask(websiteId uint64) (err error) {
form := url.Values{ res, err := serverClient.PostForm(
config.ServerUrl + "/task/cancel",
url.Values{
"token": {config.Token}, "token": {config.Token},
"website_id": {strconv.FormatUint(websiteId, 10)}, "website_id": {strconv.FormatUint(websiteId, 10)},
} },
encForm := form.Encode() )
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/cancel",
strings.NewReader(encForm))
if err != nil { return }
res, err := serverClient.Do(req)
if err != nil { return } if err != nil { return }
res.Body.Close() res.Body.Close()

View File

@@ -39,6 +39,10 @@ func Stats(c context.Context) {
perSecond = math.Round(perSecond) perSecond = math.Round(perSecond)
perSecond /= 2 perSecond /= 2
if perSecond <= 0 {
continue
}
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"per_second": perSecond, "per_second": perSecond,
"done": atomic.LoadUint64(&totalDone), "done": atomic.LoadUint64(&totalDone),
@@ -53,7 +57,7 @@ func Stats(c context.Context) {
runtime.ReadMemStats(&mem) runtime.ReadMemStats(&mem)
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"queue_count": totalBuffered, "queue_count": atomic.LoadInt64(&totalBuffered),
"heap": FormatByteCount(mem.Alloc), "heap": FormatByteCount(mem.Alloc),
"objects": mem.HeapObjects, "objects": mem.HeapObjects,
"num_gc": mem.NumGC, "num_gc": mem.NumGC,

View File

@@ -1,16 +0,0 @@
package main
import "time"
type Task struct {
WebsiteId int `json:"website_id"`
Url string `json:"url"`
}
type TaskResult struct {
StatusCode int `json:"status_code"`
FileCount uint64 `json:"file_count"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
WebsiteId uint64 `json:"website_id"`
}

22
util.go
View File

@@ -1,6 +1,9 @@
package main package main
import "fmt" import (
"fmt"
"sync"
)
// https://programming.guide/go/formatting-byte-size-to-human-readable-format.html // https://programming.guide/go/formatting-byte-size-to-human-readable-format.html
func FormatByteCount(b uint64) string { func FormatByteCount(b uint64) string {
@@ -16,3 +19,20 @@ func FormatByteCount(b uint64) string {
return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp]) return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp])
} }
} }
type Hooks struct {
m sync.Mutex
l []func()
}
func (h *Hooks) Add(hook func()) {
h.m.Lock()
h.l = append(h.l, hook)
h.m.Unlock()
}
func (h *Hooks) Execute() {
for _, hook := range h.l {
hook()
}
}

View File

@@ -2,6 +2,7 @@ package main
import ( import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/valyala/fasthttp"
"math" "math"
"sort" "sort"
"strings" "strings"
@@ -39,10 +40,15 @@ func (w WorkerContext) step(results chan<- File, job Job) {
if err != nil { if err != nil {
job.Fails++ job.Fails++
if err == ErrForbidden { if httpErr, ok := err.(*HttpError); ok {
// Don't attempt crawling again switch httpErr.code {
case fasthttp.StatusTooManyRequests:
err = ErrRateLimit
default:
// Don't retry HTTP error codes
return return
} }
}
if job.Fails > config.Retries { if job.Fails > config.Retries {
atomic.AddUint64(&totalAborted, 1) atomic.AddUint64(&totalAborted, 1)
@@ -75,9 +81,11 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
// Load directory // Load directory
links, err := GetDir(job, f) links, err := GetDir(job, f)
if err != nil { if err != nil {
if !isErrSilent(err) {
logrus.WithError(err). logrus.WithError(err).
WithField("url", job.UriStr). WithField("url", job.UriStr).
Error("Failed getting dir") Error("Failed to crawl dir")
}
return nil, err return nil, err
} }
@@ -105,7 +113,6 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
} }
lastLink = uriStr lastLink = uriStr
job.OD.Wait.Add(1)
newJobs = append(newJobs, Job{ newJobs = append(newJobs, Job{
OD: job.OD, OD: job.OD,
Uri: link, Uri: link,
@@ -125,11 +132,14 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
// Load file // Load file
err := GetFile(job.Uri, f) err := GetFile(job.Uri, f)
if err != nil { if err != nil {
if !isErrSilent(err) {
logrus.WithError(err). logrus.WithError(err).
WithField("url", job.UriStr). WithField("url", job.UriStr).
Error("Failed getting file") Error("Failed to crawl file")
}
return nil, err return nil, err
} }
atomic.AddUint64(&job.OD.Result.FileCount, 1)
} }
return return
} }
@@ -143,13 +153,21 @@ func (w WorkerContext) queueJob(job Job) {
} else { } else {
time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) * time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) *
100 * time.Millisecond) 100 * time.Millisecond)
w.in <- job
} }
} else {
w.in <- job
} }
w.in <- job
} }
func (w WorkerContext) finishJob(job *Job) { func (w WorkerContext) finishJob(job *Job) {
job.OD.Wait.Done() job.OD.Wait.Done()
} }
func isErrSilent(err error) bool {
if !config.PrintHTTP {
if _, ok := err.(*HttpError); ok {
return true
}
}
return false
}