7 Commits

Author SHA1 Message Date
simon987
a962c60b82 Don't panic on file upload error 2019-04-06 14:56:22 -04:00
simon987
24f0bd91f7 Remove debug messages & don't use disk queue by default 2019-04-06 12:21:35 -04:00
simon987
84c10e1981 Change default config 2019-04-05 17:53:00 -04:00
simon987
860fa79327 Jenkins setup 2019-04-04 19:22:14 -04:00
simon987
76bc8293d6 minimum viable 2019-03-30 09:02:55 -04:00
simon987
3470be6086 More work on task_tracker integration 2019-03-09 16:38:04 -05:00
Richard Patel
60471a081e Switch to simon987/task_tracker 2019-02-28 23:51:26 +01:00
11 changed files with 402 additions and 302 deletions

View File

@@ -1,14 +1,11 @@
# OD-Database Crawler 🕷
# od-database Go crawler 🚀
[![Build Status](https://travis-ci.org/terorie/od-database-crawler.svg?branch=master)](https://travis-ci.org/terorie/od-database-crawler)
[![](https://tokei.rs/b1/github/terorie/od-database-crawler)](https://github.com/terorie/od-database-crawler)
[![CodeFactor](https://www.codefactor.io/repository/github/terorie/od-database-crawler/badge/master)](https://www.codefactor.io/repository/github/terorie/od-database-crawler/overview/master)
> by terorie 2018 :P
* Crawler for [__OD-Database__](https://github.com/simon987/od-database)
* In production at https://od-db.the-eye.eu/
* Over 880 TB actively crawled
* Crawls HTTP open directories (standard Web Server Listings)
* Gets name, path, size and modification time of all files
* Lightweight and fast
* Lightweight and fast: __over 9000 requests per second__ on a standard laptop
https://od-db.the-eye.eu/

114
config.go
View File

@@ -13,38 +13,44 @@ import (
)
var config struct {
ServerUrl string
Token string
ServerTimeout time.Duration
Recheck time.Duration
ChunkSize int64
Retries int
Workers int
UserAgent string
Tasks int32
Verbose bool
PrintHTTP bool
JobBufferSize int
TrackerUrl string
TrackerProject int
TrackerAlias string
WsBucketScheme string
WsBucketHost string
ServerTimeout time.Duration
Recheck time.Duration
ChunkSize int64
Retries int
Workers int
UserAgent string
Tasks int32
Verbose bool
PrintHTTP bool
JobBufferSize int
}
var onlineMode bool
const (
ConfServerUrl = "server.url"
ConfToken = "server.token"
ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown"
ConfChunkSize = "server.upload_chunk"
ConfUploadRetries = "server.upload_retries"
ConfTrackerUrl = "server.url"
ConfTrackerProject = "server.project"
ConfTrackerAlias = "server.alias"
ConfWsBucketScheme = "server.ws_bucket_scheme"
ConfWsBucketHost = "server.ws_bucket_host"
ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown"
ConfChunkSize = "server.upload_chunk"
ConfUploadRetries = "server.upload_retries"
ConfUploadRetryInterval = "server.upload_retry_interval"
ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout"
ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer"
ConfCrawlStats = "output.crawl_stats"
@@ -61,39 +67,45 @@ func prepareConfig() {
pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
pf.String(ConfTrackerUrl, "https://tt.the-eye.eu/api", "task_tracker api URL")
pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
pf.String(ConfTrackerProject, "1", "task_tracker project id")
pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout")
pf.String(ConfWsBucketScheme, "wss", "ws_bucket scheme")
pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs")
pf.String(ConfWsBucketHost, "wsb.the-eye.eu", "ws_bucket host")
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
pf.String(ConfTrackerAlias, "changeme", "task_tracker worker alias")
pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout")
pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfCooldown, 1*time.Minute, "OD-DB: Time to wait after a server-side error")
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries")
pf.Duration(ConfUploadRetryInterval, 30*time.Second, "OD-DB: Time to wait between upload retries")
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
pf.Uint(ConfTasks, 25, "Crawler: Max concurrent tasks")
pf.Uint(ConfWorkers, 4, "Crawler: Connections per server")
pf.Uint(ConfWorkers, 1, "Crawler: Connections per server")
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfDialTimeout, 10*time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout")
pf.Duration(ConfTimeout, 30*time.Second, "Crawler: Request timeout")
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
pf.Int(ConfJobBufferSize, -1, "Crawler: Task queue cache size")
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
pf.Duration(ConfCrawlStats, 500*time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
pf.Duration(ConfAllocStats, 500*time.Second, "Log: Resource stats interval")
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
@@ -145,17 +157,19 @@ func readConfig() {
}
if onlineMode {
config.ServerUrl = viper.GetString(ConfServerUrl)
if config.ServerUrl == "" {
configMissing(ConfServerUrl)
}
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
config.Token = viper.GetString(ConfToken)
if config.Token == "" {
configMissing(ConfToken)
config.TrackerUrl = viper.GetString(ConfTrackerUrl)
if config.TrackerUrl == "" {
configMissing(ConfTrackerUrl)
}
config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/")
}
config.TrackerProject = viper.GetInt(ConfTrackerProject)
config.TrackerAlias = viper.GetString(ConfTrackerAlias)
config.WsBucketHost = viper.GetString(ConfWsBucketHost)
config.WsBucketScheme = viper.GetString(ConfWsBucketScheme)
config.ServerTimeout = viper.GetDuration(ConfServerTimeout)
@@ -195,9 +209,11 @@ func readConfig() {
}
if filePath := viper.GetString(ConfLogFile); filePath != "" {
f, err := os.OpenFile(filePath, os.O_CREATE | os.O_WRONLY | os.O_APPEND, 0644)
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
bufWriter := bufio.NewWriter(f)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
exitHooks.Add(func() {
bufWriter.Flush()
f.Close()

View File

@@ -1,10 +1,14 @@
# OD-Database server settings
server:
# Connection URL
url: http://od-db.mine.terorie.com/api
# Server auth token
token:
url: https://tt.the-eye.eu/api
# OD-Database project id (for crawling)
project: 1
# Your worker alias
alias: changeme
# Websocket bucket host & scheme (ws/wss)
ws_bucket_host: https://wsb.the-eye.eu
ws_bucket_scheme: wss
# Request timeout
timeout: 60s
@@ -17,11 +21,7 @@ server:
# Time to wait after receiving an error
# from the server. Doesn't apply to uploads.
cooldown: 30s
# Upload chunk size
# If the value is too high, the upload fails.
upload_chunk: 1 MB
cooldown: 1s
upload_retries: 10
upload_retry_interval: 30s
@@ -29,10 +29,10 @@ server:
# Log output settings
output:
# Crawl statistics
crawl_stats: 1s
crawl_stats: 1m
# CPU/RAM/Job queue stats
resource_stats: 10s
resource_stats: 1m
# More output? (Every listed dir)
verbose: false

13
go.mod
View File

@@ -1,13 +0,0 @@
module github.com/terorie/od-database-crawler
require (
github.com/beeker1121/goque v2.0.1+incompatible
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/sirupsen/logrus v1.4.0
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.2
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect
github.com/valyala/fasthttp v1.2.0
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3
)

70
go.sum
View File

@@ -1,70 +0,0 @@
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8gh8=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0 h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38=
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw=
github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/fasthttp v1.2.0 h1:dzZJf2IuMiclVjdw0kkT+f9u4YdrapbNyGAN47E/qnk=
github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

47
jenkins/Jenkinsfile vendored Normal file
View File

@@ -0,0 +1,47 @@
def remote = [:]
remote.name = 'remote'
remote.host = env.DEPLOY_HOST
remote.user = env.DEPLOY_USER
remote.identityFile = '/var/lib/jenkins/.ssh/id_rsa'
remote.knownHosts = '/var/lib/jenkins/.ssh/known_hosts'
remote.allowAnyHosts = true
remote.retryCount = 3
remote.retryWaitSec = 3
logLevel = 'FINER'
pipeline {
agent none
environment {
GOOS='linux'
CGO_ENABLED='0'
HOME='.'
}
stages {
stage('Build') {
agent {
docker {
image 'golang:latest'
}
}
steps {
sh 'mkdir -p /go/src/github.com/terorie/od-database-crawler'
sh 'cp -r *.go fasturl ds jenkins/build.sh "/go/src/github.com/terorie/od-database-crawler"'
sh 'cd /go/src/github.com/terorie/od-database-crawler && go get ./...'
sh './jenkins/build.sh'
stash includes: 'dist/', name: 'dist'
}
}
stage('Deploy') {
agent none
steps {
node('master') {
unstash 'dist'
sshCommand remote: remote, command: "ls od-database-crawler/"
sshPut remote: remote, from: 'dist/', into: 'od-database-crawler'
}
}
}
}
}

23
jenkins/build.sh Executable file
View File

@@ -0,0 +1,23 @@
#!/usr/bin/env bash
appname="od-database-crawler"
outdir="dist/"
tag="${BUILD_ID}_$(date +%Y-%m-%d)"
rm -rf "./${outdir}"
mkdir build 2> /dev/null
name=${outdir}${appname}-${tag}-linux
GOOS="linux" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-mac
GOOS="darwin" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-freebsd
GOOS="freebsd" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}

View File

@@ -7,19 +7,29 @@ import (
"time"
)
type ResultCode int
const (
TR_OK = ResultCode(iota)
TR_FAIL = 1
TR_SKIP = 2
)
type Task struct {
WebsiteId uint64 `json:"website_id"`
Url string `json:"url"`
WebsiteId uint64 `json:"website_id"`
Url string `json:"url"`
UploadToken string `json:"upload_token"`
TaskId int64
}
type TaskResult struct {
StatusCode string `json:"status_code"`
FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"`
StartTimeUnix int64 `json:"start_time"`
EndTimeUnix int64 `json:"end_time"`
WebsiteId uint64 `json:"website_id"`
ResultCode ResultCode `json:"status_code"`
FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"`
StartTimeUnix int64 `json:"start_time"`
EndTimeUnix int64 `json:"end_time"`
WebsiteId uint64 `json:"website_id"`
}
type Job struct {
@@ -51,13 +61,16 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
defer o.Scanned.Unlock()
exists = o.Scanned.Get(k)
if exists { return true }
if exists {
return true
}
o.Scanned.Put(k)
return false
}
type errorString string
func (e errorString) Error() string {
return string(e)
}

View File

@@ -34,13 +34,16 @@ func Schedule(c context.Context, remotes <-chan *OD) {
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
if err := os.RemoveAll(queuePath); err != nil {
panic(err)
}
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
// Spawn workers
for i := 0; i < config.Workers; i++ {
@@ -77,12 +80,12 @@ func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
globalWait.Add(1)
now := time.Now()
od := &OD {
Task: *t,
od := &OD{
Task: *t,
BaseUri: *u,
Result: TaskResult {
WebsiteId: t.WebsiteId,
StartTime: now,
Result: TaskResult{
WebsiteId: t.WebsiteId,
StartTime: now,
StartTimeUnix: now.Unix(),
},
}
@@ -117,7 +120,7 @@ func (o *OD) Watch(results chan File) {
// Open crawl results file
f, err := os.OpenFile(
filePath,
os.O_CREATE | os.O_RDWR | os.O_TRUNC,
os.O_CREATE|os.O_RDWR|os.O_TRUNC,
0644,
)
if err != nil {
@@ -145,7 +148,7 @@ func (o *OD) Watch(results chan File) {
}
// Upload results
err = PushResult(&o.Result, f)
err = PushResult(&o.Task, f)
if err != nil {
logrus.WithError(err).
Error("Failed uploading crawl results")
@@ -170,24 +173,18 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
// Log finish
logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"duration": time.Since(o.Result.StartTime),
}).Info("Crawler finished")
// Set status code
now := time.Now()
o.Result.EndTimeUnix = now.Unix()
fileCount := atomic.LoadUint64(&o.Result.FileCount)
if fileCount == 0 {
errorCount := atomic.LoadUint64(&o.Result.ErrorCount)
if errorCount == 0 {
o.Result.StatusCode = "empty"
} else {
o.Result.StatusCode = "directory listing failed"
}
if atomic.LoadUint64(&o.Result.ErrorCount) != 0 {
o.Result.ResultCode = TR_FAIL
} else {
o.Result.StatusCode = "success"
o.Result.ResultCode = TR_OK
}
}
@@ -205,11 +202,17 @@ func (t *Task) collect(results chan File, f *os.File) error {
result.Path = fasturl.PathUnescape(result.Path)
result.Name = fasturl.PathUnescape(result.Name)
resJson, err := json.Marshal(result)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
_, err = f.Write(resJson)
if err != nil { return err }
if err != nil {
return err
}
_, err = f.Write([]byte{'\n'})
if err != nil { return err }
if err != nil {
return err
}
}
return nil

300
server.go
View File

@@ -2,53 +2,125 @@ package main
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/fasthttp/websocket"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"golang.org/x/time/rate"
"io"
"mime/multipart"
"io/ioutil"
"net/http"
"net/url"
"os"
"strconv"
"time"
)
var serverClient = http.Client {
Timeout: config.ServerTimeout,
var serverWorker *TrackerWorker
var serverClient = http.Client{
Timeout: config.ServerTimeout,
Transport: new(ServerTripper),
}
var serverUserAgent = "od-database-crawler/" + rootCmd.Version
func getOrCreateWorker() {
if _, err := os.Stat("worker.json"); os.IsNotExist(err) {
req := CreateTrackerWorkerRequest{
Alias: config.TrackerAlias,
}
body, _ := json.Marshal(&req)
buf := bytes.NewBuffer(body)
resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf)
workerResponse := CreateTrackerWorkerResponse{}
respBody, _ := ioutil.ReadAll(resp.Body)
_ = json.Unmarshal(respBody, &workerResponse)
workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker)
fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
_, _ = fp.Write(workerJsonData)
//Request ASSIGN permission
serverWorker = &workerResponse.Content.Worker
accessReq, _ := json.Marshal(WorkerAccessRequest{
Project: config.TrackerProject,
Assign: true,
Submit: false,
})
buf = bytes.NewBuffer(accessReq)
res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf)
if err != nil {
panic(err)
}
logrus.WithFields(logrus.Fields{
"response": res.StatusCode,
}).Info("Requested ASSIGN permission")
} else {
var worker TrackerWorker
fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600)
workerJsonData, _ := ioutil.ReadAll(fp)
_ = json.Unmarshal(workerJsonData, &worker)
serverWorker = &worker
}
}
func FetchTask() (t *Task, err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/get",
url.Values{ "token": {config.Token} })
if err != nil { return }
if serverWorker == nil {
getOrCreateWorker()
}
res, err := serverClient.Get(config.TrackerUrl + "/task/get/" + strconv.Itoa(config.TrackerProject))
if err != nil {
return
}
defer res.Body.Close()
switch res.StatusCode {
case 200:
break
case 404, 500:
return nil, nil
default:
return nil, fmt.Errorf("http %s", res.Status)
}
t = new(Task)
err = json.NewDecoder(res.Body).Decode(t)
jsonResponse := FetchTaskResponse{}
err = json.NewDecoder(res.Body).Decode(&jsonResponse)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil { return }
} else if err != nil {
return
}
if !jsonResponse.Ok {
if jsonResponse.Message == "No task available" {
return nil, nil
}
return nil, errors.New(jsonResponse.Message)
}
task := Task{}
err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
t = &task
t.TaskId = jsonResponse.Content.Task.Id
return
}
func PushResult(result *TaskResult, f *os.File) (err error) {
if result.WebsiteId == 0 {
func PushResult(task *Task, f *os.File) (err error) {
if task.WebsiteId == 0 {
// Not a real result, don't push
return nil
}
@@ -59,10 +131,10 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
return
}
err = uploadChunks(result.WebsiteId, f)
err = uploadWebsocket(f, task.UploadToken)
if err != nil {
logrus.Errorf("Failed to upload file list: %s", err)
err2 := CancelTask(result.WebsiteId)
err2 := releaseTask(task, TR_FAIL)
if err2 != nil {
logrus.Error(err2)
}
@@ -70,91 +142,62 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
}
// Upload result ignoring errors
uploadResult(result)
_ = releaseTask(task, TR_OK)
return
}
func uploadChunks(websiteId uint64, f *os.File) error {
eof := false
for iter := 1; !eof; iter++ {
// TODO Stream with io.Pipe?
var b bytes.Buffer
func uploadWebsocket(f *os.File, token string) (err error) {
multi := multipart.NewWriter(&b)
u := url.URL{Scheme: config.WsBucketScheme, Host: config.WsBucketHost, Path: "/upload"}
// Set upload fields
var err error
err = multi.WriteField("token", config.Token)
if err != nil { return err }
err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId))
if err != nil { return err }
// Copy chunk to file_list
formFile, err := multi.CreateFormFile("file_list", "file_list")
var n int64
n, err = io.CopyN(formFile, f, config.ChunkSize)
if err != io.EOF && err != nil {
return err
}
if n == 0 {
// Don't upload, no content
return nil
} else if n < config.ChunkSize {
err = nil
// Break at end of iteration
eof = true
}
multi.Close()
for retries := 0; retries < viper.GetInt(ConfUploadRetries); retries++ {
if retries > 0 {
// Error occurred, retry upload
time.Sleep(viper.GetDuration(ConfUploadRetryInterval))
}
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { continue }
res, err := serverClient.Do(req)
if err != nil { continue }
res.Body.Close()
if res.StatusCode != http.StatusOK {
logrus.WithField("status", res.Status).
WithField("part", iter).
Errorf("Upload failed")
continue
}
// Upload successful
break
}
logrus.WithField("id", websiteId).
WithField("part", iter).
Infof("Uploaded files chunk")
header := http.Header{}
header.Add("X-Upload-Token", token)
conn, _, err := websocket.DefaultDialer.Dial(u.String(), header)
if err != nil {
return
}
conn.EnableWriteCompression(true) //TODO: Is this necessary?
socketWriter, _ := conn.NextWriter(websocket.BinaryMessage)
_, _ = io.Copy(socketWriter, f)
err = socketWriter.Close()
if err != nil {
logrus.Error("FIXME: couldn't do file upload")
return
}
err = conn.Close()
if err != nil {
return
}
return nil
}
func uploadResult(result *TaskResult) (err error) {
resultEnc, err := json.Marshal(result)
if err != nil { panic(err) }
func releaseTask(task *Task, taskResult ResultCode) (err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/complete",
url.Values {
"token": {config.Token},
"result": {string(resultEnc)},
},
req := releaseTaskRequest{
TaskId: task.TaskId,
ResultCode: taskResult,
// TODO Will implement verification in a later ODDB update
Verification: 0,
}
resultEnc, err := json.Marshal(&req)
if err != nil {
panic(err)
}
body := bytes.NewBuffer(resultEnc)
res, err := serverClient.Post(
config.TrackerUrl+"/task/release",
"application/json",
body,
)
if err != nil { return }
if err != nil {
return
}
res.Body.Close()
if res.StatusCode != http.StatusOK {
@@ -164,27 +207,66 @@ func uploadResult(result *TaskResult) (err error) {
return
}
func CancelTask(websiteId uint64) (err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/cancel",
url.Values{
"token": {config.Token},
"website_id": {strconv.FormatUint(websiteId, 10)},
},
)
if err != nil { return }
res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to cancel task: %s", res.Status)
}
return
}
type ServerTripper struct{}
func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
req.Header.Set("User-Agent", serverUserAgent)
//TODO: Use task_tracker/client ?
if serverWorker != nil {
req.Header.Add("X-Worker-Id", strconv.Itoa(serverWorker.Id))
req.Header.Add("X-Secret", base64.StdEncoding.EncodeToString(serverWorker.Secret))
}
return http.DefaultTransport.RoundTrip(req)
}
// https://github.com/simon987/task_tracker/blob/master/api/models.go
type releaseTaskRequest struct {
TaskId int64 `json:"task_id"`
ResultCode ResultCode `json:"result"`
Verification int64 `json:"verification"`
}
type WorkerAccessRequest struct {
Assign bool `json:"assign"`
Submit bool `json:"submit"`
Project int `json:"project"`
}
type FetchTaskResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Task struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Project struct {
Id int64 `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
} `json:"project"`
Recipe string `json:"recipe"`
} `json:"task"`
} `json:"content"`
}
type TrackerWorker struct {
Alias string `json:"alias"`
Id int `json:"id"`
Secret []byte `json:"secret"`
}
type CreateTrackerWorkerResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Worker TrackerWorker `json:"worker"`
} `json:"content"`
}
type CreateTrackerWorkerRequest struct {
Alias string `json:"alias"`
}

View File

@@ -14,8 +14,8 @@ import (
var globalWait sync.WaitGroup
type WorkerContext struct {
OD *OD
Queue *BufferedQueue
OD *OD
Queue *BufferedQueue
lastRateLimit time.Time
numRateLimits int
}
@@ -56,16 +56,16 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
if !shouldRetry(err) {
atomic.AddUint64(&totalAborted, 1)
logrus.WithField("url", job.UriStr).
WithError(err).
Error("Giving up after failure")
//logrus.WithField("url", job.UriStr).
// WithError(err).
// Error("Giving up after failure")
return
}
if job.Fails > config.Retries {
atomic.AddUint64(&totalAborted, 1)
logrus.WithField("url", job.UriStr).
Errorf("Giving up after %d fails", job.Fails)
//logrus.WithField("url", job.UriStr).
// Errorf("Giving up after %d fails", job.Fails)
} else {
atomic.AddUint64(&totalRetries, 1)
if err == ErrRateLimit {
@@ -88,7 +88,9 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
}
func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
if len(job.Uri.Path) == 0 { return }
if len(job.Uri.Path) == 0 {
return
}
if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
// Load directory
links, err := GetDir(job, f)
@@ -159,10 +161,10 @@ func (w *WorkerContext) queueJob(job Job) {
w.OD.Wait.Add(1)
if w.numRateLimits > 0 {
if time.Since(w.lastRateLimit) > 5 * time.Second {
if time.Since(w.lastRateLimit) > 5*time.Second {
w.numRateLimits = 0
} else {
time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) *
time.Sleep(time.Duration(math.Sqrt(float64(50*w.numRateLimits))) *
100 * time.Millisecond)
}
}