From faad19f121604ec76937ec066ffc47f3c1622483 Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Sun, 28 Oct 2018 03:41:16 +0100 Subject: [PATCH] more stuff --- config.yml | 3 ++- main.go | 2 +- scheduler.go | 8 ++++++-- worker.go | 31 +++++++++++++++++++++++++------ 4 files changed, 34 insertions(+), 10 deletions(-) diff --git a/config.yml b/config.yml index 7a4ac78..fbfa1c5 100644 --- a/config.yml +++ b/config.yml @@ -2,4 +2,5 @@ server_url: localhost:6969 token: abc stats_interval: 1s verbose: true -retries: 0 \ No newline at end of file +retries: 5 +workers: 80 \ No newline at end of file diff --git a/main.go b/main.go index 6bb46c2..ba8a427 100644 --- a/main.go +++ b/main.go @@ -14,7 +14,7 @@ func main() { remotes := make(chan *RemoteDir) go Schedule(c, remotes) - u, _ := url.Parse("https://the-eye.eu/public/rom/") + u, _ := url.Parse("http://mine.terorie.com:420/") remote := NewRemoteDir(*u) globalWait.Add(1) diff --git a/scheduler.go b/scheduler.go index 87ec34f..2224c4a 100644 --- a/scheduler.go +++ b/scheduler.go @@ -10,11 +10,15 @@ type Job struct { Uri url.URL UriStr string Fails int + LastError error } func Schedule(c context.Context, remotes <-chan *RemoteDir) { in, out := makeJobBuffer() - wCtx := WorkerContext{ in, out } + wCtx := WorkerContext{ + in: in, + out: out, + } for i := 0; i < config.Workers; i++ { go wCtx.Worker() } @@ -28,7 +32,7 @@ func Schedule(c context.Context, remotes <-chan *RemoteDir) { case remote := <-remotes: // Enqueue initial job - queueJob(in, Job{ + wCtx.queueJob(Job{ Remote: remote, Uri: remote.BaseUri, UriStr: remote.BaseUri.String(), diff --git a/worker.go b/worker.go index 481aee2..2fd2785 100644 --- a/worker.go +++ b/worker.go @@ -2,9 +2,11 @@ package main import ( "github.com/sirupsen/logrus" + "math" "strings" "sync" "sync/atomic" + "time" ) var globalWait sync.WaitGroup @@ -12,6 +14,8 @@ var globalWait sync.WaitGroup type WorkerContext struct { in chan<- Job out <-chan Job + lastRateLimit time.Time + numRateLimits int } func (w WorkerContext) Worker() { @@ -21,7 +25,7 @@ func (w WorkerContext) Worker() { } func (w WorkerContext) step(job Job) { - defer finishJob(&job) + defer w.finishJob(&job) var f File @@ -37,14 +41,18 @@ func (w WorkerContext) step(job Job) { Errorf("Giving up after %d fails", job.Fails) } else { atomic.AddUint64(&totalRetries, 1) - queueJob(w.in, job) + if err == ErrRateLimit { + w.lastRateLimit = time.Now() + w.numRateLimits++ + } + w.queueJob(job) } return } atomic.AddUint64(&totalDone, 1) for _, job := range newJobs { - queueJob(w.in, job) + w.queueJob(job) } job.Remote.Files = append(job.Remote.Files, f) @@ -86,13 +94,24 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) { return } -func queueJob(in chan<- Job, job Job) { +func (w WorkerContext) queueJob(job Job) { job.Remote.Wait.Add(1) globalWait.Add(1) - in <- job + + if w.numRateLimits > 0 { + if time.Since(w.lastRateLimit) > 5 * time.Second { + w.numRateLimits = 0 + } else { + time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) * + 100 * time.Millisecond) + w.in <- job + } + } else { + w.in <- job + } } -func finishJob(job *Job) { +func (w WorkerContext) finishJob(job *Job) { job.Remote.Wait.Done() globalWait.Done() }