More work on task_tracker integration

This commit is contained in:
simon987 2019-03-09 16:38:04 -05:00
parent 60471a081e
commit 3470be6086
7 changed files with 294 additions and 256 deletions

View File

@ -13,38 +13,39 @@ import (
)
var config struct {
ServerUrl string
Token string
ServerTimeout time.Duration
Recheck time.Duration
ChunkSize int64
Retries int
Workers int
UserAgent string
Tasks int32
Verbose bool
PrintHTTP bool
JobBufferSize int
TrackerUrl string
TrackerProject string
Token string
ServerTimeout time.Duration
Recheck time.Duration
ChunkSize int64
Retries int
Workers int
UserAgent string
Tasks int32
Verbose bool
PrintHTTP bool
JobBufferSize int
}
var onlineMode bool
const (
ConfServerUrl = "server.url"
ConfToken = "server.token"
ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown"
ConfChunkSize = "server.upload_chunk"
ConfUploadRetries = "server.upload_retries"
ConfTrackerUrl = "server.url"
ConfTrackerProject = "server.project"
ConfServerTimeout = "server.timeout"
ConfRecheck = "server.recheck"
ConfCooldown = "server.cooldown"
ConfChunkSize = "server.upload_chunk"
ConfUploadRetries = "server.upload_retries"
ConfUploadRetryInterval = "server.upload_retry_interval"
ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout"
ConfTasks = "crawl.tasks"
ConfRetries = "crawl.retries"
ConfWorkers = "crawl.connections"
ConfUserAgent = "crawl.user-agent"
ConfDialTimeout = "crawl.dial_timeout"
ConfTimeout = "crawl.timeout"
ConfJobBufferSize = "crawl.job_buffer"
ConfCrawlStats = "output.crawl_stats"
@ -61,21 +62,21 @@ func prepareConfig() {
pf.StringVar(&configFile, "config", "", "Config file")
configFile = os.Getenv("OD_CONFIG")
pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL")
pf.String(ConfTrackerUrl, "http://tt.the-eye.eu/api", "task_tracker api URL")
pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)")
pf.String(ConfTrackerProject, "3", "task_tracker project id")
pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout")
pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout")
pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs")
pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error")
pf.Duration(ConfCooldown, 30*time.Second, "OD-DB: Time to wait after a server-side error")
pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size")
pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries")
pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries")
pf.Duration(ConfUploadRetryInterval, 30*time.Second, "OD-DB: Time to wait between upload retries")
pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks")
@ -83,9 +84,9 @@ func prepareConfig() {
pf.Uint(ConfRetries, 5, "Crawler: Request retries")
pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfDialTimeout, 10*time.Second, "Crawler: Handshake timeout")
pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout")
pf.Duration(ConfTimeout, 30*time.Second, "Crawler: Request timeout")
pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent")
@ -93,7 +94,7 @@ func prepareConfig() {
pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval")
pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval")
pf.Duration(ConfAllocStats, 10*time.Second, "Log: Resource stats interval")
pf.Bool(ConfVerbose, false, "Log: Print every listed dir")
@ -145,17 +146,13 @@ func readConfig() {
}
if onlineMode {
config.ServerUrl = viper.GetString(ConfServerUrl)
if config.ServerUrl == "" {
configMissing(ConfServerUrl)
}
config.ServerUrl = strings.TrimRight(config.ServerUrl, "/")
config.Token = viper.GetString(ConfToken)
if config.Token == "" {
configMissing(ConfToken)
config.TrackerUrl = viper.GetString(ConfTrackerUrl)
if config.TrackerUrl == "" {
configMissing(ConfTrackerUrl)
}
config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/")
}
config.TrackerProject = viper.GetString(ConfTrackerProject)
config.ServerTimeout = viper.GetDuration(ConfServerTimeout)
@ -195,9 +192,11 @@ func readConfig() {
}
if filePath := viper.GetString(ConfLogFile); filePath != "" {
f, err := os.OpenFile(filePath, os.O_CREATE | os.O_WRONLY | os.O_APPEND, 0644)
f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
bufWriter := bufio.NewWriter(f)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
exitHooks.Add(func() {
bufWriter.Flush()
f.Close()

View File

@ -1,7 +1,8 @@
# OD-Database server settings
server:
# Connection URL
url: http://od-db.mine.terorie.com/api
url: http://localhost/api
project: 3
# Server auth token
token:
@ -17,11 +18,7 @@ server:
# Time to wait after receiving an error
# from the server. Doesn't apply to uploads.
cooldown: 30s
# Upload chunk size
# If the value is too high, the upload fails.
upload_chunk: 1 MB
cooldown: 1s
upload_retries: 10
upload_retry_interval: 30s

13
go.mod
View File

@ -1,13 +0,0 @@
module github.com/terorie/od-database-crawler
require (
github.com/beeker1121/goque v2.0.1+incompatible
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
github.com/sirupsen/logrus v1.3.0
github.com/spf13/cobra v0.0.3
github.com/spf13/viper v1.3.1
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect
github.com/valyala/fasthttp v1.2.0
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3
)

66
go.sum
View File

@ -1,66 +0,0 @@
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE=
github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8gh8=
github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM=
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME=
github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI=
github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ=
github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8=
github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk=
github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo=
github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38=
github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs=
github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw=
github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/fasthttp v1.2.0 h1:dzZJf2IuMiclVjdw0kkT+f9u4YdrapbNyGAN47E/qnk=
github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc=
golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI=
golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A=
golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@ -8,10 +8,11 @@ import (
)
type ResultCode int
const (
TR_OK = ResultCode(iota)
TR_FAIL
TR_SKIP
TR_OK = ResultCode(iota)
TR_FAIL = 1
TR_SKIP = 2
)
type Task struct {
@ -58,13 +59,16 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) {
defer o.Scanned.Unlock()
exists = o.Scanned.Get(k)
if exists { return true }
if exists {
return true
}
o.Scanned.Put(k)
return false
}
type errorString string
func (e errorString) Error() string {
return string(e)
}

View File

@ -34,13 +34,16 @@ func Schedule(c context.Context, remotes <-chan *OD) {
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
if err := os.RemoveAll(queuePath); err != nil {
panic(err)
}
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
// Spawn workers
for i := 0; i < config.Workers; i++ {
@ -77,12 +80,12 @@ func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
globalWait.Add(1)
now := time.Now()
od := &OD {
Task: *t,
od := &OD{
Task: *t,
BaseUri: *u,
Result: TaskResult {
WebsiteId: t.WebsiteId,
StartTime: now,
Result: TaskResult{
WebsiteId: t.WebsiteId,
StartTime: now,
StartTimeUnix: now.Unix(),
},
}
@ -117,7 +120,7 @@ func (o *OD) Watch(results chan File) {
// Open crawl results file
f, err := os.OpenFile(
filePath,
os.O_CREATE | os.O_RDWR | os.O_TRUNC,
os.O_CREATE|os.O_RDWR|os.O_TRUNC,
0644,
)
if err != nil {
@ -170,24 +173,18 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error
// Log finish
logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"duration": time.Since(o.Result.StartTime),
}).Info("Crawler finished")
// Set status code
now := time.Now()
o.Result.EndTimeUnix = now.Unix()
fileCount := atomic.LoadUint64(&o.Result.FileCount)
if fileCount == 0 {
errorCount := atomic.LoadUint64(&o.Result.ErrorCount)
if errorCount == 0 {
o.Result.StatusCode = "empty"
} else {
o.Result.StatusCode = "directory listing failed"
}
if atomic.LoadUint64(&o.Result.ErrorCount) != 0 {
o.Result.ResultCode = TR_FAIL
} else {
o.Result.StatusCode = "success"
o.Result.ResultCode = TR_OK
}
}
@ -205,11 +202,17 @@ func (t *Task) collect(results chan File, f *os.File) error {
result.Path = fasturl.PathUnescape(result.Path)
result.Name = fasturl.PathUnescape(result.Name)
resJson, err := json.Marshal(result)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
_, err = f.Write(resJson)
if err != nil { return err }
if err != nil {
return err
}
_, err = f.Write([]byte{'\n'})
if err != nil { return err }
if err != nil {
return err
}
}
return nil

312
server.go
View File

@ -2,12 +2,15 @@ package main
import (
"bytes"
"crypto"
"crypto/hmac"
"encoding/hex"
"encoding/json"
"fmt"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
"io"
"mime/multipart"
"golang.org/x/time/rate"
"io/ioutil"
"net/http"
"net/url"
"os"
@ -15,34 +18,174 @@ import (
"time"
)
var serverClient = http.Client {
Timeout: config.ServerTimeout,
var serverWorker *TrackerWorker
var serverClient = http.Client{
Timeout: config.ServerTimeout,
Transport: new(ServerTripper),
}
var serverUserAgent = "od-database-crawler/" + rootCmd.Version
//TODO: Move those elsewhere?
type WorkerAccessRequest struct {
Assign bool `json:"assign"`
Submit bool `json:"submit"`
Project int `json:"project"`
}
//todo: only keep necessary info
type FetchTaskResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Task struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Project struct {
Id int64 `json:"id"`
Priority int64 `json:"priority"`
Name string `json:"name"`
CloneUrl string `json:"clone_url"`
GitRepo string `json:"git_repo"`
Version string `json:"version"`
Motd string `json:"motd"`
Public bool `json:"public"`
Hidden bool `json:"hidden"`
Chain int64 `json:"chain"`
Paused bool `json:"paused"`
AssignRate rate.Limit `json:"assign_rate"`
SubmitRate rate.Limit `json:"submit_rate"`
} `json:"project"`
Assignee int64 `json:"assignee"`
Retries int64 `json:"retries"`
MaxRetries int64 `json:"max_retries"`
Status int64 `json:"status"`
Recipe string `json:"recipe"`
MaxAssignTime int64 `json:"max_assign_time"`
AssignTime int64 `json:"assign_time"`
VerificationCount int64 `json:"verification_count"`
} `json:"task"`
} `json:"content"`
}
type TrackerWorker struct {
Alias string `json:"alias"`
Id int `json:"id"`
Secret []byte `json:"secret"`
}
type CreateTrackerWorkerResponse struct {
Ok bool `json:"ok"`
Message string `json:"message"`
Content struct {
Worker TrackerWorker `json:"worker"`
} `json:"content"`
}
type CreateTrackerWorkerRequest struct {
Alias string `json:"alias"`
}
func getOrCreateWorker() *TrackerWorker {
var worker TrackerWorker
if _, err := os.Stat("worker.json"); os.IsNotExist(err) {
req := CreateTrackerWorkerRequest{
Alias: "crawler", //todo: load from config
}
body, _ := json.Marshal(&req)
buf := bytes.NewBuffer(body)
resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf)
//todo: handle err
fmt.Println(resp.StatusCode)
workerResponse := CreateTrackerWorkerResponse{}
respBody, _ := ioutil.ReadAll(resp.Body)
_ = json.Unmarshal(respBody, &workerResponse)
//todo handle err
fmt.Println(workerResponse)
workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker)
fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600)
_, _ = fp.Write(workerJsonData)
//todo: handle err
} else {
fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600)
workerJsonData, _ := ioutil.ReadAll(fp)
_ = json.Unmarshal(workerJsonData, &worker)
//todo: handle err
}
return &worker
}
func FetchTask() (t *Task, err error) {
res, err := serverClient.PostForm(
config.ServerUrl + "/task/get",
url.Values{ "token": {config.Token} })
if err != nil { return }
//todo: this whole block should definitely be extracted elsewhere
if serverWorker == nil {
serverWorker = getOrCreateWorker()
//Request ASSIGN permission
//todo: project ID should be stored as a int in the first place
pid, _ := strconv.Atoi(config.TrackerProject)
accessReq, _ := json.Marshal(WorkerAccessRequest{
Project: int(pid),
Assign: true,
Submit: false,
})
buf := bytes.NewBuffer(accessReq)
res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf)
if err != nil {
panic(err)
}
fmt.Println(res.StatusCode)
}
res, err := serverClient.Get(
config.TrackerUrl + "/task/get/" + config.TrackerProject)
if err != nil {
return
}
defer res.Body.Close()
switch res.StatusCode {
case 200:
break
//TODO: 404 should not happen.
case 404, 500:
return nil, nil
default:
return nil, fmt.Errorf("http %s", res.Status)
}
t = new(Task)
err = json.NewDecoder(res.Body).Decode(t)
jsonResponse := FetchTaskResponse{}
err = json.NewDecoder(res.Body).Decode(&jsonResponse)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil { return }
} else if err != nil {
return
}
if !jsonResponse.Ok {
//The tracker will return Ok=false when no tasks are available
err = errors.New(jsonResponse.Message)
return
}
fmt.Println(jsonResponse.Content.Task.Recipe)
task := Task{}
err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task)
if _, ok := err.(*json.SyntaxError); ok {
return nil, fmt.Errorf("/task/get returned invalid JSON")
} else if err != nil {
return
}
t = &task
return
}
@ -59,7 +202,7 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
return
}
err = uploadChunks(result.WebsiteId, f)
err = uploadWebsocket(result.WebsiteId, f)
if err != nil {
logrus.Errorf("Failed to upload file list: %s", err)
err2 := CancelTask(result.WebsiteId)
@ -75,93 +218,49 @@ func PushResult(result *TaskResult, f *os.File) (err error) {
return
}
func uploadChunks(websiteId uint64, f *os.File) error {
eof := false
for iter := 1; !eof; iter++ {
// TODO Stream with io.Pipe?
var b bytes.Buffer
func uploadWebsocket(websiteId uint64, f *os.File) error {
multi := multipart.NewWriter(&b)
// Set upload fields
var err error
err = multi.WriteField("token", config.Token)
if err != nil { return err }
err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId))
if err != nil { return err }
// Copy chunk to file_list
formFile, err := multi.CreateFormFile("file_list", "file_list")
var n int64
n, err = io.CopyN(formFile, f, config.ChunkSize)
if err != io.EOF && err != nil {
return err
}
if n == 0 {
// Don't upload, no content
return nil
} else if n < config.ChunkSize {
err = nil
// Break at end of iteration
eof = true
}
multi.Close()
for retries := 0; retries < viper.GetInt(ConfUploadRetries); retries++ {
if retries > 0 {
// Error occurred, retry upload
time.Sleep(viper.GetDuration(ConfUploadRetryInterval))
}
req, err := http.NewRequest(
http.MethodPost,
config.ServerUrl + "/task/upload",
&b)
req.Header.Set("content-type", multi.FormDataContentType())
if err != nil { continue }
res, err := serverClient.Do(req)
if err != nil { continue }
res.Body.Close()
if res.StatusCode != http.StatusOK {
logrus.WithField("status", res.Status).
WithField("part", iter).
Errorf("Upload failed")
continue
}
// Upload successful
break
}
logrus.WithField("id", websiteId).
WithField("part", iter).
Infof("Uploaded files chunk")
}
//TODO:
/*
* OD-DB will give you an Upload token when you fetch the task
* Open a WS connection at {ws_bucket_addr}/upload with the 'X-Upload-Token' as header
* Stream whole file as a single WS message
* Close connection
*/
return nil
}
func uploadResult(result *TaskResult) (err error) {
req := releaseTaskRequest {
TaskId: int64(result.WebsiteId),
//TODO:
/*
* When the file has been uploaded, just release the task with the TR_OK code
* Don't bother sending the ODDB-related stuff, You just need the task id
* Probably a good idea to wrap this around a new releaseTask() function
*/
req := releaseTaskRequest{
TaskId: int64(result.WebsiteId),
ResultCode: result.ResultCode,
// TODO What is verification
Verification: 0,
}
resultEnc, err := json.Marshal(&req)
if err != nil { panic(err) }
if err != nil {
panic(err)
}
res, err := serverClient.PostForm(
config.ServerUrl + "/task/release",
url.Values {
"token": {config.Token},
config.TrackerUrl+"/task/release",
url.Values{
"token": {config.Token},
"result": {string(resultEnc)},
},
)
if err != nil { return }
if err != nil {
return
}
res.Body.Close()
if res.StatusCode != http.StatusOK {
@ -172,22 +271,8 @@ func uploadResult(result *TaskResult) (err error) {
}
func CancelTask(websiteId uint64) (err error) {
// TODO Remove, no endpoint in task_tracker
res, err := serverClient.PostForm(
config.ServerUrl + "/task/cancel",
url.Values{
"token": {config.Token},
"website_id": {strconv.FormatUint(websiteId, 10)},
},
)
if err != nil { return }
res.Body.Close()
if res.StatusCode != http.StatusOK {
return fmt.Errorf("failed to cancel task: %s", res.Status)
}
//TODO: Maybe wrap this function around releaseTask(cancel: bool) ?
return
}
@ -195,6 +280,35 @@ type ServerTripper struct{}
func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) {
req.Header.Set("User-Agent", serverUserAgent)
//TODO: Move this whole block elsewhere
if serverWorker != nil {
var content []byte
if req.Method == "GET" {
content = []byte("/task/get/" + config.TrackerProject) //todo: find a less retarded way of doing that
} else {
//todo: this is retarded and should be moved elsewhere
buf, _ := ioutil.ReadAll(req.Body)
rdr1 := ioutil.NopCloser(bytes.NewBuffer(buf))
rdr2 := ioutil.NopCloser(bytes.NewBuffer(buf))
content, _ = ioutil.ReadAll(rdr1)
req.Body = rdr2
}
ts := time.Now().Format(time.RFC1123)
mac := hmac.New(crypto.SHA256.New, serverWorker.Secret)
mac.Write(content)
mac.Write([]byte(ts))
sig := hex.EncodeToString(mac.Sum(nil))
req.Header.Add("X-Worker-Id", strconv.Itoa(serverWorker.Id))
req.Header.Add("Timestamp", time.Now().Format(time.RFC1123))
req.Header.Add("X-Signature", sig)
}
return http.DefaultTransport.RoundTrip(req)
}