diff --git a/queue.go b/queue.go index a1e922e..66f57e1 100644 --- a/queue.go +++ b/queue.go @@ -2,6 +2,7 @@ package main import ( "github.com/beeker1121/goque" + "os" "sync" "sync/atomic" ) @@ -11,13 +12,15 @@ const ( ) type BufferedQueue struct { - q *goque.Queue - outBuf []Job - m sync.Mutex + dataDir string + q *goque.Queue + buf []Job + m sync.Mutex } func OpenQueue(dataDir string) (bq *BufferedQueue, err error) { bq = new(BufferedQueue) + bq.dataDir = dataDir bq.q, err = goque.OpenQueue(dataDir) if err != nil { return nil, err } return @@ -59,8 +62,8 @@ func (q *BufferedQueue) directEnqueue(job *Job) bool { q.m.Lock() defer q.m.Unlock() - if len(q.outBuf) < threshold { - q.outBuf = append(q.outBuf, *job) + if len(q.buf) < threshold { + q.buf = append(q.buf, *job) return true } else { return false @@ -71,17 +74,25 @@ func (q *BufferedQueue) directDequeue(job *Job) bool { q.m.Lock() defer q.m.Unlock() - if len(q.outBuf) > 0 { - *job = q.outBuf[0] - q.outBuf = q.outBuf[1:] + if len(q.buf) > 0 { + *job = q.buf[0] + q.buf = q.buf[1:] return true } else { return false } } +// Always returns nil (But implements io.Closer) func (q *BufferedQueue) Close() error { - return q.q.Close() + // Close ignoring errors + q.q.Close() + + // Delete files + if err := os.RemoveAll(q.dataDir); + err != nil { panic(err) } + + return nil } type JobGob struct { diff --git a/scheduler.go b/scheduler.go index 63e414b..9abe491 100644 --- a/scheduler.go +++ b/scheduler.go @@ -160,6 +160,8 @@ func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error // Wait for all jobs on remote to finish o.Wait.Wait() + + // Close queue if err := o.WCtx.Queue.Close(); err != nil { panic(err) }