Merge 3470be6086461aa613d8e5957b656252df280bf2 into d3c199b738870d5934213e3152a8af43610ad484

This commit is contained in:
Richard Patel 2019-03-09 21:38:11 +00:00 committed by GitHub
commit 4e9714e5a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 332 additions and 257 deletions

View File

@ -13,38 +13,39 @@ import (
) )
var config struct { var config struct {
ServerUrl string TrackerUrl string
Token string TrackerProject string
ServerTimeout time.Duration Token string
Recheck time.Duration ServerTimeout time.Duration
ChunkSize int64 Recheck time.Duration
Retries int ChunkSize int64
Workers int Retries int
UserAgent string Workers int
Tasks int32 UserAgent string
Verbose bool Tasks int32
PrintHTTP bool Verbose bool
JobBufferSize int PrintHTTP bool
JobBufferSize int
} }
var onlineMode bool var onlineMode bool
const ( const (
ConfServerUrl = "server.url" ConfTrackerUrl = "server.url"
ConfToken = "server.token" ConfTrackerProject = "server.project"
ConfServerTimeout = "server.timeout" ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck" ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown" ConfCooldown = "server.cooldown"
ConfChunkSize = "server.upload_chunk" ConfChunkSize = "server.upload_chunk"
ConfUploadRetries = "server.upload_retries" ConfUploadRetries = "server.upload_retries"
ConfUploadRetryInterval = "server.upload_retry_interval" ConfUploadRetryInterval = "server.upload_retry_interval"
ConfTasks = "crawl.tasks" ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries" ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections" ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent" ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout" ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout" ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer" ConfJobBufferSize = "crawl.job_buffer"
ConfCrawlStats = "output.crawl_stats" ConfCrawlStats = "output.crawl_stats"
@ -61,21 +62,21 @@ func prepareConfig() {
pf.StringVar(&configFile, "config", "", "Config file") pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG") configFile = os.Getenv("OD_CONFIG")
pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL") pf.String(ConfTrackerUrl, "http://tt.the-eye.eu/api", "task_tracker api URL")
pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)") pf.String(ConfTrackerProject, "3", "task_tracker project id")
pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout") pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout")
pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs") pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error") pf.Duration(ConfCooldown, 30*time.Second, "OD-DB: Time to wait after a server-side error")
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size") pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries") pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries") pf.Duration(ConfUploadRetryInterval, 30*time.Second, "OD-DB: Time to wait between upload retries")
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks") pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
@ -83,9 +84,9 @@ func prepareConfig() {
pf.Uint(ConfRetries, 5, "Crawler: Request retries") pf.Uint(ConfRetries, 5, "Crawler: Request retries")
pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout") pf.Duration(ConfDialTimeout, 10*time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout") pf.Duration(ConfTimeout, 30*time.Second, "Crawler: Request timeout")
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent") pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
@ -93,7 +94,7 @@ func prepareConfig() {
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval") pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval") pf.Duration(ConfAllocStats, 10*time.Second, "Log: Resource stats interval")
pf.Bool(ConfVerbose, false, "Log: Print every listed dir") pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
@ -145,17 +146,13 @@ func readConfig() {
} }
if onlineMode { if onlineMode {
config.ServerUrl = viper.GetString(ConfServerUrl) config.TrackerUrl = viper.GetString(ConfTrackerUrl)
if config.ServerUrl == "" { if config.TrackerUrl == "" {
configMissing(ConfServerUrl) configMissing(ConfTrackerUrl)
}
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
config.Token = viper.GetString(ConfToken)
if config.Token == "" {
configMissing(ConfToken)
} }
config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/")
} }
config.TrackerProject = viper.GetString(ConfTrackerProject)
config.ServerTimeout = viper.GetDuration(ConfServerTimeout) config.ServerTimeout = viper.GetDuration(ConfServerTimeout)
@ -195,9 +192,11 @@ func readConfig() {
} }
if filePath := viper.GetString(ConfLogFile); filePath != "" { if filePath := viper.GetString(ConfLogFile); filePath != "" {
f, err := os.OpenFile(filePath, os.O_CREATE | os.O_WRONLY | os.O_APPEND, 0644) f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
bufWriter := bufio.NewWriter(f) bufWriter := bufio.NewWriter(f)
if err != nil { panic(err) } if err != nil {
panic(err)
}
exitHooks.Add(func() { exitHooks.Add(func() {
bufWriter.Flush() bufWriter.Flush()
f.Close() f.Close()

View File

@ -1,7 +1,8 @@
# OD-Database server settings # OD-Database server settings
server: server:
# Connection URL # Connection URL
url: http://od-db.mine.terorie.com/api url: http://localhost/api
project: 3
# Server auth token # Server auth token
token: token:
@ -17,11 +18,7 @@ server:
# Time to wait after receiving an error # Time to wait after receiving an error
# from the server. Doesn't apply to uploads. # from the server. Doesn't apply to uploads.
cooldown: 30s cooldown: 1s
# Upload chunk size
# If the value is too high, the upload fails.
upload_chunk: 1 MB
upload_retries: 10 upload_retries: 10
upload_retry_interval: 30s upload_retry_interval: 30s

13
go.mod
View File

@ -1,13 +0,0 @@
module github.com/terorie/od-database-crawler
require (
github.com/beeker1121/goque v2.0.1+incompatible
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/sirupsen/logrus v1.3.0
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.1
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect
github.com/valyala/fasthttp v1.2.0
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3
)

66
go.sum
View File

@ -1,66 +0,0 @@
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8gh8=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38=
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw=
github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/fasthttp v1.2.0 h1:dzZJf2IuMiclVjdw0kkT+f9u4YdrapbNyGAN47E/qnk=
github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@ -7,19 +7,27 @@ import (
"time" "time"
) )
type ResultCode int
const (
TR_OK = ResultCode(iota)
TR_FAIL = 1
TR_SKIP = 2
)
type Task struct { type Task struct {
WebsiteId uint64 `json:"website_id"` WebsiteId uint64 `json:"website_id"`
Url string `json:"url"` Url string `json:"url"`
} }
type TaskResult struct { type TaskResult struct {
StatusCode string `json:"status_code"` ResultCode ResultCode `json:"status_code"`
FileCount uint64 `json:"file_count"` FileCount uint64 `json:"file_count"`
ErrorCount uint64 `json:"-"` ErrorCount uint64 `json:"-"`
StartTime time.Time `json:"-"` StartTime time.Time `json:"-"`
StartTimeUnix int64 `json:"start_time"` StartTimeUnix int64 `json:"start_time"`
EndTimeUnix int64 `json:"end_time"` EndTimeUnix int64 `json:"end_time"`
WebsiteId uint64 `json:"website_id"` WebsiteId uint64 `json:"website_id"`
} }
type Job struct { type Job struct {
@ -51,13 +59,16 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
defer o.Scanned.Unlock() defer o.Scanned.Unlock()
exists = o.Scanned.Get(k) exists = o.Scanned.Get(k)
if exists { return true } if exists {
return true
}
o.Scanned.Put(k) o.Scanned.Put(k)
return false return false
} }
type errorString string type errorString string
func (e errorString) Error() string { func (e errorString) Error() string {
return string(e) return string(e)
} }

View File

@ -34,13 +34,16 @@ func Schedule(c context.Context, remotes <-chan *OD) {
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId)) queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue // Delete existing queue
if err := os.RemoveAll(queuePath); if err := os.RemoveAll(queuePath); err != nil {
err != nil { panic(err) } panic(err)
}
// Start new queue // Start new queue
var err error var err error
remote.WCtx.Queue, err = OpenQueue(queuePath) remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) } if err != nil {
panic(err)
}
// Spawn workers // Spawn workers
for i := 0; i < config.Workers; i++ { for i := 0; i < config.Workers; i++ {
@ -77,12 +80,12 @@ func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
globalWait.Add(1) globalWait.Add(1)
now := time.Now() now := time.Now()
od := &OD { od := &OD{
Task: *t, Task: *t,
BaseUri: *u, BaseUri: *u,
Result: TaskResult { Result: TaskResult{
WebsiteId: t.WebsiteId, WebsiteId: t.WebsiteId,
StartTime: now, StartTime: now,
StartTimeUnix: now.Unix(), StartTimeUnix: now.Unix(),
}, },
} }
@ -117,7 +120,7 @@ func (o *OD) Watch(results chan File) {
// Open crawl results file // Open crawl results file
f, err := os.OpenFile( f, err := os.OpenFile(
filePath, filePath,
os.O_CREATE | os.O_RDWR | os.O_TRUNC, os.O_CREATE|os.O_RDWR|os.O_TRUNC,
0644, 0644,
) )
if err != nil { if err != nil {
@ -170,24 +173,18 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
// Log finish // Log finish
logrus.WithFields(logrus.Fields{ logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId, "id": o.Task.WebsiteId,
"url": o.BaseUri.String(), "url": o.BaseUri.String(),
"duration": time.Since(o.Result.StartTime), "duration": time.Since(o.Result.StartTime),
}).Info("Crawler finished") }).Info("Crawler finished")
// Set status code // Set status code
now := time.Now() now := time.Now()
o.Result.EndTimeUnix = now.Unix() o.Result.EndTimeUnix = now.Unix()
fileCount := atomic.LoadUint64(&o.Result.FileCount) if atomic.LoadUint64(&o.Result.ErrorCount) != 0 {
if fileCount == 0 { o.Result.ResultCode = TR_FAIL
errorCount := atomic.LoadUint64(&o.Result.ErrorCount)
if errorCount == 0 {
o.Result.StatusCode = "empty"
} else {
o.Result.StatusCode = "directory listing failed"
}
} else { } else {
o.Result.StatusCode = "success" o.Result.ResultCode = TR_OK
} }
} }
@ -205,11 +202,17 @@ func (t *Task) collect(results chan File, f *os.File) error {
result.Path = fasturl.PathUnescape(result.Path) result.Path = fasturl.PathUnescape(result.Path)
result.Name = fasturl.PathUnescape(result.Name) result.Name = fasturl.PathUnescape(result.Name)
resJson, err := json.Marshal(result) resJson, err := json.Marshal(result)
if err != nil { panic(err) } if err != nil {
panic(err)
}
_, err = f.Write(resJson) _, err = f.Write(resJson)
if err != nil { return err } if err != nil {
return err
}
_, err = f.Write([]byte{'\n'}) _, err = f.Write([]byte{'\n'})
if err != nil { return err } if err != nil {
return err
}
} }
return nil return nil

336
server.go
View File

@ -2,12 +2,15 @@ package main
import ( import (
"bytes" "bytes"
"crypto"
"crypto/hmac"
"encoding/hex"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/spf13/viper" "golang.org/x/time/rate"
"io" "io/ioutil"
"mime/multipart"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
@ -15,34 +18,174 @@ import (
"time" "time"
) )
var serverClient = http.Client { var serverWorker *TrackerWorker
Timeout: config.ServerTimeout,
var serverClient = http.Client{
Timeout: config.ServerTimeout,
Transport: new(ServerTripper), Transport: new(ServerTripper),
} }
var serverUserAgent = "od-database-crawler/" + rootCmd.Version var serverUserAgent = "od-database-crawler/" + rootCmd.Version
//TODO: Move those elsewhere?
type WorkerAccessRequest struct {
Assign bool `json:"assign"`
Submit bool `json:"submit"`
Project int `json:"project"`
}
//todo: only keep necessary info
type FetchTaskResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Task struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Project struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Name string `json:"name"`
CloneUrl string `json:"clone_url"`
GitRepo string `json:"git_repo"`
Version string `json:"version"`
Motd string `json:"motd"`
Public bool `json:"public"`
Hidden bool `json:"hidden"`
Chain int64 `json:"chain"`
Paused bool `json:"paused"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
} `json:"project"`
Assignee int64 `json:"assignee"`
Retries int64 `json:"retries"`
MaxRetries int64 `json:"max_retries"`
Status int64 `json:"status"`
Recipe string `json:"recipe"`
MaxAssignTime int64 `json:"max_assign_time"`
AssignTime int64 `json:"assign_time"`
VerificationCount int64 `json:"verification_count"`
} `json:"task"`
} `json:"content"`
}
type TrackerWorker struct {
Alias string `json:"alias"`
Id int `json:"id"`
Secret []byte `json:"secret"`
}
type CreateTrackerWorkerResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Worker TrackerWorker `json:"worker"`
} `json:"content"`
}
type CreateTrackerWorkerRequest struct {
Alias string `json:"alias"`
}
func getOrCreateWorker() *TrackerWorker {
var worker TrackerWorker
if _, err := os.Stat("worker.json"); os.IsNotExist(err) {
req := CreateTrackerWorkerRequest{
Alias: "crawler", //todo: load from config
}
body, _ := json.Marshal(&req)
buf := bytes.NewBuffer(body)
resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf)
//todo: handle err
fmt.Println(resp.StatusCode)
workerResponse := CreateTrackerWorkerResponse{}
respBody, _ := ioutil.ReadAll(resp.Body)
_ = json.Unmarshal(respBody, &workerResponse)
//todo handle err
fmt.Println(workerResponse)
workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker)
fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
_, _ = fp.Write(workerJsonData)
//todo: handle err
} else {
fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600)
workerJsonData, _ := ioutil.ReadAll(fp)
_ = json.Unmarshal(workerJsonData, &worker)
//todo: handle err
}
return &worker
}
func FetchTask() (t *Task, err error) { func FetchTask() (t *Task, err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/get", //todo: this whole block should definitely be extracted elsewhere
url.Values{ "token": {config.Token} }) if serverWorker == nil {
if err != nil { return } serverWorker = getOrCreateWorker()
//Request ASSIGN permission
//todo: project ID should be stored as a int in the first place
pid, _ := strconv.Atoi(config.TrackerProject)
accessReq, _ := json.Marshal(WorkerAccessRequest{
Project: int(pid),
Assign: true,
Submit: false,
})
buf := bytes.NewBuffer(accessReq)
res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf)
if err != nil {
panic(err)
}
fmt.Println(res.StatusCode)
}
res, err := serverClient.Get(
config.TrackerUrl + "/task/get/" + config.TrackerProject)
if err != nil {
return
}
defer res.Body.Close() defer res.Body.Close()
switch res.StatusCode { switch res.StatusCode {
case 200: case 200:
break break
//TODO: 404 should not happen.
case 404, 500: case 404, 500:
return nil, nil return nil, nil
default: default:
return nil, fmt.Errorf("http %s", res.Status) return nil, fmt.Errorf("http %s", res.Status)
} }
t = new(Task) jsonResponse := FetchTaskResponse{}
err = json.NewDecoder(res.Body).Decode(t) err = json.NewDecoder(res.Body).Decode(&jsonResponse)
if _, ok := err.(*json.SyntaxError); ok { if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON") return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil { return } } else if err != nil {
return
}
if !jsonResponse.Ok {
//The tracker will return Ok=false when no tasks are available
err = errors.New(jsonResponse.Message)
return
}
fmt.Println(jsonResponse.Content.Task.Recipe)
task := Task{}
err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
t = &task
return return
} }
@ -59,7 +202,7 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
return return
} }
err = uploadChunks(result.WebsiteId, f) err = uploadWebsocket(result.WebsiteId, f)
if err != nil { if err != nil {
logrus.Errorf("Failed to upload file list: %s", err) logrus.Errorf("Failed to upload file list: %s", err)
err2 := CancelTask(result.WebsiteId) err2 := CancelTask(result.WebsiteId)
@ -75,86 +218,49 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
return return
} }
func uploadChunks(websiteId uint64, f *os.File) error { func uploadWebsocket(websiteId uint64, f *os.File) error {
eof := false
for iter := 1; !eof; iter++ {
// TODO Stream with io.Pipe?
var b bytes.Buffer
multi := multipart.NewWriter(&b) //TODO:
/*
// Set upload fields * OD-DB will give you an Upload token when you fetch the task
var err error * Open a WS connection at {ws_bucket_addr}/upload with the 'X-Upload-Token' as header
err = multi.WriteField("token", config.Token) * Stream whole file as a single WS message
if err != nil { return err } * Close connection
err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId)) */
if err != nil { return err }
// Copy chunk to file_list
formFile, err := multi.CreateFormFile("file_list", "file_list")
var n int64
n, err = io.CopyN(formFile, f, config.ChunkSize)
if err != io.EOF && err != nil {
return err
}
if n == 0 {
// Don't upload, no content
return nil
} else if n < config.ChunkSize {
err = nil
// Break at end of iteration
eof = true
}
multi.Close()
for retries := 0; retries < viper.GetInt(ConfUploadRetries); retries++ {
if retries > 0 {
// Error occurred, retry upload
time.Sleep(viper.GetDuration(ConfUploadRetryInterval))
}
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { continue }
res, err := serverClient.Do(req)
if err != nil { continue }
res.Body.Close()
if res.StatusCode != http.StatusOK {
logrus.WithField("status", res.Status).
WithField("part", iter).
Errorf("Upload failed")
continue
}
// Upload successful
break
}
logrus.WithField("id", websiteId).
WithField("part", iter).
Infof("Uploaded files chunk")
}
return nil return nil
} }
func uploadResult(result *TaskResult) (err error) { func uploadResult(result *TaskResult) (err error) {
resultEnc, err := json.Marshal(result)
if err != nil { panic(err) } //TODO:
/*
* When the file has been uploaded, just release the task with the TR_OK code
* Don't bother sending the ODDB-related stuff, You just need the task id
* Probably a good idea to wrap this around a new releaseTask() function
*/
req := releaseTaskRequest{
TaskId: int64(result.WebsiteId),
ResultCode: result.ResultCode,
// TODO What is verification
Verification: 0,
}
resultEnc, err := json.Marshal(&req)
if err != nil {
panic(err)
}
res, err := serverClient.PostForm( res, err := serverClient.PostForm(
config.ServerUrl + "/task/complete", config.TrackerUrl+"/task/release",
url.Values { url.Values{
"token": {config.Token}, "token": {config.Token},
"result": {string(resultEnc)}, "result": {string(resultEnc)},
}, },
) )
if err != nil { return } if err != nil {
return
}
res.Body.Close() res.Body.Close()
if res.StatusCode != http.StatusOK { if res.StatusCode != http.StatusOK {
@ -165,20 +271,8 @@ func uploadResult(result *TaskResult) (err error) {
} }
func CancelTask(websiteId uint64) (err error) { func CancelTask(websiteId uint64) (err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/cancel",
url.Values{
"token": {config.Token},
"website_id": {strconv.FormatUint(websiteId, 10)},
},
)
if err != nil { return }
res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to cancel task: %s", res.Status)
}
//TODO: Maybe wrap this function around releaseTask(cancel: bool) ?
return return
} }
@ -186,5 +280,55 @@ type ServerTripper struct{}
func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) { func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
req.Header.Set("User-Agent", serverUserAgent) req.Header.Set("User-Agent", serverUserAgent)
//TODO: Move this whole block elsewhere
if serverWorker != nil {
var content []byte
if req.Method == "GET" {
content = []byte("/task/get/" + config.TrackerProject) //todo: find a less retarded way of doing that
} else {
//todo: this is retarded and should be moved elsewhere
buf, _ := ioutil.ReadAll(req.Body)
rdr1 := ioutil.NopCloser(bytes.NewBuffer(buf))
rdr2 := ioutil.NopCloser(bytes.NewBuffer(buf))
content, _ = ioutil.ReadAll(rdr1)
req.Body = rdr2
}
ts := time.Now().Format(time.RFC1123)
mac := hmac.New(crypto.SHA256.New, serverWorker.Secret)
mac.Write(content)
mac.Write([]byte(ts))
sig := hex.EncodeToString(mac.Sum(nil))
req.Header.Add("X-Worker-Id", strconv.Itoa(serverWorker.Id))
req.Header.Add("Timestamp", time.Now().Format(time.RFC1123))
req.Header.Add("X-Signature", sig)
}
return http.DefaultTransport.RoundTrip(req) return http.DefaultTransport.RoundTrip(req)
} }
const mimeJSON = "application/json"
// https://github.com/simon987/task_tracker/blob/master/api/models.go
type submitTaskRequest struct {
Project int64 `json:"project"`
MaxRetries int64 `json:"max_retries"`
Recipe string `json:"recipe"`
Priority int64 `json:"priority"`
MaxAssignTime int64 `json:"max_assign_time"`
Hash64 int64 `json:"hash_u64"`
UniqueString string `json:"unique_string"`
VerificationCount int64 `json:"verification_count"`
}
type releaseTaskRequest struct {
TaskId int64 `json:"task_id"`
ResultCode ResultCode `json:"result"`
Verification int64 `json:"verification"`
}