From 3f85cf679b8331551c6a7822706d49d6089ee703 Mon Sep 17 00:00:00 2001 From: Richard Patel Date: Fri, 16 Nov 2018 04:47:08 +0100 Subject: [PATCH] Getting tasks --- config.go | 14 +++++---- config.yml | 8 ++--- main.go | 85 ++++++++++++++++++++++++++-------------------------- scheduler.go | 10 +++++++ server.go | 21 ++++++------- stats.go | 4 +++ 6 files changed, 77 insertions(+), 65 deletions(-) diff --git a/config.go b/config.go index 566aeab..9fc40a0 100644 --- a/config.go +++ b/config.go @@ -5,6 +5,7 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/viper" "os" + "strings" "time" ) @@ -52,14 +53,15 @@ func readConfig() { } config.ServerUrl = viper.GetString(ConfServerUrl) - //if config.ServerUrl == "" { - // configMissing(ConfServerUrl) - //} + if config.ServerUrl == "" { + configMissing(ConfServerUrl) + } + config.ServerUrl = strings.TrimRight(config.ServerUrl, "/") config.Token = viper.GetString(ConfToken) - //if config.Token == "" { - // configMissing(ConfToken) - //} + if config.Token == "" { + configMissing(ConfToken) + } config.Retries = viper.GetInt(ConfRetries) if config.Retries < 0 { diff --git a/config.yml b/config.yml index e940981..55cab36 100644 --- a/config.yml +++ b/config.yml @@ -1,7 +1,7 @@ # OD-Database server settings server: # Connection URL - url: localhost:6969 + url: http://od-db.mine.terorie.com/api # Server auth token token: @@ -10,7 +10,7 @@ output: # Crawl statistics crawl_stats: 1s # CPU/RAM/Job queue stats - resource_stats: 1s + resource_stats: 10s # More output? (Every listed dir) verbose: false @@ -18,9 +18,9 @@ output: crawl: # Number of sites that can be # processed at once - tasks: 3 + tasks: 300 # Number of connections per site - connections: 2 + connections: 20 # How often to retry getting data # from the site before giving up retries: 5 diff --git a/main.go b/main.go index 260b27f..fbf10a3 100644 --- a/main.go +++ b/main.go @@ -9,7 +9,7 @@ import ( "net/http" _ "net/http/pprof" "os" - "strings" + "sync/atomic" "time" ) @@ -20,14 +20,7 @@ var app = cli.App { BashComplete: cli.DefaultAppComplete, Writer: os.Stdout, Compiled: buildDate, - Commands: []cli.Command{ - { - Name: "crawl", - Usage: "Crawl a list of URLs", - ArgsUsage: "[site, site, ...]", - Action: cmdCrawler, - }, - }, + Action: cmdBase, } func init() { @@ -41,50 +34,56 @@ func main() { app.Run(os.Args) } -func cmdCrawler(clic *cli.Context) error { +func cmdBase(clic *cli.Context) error { readConfig() - if clic.NArg() == 0 { - cli.ShowCommandHelpAndExit(clic, "crawl", 1) - } - - args := clic.Args() - remotes := make([]*OD, len(args)) - for i, arg := range args { - // https://github.com/golang/go/issues/19779 - if !strings.Contains(arg, "://") { - arg = "http://" + arg - } - var u fasturl.URL - err := u.Parse(arg) - if !strings.HasSuffix(u.Path, "/") { - u.Path += "/" - } - if err != nil { return err } - remotes[i] = &OD { - Task: &Task{ - WebsiteId: 0, - Url: u.String(), - }, - BaseUri: u, - } - } - - c := context.Background() + // TODO Graceful shutdown + appCtx := context.Background() + forceCtx := context.Background() inRemotes := make(chan *OD) - go Schedule(c, inRemotes) + go Schedule(forceCtx, inRemotes) - for _, remote := range remotes { - globalWait.Add(1) - inRemotes <- remote + ticker := time.NewTicker(3 * time.Second) + defer ticker.Stop() + for { + select { + case <-appCtx.Done(): + return nil + case <-ticker.C: + t, err := FetchTask() + if err != nil { + logrus.WithError(err). + Error("Failed getting new task") + time.Sleep(30 * time.Second) + continue + } + if t == nil { + // No new task + if atomic.LoadInt32(&activeTasks) == 0 { + logrus.Info("Waiting …") + } + continue + } + + var baseUri fasturl.URL + err = baseUri.Parse(t.Url) + if err != nil { + logrus.WithError(err). + Error("Failed getting new task") + time.Sleep(30 * time.Second) + continue + } + inRemotes <- &OD { + Task: t, + BaseUri: baseUri, + } + } } // Wait for all jobs to finish globalWait.Wait() - logrus.Info("All dirs processed!") - return nil } diff --git a/scheduler.go b/scheduler.go index 84683c7..d972072 100644 --- a/scheduler.go +++ b/scheduler.go @@ -8,6 +8,7 @@ import ( "os" "path" "sync/atomic" + "time" ) var activeTasks int32 @@ -40,6 +41,15 @@ func Schedule(c context.Context, remotes <-chan *OD) { // Upload result when ready go remote.Watch(results) + + for atomic.LoadInt32(&activeTasks) > config.Tasks { + select { + case <-c.Done(): + return + case <-time.After(time.Second): + continue + } + } } } diff --git a/server.go b/server.go index 7c86ef9..c3987da 100644 --- a/server.go +++ b/server.go @@ -22,22 +22,19 @@ const ( var serverClient = http.DefaultClient func FetchTask() (t *Task, err error) { - escToken, _ := json.Marshal(config.Token) - payload := `{"token":` + string(escToken) + `}` - - req, err := http.NewRequest( - http.MethodPost, + res, err := serverClient.PostForm( config.ServerUrl + "/task/get", - strings.NewReader(payload)) - if err != nil { return } - - res, err := serverClient.Do(req) + url.Values{ "token": {config.Token} }) if err != nil { return } defer res.Body.Close() - if res.StatusCode != 200 { - err = fmt.Errorf("http %s", res.Status) - return + switch res.StatusCode { + case 200: + break + case 500: + return nil, nil + default: + return nil, fmt.Errorf("http %s", res.Status) } t = new(Task) diff --git a/stats.go b/stats.go index 61e14da..64337e1 100644 --- a/stats.go +++ b/stats.go @@ -39,6 +39,10 @@ func Stats(c context.Context) { perSecond = math.Round(perSecond) perSecond /= 2 + if perSecond <= 0 { + continue + } + logrus.WithFields(logrus.Fields{ "per_second": perSecond, "done": atomic.LoadUint64(&totalDone),