od-database-crawler/scheduler.go
2018-10-28 02:47:20 +02:00

87 lines
1.4 KiB
Go

package main
import (
"context"
"net/url"
)
type Job struct {
Remote *RemoteDir
Uri url.URL
UriStr string
Fails int
}
func Schedule(c context.Context, remotes <-chan *RemoteDir) {
in, out := makeJobBuffer()
wCtx := WorkerContext{ in, out }
for i := 0; i < config.Workers; i++ {
go wCtx.Worker()
}
go Stats(c)
for {
select {
case <-c.Done():
close(in)
return
case remote := <-remotes:
// Enqueue initial job
queueJob(in, Job{
Remote: remote,
Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(),
Fails: 0,
})
globalWait.Done()
// Upload result when ready
go remote.Watch()
}
}
}
func (r *RemoteDir) Watch() {
// Wait for all jobs on remote to finish
r.Wait.Wait()
}
func makeJobBuffer() (chan<- Job, <-chan Job) {
in := make(chan Job)
out := make(chan Job)
go bufferJobs(in, out)
return in, out
}
func bufferJobs(in chan Job, out chan Job) {
var inQueue []Job
outCh := func() chan Job {
if len(inQueue) == 0 {
return nil
}
return out
}
for len(inQueue) > 0 || in != nil {
if len(inQueue) == 0 {
v, ok := <-in
if !ok {
in = nil
} else {
inQueue = append(inQueue, v)
}
} else {
select {
case v, ok := <-in:
if !ok {
in = nil
} else {
inQueue = append(inQueue, v)
}
case outCh() <- inQueue[0]:
inQueue = inQueue[1:]
}
}
}
close(out)
}