5 Commits

Author SHA1 Message Date
Simon Fortier
88bf634cb6 Update config.yml 2019-04-05 09:30:20 -04:00
dependabot[bot]
796cf6ac23 Bump github.com/spf13/viper from 1.3.1 to 1.3.2 2019-03-14 10:57:44 +01:00
Richard Patel
defaf54e66 Bump github.com/sirupsen/logrus from 1.3.0 to 1.4.0 2019-03-12 19:37:06 +01:00
dependabot[bot]
230824c58f Bump github.com/sirupsen/logrus from 1.3.0 to 1.4.0
Bumps [github.com/sirupsen/logrus](https://github.com/sirupsen/logrus) from 1.3.0 to 1.4.0.
- [Release notes](https://github.com/sirupsen/logrus/releases)
- [Changelog](https://github.com/sirupsen/logrus/blob/master/CHANGELOG.md)
- [Commits](https://github.com/sirupsen/logrus/compare/v1.3.0...v1.4.0)

Signed-off-by: dependabot[bot] <support@dependabot.com>
2019-03-12 04:48:03 +00:00
terorie
d3c199b738 Update README
Add some badges and update description
2019-02-28 23:57:50 +01:00
11 changed files with 302 additions and 402 deletions

View File

@@ -1,11 +1,14 @@
# od-database Go crawler 🚀 # OD-Database Crawler 🕷
[![Build Status](https://travis-ci.org/terorie/od-database-crawler.svg?branch=master)](https://travis-ci.org/terorie/od-database-crawler) [![Build Status](https://travis-ci.org/terorie/od-database-crawler.svg?branch=master)](https://travis-ci.org/terorie/od-database-crawler)
> by terorie 2018 :P [![](https://tokei.rs/b1/github/terorie/od-database-crawler)](https://github.com/terorie/od-database-crawler)
[![CodeFactor](https://www.codefactor.io/repository/github/terorie/od-database-crawler/badge/master)](https://www.codefactor.io/repository/github/terorie/od-database-crawler/overview/master)
* Crawler for [__OD-Database__](https://github.com/simon987/od-database) * Crawler for [__OD-Database__](https://github.com/simon987/od-database)
* In production at https://od-db.the-eye.eu/
* Over 880 TB actively crawled
* Crawls HTTP open directories (standard Web Server Listings) * Crawls HTTP open directories (standard Web Server Listings)
* Gets name, path, size and modification time of all files * Gets name, path, size and modification time of all files
* Lightweight and fast: __over 9000 requests per second__ on a standard laptop * Lightweight and fast
https://od-db.the-eye.eu/ https://od-db.the-eye.eu/

114
config.go
View File

@@ -13,44 +13,38 @@ import (
) )
var config struct { var config struct {
TrackerUrl string ServerUrl string
TrackerProject int Token string
TrackerAlias string ServerTimeout time.Duration
WsBucketScheme string Recheck time.Duration
WsBucketHost string ChunkSize int64
ServerTimeout time.Duration Retries int
Recheck time.Duration Workers int
ChunkSize int64 UserAgent string
Retries int Tasks int32
Workers int Verbose bool
UserAgent string PrintHTTP bool
Tasks int32 JobBufferSize int
Verbose bool
PrintHTTP bool
JobBufferSize int
} }
var onlineMode bool var onlineMode bool
const ( const (
ConfTrackerUrl = "server.url" ConfServerUrl = "server.url"
ConfTrackerProject = "server.project" ConfToken = "server.token"
ConfTrackerAlias = "server.alias" ConfServerTimeout = "server.timeout"
ConfWsBucketScheme = "server.ws_bucket_scheme" ConfRecheck = "server.recheck"
ConfWsBucketHost = "server.ws_bucket_host" ConfCooldown = "server.cooldown"
ConfServerTimeout = "server.timeout" ConfChunkSize = "server.upload_chunk"
ConfRecheck = "server.recheck" ConfUploadRetries = "server.upload_retries"
ConfCooldown = "server.cooldown"
ConfChunkSize = "server.upload_chunk"
ConfUploadRetries = "server.upload_retries"
ConfUploadRetryInterval = "server.upload_retry_interval" ConfUploadRetryInterval = "server.upload_retry_interval"
ConfTasks = "crawl.tasks" ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries" ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections" ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent" ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout" ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout" ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer" ConfJobBufferSize = "crawl.job_buffer"
ConfCrawlStats = "output.crawl_stats" ConfCrawlStats = "output.crawl_stats"
@@ -67,45 +61,39 @@ func prepareConfig() {
pf.StringVar(&configFile, "config", "", "Config file") pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG") configFile = os.Getenv("OD_CONFIG")
pf.String(ConfTrackerUrl, "https://tt.the-eye.eu/api", "task_tracker api URL") pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
pf.String(ConfTrackerProject, "1", "task_tracker project id") pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
pf.String(ConfWsBucketScheme, "wss", "ws_bucket scheme") pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout")
pf.String(ConfWsBucketHost, "wsb.the-eye.eu", "ws_bucket host") pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs")
pf.String(ConfTrackerAlias, "changeme", "task_tracker worker alias") pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout")
pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfCooldown, 1*time.Minute, "OD-DB: Time to wait after a server-side error")
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size") pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries") pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
pf.Duration(ConfUploadRetryInterval, 30*time.Second, "OD-DB: Time to wait between upload retries") pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries")
pf.Uint(ConfTasks, 25, "Crawler: Max concurrent tasks") pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
pf.Uint(ConfWorkers, 1, "Crawler: Connections per server") pf.Uint(ConfWorkers, 4, "Crawler: Connections per server")
pf.Uint(ConfRetries, 5, "Crawler: Request retries") pf.Uint(ConfRetries, 5, "Crawler: Request retries")
pf.Duration(ConfDialTimeout, 10*time.Second, "Crawler: Handshake timeout") pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfTimeout, 30*time.Second, "Crawler: Request timeout") pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout")
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent") pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
pf.Int(ConfJobBufferSize, -1, "Crawler: Task queue cache size") pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
pf.Duration(ConfCrawlStats, 500*time.Second, "Log: Crawl stats interval") pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 500*time.Second, "Log: Resource stats interval") pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
pf.Bool(ConfVerbose, false, "Log: Print every listed dir") pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
@@ -157,19 +145,17 @@ func readConfig() {
} }
if onlineMode { if onlineMode {
config.TrackerUrl = viper.GetString(ConfTrackerUrl) config.ServerUrl = viper.GetString(ConfServerUrl)
if config.TrackerUrl == "" { if config.ServerUrl == "" {
configMissing(ConfTrackerUrl) configMissing(ConfServerUrl)
}
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
config.Token = viper.GetString(ConfToken)
if config.Token == "" {
configMissing(ConfToken)
} }
config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/")
} }
config.TrackerProject = viper.GetInt(ConfTrackerProject)
config.TrackerAlias = viper.GetString(ConfTrackerAlias)
config.WsBucketHost = viper.GetString(ConfWsBucketHost)
config.WsBucketScheme = viper.GetString(ConfWsBucketScheme)
config.ServerTimeout = viper.GetDuration(ConfServerTimeout) config.ServerTimeout = viper.GetDuration(ConfServerTimeout)
@@ -209,11 +195,9 @@ func readConfig() {
} }
if filePath := viper.GetString(ConfLogFile); filePath != "" { if filePath := viper.GetString(ConfLogFile); filePath != "" {
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) f, err := os.OpenFile(filePath, os.O_CREATE | os.O_WRONLY | os.O_APPEND, 0644)
bufWriter := bufio.NewWriter(f) bufWriter := bufio.NewWriter(f)
if err != nil { if err != nil { panic(err) }
panic(err)
}
exitHooks.Add(func() { exitHooks.Add(func() {
bufWriter.Flush() bufWriter.Flush()
f.Close() f.Close()

View File

@@ -1,14 +1,10 @@
# OD-Database server settings # OD-Database server settings
server: server:
# Connection URL # Connection URL
url: https://tt.the-eye.eu/api url: http://od-db.mine.terorie.com/api
# OD-Database project id (for crawling)
project: 1 # Server auth token
# Your worker alias token:
alias: changeme
# Websocket bucket host & scheme (ws/wss)
ws_bucket_host: https://wsb.the-eye.eu
ws_bucket_scheme: wss
# Request timeout # Request timeout
timeout: 60s timeout: 60s
@@ -21,7 +17,11 @@ server:
# Time to wait after receiving an error # Time to wait after receiving an error
# from the server. Doesn't apply to uploads. # from the server. Doesn't apply to uploads.
cooldown: 1s cooldown: 30s
# Upload chunk size
# If the value is too high, the upload fails.
upload_chunk: 1 MB
upload_retries: 10 upload_retries: 10
upload_retry_interval: 30s upload_retry_interval: 30s
@@ -29,10 +29,10 @@ server:
# Log output settings # Log output settings
output: output:
# Crawl statistics # Crawl statistics
crawl_stats: 1m crawl_stats: 1s
# CPU/RAM/Job queue stats # CPU/RAM/Job queue stats
resource_stats: 1m resource_stats: 10s
# More output? (Every listed dir) # More output? (Every listed dir)
verbose: false verbose: false

13
go.mod Normal file
View File

@@ -0,0 +1,13 @@
module github.com/terorie/od-database-crawler
require (
github.com/beeker1121/goque v2.0.1+incompatible
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/sirupsen/logrus v1.4.0
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.2
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect
github.com/valyala/fasthttp v1.2.0
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3
)

70
go.sum Normal file
View File

@@ -0,0 +1,70 @@
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8gh8=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0 h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38=
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw=
github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/fasthttp v1.2.0 h1:dzZJf2IuMiclVjdw0kkT+f9u4YdrapbNyGAN47E/qnk=
github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

47
jenkins/Jenkinsfile vendored
View File

@@ -1,47 +0,0 @@
def remote = [:]
remote.name = 'remote'
remote.host = env.DEPLOY_HOST
remote.user = env.DEPLOY_USER
remote.identityFile = '/var/lib/jenkins/.ssh/id_rsa'
remote.knownHosts = '/var/lib/jenkins/.ssh/known_hosts'
remote.allowAnyHosts = true
remote.retryCount = 3
remote.retryWaitSec = 3
logLevel = 'FINER'
pipeline {
agent none
environment {
GOOS='linux'
CGO_ENABLED='0'
HOME='.'
}
stages {
stage('Build') {
agent {
docker {
image 'golang:latest'
}
}
steps {
sh 'mkdir -p /go/src/github.com/terorie/od-database-crawler'
sh 'cp -r *.go fasturl ds jenkins/build.sh "/go/src/github.com/terorie/od-database-crawler"'
sh 'cd /go/src/github.com/terorie/od-database-crawler && go get ./...'
sh './jenkins/build.sh'
stash includes: 'dist/', name: 'dist'
}
}
stage('Deploy') {
agent none
steps {
node('master') {
unstash 'dist'
sshCommand remote: remote, command: "ls od-database-crawler/"
sshPut remote: remote, from: 'dist/', into: 'od-database-crawler'
}
}
}
}
}

View File

@@ -1,23 +0,0 @@
#!/usr/bin/env bash
appname="od-database-crawler"
outdir="dist/"
tag="${BUILD_ID}_$(date +%Y-%m-%d)"
rm -rf "./${outdir}"
mkdir build 2> /dev/null
name=${outdir}${appname}-${tag}-linux
GOOS="linux" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-mac
GOOS="darwin" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-freebsd
GOOS="freebsd" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}

View File

@@ -7,29 +7,19 @@ import (
"time" "time"
) )
type ResultCode int
const (
TR_OK = ResultCode(iota)
TR_FAIL = 1
TR_SKIP = 2
)
type Task struct { type Task struct {
WebsiteId uint64 `json:"website_id"` WebsiteId uint64 `json:"website_id"`
Url string `json:"url"` Url string `json:"url"`
UploadToken string `json:"upload_token"`
TaskId int64
} }
type TaskResult struct { type TaskResult struct {
ResultCode ResultCode `json:"status_code"` StatusCode string `json:"status_code"`
FileCount uint64 `json:"file_count"` FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"` ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"` StartTime time.Time `json:"-"`
StartTimeUnix int64 `json:"start_time"` StartTimeUnix int64 `json:"start_time"`
EndTimeUnix int64 `json:"end_time"` EndTimeUnix int64 `json:"end_time"`
WebsiteId uint64 `json:"website_id"` WebsiteId uint64 `json:"website_id"`
} }
type Job struct { type Job struct {
@@ -61,16 +51,13 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
defer o.Scanned.Unlock() defer o.Scanned.Unlock()
exists = o.Scanned.Get(k) exists = o.Scanned.Get(k)
if exists { if exists { return true }
return true
}
o.Scanned.Put(k) o.Scanned.Put(k)
return false return false
} }
type errorString string type errorString string
func (e errorString) Error() string { func (e errorString) Error() string {
return string(e) return string(e)
} }

View File

@@ -34,16 +34,13 @@ func Schedule(c context.Context, remotes <-chan *OD) {
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId)) queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue // Delete existing queue
if err := os.RemoveAll(queuePath); err != nil { if err := os.RemoveAll(queuePath);
panic(err) err != nil { panic(err) }
}
// Start new queue // Start new queue
var err error var err error
remote.WCtx.Queue, err = OpenQueue(queuePath) remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { if err != nil { panic(err) }
panic(err)
}
// Spawn workers // Spawn workers
for i := 0; i < config.Workers; i++ { for i := 0; i < config.Workers; i++ {
@@ -80,12 +77,12 @@ func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
globalWait.Add(1) globalWait.Add(1)
now := time.Now() now := time.Now()
od := &OD{ od := &OD {
Task: *t, Task: *t,
BaseUri: *u, BaseUri: *u,
Result: TaskResult{ Result: TaskResult {
WebsiteId: t.WebsiteId, WebsiteId: t.WebsiteId,
StartTime: now, StartTime: now,
StartTimeUnix: now.Unix(), StartTimeUnix: now.Unix(),
}, },
} }
@@ -120,7 +117,7 @@ func (o *OD) Watch(results chan File) {
// Open crawl results file // Open crawl results file
f, err := os.OpenFile( f, err := os.OpenFile(
filePath, filePath,
os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.O_CREATE | os.O_RDWR | os.O_TRUNC,
0644, 0644,
) )
if err != nil { if err != nil {
@@ -148,7 +145,7 @@ func (o *OD) Watch(results chan File) {
} }
// Upload results // Upload results
err = PushResult(&o.Task, f) err = PushResult(&o.Result, f)
if err != nil { if err != nil {
logrus.WithError(err). logrus.WithError(err).
Error("Failed uploading crawl results") Error("Failed uploading crawl results")
@@ -173,18 +170,24 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
// Log finish // Log finish
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId, "id": o.Task.WebsiteId,
"url": o.BaseUri.String(), "url": o.BaseUri.String(),
"duration": time.Since(o.Result.StartTime), "duration": time.Since(o.Result.StartTime),
}).Info("Crawler finished") }).Info("Crawler finished")
// Set status code // Set status code
now := time.Now() now := time.Now()
o.Result.EndTimeUnix = now.Unix() o.Result.EndTimeUnix = now.Unix()
if atomic.LoadUint64(&o.Result.ErrorCount) != 0 { fileCount := atomic.LoadUint64(&o.Result.FileCount)
o.Result.ResultCode = TR_FAIL if fileCount == 0 {
errorCount := atomic.LoadUint64(&o.Result.ErrorCount)
if errorCount == 0 {
o.Result.StatusCode = "empty"
} else {
o.Result.StatusCode = "directory listing failed"
}
} else { } else {
o.Result.ResultCode = TR_OK o.Result.StatusCode = "success"
} }
} }
@@ -202,17 +205,11 @@ func (t *Task) collect(results chan File, f *os.File) error {
result.Path = fasturl.PathUnescape(result.Path) result.Path = fasturl.PathUnescape(result.Path)
result.Name = fasturl.PathUnescape(result.Name) result.Name = fasturl.PathUnescape(result.Name)
resJson, err := json.Marshal(result) resJson, err := json.Marshal(result)
if err != nil { if err != nil { panic(err) }
panic(err)
}
_, err = f.Write(resJson) _, err = f.Write(resJson)
if err != nil { if err != nil { return err }
return err
}
_, err = f.Write([]byte{'\n'}) _, err = f.Write([]byte{'\n'})
if err != nil { if err != nil { return err }
return err
}
} }
return nil return nil

300
server.go
View File

@@ -2,125 +2,53 @@ package main
import ( import (
"bytes" "bytes"
"encoding/base64"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/fasthttp/websocket"
"github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/time/rate" "github.com/spf13/viper"
"io" "io"
"io/ioutil" "mime/multipart"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
"strconv" "strconv"
"time"
) )
var serverWorker *TrackerWorker var serverClient = http.Client {
Timeout: config.ServerTimeout,
var serverClient = http.Client{
Timeout: config.ServerTimeout,
Transport: new(ServerTripper), Transport: new(ServerTripper),
} }
var serverUserAgent = "od-database-crawler/" + rootCmd.Version var serverUserAgent = "od-database-crawler/" + rootCmd.Version
func getOrCreateWorker() {
if _, err := os.Stat("worker.json"); os.IsNotExist(err) {
req := CreateTrackerWorkerRequest{
Alias: config.TrackerAlias,
}
body, _ := json.Marshal(&req)
buf := bytes.NewBuffer(body)
resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf)
workerResponse := CreateTrackerWorkerResponse{}
respBody, _ := ioutil.ReadAll(resp.Body)
_ = json.Unmarshal(respBody, &workerResponse)
workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker)
fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
_, _ = fp.Write(workerJsonData)
//Request ASSIGN permission
serverWorker = &workerResponse.Content.Worker
accessReq, _ := json.Marshal(WorkerAccessRequest{
Project: config.TrackerProject,
Assign: true,
Submit: false,
})
buf = bytes.NewBuffer(accessReq)
res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf)
if err != nil {
panic(err)
}
logrus.WithFields(logrus.Fields{
"response": res.StatusCode,
}).Info("Requested ASSIGN permission")
} else {
var worker TrackerWorker
fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600)
workerJsonData, _ := ioutil.ReadAll(fp)
_ = json.Unmarshal(workerJsonData, &worker)
serverWorker = &worker
}
}
func FetchTask() (t *Task, err error) { func FetchTask() (t *Task, err error) {
res, err := serverClient.PostForm(
if serverWorker == nil { config.ServerUrl + "/task/get",
getOrCreateWorker() url.Values{ "token": {config.Token} })
} if err != nil { return }
res, err := serverClient.Get(config.TrackerUrl + "/task/get/" + strconv.Itoa(config.TrackerProject))
if err != nil {
return
}
defer res.Body.Close() defer res.Body.Close()
switch res.StatusCode { switch res.StatusCode {
case 200: case 200:
break break
case 404, 500:
return nil, nil
default: default:
return nil, fmt.Errorf("http %s", res.Status) return nil, fmt.Errorf("http %s", res.Status)
} }
jsonResponse := FetchTaskResponse{} t = new(Task)
err = json.NewDecoder(res.Body).Decode(&jsonResponse) err = json.NewDecoder(res.Body).Decode(t)
if _, ok := err.(*json.SyntaxError); ok { if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON") return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil { } else if err != nil { return }
return
}
if !jsonResponse.Ok {
if jsonResponse.Message == "No task available" {
return nil, nil
}
return nil, errors.New(jsonResponse.Message)
}
task := Task{}
err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
t = &task
t.TaskId = jsonResponse.Content.Task.Id
return return
} }
func PushResult(task *Task, f *os.File) (err error) { func PushResult(result *TaskResult, f *os.File) (err error) {
if task.WebsiteId == 0 { if result.WebsiteId == 0 {
// Not a real result, don't push // Not a real result, don't push
return nil return nil
} }
@@ -131,10 +59,10 @@ func PushResult(task *Task, f *os.File) (err error) {
return return
} }
err = uploadWebsocket(f, task.UploadToken) err = uploadChunks(result.WebsiteId, f)
if err != nil { if err != nil {
logrus.Errorf("Failed to upload file list: %s", err) logrus.Errorf("Failed to upload file list: %s", err)
err2 := releaseTask(task, TR_FAIL) err2 := CancelTask(result.WebsiteId)
if err2 != nil { if err2 != nil {
logrus.Error(err2) logrus.Error(err2)
} }
@@ -142,62 +70,91 @@ func PushResult(task *Task, f *os.File) (err error) {
} }
// Upload result ignoring errors // Upload result ignoring errors
_ = releaseTask(task, TR_OK) uploadResult(result)
return return
} }
func uploadWebsocket(f *os.File, token string) (err error) { func uploadChunks(websiteId uint64, f *os.File) error {
eof := false
for iter := 1; !eof; iter++ {
// TODO Stream with io.Pipe?
var b bytes.Buffer
u := url.URL{Scheme: config.WsBucketScheme, Host: config.WsBucketHost, Path: "/upload"} multi := multipart.NewWriter(&b)
header := http.Header{} // Set upload fields
header.Add("X-Upload-Token", token) var err error
conn, _, err := websocket.DefaultDialer.Dial(u.String(), header) err = multi.WriteField("token", config.Token)
if err != nil { if err != nil { return err }
return err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId))
if err != nil { return err }
// Copy chunk to file_list
formFile, err := multi.CreateFormFile("file_list", "file_list")
var n int64
n, err = io.CopyN(formFile, f, config.ChunkSize)
if err != io.EOF && err != nil {
return err
}
if n == 0 {
// Don't upload, no content
return nil
} else if n < config.ChunkSize {
err = nil
// Break at end of iteration
eof = true
}
multi.Close()
for retries := 0; retries < viper.GetInt(ConfUploadRetries); retries++ {
if retries > 0 {
// Error occurred, retry upload
time.Sleep(viper.GetDuration(ConfUploadRetryInterval))
}
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { continue }
res, err := serverClient.Do(req)
if err != nil { continue }
res.Body.Close()
if res.StatusCode != http.StatusOK {
logrus.WithField("status", res.Status).
WithField("part", iter).
Errorf("Upload failed")
continue
}
// Upload successful
break
}
logrus.WithField("id", websiteId).
WithField("part", iter).
Infof("Uploaded files chunk")
} }
conn.EnableWriteCompression(true) //TODO: Is this necessary?
socketWriter, _ := conn.NextWriter(websocket.BinaryMessage)
_, _ = io.Copy(socketWriter, f)
err = socketWriter.Close()
if err != nil {
logrus.Error("FIXME: couldn't do file upload")
return
}
err = conn.Close()
if err != nil {
return
}
return nil return nil
} }
func releaseTask(task *Task, taskResult ResultCode) (err error) { func uploadResult(result *TaskResult) (err error) {
resultEnc, err := json.Marshal(result)
if err != nil { panic(err) }
req := releaseTaskRequest{ res, err := serverClient.PostForm(
TaskId: task.TaskId, config.ServerUrl + "/task/complete",
ResultCode: taskResult, url.Values {
// TODO Will implement verification in a later ODDB update "token": {config.Token},
Verification: 0, "result": {string(resultEnc)},
} },
resultEnc, err := json.Marshal(&req)
if err != nil {
panic(err)
}
body := bytes.NewBuffer(resultEnc)
res, err := serverClient.Post(
config.TrackerUrl+"/task/release",
"application/json",
body,
) )
if err != nil { if err != nil { return }
return
}
res.Body.Close() res.Body.Close()
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
@@ -207,66 +164,27 @@ func releaseTask(task *Task, taskResult ResultCode) (err error) {
return return
} }
func CancelTask(websiteId uint64) (err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/cancel",
url.Values{
"token": {config.Token},
"website_id": {strconv.FormatUint(websiteId, 10)},
},
)
if err != nil { return }
res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to cancel task: %s", res.Status)
}
return
}
type ServerTripper struct{} type ServerTripper struct{}
func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) { func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
req.Header.Set("User-Agent", serverUserAgent) req.Header.Set("User-Agent", serverUserAgent)
//TODO: Use task_tracker/client ?
if serverWorker != nil {
req.Header.Add("X-Worker-Id", strconv.Itoa(serverWorker.Id))
req.Header.Add("X-Secret", base64.StdEncoding.EncodeToString(serverWorker.Secret))
}
return http.DefaultTransport.RoundTrip(req) return http.DefaultTransport.RoundTrip(req)
} }
// https://github.com/simon987/task_tracker/blob/master/api/models.go
type releaseTaskRequest struct {
TaskId int64 `json:"task_id"`
ResultCode ResultCode `json:"result"`
Verification int64 `json:"verification"`
}
type WorkerAccessRequest struct {
Assign bool `json:"assign"`
Submit bool `json:"submit"`
Project int `json:"project"`
}
type FetchTaskResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Task struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Project struct {
Id int64 `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
} `json:"project"`
Recipe string `json:"recipe"`
} `json:"task"`
} `json:"content"`
}
type TrackerWorker struct {
Alias string `json:"alias"`
Id int `json:"id"`
Secret []byte `json:"secret"`
}
type CreateTrackerWorkerResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Worker TrackerWorker `json:"worker"`
} `json:"content"`
}
type CreateTrackerWorkerRequest struct {
Alias string `json:"alias"`
}

View File

@@ -14,8 +14,8 @@ import (
var globalWait sync.WaitGroup var globalWait sync.WaitGroup
type WorkerContext struct { type WorkerContext struct {
OD *OD OD *OD
Queue *BufferedQueue Queue *BufferedQueue
lastRateLimit time.Time lastRateLimit time.Time
numRateLimits int numRateLimits int
} }
@@ -56,16 +56,16 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
if !shouldRetry(err) { if !shouldRetry(err) {
atomic.AddUint64(&totalAborted, 1) atomic.AddUint64(&totalAborted, 1)
//logrus.WithField("url", job.UriStr). logrus.WithField("url", job.UriStr).
// WithError(err). WithError(err).
// Error("Giving up after failure") Error("Giving up after failure")
return return
} }
if job.Fails > config.Retries { if job.Fails > config.Retries {
atomic.AddUint64(&totalAborted, 1) atomic.AddUint64(&totalAborted, 1)
//logrus.WithField("url", job.UriStr). logrus.WithField("url", job.UriStr).
// Errorf("Giving up after %d fails", job.Fails) Errorf("Giving up after %d fails", job.Fails)
} else { } else {
atomic.AddUint64(&totalRetries, 1) atomic.AddUint64(&totalRetries, 1)
if err == ErrRateLimit { if err == ErrRateLimit {
@@ -88,9 +88,7 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
} }
func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) { func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
if len(job.Uri.Path) == 0 { if len(job.Uri.Path) == 0 { return }
return
}
if job.Uri.Path[len(job.Uri.Path)-1] == '/' { if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
// Load directory // Load directory
links, err := GetDir(job, f) links, err := GetDir(job, f)
@@ -161,10 +159,10 @@ func (w *WorkerContext) queueJob(job Job) {
w.OD.Wait.Add(1) w.OD.Wait.Add(1)
if w.numRateLimits > 0 { if w.numRateLimits > 0 {
if time.Since(w.lastRateLimit) > 5*time.Second { if time.Since(w.lastRateLimit) > 5 * time.Second {
w.numRateLimits = 0 w.numRateLimits = 0
} else { } else {
time.Sleep(time.Duration(math.Sqrt(float64(50*w.numRateLimits))) * time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) *
100 * time.Millisecond) 100 * time.Millisecond)
} }
} }