From 85d2aac9d4bbef6ce46738142573f83ee4063af1 Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Tue, 20 Nov 2018 02:33:50 +0100 Subject: [PATCH] Performance patch --- model.go | 24 ------------ queue.go | 105 +++++++++++++++++++++++++++++++++++++++++++++++++++ scheduler.go | 7 +--- worker.go | 19 ++-------- 4 files changed, 111 insertions(+), 44 deletions(-) create mode 100644 queue.go diff --git a/model.go b/model.go index ff1f048..0b24e91 100644 --- a/model.go +++ b/model.go @@ -29,30 +29,6 @@ type Job struct { LastError error } -type JobGob struct { - Uri string - Fails int - LastError string -} - -func (g *JobGob) ToGob(j *Job) { - g.Uri = j.UriStr - g.Fails = j.Fails - if j.LastError != nil { - g.LastError = j.LastError.Error() - } -} - -func (g *JobGob) FromGob(j *Job) { - if err := j.Uri.Parse(g.Uri); - err != nil { panic(err) } - j.UriStr = g.Uri - j.Fails = g.Fails - if g.LastError != "" { - j.LastError = errorString(g.LastError) - } -} - type OD struct { Task Task Result TaskResult diff --git a/queue.go b/queue.go new file mode 100644 index 0000000..88d2e1a --- /dev/null +++ b/queue.go @@ -0,0 +1,105 @@ +package main + +import ( + "github.com/beeker1121/goque" + "sync" +) + +const ( + threshold = 5000 +) + +type BufferedQueue struct { + q *goque.Queue + inBuf []Job + outBuf []Job + m sync.Mutex +} + +func OpenQueue(dataDir string) (bq *BufferedQueue, err error) { + bq = new(BufferedQueue) + bq.q, err = goque.OpenQueue(dataDir) + if err != nil { return nil, err } + return +} + +func (q *BufferedQueue) Enqueue(job *Job) error { + if q.directEnqueue(job) { + return nil + } + + var gob JobGob + gob.ToGob(job) + _, err := q.q.EnqueueObject(gob) + return err +} + +func (q *BufferedQueue) Dequeue() (job Job, err error) { + if q.directDequeue(&job) { + return job, nil + } + + var item *goque.Item + item, err = q.q.Dequeue() + if err != nil { return } + + var gob JobGob + err = item.ToObject(&gob) + if err != nil { return } + gob.FromGob(&job) + + return +} + +func (q *BufferedQueue) directEnqueue(job *Job) bool { + q.m.Lock() + defer q.m.Unlock() + + if len(q.outBuf) < threshold { + q.outBuf = append(q.outBuf, *job) + return true + } else { + return false + } +} + +func (q *BufferedQueue) directDequeue(job *Job) bool { + q.m.Lock() + defer q.m.Unlock() + + if len(q.outBuf) > 0 { + *job = q.outBuf[0] + q.outBuf = q.outBuf[1:] + return true + } else { + return false + } +} + +func (q *BufferedQueue) Close() error { + return q.q.Close() +} + +type JobGob struct { + Uri string + Fails int + LastError string +} + +func (g *JobGob) ToGob(j *Job) { + g.Uri = j.UriStr + g.Fails = j.Fails + if j.LastError != nil { + g.LastError = j.LastError.Error() + } +} + +func (g *JobGob) FromGob(j *Job) { + if err := j.Uri.Parse(g.Uri); + err != nil { panic(err) } + j.UriStr = g.Uri + j.Fails = g.Fails + if g.LastError != "" { + j.LastError = errorString(g.LastError) + } +} diff --git a/scheduler.go b/scheduler.go index 50e14bb..c348647 100644 --- a/scheduler.go +++ b/scheduler.go @@ -4,7 +4,6 @@ import ( "context" "encoding/json" "fmt" - "github.com/beeker1121/goque" "github.com/sirupsen/logrus" "github.com/terorie/od-database-crawler/fasturl" "os" @@ -40,10 +39,8 @@ func Schedule(c context.Context, remotes <-chan *OD) { // Start new queue var err error - remote.WCtx.Queue, err = goque.OpenQueue(queuePath) - if err != nil { - panic(err) - } + remote.WCtx.Queue, err = OpenQueue(queuePath) + if err != nil { panic(err) } // Spawn workers for i := 0; i < config.Workers; i++ { diff --git a/worker.go b/worker.go index 837ffda..2d6e39a 100644 --- a/worker.go +++ b/worker.go @@ -16,14 +16,14 @@ var globalWait sync.WaitGroup type WorkerContext struct { OD *OD - Queue *goque.Queue + Queue *BufferedQueue lastRateLimit time.Time numRateLimits int } func (w *WorkerContext) Worker(results chan<- File) { for { - item, err := w.Queue.Dequeue() + job, err := w.Queue.Dequeue() switch err { case goque.ErrEmpty: time.Sleep(500 * time.Millisecond) @@ -33,20 +33,11 @@ func (w *WorkerContext) Worker(results chan<- File) { return case nil: - break + w.step(results, job) default: panic(err) } - - var gob JobGob - if err := item.ToObject(&gob); err != nil { - panic(err) - } - - var job Job - gob.FromGob(&job) - w.step(results, job) } } @@ -179,9 +170,7 @@ func (w *WorkerContext) queueJob(job Job) { } } - var gob JobGob - gob.ToGob(&job) - if _, err := w.Queue.EnqueueObject(gob); err != nil { + if err := w.Queue.Enqueue(&job); err != nil { panic(err) } }