Fix wait, add back crawl command

This commit is contained in:
Richard Patel 2018-11-17 00:49:09 +01:00
parent cc777bcaeb
commit 145d37f84a
No known key found for this signature in database
GPG Key ID: C268B2BBDA2ABECB
3 changed files with 53 additions and 4 deletions

50
main.go
View File

@ -9,6 +9,7 @@ import (
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"strings"
"sync/atomic" "sync/atomic"
"time" "time"
) )
@ -21,6 +22,14 @@ var app = cli.App {
Writer: os.Stdout, Writer: os.Stdout,
Compiled: buildDate, Compiled: buildDate,
Action: cmdBase, Action: cmdBase,
Commands: []cli.Command{
{
Name: "crawl",
Usage: "Crawl a list of URLs",
ArgsUsage: "<site>",
Action: cmdCrawler,
},
},
} }
func init() { func init() {
@ -74,6 +83,7 @@ func cmdBase(clic *cli.Context) error {
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
continue continue
} }
globalWait.Add(1)
inRemotes <- &OD { inRemotes <- &OD {
Task: t, Task: t,
BaseUri: baseUri, BaseUri: baseUri,
@ -81,6 +91,46 @@ func cmdBase(clic *cli.Context) error {
} }
} }
return nil
}
func cmdCrawler(clic *cli.Context) error {
readConfig()
if clic.NArg() != 1 {
cli.ShowCommandHelpAndExit(clic, "crawl", 1)
}
arg := clic.Args()[0]
// https://github.com/golang/go/issues/19779
if !strings.Contains(arg, "://") {
arg = "http://" + arg
}
var u fasturl.URL
err := u.Parse(arg)
if !strings.HasSuffix(u.Path, "/") {
u.Path += "/"
}
if err != nil { return err }
// TODO Graceful shutdown
forceCtx := context.Background()
inRemotes := make(chan *OD)
go Schedule(forceCtx, inRemotes)
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
globalWait.Add(1)
inRemotes <- &OD {
Task: &Task{
WebsiteId: 0,
Url: u.String(),
},
BaseUri: u,
}
// Wait for all jobs to finish // Wait for all jobs to finish
globalWait.Wait() globalWait.Wait()

View File

@ -42,6 +42,7 @@ func Schedule(c context.Context, remotes <-chan *OD) {
// Upload result when ready // Upload result when ready
go remote.Watch(results) go remote.Watch(results)
// Sleep if max number of tasks are active
for atomic.LoadInt32(&activeTasks) > config.Tasks { for atomic.LoadInt32(&activeTasks) > config.Tasks {
select { select {
case <-c.Done(): case <-c.Done():

View File

@ -105,7 +105,6 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) {
} }
lastLink = uriStr lastLink = uriStr
job.OD.Wait.Add(1)
newJobs = append(newJobs, Job{ newJobs = append(newJobs, Job{
OD: job.OD, OD: job.OD,
Uri: link, Uri: link,
@ -143,11 +142,10 @@ func (w WorkerContext) queueJob(job Job) {
} else { } else {
time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) * time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) *
100 * time.Millisecond) 100 * time.Millisecond)
w.in <- job
} }
} else {
w.in <- job
} }
w.in <- job
} }
func (w WorkerContext) finishJob(job *Job) { func (w WorkerContext) finishJob(job *Job) {