diff --git a/main.go b/main.go index fbf10a3..44a0f24 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,7 @@ import ( "net/http" _ "net/http/pprof" "os" + "strings" "sync/atomic" "time" ) @@ -21,6 +22,14 @@ var app = cli.App { Writer: os.Stdout, Compiled: buildDate, Action: cmdBase, + Commands: []cli.Command{ + { + Name: "crawl", + Usage: "Crawl a list of URLs", + ArgsUsage: "", + Action: cmdCrawler, + }, + }, } func init() { @@ -74,6 +83,7 @@ func cmdBase(clic *cli.Context) error { time.Sleep(30 * time.Second) continue } + globalWait.Add(1) inRemotes <- &OD { Task: t, BaseUri: baseUri, @@ -81,6 +91,46 @@ func cmdBase(clic *cli.Context) error { } } + return nil +} + +func cmdCrawler(clic *cli.Context) error { + readConfig() + + if clic.NArg() != 1 { + cli.ShowCommandHelpAndExit(clic, "crawl", 1) + } + + arg := clic.Args()[0] + // https://github.com/golang/go/issues/19779 + if !strings.Contains(arg, "://") { + arg = "http://" + arg + } + var u fasturl.URL + err := u.Parse(arg) + if !strings.HasSuffix(u.Path, "/") { + u.Path += "/" + } + if err != nil { return err } + + // TODO Graceful shutdown + forceCtx := context.Background() + + inRemotes := make(chan *OD) + go Schedule(forceCtx, inRemotes) + + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + + globalWait.Add(1) + inRemotes <- &OD { + Task: &Task{ + WebsiteId: 0, + Url: u.String(), + }, + BaseUri: u, + } + // Wait for all jobs to finish globalWait.Wait() diff --git a/scheduler.go b/scheduler.go index d972072..8c0938d 100644 --- a/scheduler.go +++ b/scheduler.go @@ -42,6 +42,7 @@ func Schedule(c context.Context, remotes <-chan *OD) { // Upload result when ready go remote.Watch(results) + // Sleep if max number of tasks are active for atomic.LoadInt32(&activeTasks) > config.Tasks { select { case <-c.Done(): diff --git a/worker.go b/worker.go index 813f49b..50d46d3 100644 --- a/worker.go +++ b/worker.go @@ -105,7 +105,6 @@ func DoJob(job *Job, f *File) (newJobs []Job, err error) { } lastLink = uriStr - job.OD.Wait.Add(1) newJobs = append(newJobs, Job{ OD: job.OD, Uri: link, @@ -143,11 +142,10 @@ func (w WorkerContext) queueJob(job Job) { } else { time.Sleep(time.Duration(math.Sqrt(float64(50 * w.numRateLimits))) * 100 * time.Millisecond) - w.in <- job } - } else { - w.in <- job } + + w.in <- job } func (w WorkerContext) finishJob(job *Job) {