Job queue disk offloading

This commit is contained in:
Richard Patel
2018-11-20 02:03:10 +01:00
parent d332f06659
commit b6c0a45900
4 changed files with 113 additions and 72 deletions

View File

@@ -1,6 +1,7 @@
package main
import (
"github.com/beeker1121/goque"
"github.com/sirupsen/logrus"
"github.com/valyala/fasthttp"
"math"
@@ -14,24 +15,47 @@ import (
var globalWait sync.WaitGroup
type WorkerContext struct {
in chan<- Job
out <-chan Job
OD *OD
Queue *goque.Queue
lastRateLimit time.Time
numRateLimits int
}
func (w WorkerContext) Worker(results chan<- File) {
for job := range w.out {
func (w *WorkerContext) Worker(results chan<- File) {
for {
item, err := w.Queue.Dequeue()
switch err {
case goque.ErrEmpty:
time.Sleep(500 * time.Millisecond)
continue
case goque.ErrDBClosed:
return
case nil:
break
default:
panic(err)
}
var gob JobGob
if err := item.ToObject(&gob); err != nil {
panic(err)
}
var job Job
gob.FromGob(&job)
w.step(results, job)
}
}
func (w WorkerContext) step(results chan<- File, job Job) {
func (w *WorkerContext) step(results chan<- File, job Job) {
defer w.finishJob(&job)
var f File
newJobs, err := DoJob(&job, &f)
newJobs, err := w.DoJob(&job, &f)
atomic.AddUint64(&totalStarted, 1)
if err == ErrKnown {
return
@@ -75,7 +99,7 @@ func (w WorkerContext) step(results chan<- File, job Job) {
}
}
func DoJob(job *Job, f *File) (newJobs []Job, err error) {
func (w *WorkerContext) DoJob(job *Job, f *File) (newJobs []Job, err error) {
if len(job.Uri.Path) == 0 { return }
if job.Uri.Path[len(job.Uri.Path)-1] == '/' {
// Load directory
@@ -93,7 +117,7 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
hash := f.HashDir(links)
// Skip symlinked dirs
if job.OD.LoadOrStoreKey(&hash) {
if w.OD.LoadOrStoreKey(&hash) {
return nil, ErrKnown
}
@@ -114,7 +138,6 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
lastLink = uriStr
newJobs = append(newJobs, Job{
OD: job.OD,
Uri: link,
UriStr: uriStr,
Fails: 0,
@@ -139,13 +162,13 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
}
return nil, err
}
atomic.AddUint64(&job.OD.Result.FileCount, 1)
atomic.AddUint64(&w.OD.Result.FileCount, 1)
}
return
}
func (w WorkerContext) queueJob(job Job) {
job.OD.Wait.Add(1)
func (w *WorkerContext) queueJob(job Job) {
w.OD.Wait.Add(1)
if w.numRateLimits > 0 {
if time.Since(w.lastRateLimit) > 5 * time.Second {
@@ -156,11 +179,15 @@ func (w WorkerContext) queueJob(job Job) {
}
}
w.in <- job
var gob JobGob
gob.ToGob(&job)
if _, err := w.Queue.EnqueueObject(gob); err != nil {
panic(err)
}
}
func (w WorkerContext) finishJob(job *Job) {
job.OD.Wait.Done()
func (w *WorkerContext) finishJob(job *Job) {
w.OD.Wait.Done()
}
func isErrSilent(err error) bool {