Performance patch

This commit is contained in:
Richard Patel 2018-11-20 02:33:50 +01:00
parent b6c0a45900
commit 85d2aac9d4
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
4 changed files with 111 additions and 44 deletions

View File

@ -29,30 +29,6 @@ type Job struct {
LastError error
}
type JobGob struct {
Uri string
Fails int
LastError string
}
func (g *JobGob) ToGob(j *Job) {
g.Uri = j.UriStr
g.Fails = j.Fails
if j.LastError != nil {
g.LastError = j.LastError.Error()
}
}
func (g *JobGob) FromGob(j *Job) {
if err := j.Uri.Parse(g.Uri);
err != nil { panic(err) }
j.UriStr = g.Uri
j.Fails = g.Fails
if g.LastError != "" {
j.LastError = errorString(g.LastError)
}
}
type OD struct {
Task Task
Result TaskResult

105
queue.go Normal file
View File

@ -0,0 +1,105 @@
package main
import (
"github.com/beeker1121/goque"
"sync"
)
const (
threshold = 5000
)
type BufferedQueue struct {
q *goque.Queue
inBuf []Job
outBuf []Job
m sync.Mutex
}
func OpenQueue(dataDir string) (bq *BufferedQueue, err error) {
bq = new(BufferedQueue)
bq.q, err = goque.OpenQueue(dataDir)
if err != nil { return nil, err }
return
}
func (q *BufferedQueue) Enqueue(job *Job) error {
if q.directEnqueue(job) {
return nil
}
var gob JobGob
gob.ToGob(job)
_, err := q.q.EnqueueObject(gob)
return err
}
func (q *BufferedQueue) Dequeue() (job Job, err error) {
if q.directDequeue(&job) {
return job, nil
}
var item *goque.Item
item, err = q.q.Dequeue()
if err != nil { return }
var gob JobGob
err = item.ToObject(&gob)
if err != nil { return }
gob.FromGob(&job)
return
}
func (q *BufferedQueue) directEnqueue(job *Job) bool {
q.m.Lock()
defer q.m.Unlock()
if len(q.outBuf) < threshold {
q.outBuf = append(q.outBuf, *job)
return true
} else {
return false
}
}
func (q *BufferedQueue) directDequeue(job *Job) bool {
q.m.Lock()
defer q.m.Unlock()
if len(q.outBuf) > 0 {
*job = q.outBuf[0]
q.outBuf = q.outBuf[1:]
return true
} else {
return false
}
}
func (q *BufferedQueue) Close() error {
return q.q.Close()
}
type JobGob struct {
Uri string
Fails int
LastError string
}
func (g *JobGob) ToGob(j *Job) {
g.Uri = j.UriStr
g.Fails = j.Fails
if j.LastError != nil {
g.LastError = j.LastError.Error()
}
}
func (g *JobGob) FromGob(j *Job) {
if err := j.Uri.Parse(g.Uri);
err != nil { panic(err) }
j.UriStr = g.Uri
j.Fails = g.Fails
if g.LastError != "" {
j.LastError = errorString(g.LastError)
}
}

View File

@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/beeker1121/goque"
"github.com/sirupsen/logrus"
"github.com/terorie/od-database-crawler/fasturl"
"os"
@ -40,10 +39,8 @@ func Schedule(c context.Context, remotes <-chan *OD) {
// Start new queue
var err error
remote.WCtx.Queue, err = goque.OpenQueue(queuePath)
if err != nil {
panic(err)
}
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
// Spawn workers
for i := 0; i < config.Workers; i++ {

View File

@ -16,14 +16,14 @@ var globalWait sync.WaitGroup
type WorkerContext struct {
OD *OD
Queue *goque.Queue
Queue *BufferedQueue
lastRateLimit time.Time
numRateLimits int
}
func (w *WorkerContext) Worker(results chan<- File) {
for {
item, err := w.Queue.Dequeue()
job, err := w.Queue.Dequeue()
switch err {
case goque.ErrEmpty:
time.Sleep(500 * time.Millisecond)
@ -33,20 +33,11 @@ func (w *WorkerContext) Worker(results chan<- File) {
return
case nil:
break
w.step(results, job)
default:
panic(err)
}
var gob JobGob
if err := item.ToObject(&gob); err != nil {
panic(err)
}
var job Job
gob.FromGob(&job)
w.step(results, job)
}
}
@ -179,9 +170,7 @@ func (w *WorkerContext) queueJob(job Job) {
}
}
var gob JobGob
gob.ToGob(&job)
if _, err := w.Queue.EnqueueObject(gob); err != nil {
if err := w.Queue.Enqueue(&job); err != nil {
panic(err)
}
}