25 Commits

Author SHA1 Message Date
Simon Fortier
88bf634cb6 Update config.yml 2019-04-05 09:30:20 -04:00
dependabot[bot]
796cf6ac23 Bump github.com/spf13/viper from 1.3.1 to 1.3.2 2019-03-14 10:57:44 +01:00
Richard Patel
defaf54e66 Bump github.com/sirupsen/logrus from 1.3.0 to 1.4.0 2019-03-12 19:37:06 +01:00
dependabot[bot]
230824c58f Bump github.com/sirupsen/logrus from 1.3.0 to 1.4.0
Bumps [github.com/sirupsen/logrus](https://github.com/sirupsen/logrus) from 1.3.0 to 1.4.0.
- [Release notes](https://github.com/sirupsen/logrus/releases)
- [Changelog](https://github.com/sirupsen/logrus/blob/master/CHANGELOG.md)
- [Commits](https://github.com/sirupsen/logrus/compare/v1.3.0...v1.4.0)

Signed-off-by: dependabot[bot] <support@dependabot.com>
2019-03-12 04:48:03 +00:00
terorie
d3c199b738 Update README
Add some badges and update description
2019-02-28 23:57:50 +01:00
dependabot[bot]
0b3f0d87fe Upgrade fasthttp to 1.2.0
Bumps [github.com/valyala/fasthttp](https://github.com/valyala/fasthttp) from 1.1.0 to 1.2.0.
- [Release notes](https://github.com/valyala/fasthttp/releases)
- [Commits](https://github.com/valyala/fasthttp/compare/v1.1.0...v1.2.0)

Thanks bot

Signed-off-by: dependabot[bot] <support@dependabot.com>
2019-02-28 22:42:40 +01:00
terorie
da9c75e392 Reduce Docker image size 2019-02-22 21:37:04 +01:00
Pascal
8947e05d0c Fix Dockerfile
Fixes #22
Credit to @pascaldulieu
2019-02-22 21:11:55 +01:00
Richard Patel
8c5f99d616 More descriptive error if /task/get returns invalid JSON 2019-02-22 20:17:59 +01:00
Richard Patel
206ea0e91d Simplify config 2019-02-22 18:50:35 +01:00
Richard Patel
8b9d8bfd17 Fix README.md format 2019-02-22 06:04:10 +01:00
Richard Patel
c9ff102d80 Fix Dockerfile 2019-02-22 06:00:57 +01:00
Richard Patel
88856c1c19 Flag explanation in README.md 2019-02-22 05:59:59 +01:00
Richard Patel
9e9b606250 Merge branch 'stable' 2019-02-22 05:37:52 +01:00
Richard Patel
326e29e5e4 Reset to stable branch 2019-02-22 05:37:45 +01:00
Richard Patel
c2acd5463f Restore .travis.yml
Now handling auto-build over Docker Hub directly
2019-02-22 05:16:25 +01:00
Richard Patel
e4d04e6a5f go.mod: Fix package path
lol
2019-02-22 05:10:43 +01:00
terorie
9f1402e841 New Dockerfile and Travis Config (#23) 2019-02-22 05:07:27 +01:00
terorie
7c8ab50ee4 Merge stable into master 2019-02-13 15:32:40 +01:00
terorie
281d2d17d6 Update config.yml 2019-02-13 15:32:00 +01:00
Richard Patel
45cbd4d535 Disable resume feature 2019-02-05 15:44:59 +01:00
Richard Patel
771d49f2dd Fix WaitGroup deadlock 2019-02-03 17:14:20 +01:00
Richard Patel
dbd787aa81 Fix WaitGroup crash 2019-02-03 17:09:43 +01:00
Richard Patel
cea6c1658b Bugfix: Don't schedule new tasks during shutdown 2019-02-03 17:02:44 +01:00
terorie
885af5bb3b Beta task resuming 2019-02-03 16:50:08 +01:00
15 changed files with 131 additions and 653 deletions

15
Dockerfile Normal file
View File

@@ -0,0 +1,15 @@
FROM golang:alpine as builder
ADD . /go/src/github.com/terorie/od-database-crawler
RUN apk add git \
&& go get -d -v github.com/terorie/od-database-crawler \
&& CGO_ENABLED=0 go install -a \
-installsuffix cgo \
-ldflags="-s -w" \
github.com/terorie/od-database-crawler
FROM scratch
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
COPY --from=builder /go/bin/od-database-crawler /bin/
WORKDIR /oddb
VOLUME [ "/oddb" ]
CMD ["/bin/od-database-crawler", "server"]

View File

@@ -1,15 +1,20 @@
# od-database Go crawler 🚀 # OD-Database Crawler 🕷
[![Build Status](https://travis-ci.org/terorie/od-database-crawler.svg?branch=master)](https://travis-ci.org/terorie/od-database-crawler) [![Build Status](https://travis-ci.org/terorie/od-database-crawler.svg?branch=master)](https://travis-ci.org/terorie/od-database-crawler)
> by terorie 2018 :P [![](https://tokei.rs/b1/github/terorie/od-database-crawler)](https://github.com/terorie/od-database-crawler)
[![CodeFactor](https://www.codefactor.io/repository/github/terorie/od-database-crawler/badge/master)](https://www.codefactor.io/repository/github/terorie/od-database-crawler/overview/master)
* Crawler for [__OD-Database__](https://github.com/simon987/od-database) * Crawler for [__OD-Database__](https://github.com/simon987/od-database)
* In production at https://od-db.the-eye.eu/
* Over 880 TB actively crawled
* Crawls HTTP open directories (standard Web Server Listings) * Crawls HTTP open directories (standard Web Server Listings)
* Gets name, path, size and modification time of all files * Gets name, path, size and modification time of all files
* Lightweight and fast: __over 9000 requests per second__ on a standard laptop * Lightweight and fast
https://od-db.the-eye.eu/ https://od-db.the-eye.eu/
#### Usage ## Usage
### Deploys
1. With Config File (if `config.yml` found in working dir) 1. With Config File (if `config.yml` found in working dir)
- Download [default config](https://github.com/terorie/od-database-crawler/blob/master/config.yml) - Download [default config](https://github.com/terorie/od-database-crawler/blob/master/config.yml)
@@ -22,3 +27,31 @@ https://od-db.the-eye.eu/
- Every flag is available as an environment variable: - Every flag is available as an environment variable:
`--server.crawl_stats` ➡️ `OD_SERVER_CRAWL_STATS` `--server.crawl_stats` ➡️ `OD_SERVER_CRAWL_STATS`
- Start with `./od-database-crawler server <flags>` - Start with `./od-database-crawler server <flags>`
3. With Docker
```bash
docker run \
-e OD_SERVER_URL=xxx \
-e OD_SERVER_TOKEN=xxx \
terorie/od-database-crawler
```
### Flag reference
Here are the most important config flags. For more fine control, take a look at `/config.yml`.
| Flag/Environment | Description | Example |
| ------------------------------------------------------- | ------------------------------------------------------------ | ----------------------------------- |
| `server.url`<br />`OD_SERVER_URL` | OD-DB Server URL | `https://od-db.mine.the-eye.eu/api` |
| `server.token`<br />`OD_SERVER_TOKEN` | OD-DB Server Access Token | _Ask Hexa **TM**_ |
| `server.recheck`<br />`OD_SERVER_RECHECK` | Job Fetching Interval | `3s` |
| `output.crawl_stats`<br />`OD_OUTPUT_CRAWL_STATS` | Crawl Stats Logging Interval (0 = disabled) | `500ms` |
| `output.resource_stats`<br />`OD_OUTPUT_RESORUCE_STATS` | Resource Stats Logging Interval (0 = disabled) | `8s` |
| `output.log`<br />`OD_OUTPUT_LOG` | Log File (none = disabled) | `crawler.log` |
| `crawl.tasks`<br />`OD_CRAWL_TASKS` | Max number of sites to crawl concurrently | `500` |
| `crawl.connections`<br />`OD_CRAWL_CONNECTIONS` | HTTP connections per site | `1` |
| `crawl.retries`<br />`OD_CRAWL_RETRIES` | How often to retry after a temporary failure (e.g. `HTTP 429` or timeouts) | `5` |
| `crawl.dial_timeout`<br />`OD_CRAWL_DIAL_TIMEOUT` | TCP Connect timeout | `5s` |
| `crawl.timeout`<br />`OD_CRAWL_TIMEOUT` | HTTP request timeout | `20s` |
| `crawl.user-agent`<br />`OD_CRAWL_USER_AGENT` | HTTP Crawler User-Agent | `googlebot/1.2.3` |
| `crawl.job_buffer`<br />`OD_CRAWL_JOB_BUFFER` | Number of URLs to keep in memory/cache, per job. The rest is offloaded to disk. Decrease this value if the crawler uses too much RAM. (0 = Disable Cache, -1 = Only use Cache) | `5000` |

View File

@@ -46,7 +46,6 @@ const (
ConfDialTimeout = "crawl.dial_timeout" ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout" ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer" ConfJobBufferSize = "crawl.job_buffer"
ConfResume = "crawl.resume"
ConfCrawlStats = "output.crawl_stats" ConfCrawlStats = "output.crawl_stats"
ConfAllocStats = "output.resource_stats" ConfAllocStats = "output.resource_stats"
@@ -92,8 +91,6 @@ func prepareConfig() {
pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size") pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
pf.Duration(ConfResume, 72 * time.Hour, "Crawler: Resume tasks not older than x")
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval") pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval") pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")

View File

@@ -47,13 +47,13 @@ output:
# Crawler settings # Crawler settings
crawl: crawl:
# Number of sites that can be processed at once # Number of sites that can be processed at once
tasks: 100 tasks: 25
# Number of connections per site # Number of connections per site
# Please be careful with this setting! # Please be careful with this setting!
# The crawler fires fast and more than # The crawler fires fast and more than
# ten connections can overwhelm a server. # ten connections can overwhelm a server.
connections: 4 connections: 1
# How often to retry getting data # How often to retry getting data
# from the site before giving up # from the site before giving up
@@ -81,4 +81,4 @@ crawl:
# in memory. # in memory.
# A negative value will cause all jobs # A negative value will cause all jobs
# to be stored in memory. (Don't do this) # to be stored in memory. (Don't do this)
job_buffer: 5000 job_buffer: -1

View File

@@ -15,10 +15,7 @@ package redblackhash
import ( import (
"bytes" "bytes"
"encoding/binary"
"encoding/hex"
"fmt" "fmt"
"io"
"sync" "sync"
) )
@@ -46,13 +43,6 @@ type Node struct {
Parent *Node Parent *Node
} }
type nodeHeader struct {
Key *Key
Color color
}
var o = binary.BigEndian
func (k *Key) Compare(o *Key) int { func (k *Key) Compare(o *Key) int {
return bytes.Compare(k[:], o[:]) return bytes.Compare(k[:], o[:])
} }
@@ -243,7 +233,7 @@ func (tree *Tree) String() string {
} }
func (node *Node) String() string { func (node *Node) String() string {
return hex.EncodeToString(node.Key[:16]) + "..." return fmt.Sprintf("%v", node.Key)
} }
func output(node *Node, prefix string, isTail bool, str *string) { func output(node *Node, prefix string, isTail bool, str *string) {
@@ -491,119 +481,6 @@ func (tree *Tree) deleteCase6(node *Node) {
} }
} }
func (tree *Tree) Marshal(w io.Writer) (err error) {
tree.Lock()
defer tree.Unlock()
err = binary.Write(w, o, uint64(0x617979797979790A))
if err != nil { return err }
err = marshal(tree.Root, w)
if err != nil { return err }
err = binary.Write(w, o, uint64(0x6C6D616F6F6F6F0A))
if err != nil { return err }
return nil
}
func marshal(n *Node, w io.Writer) (err error) {
if n == nil {
err = binary.Write(w, o, uint64(0x796565656565740A))
return err
}
err = binary.Write(w, o, uint64(0xF09F85B1EFB88F0A))
if err != nil { return err }
_, err = w.Write(n.Key[:])
if err != nil { return err }
var colorI uint64
if n.color {
colorI = 0x7468652D6579657C
} else {
colorI = 0x6865782B7465727C
}
err = binary.Write(w, o, colorI)
if err != nil { return err }
err = marshal(n.Left, w)
if err != nil { return err }
err = marshal(n.Right, w)
if err != nil { return err }
return nil
}
func (tree *Tree) Unmarshal(r io.Reader) (err error) {
tree.Lock()
defer tree.Unlock()
var sof uint64
err = binary.Read(r, o, &sof)
if err != nil { return err }
if sof != 0x617979797979790A {
return fmt.Errorf("redblack: wrong format")
}
tree.Root, tree.size, err = unmarshal(r)
if err != nil { return err }
var eof uint64
err = binary.Read(r, o, &eof)
if err != nil { return err }
if eof != 0x6C6D616F6F6F6F0A {
return fmt.Errorf("redblack: end of file missing")
}
return nil
}
func unmarshal(r io.Reader) (n *Node, size int, err error) {
var head uint64
err = binary.Read(r, o, &head)
if err != nil { return nil, 0, err }
size = 1
switch head {
case 0x796565656565740A:
return nil, 0, nil
case 0xF09F85B1EFB88F0A:
n = new(Node)
_, err = io.ReadFull(r, n.Key[:])
if err != nil { return nil, 0, err }
var colorInt uint64
err = binary.Read(r, o, &colorInt)
if err != nil { return nil, 0, err }
switch colorInt {
case 0x7468652D6579657C:
n.color = true
case 0x6865782B7465727C:
n.color = false
default:
return nil, 0, fmt.Errorf("redblack: corrupt node color")
}
default:
return nil, 0, fmt.Errorf("redblack: corrupt node info")
}
var s2 int
n.Left, s2, err = unmarshal(r)
size += s2
if err != nil { return nil, 0, err }
n.Right, s2, err = unmarshal(r)
size += s2
if err != nil { return nil, 0, err }
return n, size, nil
}
func nodeColor(node *Node) color { func nodeColor(node *Node) color {
if node == nil { if node == nil {
return black return black

View File

@@ -1,47 +0,0 @@
package redblackhash
import (
"bytes"
"math/rand"
"testing"
)
func TestTree_Marshal(t *testing.T) {
var t1, t2 Tree
// Generate 1000 random values to insert
for i := 0; i < 1000; i++ {
var key Key
rand.Read(key[:])
t1.Put(&key)
}
// Marshal tree
var wr bytes.Buffer
err := t1.Marshal(&wr)
if err != nil {
t.Error(err)
t.FailNow()
}
buf := wr.Bytes()
rd := bytes.NewBuffer(buf)
// Unmarshal tree
err = t2.Unmarshal(rd)
if err != nil {
t.Error(err)
t.FailNow()
}
if !compare(t1.Root, t2.Root) {
t.Error("trees are not equal")
t.FailNow()
}
}
func compare(n1, n2 *Node) bool {
return n1.Key.Compare(&n2.Key) == 0 &&
(n1.Left == nil || compare(n1.Left, n2.Left)) &&
(n1.Right == nil || compare(n1.Right, n2.Right))
}

9
go.mod
View File

@@ -1,14 +1,13 @@
module github.com/syndtr/od-database-crawler module github.com/terorie/od-database-crawler
require ( require (
github.com/beeker1121/goque v2.0.1+incompatible github.com/beeker1121/goque v2.0.1+incompatible
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/sirupsen/logrus v1.3.0 github.com/sirupsen/logrus v1.4.0
github.com/spf13/cobra v0.0.3 github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.1 github.com/spf13/viper v1.3.2
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect
github.com/terorie/od-database-crawler v1.1.1 github.com/valyala/fasthttp v1.2.0
github.com/valyala/fasthttp v1.1.0
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3
) )

8
go.sum
View File

@@ -25,6 +25,8 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME= github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0 h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
@@ -37,17 +39,19 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38= github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38=
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs= github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/terorie/od-database-crawler v1.1.1 h1:Ca+ZqbZX3rVWBR8SDRzvroyxjBtUs75MQXZ9YG0gqGo=
github.com/terorie/od-database-crawler v1.1.1/go.mod h1:vVJ7pLkudrlUNp9qu24JCzQ8N6mFsrOmX1tPXr155DQ=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw= github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw=
github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s= github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/fasthttp v1.2.0 h1:dzZJf2IuMiclVjdw0kkT+f9u4YdrapbNyGAN47E/qnk=
github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=

13
main.go
View File

@@ -18,7 +18,7 @@ var configFile string
var rootCmd = cobra.Command { var rootCmd = cobra.Command {
Use: "od-database-crawler", Use: "od-database-crawler",
Version: "1.2.1", Version: "1.2.2",
Short: "OD-Database Go crawler", Short: "OD-Database Go crawler",
Long: helpText, Long: helpText,
PersistentPreRunE: preRun, PersistentPreRunE: preRun,
@@ -83,7 +83,6 @@ func cmdBase(_ *cobra.Command, _ []string) {
go listenCtrlC(soft, hard) go listenCtrlC(soft, hard)
inRemotes := make(chan *OD) inRemotes := make(chan *OD)
go LoadResumeTasks(inRemotes)
go Schedule(appCtx, inRemotes) go Schedule(appCtx, inRemotes)
ticker := time.NewTicker(config.Recheck) ticker := time.NewTicker(config.Recheck)
@@ -115,13 +114,7 @@ func cmdBase(_ *cobra.Command, _ []string) {
if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme { if urlErr, ok := err.(*fasturl.Error); ok && urlErr.Err == fasturl.ErrUnknownScheme {
// Not an error // Not an error
err = nil err = nil
// TODO FTP crawler
// Give back task
//err2 := CancelTask(t.WebsiteId)
//if err2 != nil {
// logrus.Error(err2)
//}
continue continue
} else if err != nil { } else if err != nil {
logrus.WithError(err). logrus.WithError(err).
@@ -133,7 +126,7 @@ func cmdBase(_ *cobra.Command, _ []string) {
} }
} }
shutdown: shutdown:
globalWait.Wait() globalWait.Wait()
} }

View File

@@ -3,6 +3,7 @@ package main
import ( import (
"github.com/terorie/od-database-crawler/ds/redblackhash" "github.com/terorie/od-database-crawler/ds/redblackhash"
"github.com/terorie/od-database-crawler/fasturl" "github.com/terorie/od-database-crawler/fasturl"
"sync"
"time" "time"
) )
@@ -29,19 +30,12 @@ type Job struct {
} }
type OD struct { type OD struct {
Task Task Task Task
Result TaskResult Result TaskResult
InProgress int64 Wait sync.WaitGroup
BaseUri fasturl.URL BaseUri fasturl.URL
WCtx WorkerContext WCtx WorkerContext
Scanned redblackhash.Tree Scanned redblackhash.Tree
}
type PausedOD struct {
Task *Task
Result *TaskResult
BaseUri *fasturl.URL
InProgress int64
} }
type File struct { type File struct {

270
resume.go
View File

@@ -1,270 +0,0 @@
package main
import (
"bytes"
"encoding/binary"
"encoding/gob"
"fmt"
"github.com/beeker1121/goque"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"io"
"os"
"path/filepath"
"strconv"
"sync/atomic"
"time"
)
func init() {
gob.Register(&PausedOD{})
}
func LoadResumeTasks(inRemotes chan<- *OD) {
resumed, err := ResumeTasks()
if err != nil {
logrus.WithError(err).
Error("Failed to resume queued tasks. " +
"/queue is probably corrupt")
err = nil
}
for _, remote := range resumed {
inRemotes <- remote
}
}
func ResumeTasks() (tasks []*OD, err error) {
// Get files in /queue
var queueF *os.File
var entries []os.FileInfo
queueF, err = os.Open("queue")
if err != nil { return nil, err }
defer queueF.Close()
entries, err = queueF.Readdir(-1)
if err != nil { return nil, err }
resumeDur := viper.GetDuration(ConfResume)
for _, entry := range entries {
if !entry.IsDir() { continue }
// Check if name is a number
var id uint64
if id, err = strconv.ParseUint(entry.Name(), 10, 64); err != nil {
continue
}
// Too old to be resumed
timeDelta := time.Since(entry.ModTime())
if resumeDur >= 0 && timeDelta > resumeDur {
removeOldQueue(id)
continue
}
// Load queue
var od *OD
if od, err = resumeQueue(id); err != nil {
logrus.WithError(err).
WithField("id", id).
Warning("Failed to load paused task")
continue
} else if od == nil {
removeOldQueue(id)
continue
}
tasks = append(tasks, od)
}
return tasks, nil
}
func SaveTask(od *OD) (err error) {
dir := filepath.Join("queue",
strconv.FormatUint(od.Task.WebsiteId, 10))
fPath := filepath.Join(dir, "PAUSED")
err = os.Mkdir(dir, 0777)
if err != nil { return err }
// Open pause file
pausedF, err := os.OpenFile(fPath, os.O_CREATE | os.O_WRONLY | os.O_TRUNC, 0666)
if err != nil { return err }
defer pausedF.Close()
err = writePauseFile(od, pausedF)
if err != nil { return err }
return nil
}
func resumeQueue(id uint64) (od *OD, err error) {
logrus.WithField("id", id).
Info("Found unfinished")
fPath := filepath.Join("queue", strconv.FormatUint(id, 10))
// Try to find pause file
pausedF, err := os.Open(filepath.Join(fPath, "PAUSED"))
if os.IsNotExist(err) {
// No PAUSED file => not paused
// not paused => no error
return nil, nil
} else if err != nil {
return nil, err
}
defer pausedF.Close()
od = new(OD)
od.WCtx.OD = od
err = readPauseFile(od, pausedF)
if err != nil { return nil, err }
// Open queue
bq, err := OpenQueue(fPath)
if err != nil { return nil, err }
od.WCtx.Queue = bq
logrus.WithField("id", id).
Info("Resuming task")
return od, nil
}
func removeOldQueue(id uint64) {
if id == 0 {
// TODO Make custom crawl less of an ugly hack
return
}
logrus.WithField("id", id).
Warning("Deleting & returning old task")
name := strconv.FormatUint(id, 10)
fPath := filepath.Join("queue", name)
// Acquire old queue
q, err := goque.OpenQueue(fPath)
if err != nil {
// Queue lock exists, don't delete
logrus.WithField("err", err).
WithField("path", fPath).
Error("Failed to acquire old task")
return
}
// Delete old queue from disk
err = q.Drop()
if err != nil {
// Queue lock exists, don't delete
logrus.WithField("err", err).
WithField("path", fPath).
Error("Failed to delete old task")
return
}
// Delete old crawl result from disk
_ = os.Remove(filepath.Join("crawled", name + ".json"))
// Return task to server
if err := CancelTask(id); err != nil {
// Queue lock exists, don't delete
logrus.WithField("err", err).
WithField("id", id).
Warning("Failed to return unfinished task to server")
return
}
}
func writePauseFile(od *OD, w io.Writer) (err error) {
// Write pause file version
_, err = w.Write([]byte("ODPAUSE-"))
if err != nil { return err }
// Create save state
paused := PausedOD {
Task: &od.Task,
Result: &od.Result,
BaseUri: &od.BaseUri,
InProgress: atomic.LoadInt64(&od.InProgress),
}
// Prepare pause settings
var b bytes.Buffer
pauseEnc := gob.NewEncoder(&b)
err = pauseEnc.Encode(&paused)
if err != nil { return err }
// Write length of pause settings
err = binary.Write(w, binary.LittleEndian, uint64(b.Len()))
if err != nil { return err }
// Write pause settings
_, err = w.Write(b.Bytes())
if err != nil { return err }
// Write pause scan state
err = od.Scanned.Marshal(w)
if err != nil { return err }
// Save mark
_, err = w.Write([]byte("--------"))
if err != nil { return err }
return nil
}
func readPauseFile(od *OD, r io.Reader) (err error) {
// Make the paused struct point to OD fields
// So gob loads values into the OD struct
paused := PausedOD {
Task: &od.Task,
Result: &od.Result,
BaseUri: &od.BaseUri,
}
var version [8]byte
_, err = io.ReadFull(r, version[:])
if err != nil { return err }
if !bytes.Equal(version[:], []byte("ODPAUSE-")) {
return fmt.Errorf("unsupported pause file")
}
// Read pause settings len
var pauseSettingsLen uint64
err = binary.Read(r, binary.LittleEndian, &pauseSettingsLen)
// Read pause settings
pauseDec := gob.NewDecoder(io.LimitReader(r, int64(pauseSettingsLen)))
err = pauseDec.Decode(&paused)
if err != nil { return err }
atomic.StoreInt64(&od.InProgress, paused.InProgress)
err = readPauseStateTree(od, r)
if err != nil {
return fmt.Errorf("failed to read state tree: %s", err)
}
return nil
}
func readPauseStateTree(od *OD, r io.Reader) (err error) {
// Read pause scan state
err = od.Scanned.Unmarshal(r)
if err != nil { return err }
// Check mark
var mark [8]byte
_, err = io.ReadFull(r, mark[:])
if err != nil { return err }
if !bytes.Equal(mark[:], []byte("--------")) {
return fmt.Errorf("corrupt pause file")
}
return nil
}

View File

@@ -1,48 +0,0 @@
package main
import (
"bytes"
"github.com/terorie/od-database-crawler/fasturl"
"testing"
"time"
)
func TestResumeTasks_Empty(t *testing.T) {
start := time.Now().Add(-1 * time.Minute)
od := OD {
Task: Task {
WebsiteId: 213,
Url: "https://the-eye.eu/public/",
},
Result: TaskResult {
StartTime: start,
StartTimeUnix: start.Unix(),
EndTimeUnix: time.Now().Unix(),
WebsiteId: 213,
},
InProgress: 0,
BaseUri: fasturl.URL {
Scheme: fasturl.SchemeHTTPS,
Host: "the-eye.eu",
Path: "/public/",
},
}
od.WCtx.OD = &od
var b bytes.Buffer
var err error
err = writePauseFile(&od, &b)
if err != nil {
t.Fatal(err)
}
buf := b.Bytes()
var od2 OD
b2 := bytes.NewBuffer(buf)
err = readPauseFile(&od2, b2)
if err != nil {
t.Fatal(err)
}
}

View File

@@ -22,57 +22,54 @@ func Schedule(c context.Context, remotes <-chan *OD) {
go Stats(c) go Stats(c)
for remote := range remotes { for remote := range remotes {
if !scheduleNewTask(c, remote) { logrus.WithField("url", remote.BaseUri.String()).
return Info("Starting crawler")
// Collect results
results := make(chan File)
remote.WCtx.OD = remote
// Get queue path
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
// Spawn workers
for i := 0; i < config.Workers; i++ {
go remote.WCtx.Worker(results)
}
// Enqueue initial job
atomic.AddInt32(&numActiveTasks, 1)
remote.WCtx.queueJob(Job{
Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(),
Fails: 0,
})
// Upload result when ready
go remote.Watch(results)
// Sleep if max number of tasks are active
for atomic.LoadInt32(&numActiveTasks) > config.Tasks {
select {
case <-c.Done():
return
case <-time.After(time.Second):
continue
}
} }
} }
} }
func scheduleNewTask(c context.Context, remote *OD) bool {
logrus.WithField("url", remote.BaseUri.String()).
Info("Starting crawler")
// Collect results
results := make(chan File)
remote.WCtx.OD = remote
// Get queue path
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
// Spawn workers
remote.WCtx.SpawnWorkers(c, results, config.Workers)
// Enqueue initial job
atomic.AddInt32(&numActiveTasks, 1)
remote.WCtx.queueJob(Job{
Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(),
Fails: 0,
})
// Upload result when ready
go remote.Watch(results)
// Sleep if max number of tasks are active
for atomic.LoadInt32(&numActiveTasks) > config.Tasks {
if !sleep(time.Second, c) {
break
}
}
return true
}
func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) { func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
if !t.register() { if !t.register() {
return return
@@ -120,7 +117,7 @@ func (o *OD) Watch(results chan File) {
// Open crawl results file // Open crawl results file
f, err := os.OpenFile( f, err := os.OpenFile(
filePath, filePath,
os.O_CREATE | os.O_RDWR | os.O_APPEND, os.O_CREATE | os.O_RDWR | os.O_TRUNC,
0644, 0644,
) )
if err != nil { if err != nil {
@@ -162,33 +159,16 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
defer close(results) defer close(results)
// Wait for all jobs on remote to finish // Wait for all jobs on remote to finish
for { o.Wait.Wait()
// Natural finish
if atomic.LoadInt64(&o.InProgress) == 0 {
o.onTaskFinished()
return
}
// Abort
if atomic.LoadInt32(&o.WCtx.aborted) != 0 {
// Wait for all workers to finish
o.WCtx.workers.Wait()
o.onTaskPaused()
return
}
time.Sleep(500 * time.Millisecond)
}
}
func (o *OD) onTaskFinished() {
defer atomic.AddInt32(&numActiveTasks, -1)
// Close queue // Close queue
if err := o.WCtx.Queue.Close(); err != nil { if err := o.WCtx.Queue.Close(); err != nil {
panic(err) panic(err)
} }
atomic.AddInt32(&numActiveTasks, -1)
// Log finish // Log finish
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId, "id": o.Task.WebsiteId,
"url": o.BaseUri.String(), "url": o.BaseUri.String(),
@@ -211,37 +191,6 @@ func (o *OD) onTaskFinished() {
} }
} }
func (o *OD) onTaskPaused() {
defer atomic.AddInt32(&numActiveTasks, -1)
// Close queue
if err := o.WCtx.Queue.Close(); err != nil {
panic(err)
}
// Set current end time
o.Result.EndTimeUnix = time.Now().Unix()
// Save task metadata
err := SaveTask(o)
if err != nil {
// Log finish
logrus.WithFields(logrus.Fields{
"err": err.Error(),
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
}).Error("Failed to save crawler state")
return
}
// Log finish
logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"duration": time.Since(o.Result.StartTime),
}).Info("Crawler paused")
}
func (t *Task) Collect(results chan File, f *os.File, errC chan<- error) { func (t *Task) Collect(results chan File, f *os.File, errC chan<- error) {
err := t.collect(results, f) err := t.collect(results, f)
if err != nil { if err != nil {

View File

@@ -40,7 +40,9 @@ func FetchTask() (t *Task, err error) {
t = new(Task) t = new(Task)
err = json.NewDecoder(res.Body).Decode(t) err = json.NewDecoder(res.Body).Decode(t)
if err != nil { return } if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil { return }
return return
} }

View File

@@ -1,7 +1,6 @@
package main package main
import ( import (
"context"
"github.com/beeker1121/goque" "github.com/beeker1121/goque"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"math" "math"
@@ -19,29 +18,10 @@ type WorkerContext struct {
Queue *BufferedQueue Queue *BufferedQueue
lastRateLimit time.Time lastRateLimit time.Time
numRateLimits int numRateLimits int
workers sync.WaitGroup
aborted int32
} }
func (w *WorkerContext) SpawnWorkers(c context.Context, results chan<- File, n int) { func (w *WorkerContext) Worker(results chan<- File) {
w.workers.Add(n)
for i := 0; i < n; i++ {
go w.Worker(c, results)
}
}
func (w *WorkerContext) Worker(c context.Context, results chan<- File) {
defer w.workers.Done()
for { for {
select {
case <-c.Done():
// Not yet done
atomic.StoreInt32(&w.aborted, 1)
return
default:
}
job, err := w.Queue.Dequeue() job, err := w.Queue.Dequeue()
switch err { switch err {
case goque.ErrEmpty: case goque.ErrEmpty:
@@ -176,7 +156,7 @@ func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
} }
func (w *WorkerContext) queueJob(job Job) { func (w *WorkerContext) queueJob(job Job) {
atomic.AddInt64(&w.OD.InProgress, 1) w.OD.Wait.Add(1)
if w.numRateLimits > 0 { if w.numRateLimits > 0 {
if time.Since(w.lastRateLimit) > 5 * time.Second { if time.Since(w.lastRateLimit) > 5 * time.Second {
@@ -193,7 +173,7 @@ func (w *WorkerContext) queueJob(job Job) {
} }
func (w *WorkerContext) finishJob() { func (w *WorkerContext) finishJob() {
atomic.AddInt64(&w.OD.InProgress, -1) w.OD.Wait.Done()
} }
func isErrSilent(err error) bool { func isErrSilent(err error) bool {