diff --git a/config.go b/config.go index d80ebef..718410b 100644 --- a/config.go +++ b/config.go @@ -13,38 +13,39 @@ import ( ) var config struct { - ServerUrl string - Token string - ServerTimeout time.Duration - Recheck time.Duration - ChunkSize int64 - Retries int - Workers int - UserAgent string - Tasks int32 - Verbose bool - PrintHTTP bool - JobBufferSize int + TrackerUrl string + TrackerProject string + Token string + ServerTimeout time.Duration + Recheck time.Duration + ChunkSize int64 + Retries int + Workers int + UserAgent string + Tasks int32 + Verbose bool + PrintHTTP bool + JobBufferSize int } var onlineMode bool const ( - ConfServerUrl = "server.url" - ConfToken = "server.token" - ConfServerTimeout = "server.timeout" - ConfRecheck = "server.recheck" - ConfCooldown = "server.cooldown" - ConfChunkSize = "server.upload_chunk" - ConfUploadRetries = "server.upload_retries" + ConfTrackerUrl = "server.url" + ConfTrackerProject = "server.project" + ConfServerTimeout = "server.timeout" + ConfRecheck = "server.recheck" + ConfCooldown = "server.cooldown" + ConfChunkSize = "server.upload_chunk" + ConfUploadRetries = "server.upload_retries" ConfUploadRetryInterval = "server.upload_retry_interval" - ConfTasks = "crawl.tasks" - ConfRetries = "crawl.retries" - ConfWorkers = "crawl.connections" - ConfUserAgent = "crawl.user-agent" - ConfDialTimeout = "crawl.dial_timeout" - ConfTimeout = "crawl.timeout" + ConfTasks = "crawl.tasks" + ConfRetries = "crawl.retries" + ConfWorkers = "crawl.connections" + ConfUserAgent = "crawl.user-agent" + ConfDialTimeout = "crawl.dial_timeout" + ConfTimeout = "crawl.timeout" ConfJobBufferSize = "crawl.job_buffer" ConfCrawlStats = "output.crawl_stats" @@ -61,21 +62,21 @@ func prepareConfig() { pf.StringVar(&configFile, "config", "", "Config file") configFile = os.Getenv("OD_CONFIG") - pf.String(ConfServerUrl, "http://od-db.the-eye.eu/api", "OD-DB server URL") + pf.String(ConfTrackerUrl, "http://tt.the-eye.eu/api", "task_tracker api URL") - pf.String(ConfToken, "", "OD-DB access token (env OD_SERVER_TOKEN)") + pf.String(ConfTrackerProject, "3", "task_tracker project id") - pf.Duration(ConfServerTimeout, 60 * time.Second, "OD-DB request timeout") + pf.Duration(ConfServerTimeout, 60*time.Second, "OD-DB request timeout") - pf.Duration(ConfRecheck, 1 * time.Second, "OD-DB: Poll interval for new jobs") + pf.Duration(ConfRecheck, 1*time.Second, "OD-DB: Poll interval for new jobs") - pf.Duration(ConfCooldown, 30 * time.Second, "OD-DB: Time to wait after a server-side error") + pf.Duration(ConfCooldown, 30*time.Second, "OD-DB: Time to wait after a server-side error") pf.String(ConfChunkSize, "1 MB", "OD-DB: Result upload chunk size") pf.Uint(ConfUploadRetries, 10, "OD-DB: Max upload retries") - pf.Duration(ConfUploadRetryInterval, 30 * time.Second, "OD-DB: Time to wait between upload retries") + pf.Duration(ConfUploadRetryInterval, 30*time.Second, "OD-DB: Time to wait between upload retries") pf.Uint(ConfTasks, 100, "Crawler: Max concurrent tasks") @@ -83,9 +84,9 @@ func prepareConfig() { pf.Uint(ConfRetries, 5, "Crawler: Request retries") - pf.Duration(ConfDialTimeout, 10 * time.Second, "Crawler: Handshake timeout") + pf.Duration(ConfDialTimeout, 10*time.Second, "Crawler: Handshake timeout") - pf.Duration(ConfTimeout, 30 * time.Second, "Crawler: Request timeout") + pf.Duration(ConfTimeout, 30*time.Second, "Crawler: Request timeout") pf.String(ConfUserAgent, "Mozilla/5.0 (X11; od-database-crawler) Gecko/20100101 Firefox/52.0", "Crawler: User-Agent") @@ -93,7 +94,7 @@ func prepareConfig() { pf.Duration(ConfCrawlStats, time.Second, "Log: Crawl stats interval") - pf.Duration(ConfAllocStats, 10 * time.Second, "Log: Resource stats interval") + pf.Duration(ConfAllocStats, 10*time.Second, "Log: Resource stats interval") pf.Bool(ConfVerbose, false, "Log: Print every listed dir") @@ -145,17 +146,13 @@ func readConfig() { } if onlineMode { - config.ServerUrl = viper.GetString(ConfServerUrl) - if config.ServerUrl == "" { - configMissing(ConfServerUrl) - } - config.ServerUrl = strings.TrimRight(config.ServerUrl, "/") - - config.Token = viper.GetString(ConfToken) - if config.Token == "" { - configMissing(ConfToken) + config.TrackerUrl = viper.GetString(ConfTrackerUrl) + if config.TrackerUrl == "" { + configMissing(ConfTrackerUrl) } + config.TrackerUrl = strings.TrimRight(config.TrackerUrl, "/") } + config.TrackerProject = viper.GetString(ConfTrackerProject) config.ServerTimeout = viper.GetDuration(ConfServerTimeout) @@ -195,9 +192,11 @@ func readConfig() { } if filePath := viper.GetString(ConfLogFile); filePath != "" { - f, err := os.OpenFile(filePath, os.O_CREATE | os.O_WRONLY | os.O_APPEND, 0644) + f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) bufWriter := bufio.NewWriter(f) - if err != nil { panic(err) } + if err != nil { + panic(err) + } exitHooks.Add(func() { bufWriter.Flush() f.Close() diff --git a/config.yml b/config.yml index a420c80..3408c0d 100644 --- a/config.yml +++ b/config.yml @@ -1,7 +1,8 @@ # OD-Database server settings server: # Connection URL - url: http://od-db.mine.terorie.com/api + url: http://localhost/api + project: 3 # Server auth token token: @@ -17,11 +18,7 @@ server: # Time to wait after receiving an error # from the server. Doesn't apply to uploads. - cooldown: 30s - - # Upload chunk size - # If the value is too high, the upload fails. - upload_chunk: 1 MB + cooldown: 1s upload_retries: 10 upload_retry_interval: 30s diff --git a/go.mod b/go.mod deleted file mode 100644 index 461b1c2..0000000 --- a/go.mod +++ /dev/null @@ -1,13 +0,0 @@ -module github.com/terorie/od-database-crawler - -require ( - github.com/beeker1121/goque v2.0.1+incompatible - github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect - github.com/sirupsen/logrus v1.3.0 - github.com/spf13/cobra v0.0.3 - github.com/spf13/viper v1.3.1 - github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 // indirect - github.com/valyala/fasthttp v1.2.0 - golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 - golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 -) diff --git a/go.sum b/go.sum deleted file mode 100644 index f7ef193..0000000 --- a/go.sum +++ /dev/null @@ -1,66 +0,0 @@ -github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/beeker1121/goque v2.0.1+incompatible h1:5nJHPMqQLxUvGFc8m/NW2QzxKyc0zICmqs/JUsmEjwE= -github.com/beeker1121/goque v2.0.1+incompatible/go.mod h1:L6dOWBhDOnxUVQsb0wkLve0VCnt2xJW/MI8pdRX4ANw= -github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= -github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= -github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= -github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w= -github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= -github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/klauspost/compress v1.4.0 h1:8nsMz3tWa9SWWPL60G1V6CUsf4lLjWLTNEtibhe8gh8= -github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= -github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc70QXJ07+2eg2Jy2EC7Mi11BWujeM= -github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= -github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= -github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= -github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= -github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= -github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= -github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= -github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/sirupsen/logrus v1.3.0 h1:hI/7Q+DtNZ2kINb6qt/lS+IyXnHQe9e90POfeewL/ME= -github.com/sirupsen/logrus v1.3.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= -github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= -github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= -github.com/spf13/cast v1.3.0 h1:oget//CVOEoFewqQxwr0Ej5yjygnqGkvggSE/gB35Q8= -github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= -github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= -github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= -github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= -github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= -github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= -github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= -github.com/spf13/viper v1.3.1 h1:5+8j8FTpnFV4nEImW/ofkzEt8VoOiLXxdYIDsB73T38= -github.com/spf13/viper v1.3.1/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= -github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= -github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2 h1:GnOzE5fEFN3b2zDhJJABEofdb51uMRNb8eqIVtdducs= -github.com/syndtr/goleveldb v0.0.0-20181128100959-b001fa50d6b2/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= -github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.1.0 h1:3BohG7mqwj4lq7PTX//7gLbUlzNvZSPmuHFnloXT0lw= -github.com/valyala/fasthttp v1.1.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s= -github.com/valyala/fasthttp v1.2.0 h1:dzZJf2IuMiclVjdw0kkT+f9u4YdrapbNyGAN47E/qnk= -github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s= -github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= -github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= -golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613 h1:MQ/ZZiDsUapFFiMS+vzwXkCTeEKaum+Do5rINYJDmxc= -golang.org/x/crypto v0.0.0-20190131182504-b8fe1690c613/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3 h1:czFLhve3vsQetD6JOJ8NZZvGQIXlnN3/yXxbT6/awxI= -golang.org/x/net v0.0.0-20180911220305-26e67e76b6c3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A= -golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/model.go b/model.go index 8b8ad54..629c4ff 100644 --- a/model.go +++ b/model.go @@ -8,10 +8,11 @@ import ( ) type ResultCode int + const ( - TR_OK = ResultCode(iota) - TR_FAIL - TR_SKIP + TR_OK = ResultCode(iota) + TR_FAIL = 1 + TR_SKIP = 2 ) type Task struct { @@ -58,13 +59,16 @@ func (o *OD) LoadOrStoreKey(k *redblackhash.Key) (exists bool) { defer o.Scanned.Unlock() exists = o.Scanned.Get(k) - if exists { return true } + if exists { + return true + } o.Scanned.Put(k) return false } type errorString string + func (e errorString) Error() string { return string(e) } diff --git a/scheduler.go b/scheduler.go index 9abe491..716243a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -34,13 +34,16 @@ func Schedule(c context.Context, remotes <-chan *OD) { queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId)) // Delete existing queue - if err := os.RemoveAll(queuePath); - err != nil { panic(err) } + if err := os.RemoveAll(queuePath); err != nil { + panic(err) + } // Start new queue var err error remote.WCtx.Queue, err = OpenQueue(queuePath) - if err != nil { panic(err) } + if err != nil { + panic(err) + } // Spawn workers for i := 0; i < config.Workers; i++ { @@ -77,12 +80,12 @@ func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) { globalWait.Add(1) now := time.Now() - od := &OD { - Task: *t, + od := &OD{ + Task: *t, BaseUri: *u, - Result: TaskResult { - WebsiteId: t.WebsiteId, - StartTime: now, + Result: TaskResult{ + WebsiteId: t.WebsiteId, + StartTime: now, StartTimeUnix: now.Unix(), }, } @@ -117,7 +120,7 @@ func (o *OD) Watch(results chan File) { // Open crawl results file f, err := os.OpenFile( filePath, - os.O_CREATE | os.O_RDWR | os.O_TRUNC, + os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644, ) if err != nil { @@ -170,24 +173,18 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error // Log finish logrus.WithFields(logrus.Fields{ - "id": o.Task.WebsiteId, - "url": o.BaseUri.String(), + "id": o.Task.WebsiteId, + "url": o.BaseUri.String(), "duration": time.Since(o.Result.StartTime), }).Info("Crawler finished") // Set status code now := time.Now() o.Result.EndTimeUnix = now.Unix() - fileCount := atomic.LoadUint64(&o.Result.FileCount) - if fileCount == 0 { - errorCount := atomic.LoadUint64(&o.Result.ErrorCount) - if errorCount == 0 { - o.Result.StatusCode = "empty" - } else { - o.Result.StatusCode = "directory listing failed" - } + if atomic.LoadUint64(&o.Result.ErrorCount) != 0 { + o.Result.ResultCode = TR_FAIL } else { - o.Result.StatusCode = "success" + o.Result.ResultCode = TR_OK } } @@ -205,11 +202,17 @@ func (t *Task) collect(results chan File, f *os.File) error { result.Path = fasturl.PathUnescape(result.Path) result.Name = fasturl.PathUnescape(result.Name) resJson, err := json.Marshal(result) - if err != nil { panic(err) } + if err != nil { + panic(err) + } _, err = f.Write(resJson) - if err != nil { return err } + if err != nil { + return err + } _, err = f.Write([]byte{'\n'}) - if err != nil { return err } + if err != nil { + return err + } } return nil diff --git a/server.go b/server.go index f48c0ce..2f2f550 100644 --- a/server.go +++ b/server.go @@ -2,12 +2,15 @@ package main import ( "bytes" + "crypto" + "crypto/hmac" + "encoding/hex" "encoding/json" "fmt" + "github.com/pkg/errors" "github.com/sirupsen/logrus" - "github.com/spf13/viper" - "io" - "mime/multipart" + "golang.org/x/time/rate" + "io/ioutil" "net/http" "net/url" "os" @@ -15,34 +18,174 @@ import ( "time" ) -var serverClient = http.Client { - Timeout: config.ServerTimeout, +var serverWorker *TrackerWorker + +var serverClient = http.Client{ + Timeout: config.ServerTimeout, Transport: new(ServerTripper), } var serverUserAgent = "od-database-crawler/" + rootCmd.Version +//TODO: Move those elsewhere? +type WorkerAccessRequest struct { + Assign bool `json:"assign"` + Submit bool `json:"submit"` + Project int `json:"project"` +} + +//todo: only keep necessary info +type FetchTaskResponse struct { + Ok bool `json:"ok"` + Message string `json:"message"` + Content struct { + Task struct { + Id int64 `json:"id"` + Priority int64 `json:"priority"` + Project struct { + Id int64 `json:"id"` + Priority int64 `json:"priority"` + Name string `json:"name"` + CloneUrl string `json:"clone_url"` + GitRepo string `json:"git_repo"` + Version string `json:"version"` + Motd string `json:"motd"` + Public bool `json:"public"` + Hidden bool `json:"hidden"` + Chain int64 `json:"chain"` + Paused bool `json:"paused"` + AssignRate rate.Limit `json:"assign_rate"` + SubmitRate rate.Limit `json:"submit_rate"` + } `json:"project"` + Assignee int64 `json:"assignee"` + Retries int64 `json:"retries"` + MaxRetries int64 `json:"max_retries"` + Status int64 `json:"status"` + Recipe string `json:"recipe"` + MaxAssignTime int64 `json:"max_assign_time"` + AssignTime int64 `json:"assign_time"` + VerificationCount int64 `json:"verification_count"` + } `json:"task"` + } `json:"content"` +} + +type TrackerWorker struct { + Alias string `json:"alias"` + Id int `json:"id"` + Secret []byte `json:"secret"` +} + +type CreateTrackerWorkerResponse struct { + Ok bool `json:"ok"` + Message string `json:"message"` + Content struct { + Worker TrackerWorker `json:"worker"` + } `json:"content"` +} + +type CreateTrackerWorkerRequest struct { + Alias string `json:"alias"` +} + +func getOrCreateWorker() *TrackerWorker { + + var worker TrackerWorker + + if _, err := os.Stat("worker.json"); os.IsNotExist(err) { + req := CreateTrackerWorkerRequest{ + Alias: "crawler", //todo: load from config + } + body, _ := json.Marshal(&req) + buf := bytes.NewBuffer(body) + resp, _ := serverClient.Post(config.TrackerUrl+"/worker/create", "application/json", buf) + //todo: handle err + + fmt.Println(resp.StatusCode) + + workerResponse := CreateTrackerWorkerResponse{} + respBody, _ := ioutil.ReadAll(resp.Body) + _ = json.Unmarshal(respBody, &workerResponse) + //todo handle err + fmt.Println(workerResponse) + + workerJsonData, _ := json.Marshal(&workerResponse.Content.Worker) + fp, _ := os.OpenFile("worker.json", os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0600) + _, _ = fp.Write(workerJsonData) + //todo: handle err + } else { + fp, _ := os.OpenFile("worker.json", os.O_RDONLY, 0600) + workerJsonData, _ := ioutil.ReadAll(fp) + _ = json.Unmarshal(workerJsonData, &worker) + //todo: handle err + } + + return &worker +} + func FetchTask() (t *Task, err error) { - res, err := serverClient.PostForm( - config.ServerUrl + "/task/get", - url.Values{ "token": {config.Token} }) - if err != nil { return } + + //todo: this whole block should definitely be extracted elsewhere + if serverWorker == nil { + serverWorker = getOrCreateWorker() + + //Request ASSIGN permission + //todo: project ID should be stored as a int in the first place + pid, _ := strconv.Atoi(config.TrackerProject) + accessReq, _ := json.Marshal(WorkerAccessRequest{ + Project: int(pid), + Assign: true, + Submit: false, + }) + buf := bytes.NewBuffer(accessReq) + res, err := serverClient.Post(config.TrackerUrl+"/project/request_access", "application/json", buf) + if err != nil { + panic(err) + } + fmt.Println(res.StatusCode) + } + + res, err := serverClient.Get( + config.TrackerUrl + "/task/get/" + config.TrackerProject) + + if err != nil { + return + } defer res.Body.Close() switch res.StatusCode { case 200: break + //TODO: 404 should not happen. case 404, 500: return nil, nil default: return nil, fmt.Errorf("http %s", res.Status) } - t = new(Task) - err = json.NewDecoder(res.Body).Decode(t) + jsonResponse := FetchTaskResponse{} + err = json.NewDecoder(res.Body).Decode(&jsonResponse) if _, ok := err.(*json.SyntaxError); ok { return nil, fmt.Errorf("/task/get returned invalid JSON") - } else if err != nil { return } + } else if err != nil { + return + } + + if !jsonResponse.Ok { + //The tracker will return Ok=false when no tasks are available + err = errors.New(jsonResponse.Message) + return + } + + fmt.Println(jsonResponse.Content.Task.Recipe) + task := Task{} + err = json.Unmarshal([]byte(jsonResponse.Content.Task.Recipe), &task) + if _, ok := err.(*json.SyntaxError); ok { + return nil, fmt.Errorf("/task/get returned invalid JSON") + } else if err != nil { + return + } + + t = &task return } @@ -59,7 +202,7 @@ func PushResult(result *TaskResult, f *os.File) (err error) { return } - err = uploadChunks(result.WebsiteId, f) + err = uploadWebsocket(result.WebsiteId, f) if err != nil { logrus.Errorf("Failed to upload file list: %s", err) err2 := CancelTask(result.WebsiteId) @@ -75,93 +218,49 @@ func PushResult(result *TaskResult, f *os.File) (err error) { return } -func uploadChunks(websiteId uint64, f *os.File) error { - eof := false - for iter := 1; !eof; iter++ { - // TODO Stream with io.Pipe? - var b bytes.Buffer +func uploadWebsocket(websiteId uint64, f *os.File) error { - multi := multipart.NewWriter(&b) - - // Set upload fields - var err error - err = multi.WriteField("token", config.Token) - if err != nil { return err } - err = multi.WriteField("website_id", fmt.Sprintf("%d", websiteId)) - if err != nil { return err } - - // Copy chunk to file_list - formFile, err := multi.CreateFormFile("file_list", "file_list") - var n int64 - n, err = io.CopyN(formFile, f, config.ChunkSize) - if err != io.EOF && err != nil { - return err - } - if n == 0 { - // Don't upload, no content - return nil - } else if n < config.ChunkSize { - err = nil - // Break at end of iteration - eof = true - } - - multi.Close() - - for retries := 0; retries < viper.GetInt(ConfUploadRetries); retries++ { - if retries > 0 { - // Error occurred, retry upload - time.Sleep(viper.GetDuration(ConfUploadRetryInterval)) - } - - req, err := http.NewRequest( - http.MethodPost, - config.ServerUrl + "/task/upload", - &b) - req.Header.Set("content-type", multi.FormDataContentType()) - if err != nil { continue } - - res, err := serverClient.Do(req) - if err != nil { continue } - res.Body.Close() - - if res.StatusCode != http.StatusOK { - logrus.WithField("status", res.Status). - WithField("part", iter). - Errorf("Upload failed") - continue - } - - // Upload successful - break - } - - logrus.WithField("id", websiteId). - WithField("part", iter). - Infof("Uploaded files chunk") - } + //TODO: + /* + * OD-DB will give you an Upload token when you fetch the task + * Open a WS connection at {ws_bucket_addr}/upload with the 'X-Upload-Token' as header + * Stream whole file as a single WS message + * Close connection + */ return nil } func uploadResult(result *TaskResult) (err error) { - req := releaseTaskRequest { - TaskId: int64(result.WebsiteId), + + //TODO: + /* + * When the file has been uploaded, just release the task with the TR_OK code + * Don't bother sending the ODDB-related stuff, You just need the task id + * Probably a good idea to wrap this around a new releaseTask() function + */ + + req := releaseTaskRequest{ + TaskId: int64(result.WebsiteId), ResultCode: result.ResultCode, // TODO What is verification Verification: 0, } resultEnc, err := json.Marshal(&req) - if err != nil { panic(err) } + if err != nil { + panic(err) + } res, err := serverClient.PostForm( - config.ServerUrl + "/task/release", - url.Values { - "token": {config.Token}, + config.TrackerUrl+"/task/release", + url.Values{ + "token": {config.Token}, "result": {string(resultEnc)}, }, ) - if err != nil { return } + if err != nil { + return + } res.Body.Close() if res.StatusCode != http.StatusOK { @@ -172,22 +271,8 @@ func uploadResult(result *TaskResult) (err error) { } func CancelTask(websiteId uint64) (err error) { - // TODO Remove, no endpoint in task_tracker - - res, err := serverClient.PostForm( - config.ServerUrl + "/task/cancel", - url.Values{ - "token": {config.Token}, - "website_id": {strconv.FormatUint(websiteId, 10)}, - }, - ) - if err != nil { return } - res.Body.Close() - - if res.StatusCode != http.StatusOK { - return fmt.Errorf("failed to cancel task: %s", res.Status) - } + //TODO: Maybe wrap this function around releaseTask(cancel: bool) ? return } @@ -195,6 +280,35 @@ type ServerTripper struct{} func (t *ServerTripper) RoundTrip(req *http.Request) (res *http.Response, err error) { req.Header.Set("User-Agent", serverUserAgent) + + //TODO: Move this whole block elsewhere + if serverWorker != nil { + var content []byte + if req.Method == "GET" { + content = []byte("/task/get/" + config.TrackerProject) //todo: find a less retarded way of doing that + } else { + //todo: this is retarded and should be moved elsewhere + buf, _ := ioutil.ReadAll(req.Body) + rdr1 := ioutil.NopCloser(bytes.NewBuffer(buf)) + rdr2 := ioutil.NopCloser(bytes.NewBuffer(buf)) + + content, _ = ioutil.ReadAll(rdr1) + + req.Body = rdr2 + } + + ts := time.Now().Format(time.RFC1123) + + mac := hmac.New(crypto.SHA256.New, serverWorker.Secret) + mac.Write(content) + mac.Write([]byte(ts)) + sig := hex.EncodeToString(mac.Sum(nil)) + + req.Header.Add("X-Worker-Id", strconv.Itoa(serverWorker.Id)) + req.Header.Add("Timestamp", time.Now().Format(time.RFC1123)) + req.Header.Add("X-Signature", sig) + + } return http.DefaultTransport.RoundTrip(req) }