7 Commits

Author SHA1 Message Date
simon987
a962c60b82 Don't panic on file upload error 2019-04-06 14:56:22 -04:00
simon987
24f0bd91f7 Remove debug messages & don't use disk queue by default 2019-04-06 12:21:35 -04:00
simon987
84c10e1981 Change default config 2019-04-05 17:53:00 -04:00
simon987
860fa79327 Jenkins setup 2019-04-04 19:22:14 -04:00
simon987
76bc8293d6 minimum viable 2019-03-30 09:02:55 -04:00
simon987
3470be6086 More work on task_tracker integration 2019-03-09 16:38:04 -05:00
Richard Patel
60471a081e Switch to simon987/task_tracker 2019-02-28 23:51:26 +01:00
11 changed files with 402 additions and 302 deletions

View File

@@ -1,14 +1,11 @@
# OD-Database Crawler 🕷
# od-database Go crawler 🚀
[![Build Status](https://travis-ci.org/terorie/od-database-crawler.svg?branch=master)](https://travis-ci.org/terorie/od-database-crawler)
[![](https://tokei.rs/b1/github/terorie/od-database-crawler)](https://github.com/terorie/od-database-crawler)
[![CodeFactor](https://www.codefactor.io/repository/github/terorie/od-database-crawler/badge/master)](https://www.codefactor.io/repository/github/terorie/od-database-crawler/overview/master)
> by terorie 2018 :P
* Crawler for [__OD-Database__](https://github.com/simon987/od-database)
* In production at https://od-db.the-eye.eu/
* Over 880 TB actively crawled
* Crawls HTTP open directories (standard Web Server Listings)
* Gets name, path, size and modification time of all files
* Lightweight and fast
* Lightweight and fast: __over 9000 requests per second__ on a standard laptop
https://od-db.the-eye.eu/

View File

@@ -13,8 +13,11 @@ import (
)
var config struct {
ServerUrl string
Token string
TrackerUrl string
TrackerProject int
TrackerAlias string
WsBucketScheme string
WsBucketHost string
ServerTimeout time.Duration
Recheck time.Duration
ChunkSize int64
@@ -30,8 +33,11 @@ var config struct {
var onlineMode bool
const (
ConfServerUrl = "server.url"
ConfToken = "server.token"
ConfTrackerUrl = "server.url"
ConfTrackerProject = "server.project"
ConfTrackerAlias = "server.alias"
ConfWsBucketScheme = "server.ws_bucket_scheme"
ConfWsBucketHost = "server.ws_bucket_host"
ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown"
@@ -61,15 +67,21 @@ func prepareConfig() {
pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
pf.String(ConfTrackerUrl, "https://tt.the-eye.eu/api", "task_tracker api URL")
pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
pf.String(ConfTrackerProject, "1", "task_tracker project id")
pf.String(ConfWsBucketScheme, "wss", "ws_bucket scheme")
pf.String(ConfWsBucketHost, "wsb.the-eye.eu", "ws_bucket host")
pf.String(ConfTrackerAlias, "changeme", "task_tracker worker alias")
pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout")
pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
pf.Duration(ConfCooldown, 1*time.Minute, "OD-DB: Time to wait after a server-side error")
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
@@ -77,9 +89,9 @@ func prepareConfig() {
pf.Duration(ConfUploadRetryInterval, 30*time.Second, "OD-DB: Time to wait between upload retries")
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
pf.Uint(ConfTasks, 25, "Crawler: Max concurrent tasks")
pf.Uint(ConfWorkers, 4, "Crawler: Connections per server")
pf.Uint(ConfWorkers, 1, "Crawler: Connections per server")
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
@@ -89,11 +101,11 @@ func prepareConfig() {
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
pf.Uint(ConfJobBufferSize, 5000, "Crawler: Task queue cache size")
pf.Int(ConfJobBufferSize, -1, "Crawler: Task queue cache size")
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
pf.Duration(ConfCrawlStats, 500*time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
pf.Duration(ConfAllocStats, 500*time.Second, "Log: Resource stats interval")
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
@@ -145,17 +157,19 @@ func readConfig() {
}
if onlineMode {
config.ServerUrl = viper.GetString(ConfServerUrl)
if config.ServerUrl == "" {
configMissing(ConfServerUrl)
config.TrackerUrl = viper.GetString(ConfTrackerUrl)
if config.TrackerUrl == "" {
configMissing(ConfTrackerUrl)
}
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/")
}
config.TrackerProject = viper.GetInt(ConfTrackerProject)
config.Token = viper.GetString(ConfToken)
if config.Token == "" {
configMissing(ConfToken)
}
}
config.TrackerAlias = viper.GetString(ConfTrackerAlias)
config.WsBucketHost = viper.GetString(ConfWsBucketHost)
config.WsBucketScheme = viper.GetString(ConfWsBucketScheme)
config.ServerTimeout = viper.GetDuration(ConfServerTimeout)
@@ -197,7 +211,9 @@ func readConfig() {
if filePath := viper.GetString(ConfLogFile); filePath != "" {
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
bufWriter := bufio.NewWriter(f)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
exitHooks.Add(func() {
bufWriter.Flush()
f.Close()

View File

@@ -1,10 +1,14 @@
# OD-Database server settings
server:
# Connection URL
url: http://od-db.mine.terorie.com/api
# Server auth token
token:
url: https://tt.the-eye.eu/api
# OD-Database project id (for crawling)
project: 1
# Your worker alias
alias: changeme
# Websocket bucket host & scheme (ws/wss)
ws_bucket_host: https://wsb.the-eye.eu
ws_bucket_scheme: wss
# Request timeout
timeout: 60s
@@ -17,11 +21,7 @@ server:
# Time to wait after receiving an error
# from the server. Doesn't apply to uploads.
cooldown: 30s
# Upload chunk size
# If the value is too high, the upload fails.
upload_chunk: 1 MB
cooldown: 1s
upload_retries: 10
upload_retry_interval: 30s
@@ -29,10 +29,10 @@ server:
# Log output settings
output:
# Crawl statistics
crawl_stats: 1s
crawl_stats: 1m
# CPU/RAM/Job queue stats
resource_stats: 10s
resource_stats: 1m
# More output? (Every listed dir)
verbose: false

13
go.mod
View File

@@ -1,13 +0,0 @@
module github.com/terorie/od-database-crawler
require (
github.com/beeker1121/goque v2.0.1+incompatible
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/sirupsen/logrus v1.4.0
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.2
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect
github.com/valyala/fasthttp v1.2.0
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3
)

70
go.sum
View File

@@ -1,70 +0,0 @@
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8gh8=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0 h1:yKenngtzGh+cUSSh6GWbxW2abRqhYUSR/t/6+2QqNvE=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38=
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M=
github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw=
github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/fasthttp v1.2.0 h1:dzZJf2IuMiclVjdw0kkT+f9u4YdrapbNyGAN47E/qnk=
github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

47
jenkins/Jenkinsfile vendored Normal file
View File

@@ -0,0 +1,47 @@
def remote = [:]
remote.name = 'remote'
remote.host = env.DEPLOY_HOST
remote.user = env.DEPLOY_USER
remote.identityFile = '/var/lib/jenkins/.ssh/id_rsa'
remote.knownHosts = '/var/lib/jenkins/.ssh/known_hosts'
remote.allowAnyHosts = true
remote.retryCount = 3
remote.retryWaitSec = 3
logLevel = 'FINER'
pipeline {
agent none
environment {
GOOS='linux'
CGO_ENABLED='0'
HOME='.'
}
stages {
stage('Build') {
agent {
docker {
image 'golang:latest'
}
}
steps {
sh 'mkdir -p /go/src/github.com/terorie/od-database-crawler'
sh 'cp -r *.go fasturl ds jenkins/build.sh "/go/src/github.com/terorie/od-database-crawler"'
sh 'cd /go/src/github.com/terorie/od-database-crawler && go get ./...'
sh './jenkins/build.sh'
stash includes: 'dist/', name: 'dist'
}
}
stage('Deploy') {
agent none
steps {
node('master') {
unstash 'dist'
sshCommand remote: remote, command: "ls od-database-crawler/"
sshPut remote: remote, from: 'dist/', into: 'od-database-crawler'
}
}
}
}
}

23
jenkins/build.sh Executable file
View File

@@ -0,0 +1,23 @@
#!/usr/bin/env bash
appname="od-database-crawler"
outdir="dist/"
tag="${BUILD_ID}_$(date +%Y-%m-%d)"
rm -rf "./${outdir}"
mkdir build 2> /dev/null
name=${outdir}${appname}-${tag}-linux
GOOS="linux" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-mac
GOOS="darwin" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}
name=${outdir}${appname}-${tag}-freebsd
GOOS="freebsd" GOARCH="amd64" go build -ldflags="-s -w" -o ${name}
gzip -f ${name}
echo ${name}

View File

@@ -7,13 +7,23 @@ import (
"time"
)
type ResultCode int
const (
TR_OK = ResultCode(iota)
TR_FAIL = 1
TR_SKIP = 2
)
type Task struct {
WebsiteId uint64 `json:"website_id"`
Url string `json:"url"`
UploadToken string `json:"upload_token"`
TaskId int64
}
type TaskResult struct {
StatusCode string `json:"status_code"`
ResultCode ResultCode `json:"status_code"`
FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"`
@@ -51,13 +61,16 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
defer o.Scanned.Unlock()
exists = o.Scanned.Get(k)
if exists { return true }
if exists {
return true
}
o.Scanned.Put(k)
return false
}
type errorString string
func (e errorString) Error() string {
return string(e)
}

View File

@@ -34,13 +34,16 @@ func Schedule(c context.Context, remotes <-chan *OD) {
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
if err := os.RemoveAll(queuePath); err != nil {
panic(err)
}
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
// Spawn workers
for i := 0; i < config.Workers; i++ {
@@ -145,7 +148,7 @@ func (o *OD) Watch(results chan File) {
}
// Upload results
err = PushResult(&o.Result, f)
err = PushResult(&o.Task, f)
if err != nil {
logrus.WithError(err).
Error("Failed uploading crawl results")
@@ -178,16 +181,10 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
// Set status code
now := time.Now()
o.Result.EndTimeUnix = now.Unix()
fileCount := atomic.LoadUint64(&o.Result.FileCount)
if fileCount == 0 {
errorCount := atomic.LoadUint64(&o.Result.ErrorCount)
if errorCount == 0 {
o.Result.StatusCode = "empty"
if atomic.LoadUint64(&o.Result.ErrorCount) != 0 {
o.Result.ResultCode = TR_FAIL
} else {
o.Result.StatusCode = "directory listing failed"
}
} else {
o.Result.StatusCode = "success"
o.Result.ResultCode = TR_OK
}
}
@@ -205,11 +202,17 @@ func (t *Task) collect(results chan File, f *os.File) error {
result.Path = fasturl.PathUnescape(result.Path)
result.Name = fasturl.PathUnescape(result.Name)
resJson, err := json.Marshal(result)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
_, err = f.Write(resJson)
if err != nil { return err }
if err != nil {
return err
}
_, err = f.Write([]byte{'\n'})
if err != nil { return err }
if err != nil {
return err
}
}
return nil

288
server.go
View File

@@ -2,19 +2,23 @@ package main
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"github.com/fasthttp/websocket"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"golang.org/x/time/rate"
"io"
"mime/multipart"
"io/ioutil"
"net/http"
"net/url"
"os"
"strconv"
"time"
)
var serverWorker *TrackerWorker
var serverClient = http.Client{
Timeout: config.ServerTimeout,
Transport: new(ServerTripper),
@@ -22,33 +26,101 @@ var serverClient = http.Client {
var serverUserAgent = "od-database-crawler/" + rootCmd.Version
func getOrCreateWorker() {
if _, err := os.Stat("worker.json"); os.IsNotExist(err) {
req := CreateTrackerWorkerRequest{
Alias: config.TrackerAlias,
}
body, _ := json.Marshal(&req)
buf := bytes.NewBuffer(body)
resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf)
workerResponse := CreateTrackerWorkerResponse{}
respBody, _ := ioutil.ReadAll(resp.Body)
_ = json.Unmarshal(respBody, &workerResponse)
workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker)
fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
_, _ = fp.Write(workerJsonData)
//Request ASSIGN permission
serverWorker = &workerResponse.Content.Worker
accessReq, _ := json.Marshal(WorkerAccessRequest{
Project: config.TrackerProject,
Assign: true,
Submit: false,
})
buf = bytes.NewBuffer(accessReq)
res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf)
if err != nil {
panic(err)
}
logrus.WithFields(logrus.Fields{
"response": res.StatusCode,
}).Info("Requested ASSIGN permission")
} else {
var worker TrackerWorker
fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600)
workerJsonData, _ := ioutil.ReadAll(fp)
_ = json.Unmarshal(workerJsonData, &worker)
serverWorker = &worker
}
}
func FetchTask() (t *Task, err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/get",
url.Values{ "token": {config.Token} })
if err != nil { return }
if serverWorker == nil {
getOrCreateWorker()
}
res, err := serverClient.Get(config.TrackerUrl + "/task/get/" + strconv.Itoa(config.TrackerProject))
if err != nil {
return
}
defer res.Body.Close()
switch res.StatusCode {
case 200:
break
case 404, 500:
return nil, nil
default:
return nil, fmt.Errorf("http %s", res.Status)
}
t = new(Task)
err = json.NewDecoder(res.Body).Decode(t)
jsonResponse := FetchTaskResponse{}
err = json.NewDecoder(res.Body).Decode(&jsonResponse)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil { return }
} else if err != nil {
return
}
if !jsonResponse.Ok {
if jsonResponse.Message == "No task available" {
return nil, nil
}
return nil, errors.New(jsonResponse.Message)
}
task := Task{}
err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
t = &task
t.TaskId = jsonResponse.Content.Task.Id
return
}
func PushResult(result *TaskResult, f *os.File) (err error) {
if result.WebsiteId == 0 {
func PushResult(task *Task, f *os.File) (err error) {
if task.WebsiteId == 0 {
// Not a real result, don't push
return nil
}
@@ -59,10 +131,10 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
return
}
err = uploadChunks(result.WebsiteId, f)
err = uploadWebsocket(f, task.UploadToken)
if err != nil {
logrus.Errorf("Failed to upload file list: %s", err)
err2 := CancelTask(result.WebsiteId)
err2 := releaseTask(task, TR_FAIL)
if err2 != nil {
logrus.Error(err2)
}
@@ -70,91 +142,62 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
}
// Upload result ignoring errors
uploadResult(result)
_ = releaseTask(task, TR_OK)
return
}
func uploadChunks(websiteId uint64, f *os.File) error {
eof := false
for iter := 1; !eof; iter++ {
// TODO Stream with io.Pipe?
var b bytes.Buffer
func uploadWebsocket(f *os.File, token string) (err error) {
multi := multipart.NewWriter(&b)
u := url.URL{Scheme: config.WsBucketScheme, Host: config.WsBucketHost, Path: "/upload"}
// Set upload fields
var err error
err = multi.WriteField("token", config.Token)
if err != nil { return err }
err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId))
if err != nil { return err }
// Copy chunk to file_list
formFile, err := multi.CreateFormFile("file_list", "file_list")
var n int64
n, err = io.CopyN(formFile, f, config.ChunkSize)
if err != io.EOF && err != nil {
return err
}
if n == 0 {
// Don't upload, no content
return nil
} else if n < config.ChunkSize {
err = nil
// Break at end of iteration
eof = true
header := http.Header{}
header.Add("X-Upload-Token", token)
conn, _, err := websocket.DefaultDialer.Dial(u.String(), header)
if err != nil {
return
}
multi.Close()
conn.EnableWriteCompression(true) //TODO: Is this necessary?
for retries := 0; retries < viper.GetInt(ConfUploadRetries); retries++ {
if retries > 0 {
// Error occurred, retry upload
time.Sleep(viper.GetDuration(ConfUploadRetryInterval))
socketWriter, _ := conn.NextWriter(websocket.BinaryMessage)
_, _ = io.Copy(socketWriter, f)
err = socketWriter.Close()
if err != nil {
logrus.Error("FIXME: couldn't do file upload")
return
}
err = conn.Close()
if err != nil {
return
}
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { continue }
res, err := serverClient.Do(req)
if err != nil { continue }
res.Body.Close()
if res.StatusCode != http.StatusOK {
logrus.WithField("status", res.Status).
WithField("part", iter).
Errorf("Upload failed")
continue
}
// Upload successful
break
}
logrus.WithField("id", websiteId).
WithField("part", iter).
Infof("Uploaded files chunk")
}
return nil
}
func uploadResult(result *TaskResult) (err error) {
resultEnc, err := json.Marshal(result)
if err != nil { panic(err) }
func releaseTask(task *Task, taskResult ResultCode) (err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/complete",
url.Values {
"token": {config.Token},
"result": {string(resultEnc)},
},
req := releaseTaskRequest{
TaskId: task.TaskId,
ResultCode: taskResult,
// TODO Will implement verification in a later ODDB update
Verification: 0,
}
resultEnc, err := json.Marshal(&req)
if err != nil {
panic(err)
}
body := bytes.NewBuffer(resultEnc)
res, err := serverClient.Post(
config.TrackerUrl+"/task/release",
"application/json",
body,
)
if err != nil { return }
if err != nil {
return
}
res.Body.Close()
if res.StatusCode != http.StatusOK {
@@ -164,27 +207,66 @@ func uploadResult(result *TaskResult) (err error) {
return
}
func CancelTask(websiteId uint64) (err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/cancel",
url.Values{
"token": {config.Token},
"website_id": {strconv.FormatUint(websiteId, 10)},
},
)
if err != nil { return }
res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to cancel task: %s", res.Status)
}
return
}
type ServerTripper struct{}
func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
req.Header.Set("User-Agent", serverUserAgent)
//TODO: Use task_tracker/client ?
if serverWorker != nil {
req.Header.Add("X-Worker-Id", strconv.Itoa(serverWorker.Id))
req.Header.Add("X-Secret", base64.StdEncoding.EncodeToString(serverWorker.Secret))
}
return http.DefaultTransport.RoundTrip(req)
}
// https://github.com/simon987/task_tracker/blob/master/api/models.go
type releaseTaskRequest struct {
TaskId int64 `json:"task_id"`
ResultCode ResultCode `json:"result"`
Verification int64 `json:"verification"`
}
type WorkerAccessRequest struct {
Assign bool `json:"assign"`
Submit bool `json:"submit"`
Project int `json:"project"`
}
type FetchTaskResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Task struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Project struct {
Id int64 `json:"id"`
Name string `json:"name"`
Version string `json:"version"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
} `json:"project"`
Recipe string `json:"recipe"`
} `json:"task"`
} `json:"content"`
}
type TrackerWorker struct {
Alias string `json:"alias"`
Id int `json:"id"`
Secret []byte `json:"secret"`
}
type CreateTrackerWorkerResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Worker TrackerWorker `json:"worker"`
} `json:"content"`
}
type CreateTrackerWorkerRequest struct {
Alias string `json:"alias"`
}

View File

@@ -56,16 +56,16 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
if !shouldRetry(err) {
atomic.AddUint64(&totalAborted, 1)
logrus.WithField("url", job.UriStr).
WithError(err).
Error("Giving up after failure")
//logrus.WithField("url", job.UriStr).
// WithError(err).
// Error("Giving up after failure")
return
}
if job.Fails > config.Retries {
atomic.AddUint64(&totalAborted, 1)
logrus.WithField("url", job.UriStr).
Errorf("Giving up after %d fails", job.Fails)
//logrus.WithField("url", job.UriStr).
// Errorf("Giving up after %d fails", job.Fails)
} else {
atomic.AddUint64(&totalRetries, 1)
if err == ErrRateLimit {
@@ -88,7 +88,9 @@ func (w *WorkerContext) step(results chan<- File, job Job) {
}
func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
if len(job.Uri.Path) == 0 { return }
if len(job.Uri.Path) == 0 {
return
}
if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
// Load directory
links, err := GetDir(job, f)