diff --git a/queue.go b/queue.go index 88d2e1a..a1e922e 100644 --- a/queue.go +++ b/queue.go @@ -3,6 +3,7 @@ package main import ( "github.com/beeker1121/goque" "sync" + "sync/atomic" ) const ( @@ -11,7 +12,6 @@ const ( type BufferedQueue struct { q *goque.Queue - inBuf []Job outBuf []Job m sync.Mutex } @@ -24,6 +24,7 @@ func OpenQueue(dataDir string) (bq *BufferedQueue, err error) { } func (q *BufferedQueue) Enqueue(job *Job) error { + atomic.AddInt64(&totalQueued, 1) if q.directEnqueue(job) { return nil } @@ -36,6 +37,7 @@ func (q *BufferedQueue) Enqueue(job *Job) error { func (q *BufferedQueue) Dequeue() (job Job, err error) { if q.directDequeue(&job) { + atomic.AddInt64(&totalQueued, -1) return job, nil } @@ -43,6 +45,8 @@ func (q *BufferedQueue) Dequeue() (job Job, err error) { item, err = q.q.Dequeue() if err != nil { return } + atomic.AddInt64(&totalQueued, -1) + var gob JobGob err = item.ToObject(&gob) if err != nil { return } diff --git a/scheduler.go b/scheduler.go index c348647..63e414b 100644 --- a/scheduler.go +++ b/scheduler.go @@ -16,7 +16,7 @@ import ( var activeTasksLock sync.Mutex var activeTasks = make(map[uint64]bool) var numActiveTasks int32 -var totalBuffered int64 +var totalQueued int64 func Schedule(c context.Context, remotes <-chan *OD) { go Stats(c) diff --git a/stats.go b/stats.go index 73a846e..80c149e 100644 --- a/stats.go +++ b/stats.go @@ -57,7 +57,7 @@ func Stats(c context.Context) { runtime.ReadMemStats(&mem) logrus.WithFields(logrus.Fields{ - "queue_count": atomic.LoadInt64(&totalBuffered), + "queue_count": atomic.LoadInt64(&totalQueued), "heap": FormatByteCount(mem.Alloc), "objects": mem.HeapObjects, "num_gc": mem.NumGC,