Reset to stable branch

This commit is contained in:
Richard Patel
2019-02-22 05:37:45 +01:00
parent 7c8ab50ee4
commit 326e29e5e4
13 changed files with 162 additions and 758 deletions

View File

@@ -21,69 +21,61 @@ var totalQueued int64
func Schedule(c context.Context, remotes <-chan *OD) {
go Stats(c)
for {
select {
case remote := <-remotes:
if !scheduleNewTask(c, remote) {
for remote := range remotes {
logrus.WithField("url", remote.BaseUri.String()).
Info("Starting crawler")
// Collect results
results := make(chan File)
remote.WCtx.OD = remote
// Get queue path
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
// Spawn workers
for i := 0; i < config.Workers; i++ {
go remote.WCtx.Worker(results)
}
// Enqueue initial job
atomic.AddInt32(&numActiveTasks, 1)
remote.WCtx.queueJob(Job{
Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(),
Fails: 0,
})
// Upload result when ready
go remote.Watch(results)
// Sleep if max number of tasks are active
for atomic.LoadInt32(&numActiveTasks) > config.Tasks {
select {
case <-c.Done():
return
case <-time.After(time.Second):
continue
}
case <-c.Done():
return
}
}
}
func scheduleNewTask(c context.Context, remote *OD) bool {
logrus.WithField("url", remote.BaseUri.String()).
Info("Starting crawler")
// Collect results
results := make(chan File)
remote.WCtx.OD = remote
// Get queue path
queuePath := path.Join("queue", fmt.Sprintf("%d", remote.Task.WebsiteId))
// Delete existing queue
if err := os.RemoveAll(queuePath);
err != nil { panic(err) }
// Start new queue
var err error
remote.WCtx.Queue, err = OpenQueue(queuePath)
if err != nil { panic(err) }
// Spawn workers
remote.WCtx.SpawnWorkers(c, results, config.Workers)
// Enqueue initial job
atomic.AddInt32(&numActiveTasks, 1)
remote.WCtx.queueJob(Job{
Uri: remote.BaseUri,
UriStr: remote.BaseUri.String(),
Fails: 0,
})
// Upload result when ready
globalWait.Add(1)
go remote.Watch(results)
// Sleep if max number of tasks are active
for atomic.LoadInt32(&numActiveTasks) > config.Tasks {
if !sleep(time.Second, c) {
break
}
}
return true
}
func ScheduleTask(remotes chan<- *OD, t *Task, u *fasturl.URL) {
if !t.register() {
return
}
globalWait.Add(1)
now := time.Now()
od := &OD {
Task: *t,
@@ -125,7 +117,7 @@ func (o *OD) Watch(results chan File) {
// Open crawl results file
f, err := os.OpenFile(
filePath,
os.O_CREATE | os.O_RDWR | os.O_APPEND,
os.O_CREATE | os.O_RDWR | os.O_TRUNC,
0644,
)
if err != nil {
@@ -141,9 +133,7 @@ func (o *OD) Watch(results chan File) {
// Block until all results are written
// (closes results channel)
if !o.handleCollect(results, f, collectErrC) {
return
}
o.handleCollect(results, f, collectErrC)
// Exit code of Collect()
err = <-collectErrC
@@ -163,40 +153,22 @@ func (o *OD) Watch(results chan File) {
}
}
// Returns if aborted naturally (results ready for upload)
func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error) bool {
func (o *OD) handleCollect(results chan File, f *os.File, collectErrC chan error) {
// Begin collecting results
go o.Task.Collect(results, f, collectErrC)
defer close(results)
// Wait for all jobs on remote to finish
for {
// Natural finish
if atomic.LoadInt64(&o.InProgress) == 0 {
o.onTaskFinished()
return true
}
// Abort
if atomic.LoadInt32(&o.WCtx.aborted) != 0 {
// Wait for all workers to finish
o.WCtx.workers.Wait()
o.onTaskPaused()
return false
}
time.Sleep(500 * time.Millisecond)
}
}
func (o *OD) onTaskFinished() {
defer atomic.AddInt32(&numActiveTasks, -1)
o.Wait.Wait()
// Close queue
if err := o.WCtx.Queue.Close(); err != nil {
panic(err)
}
atomic.AddInt32(&numActiveTasks, -1)
// Log finish
logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
@@ -219,37 +191,6 @@ func (o *OD) onTaskFinished() {
}
}
func (o *OD) onTaskPaused() {
defer atomic.AddInt32(&numActiveTasks, -1)
// Close queue
if err := o.WCtx.Queue.Close(); err != nil {
panic(err)
}
// Set current end time
o.Result.EndTimeUnix = time.Now().Unix()
// Save task metadata
err := SaveTask(o)
if err != nil {
// Log finish
logrus.WithFields(logrus.Fields{
"err": err.Error(),
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
}).Error("Failed to save crawler state")
return
}
// Log finish
logrus.WithFields(logrus.Fields{
"id": o.Task.WebsiteId,
"url": o.BaseUri.String(),
"duration": time.Since(o.Result.StartTime),
}).Info("Crawler paused")
}
func (t *Task) Collect(results chan File, f *os.File, errC chan<- error) {
err := t.collect(results, f)
if err != nil {