more stuff

This commit is contained in:
Richard Patel 2018-10-28 03:41:16 +01:00
parent 4ea5f8a410
commit faad19f121
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
4 changed files with 34 additions and 10 deletions

View File

@ -2,4 +2,5 @@ server_url: localhost:6969
token: abc
stats_interval: 1s
verbose: true
retries: 0
retries: 5
workers: 80

View File

@ -14,7 +14,7 @@ func main() {
remotes := make(chan *RemoteDir)
go Schedule(c, remotes)
u, _ := url.Parse("https://the-eye.eu/public/rom/")
u, _ := url.Parse("http://mine.terorie.com:420/")
remote := NewRemoteDir(*u)
globalWait.Add(1)

View File

@ -10,11 +10,15 @@ type Job struct {
Uri url.URL
UriStr string
Fails int
LastError error
}
func Schedule(c context.Context, remotes <-chan *RemoteDir) {
in, out := makeJobBuffer()
wCtx := WorkerContext{ in, out }
wCtx := WorkerContext{
in: in,
out: out,
}
for i := 0; i < config.Workers; i++ {
go wCtx.Worker()
}
@ -28,7 +32,7 @@ func Schedule(c context.Context, remotes <-chan *RemoteDir) {
case remote := <-remotes:
// Enqueue initial job
queueJob(in, Job{
wCtx.queueJob(Job{
Remote: remote,
Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(),

View File

@ -2,9 +2,11 @@ package main
import (
"github.com/sirupsen/logrus"
"math"
"strings"
"sync"
"sync/atomic"
"time"
)
var globalWait sync.WaitGroup
@ -12,6 +14,8 @@ var globalWait sync.WaitGroup
type WorkerContext struct {
in chan<- Job
out <-chan Job
lastRateLimit time.Time
numRateLimits int
}
func (w WorkerContext) Worker() {
@ -21,7 +25,7 @@ func (w WorkerContext) Worker() {
}
func (w WorkerContext) step(job Job) {
defer finishJob(&job)
defer w.finishJob(&job)
var f File
@ -37,14 +41,18 @@ func (w WorkerContext) step(job Job) {
Errorf("Giving up after %d fails", job.Fails)
} else {
atomic.AddUint64(&totalRetries, 1)
queueJob(w.in, job)
if err == ErrRateLimit {
w.lastRateLimit = time.Now()
w.numRateLimits++
}
w.queueJob(job)
}
return
}
atomic.AddUint64(&totalDone, 1)
for _, job := range newJobs {
queueJob(w.in, job)
w.queueJob(job)
}
job.Remote.Files = append(job.Remote.Files, f)
@ -86,13 +94,24 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
return
}
func queueJob(in chan<- Job, job Job) {
func (w WorkerContext) queueJob(job Job) {
job.Remote.Wait.Add(1)
globalWait.Add(1)
in <- job
if w.numRateLimits > 0 {
if time.Since(w.lastRateLimit) > 5 * time.Second {
w.numRateLimits = 0
} else {
time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) *
100 * time.Millisecond)
w.in <- job
}
} else {
w.in <- job
}
}
func finishJob(job *Job) {
func (w WorkerContext) finishJob(job *Job) {
job.Remote.Wait.Done()
globalWait.Done()
}